在 NiFi 中,存在从 MQTT(ConsumeMQTT)消费并发布到 HDFS 路径(PutHDFS)的数据流。我需要在将消耗的数据推送到 HDFS 路径之前引入 60 分钟的延迟。发现 ControlRate 和 MergeContent 处理器是可能的解决方案但不确定。
What is the ideal solution to introduce time delay?
示例:上午 9:00 使用的流文件应在上午 10:00 发布到 HDFS
最佳答案
您可以使用 ExecuteScript
处理器运行 sleep(60*60*1000)
循环,但这会不必要地使用系统资源。
相反,我会介绍一个 RouteOnAttribute
输出关系为one_hour_elapsed
的处理器去PutHDFS
, 和 unmatched
循环回到自身。 RouteOnAttribute
处理器应该有 路由策略设置为 Route to Property Name 和一个名为 的动态属性(单击“属性”选项卡右上角的 + 按钮) one_hour_elapsed .表达式语言值应为 ${now():toNumber():gt(${entryDate:toNumber():plus(3600000)})}
.
这个表达:
now():toNumber()
) entryDate
流文件的属性(当它进入 NiFi 时)并将其转换为毫秒并增加一小时( entryDate:toNumber():plus(3600000)
[ 3600000 == 60*60*1000
]) a:gt(${b})
) 如果这实际上不是您流程的开始,您可以使用
UpdateAttribute
处理器在流程的任何点插入任意时间戳并从那里计算。我还建议设置 产量持续时间和 运行计划 的
RouteOnAttribute
处理器比平常高得多,因为您不希望该处理器持续运行,因为它不会工作。我建议将其设置为 1 或 5 分钟开始,因为您已经引入了一个小时的延迟。关于apache-nifi - 在将流文件移动到 NiFi 中的下一个处理器之前引入时间延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61875809/