cron - 如何在 CRON 驱动的 DetectDuplicate 中摄取所有流文件?

标签 cron duplicates priority-queue apache-nifi

在 NiFi 中,我有一个 cron 驱动的处理器序列,每天提供一组流文件,其中包含我感兴趣的 2 个属性:product_codepublication_date .

我的需要是每个 product_code 只保留一个流文件: 最近的publication_date .

例如:

对于这个输入:

flow_1: product_code: A / publication_date : 2018-01-01
flow_2: product_code: B / publication_date : 2018-01-01
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_5: product_code: A / publication_date : 2000-12-31
flow_6: product_code: B / publication_date : 2018-02-02
flow_7: product_code: B / publication_date : 2018-03-03

预期的输出应该是:

flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_7: product_code: B / publication_date : 2018-03-03

我测试的算法

  1. 使用UpdateAttribute处理器添加属性 priority每个流文件,基于 publication_date .
  2. 这些更新的流文件被重定向到 PriorityAttributePrioritizer排队。
  3. 流文件留在这个队列中,因为只有一个消费处理器,它是 cron 驱动的。通过这种方式,我确定队列中的流文件是根据 publication_date 排序的。 .
  4. 然后 CRON 触发下一个处理器,a DetectDuplicate基于 product_code属性。由于流文件是从最近的项目到最旧的项目处理的,我确信当 product_code被检测为重复,这是因为相同的product_code最近的 publication_date 已经可以了.

问题

遗憾的是,当 cron 触发 DetectDuplicate processor,只有一条消息被消费,其他的留在队列中。

如果我将“调度策略”更改为“定时器驱动”且“运行调度”为 0,则我的所有流文件都将被消耗并且输出符合预期。

有没有办法问我的DetectDuplicate处理器在开始工作时消耗队列中的所有消息(而不仅仅是一条消息)?

或者有没有办法设置一个调度策略,比如“凌晨 2:00 开始工作,凌晨 4:00 停止”?

您有没有想出更好的策略来满足需求?

问候,

值(value)。


更新1

(2018-04-13) 更多信息,以及 Bryan Bende 的评论。

我知道 CRON 不是最好的解决方案,但我不知道如何改进我的算法来摆脱它。

在我的例子中,排队等待重复数据删除的流文件是通过一系列 3 个 REST 调用生成的:

  • 第一次调用“GetAllCategories”,
  • 然后为每个类别调用“GetSubCategories”,
  • 并为每个子类别调用“GetProducts”。

这个流文件生成部分通常持续 5 分钟左右:昨晚第一个流文件在凌晨 2:00:16 到达队列,最后一个在凌晨 2:04:58 到达队列。 (这就是我将 DetectDuplicate 安排在凌晨 3:00 运行的原因。)

如果我的DetectDuplicate处理器将被“定时器驱动”调度,第一个到达队列的流文件将被处理器消耗,然后所有流文件都在那里。

这会破坏整套流文件的顺序。

我觉得我必须等待所有流文件在 DetectDuplicate 之前进入队列处理器开始工作。

您有改进我的算法的潜在建议吗?

最佳答案

您通常应该对启动流程的源处理器使用 CRON 调度,然后所有其他处理器都应该使用 Run Schedule 为 0 的 Timer Driven。

例如,如果您每天凌晨 2:00 从目录中获取文件,则应使用 CRON 表达式安排 GetFile 在凌晨 2:00 开始流程,但除此之外的任何内容都不需要 CRON 安排,因为它们除非 GetFile 运行,否则永远不会接收数据。

在您希望处理器等待所有流文件可用之前执行的情况下,您可以使用 Wait/Notify 处理器,这样所有流文件在等待处理器之前建立起来释放到 DetectDuplicate 处理器。

关于cron - 如何在 CRON 驱动的 DetectDuplicate 中摄取所有流文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49796261/

相关文章:

php - Cron 作业的工作目录

ruby-on-rails - 在 Rails 应用程序中发送自动电子邮件

cron - 与 phpmyadmin 文件相比,mysqldump 文件导入速度较慢

Java序列化和重复对象

javascript - 如何每天在 11 :00pm 在 Node js 中运行 API GET 调用

Java 8 :How to remove duplicates from the List based on multiple properties preserving the order

java - 具有自定义比较器的 TreeSet 未产生正确的结果

c++ - 实现最小函数

使用 ScanF 在循环中创建新变量

java - 两条规则中的一条未触发