hadoop - 寻找一种方法来连续处理写入 hdfs 的文件

标签 hadoop apache-spark hdfs apache-flink bigdata

我正在寻找可以:

  1. 监控 hdfs 目录中的新文件并在它们出现时进行处理。
  2. 它还应该处理作业/应用程序开始工作之前目录中的文件。
  3. 它应该有检查点以在重新启动时从它离开的地方继续。

我查看了 apache spark:它可以读取新添加的文件并且可以处理重新启动以从它离开的地方继续。我找不到一种方法让它也处理同一作业范围内的旧文件(所以只有 1 和 3)。

我查看了 apache flink:它确实处理新旧文件。然而,一旦作业重新启动,它就会再次开始处理所有这些(1 和 2)。

这是一个应该很常见的用例。我是否在 spark/flink 中遗漏了一些使之成为可能的东西?这里有其他工具可以使用吗?

最佳答案

使用 Flink 流式处理,您可以完全按照您的建议处理目录中的文件,当您重新启动时,它将从它停止的地方开始处理。它被称为连续文件处理。

您唯一需要做的就是 1) 为您的工作启用检查点和 2) 启动您的程序:

    Time period = Time.minutes(10)
    env.readFile(inputFormat, "hdfs:// … /logs",
                 PROCESS_CONTINUOUSLY, 
                 period.toMilliseconds, 
                 FilePathFilter.createDefaultFilter())

该功能相当新,并且在开发邮件列表中积极讨论如何进一步改进其功能。

希望这对您有所帮助!

关于hadoop - 寻找一种方法来连续处理写入 hdfs 的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40913037/

相关文章:

powershell - HDInsight Hive日志位置

hadoop - 不应该在包安装期间配置 Oozie/Sqoop jar 位置吗?

python - Apache Pig - Jython UDF 内存错误

scala - 如何计算 Apache Spark 中 RowMatrix 的倒数?

hadoop - s3distcp : can not create path from empty string

eclipse - eclipse 调试hadoop wordcount,并发生异常

java.lang.Long 和 scala.Long

apache-spark - 齐柏林飞艇 [0.7.2] : NullPointerException on executing paragraph from a new Notebook

hadoop - 格式化本地文件系统HDFS并启动Hadoop

linux - Bash 脚本 - for 循环和 if-else