我正在研究计划每天运行一次的hadoop程序。它需要一堆json文档,并且每个文档都有一个时间戳,用于显示文档的添加时间。我的程序只应处理自上次运行以来添加的文档。因此,我需要保持一个状态,该状态是显示我的Hadoop作业上次运行的时间的时间戳。我当时想将此状态存储在SQL Server中,并在我的工作的驱动程序中查询该状态。这是一个好的解决方案还是一个更好的解决方案?
ps。我的Hadoop工作正在HDInsight上运行。话虽如此,仍然可以从我的驱动程序查询SQL Server吗?
最佳答案
我们已经针对在AWS(Amazon Web Services)中运行的工作流以及S3中存储的数据解决了此问题。
我们的设置:
问题:
我们正在使用Flume将数据摄取到Amazon S3中。所有摄取的数据都在同一个文件夹中(S3是键/值存储,没有文件夹的概念。这里的文件夹意味着所有数据都具有相同的前缀。例如/tmp/1.txt、/tmp/2.txt等。/ tmp /是键前缀)。
我们有一个ETL工作流,该工作流计划每小时运行一次。但是,由于所有数据都被提取到同一文件夹中,因此我们必须区分处理过的文件和未处理的文件。
例如摄取的第一小时数据为:
/tmp/1.txt
/tmp/2.txt
当工作流首次启动时,它应该处理来自“1.txt”和“2.txt”的数据,并将它们标记为处理过的。
如果是第二个小时,则提取的数据为:
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt
然后,两小时后文件夹中的总数据将为:
/tmp/1.txt
/tmp/2.txt
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt
由于“1.txt”和“2.txt”已被处理并标记为已处理,因此在第二次运行期间,作业应仅处理“3.txt”,“4.txt”和“5.txt” 。
解决方案:
我们开发了一个库(我们将其称为
FileManager
),用于管理已处理文件的列表。我们将此库作为Java Action 插入到Oozie工作流程中。这是工作流程中的第一步。该库还负责忽略当前由Flume写入的文件。当Flume将数据写入文件时,这些文件的后缀为“_current”。因此,这些文件将被忽略以进行处理,直到它们被完全写入。
提取的文件以时间戳作为后缀生成。例如“hourly_feed.1234567”。因此,文件名按其创建的升序排列。
为了获得未处理文件的列表,我们使用了S3的使用标记进行查询的功能(例如,如果您的文件夹中有10,000个文件,如果您将标记指定为第5,000个文件的名称,那么S3会将您的文件从5001返回到10,000)。
每个文件都有以下3种状态:
对于每个文件,我们在MySQL数据库中存储了以下详细信息:
FileManager
公开了以下接口(interface):GetLatestFiles
:返回最新的未处理的文件列表UpdateFileStatus
:处理文件后,更新文件以下是识别尚未处理的文件的步骤:
order by created desc
)。 我们使用Oozie进行工作流管理。 Oozie工作流程包括以下步骤:
重复数据删除:
当您实现这样的库时,可能会重复记录(在某些特殊情况下,同一文件可能会被拾取两次以进行处理)。我们已经实现了重复数据删除逻辑以删除重复的记录。
关于hadoop - 如何在Hadoop作业中保持状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34031477/