hadoop - 如何在Hadoop作业中保持状态?

标签 hadoop mapreduce azure-hdinsight

我正在研究计划每天运行一次的hadoop程序。它需要一堆json文档,并且每个文档都有一个时间戳,用于显示文档的添加时间。我的程序只应处理自上次运行以来添加的文档。因此,我需要保持一个状态,该状态是显示我的Hadoop作业上次运行的时间的时间戳。我当时想将此状态存储在SQL Server中,并在我的工作的驱动程序中查询该状态。这是一个好的解决方案还是一个更好的解决方案?

ps。我的Hadoop工作正在HDInsight上运行。话虽如此,仍然可以从我的驱动程序查询SQL Server吗?

最佳答案

我们已经针对在AWS(Amazon Web Services)中运行的工作流以及S3中存储的数据解决了此问题。

我们的设置:

  • 数据存储:AWS S3
  • 数据摄取机制:Flume
  • 工作流程管理:Oozie
  • 文件状态的存储:MySQL

  • 问题:

    我们正在使用Flume将数据摄取到Amazon S3中。所有摄取的数据都在同一个文件夹中(S3是键/值存储,没有文件夹的概念。这里的文件夹意味着所有数据都具有相同的前缀。例如/tmp/1.txt、/tmp/2.txt等。/ tmp /是键前缀)。

    我们有一个ETL工作流,该工作流计划每小时运行一次。但是,由于所有数据都被提取到同一文件夹中,因此我们必须区分处理过的文件和未处理的文件。

    例如摄取的第一小时数据为:
    /tmp/1.txt
    /tmp/2.txt
    

    当工作流首次启动时,它应该处理来自“1.txt”和“2.txt”的数据,并将它们标记为处理过的

    如果是第二个小时,则提取的数据为:
    /tmp/3.txt
    /tmp/4.txt
    /tmp/5.txt
    

    然后,两小时后文件夹中的总数据将为:
    /tmp/1.txt
    /tmp/2.txt
    /tmp/3.txt
    /tmp/4.txt
    /tmp/5.txt
    

    由于“1.txt”和“2.txt”已被处理并标记为已处理,因此在第二次运行期间,作业应仅处理“3.txt”,“4.txt”和“5.txt” 。

    解决方案:

    我们开发了一个库(我们将其称为FileManager),用于管理已处理文件的列表。我们将此库作为Java Action 插入到Oozie工作流程中。这是工作流程中的第一步。

    该库还负责忽略当前由Flume写入的文件。当Flume将数据写入文件时,这些文件的后缀为“_current”。因此,这些文件将被忽略以进行处理,直到它们被完全写入。

    提取的文件以时间戳作为后缀生成。例如“hourly_feed.1234567”。因此,文件名按其创建的升序排列。

    为了获得未处理文件的列表,我们使用了S3的使用标记进行查询的功能(例如,如果您的文件夹中有10,000个文件,如果您将标记指定为第5,000个文件的名称,那么S3会将您的文件从5001返回到10,000)。

    每个文件都有以下3种状态:
  • 成功-已成功处理的文件
  • 错误-已选择要处理的文件,但处理这些文件时出错。因此,需要再次拾取这些文件以处理
  • IN_PROGRESS -作业中已拾取并正在处理的文件

  • 对于每个文件,我们在MySQL数据库中存储了以下详细信息:
  • 文件名
  • 上次修改时间-我们用它来处理一些极端情况
  • 文件的状态( IN_PROGRESS SUCCESS 错误)
  • FileManager公开了以下接口(interface):
  • GetLatestFiles:返回最新的未处理的文件列表
  • UpdateFileStatus:处理文件后,更新文件
  • 的状态

    以下是识别尚未处理的文件的步骤:
  • 查询数据库(MySql),以检查最后一个文件,该文件的状态为 SUCCESS (查询:order by created desc)。
  • 如果第一步返回文件,则使用文件标记设置为最后一个成功处理的文件来查询S3。这将返回所有上次成功处理的文件之后提取的文件。
  • 还查询数据库以检查是否有任何文件处于错误状态。这些文件需要重新处理,因为以前的工作流程没有成功处理它们。
  • 返回从步骤2和3获得的文件列表(返回文件之前,将其状态标记为 IN_PROGRESS )。
  • 作业成功完成后,将所有已处理文件的状态更新为 SUCCESS 。如果在处理文件时出错,则将所有文件的状态更新为错误(以便下次可以拾取它们进行处理)

  • 我们使用Oozie进行工作流管理。 Oozie工作流程包括以下步骤:
  • 步骤1:获取下一组要处理的文件,将每个文件的状态标记为 IN_PROGRESS 并将它们传递给下一阶段
  • 步骤2:处理文件
  • 步骤3:更新处理状态( SUCCESS 错误)

  • 重复数据删除:
    当您实现这样的库时,可能会重复记录(在某些特殊情况下,同一文件可能会被拾取两次以进行处理)。我们已经实现了重复数据删除逻辑以删除重复的记录。

    关于hadoop - 如何在Hadoop作业中保持状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34031477/

    相关文章:

    hadoop - 如何将新文件权限设置为其在 hdfs 中的父文件夹?

    hadoop - Hadoop MapReduce程序是否可以访问本地资源?

    hadoop - 将文件从远程 Unix 和 Windows 服务器复制到 HDFS,无需中间暂存

    hadoop - 如何防止由于 reduce task 失败而导致 hadoop 失败

    azure - Hive:两点之间的距离

    azure - 使用函数包装器的 Invoke-AzResourceAction cmdlet 出现问题

    linux - ambari hadoop cluster + ambari 2.5.0 到新版本 - 2.6.0 有什么区别

    java - 借助log4j输出hadoop程序的变量

    python - MapReduce:ValueError:要解压的值太多(预期为 2)

    apache-spark - 在HDIinsight群集上运行Spark作业时如何解决此 fatal error ? session 681意外达到最终状态 'dead'。查看日志: