在 NiFi 中,我有一个 cron 驱动的处理器序列,每天提供一组流文件,其中包含我感兴趣的 2 个属性:product_code
和 publication_date
.
我的需要是每个 product_code
只保留一个流文件: 最近的publication_date
.
例如:
对于这个输入:
flow_1: product_code: A / publication_date : 2018-01-01
flow_2: product_code: B / publication_date : 2018-01-01
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_5: product_code: A / publication_date : 2000-12-31
flow_6: product_code: B / publication_date : 2018-02-02
flow_7: product_code: B / publication_date : 2018-03-03
预期的输出应该是:
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_7: product_code: B / publication_date : 2018-03-03
我测试的算法
- 使用
UpdateAttribute
处理器添加属性priority
每个流文件,基于publication_date
. - 这些更新的流文件被重定向到
PriorityAttributePrioritizer
排队。 - 流文件留在这个队列中,因为只有一个消费处理器,它是 cron 驱动的。通过这种方式,我确定队列中的流文件是根据
publication_date
排序的。 . - 然后 CRON 触发下一个处理器,a
DetectDuplicate
基于product_code
属性。由于流文件是从最近的项目到最旧的项目处理的,我确信当product_code
被检测为重复,这是因为相同的product_code
最近的publication_date
已经可以了.
问题
遗憾的是,当 cron 触发 DetectDuplicate
processor,只有一条消息被消费,其他的留在队列中。
如果我将“调度策略”更改为“定时器驱动”且“运行调度”为 0,则我的所有流文件都将被消耗并且输出符合预期。
有没有办法问我的DetectDuplicate
处理器在开始工作时消耗队列中的所有消息(而不仅仅是一条消息)?
或者有没有办法设置一个调度策略,比如“凌晨 2:00 开始工作,凌晨 4:00 停止”?
您有没有想出更好的策略来满足需求?
问候,
值(value)。
更新1
(2018-04-13) 更多信息,以及 Bryan Bende 的评论。
我知道 CRON 不是最好的解决方案,但我不知道如何改进我的算法来摆脱它。
在我的例子中,排队等待重复数据删除的流文件是通过一系列 3 个 REST 调用生成的:
- 第一次调用“GetAllCategories”,
- 然后为每个类别调用“GetSubCategories”,
- 并为每个子类别调用“GetProducts”。
这个流文件生成部分通常持续 5 分钟左右:昨晚第一个流文件在凌晨 2:00:16 到达队列,最后一个在凌晨 2:04:58 到达队列。 (这就是我将 DetectDuplicate
安排在凌晨 3:00 运行的原因。)
如果我的DetectDuplicate
处理器将被“定时器驱动”调度,第一个到达队列的流文件将被处理器消耗,然后所有流文件都在那里。
这会破坏整套流文件的顺序。
我觉得我必须等待所有流文件在 DetectDuplicate
之前进入队列处理器开始工作。
您有改进我的算法的潜在建议吗?
最佳答案
您通常应该对启动流程的源处理器使用 CRON 调度,然后所有其他处理器都应该使用 Run Schedule 为 0 的 Timer Driven。
例如,如果您每天凌晨 2:00 从目录中获取文件,则应使用 CRON 表达式安排 GetFile 在凌晨 2:00 开始流程,但除此之外的任何内容都不需要 CRON 安排,因为它们除非 GetFile 运行,否则永远不会接收数据。
在您希望处理器等待所有流文件可用之前执行的情况下,您可以使用 Wait/Notify 处理器,这样所有流文件在等待处理器之前建立起来释放到 DetectDuplicate 处理器。
关于cron - 如何在 CRON 驱动的 DetectDuplicate 中摄取所有流文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49796261/