apache-nifi - 在将流文件移动到 NiFi 中的下一个处理器之前引入时间延迟

标签 apache-nifi

在 NiFi 中,存在从 MQTT(ConsumeMQTT)消费并发布到 HDFS 路径(PutHDFS)的数据流。我需要在将消耗的数据推送到 HDFS 路径之前引入 60 分钟的延迟。发现 ControlRate 和 MergeContent 处理器是可能的解决方案但不确定。

What is the ideal solution to introduce time delay?



示例:上午 9:00 使用的流文件应在上午 10:00 发布到 HDFS

enter image description here

最佳答案

您可以使用 ExecuteScript处理器运行 sleep(60*60*1000)循环,但这会不必要地使用系统资源。

相反,我会介绍一个 RouteOnAttribute输出关系为one_hour_elapsed的处理器去PutHDFS , 和 unmatched循环回到自身。 RouteOnAttribute处理器应该有 路由策略设置为 Route to Property Name 和一个名为 的动态属性(单击“属性”选项卡右上角的 + 按钮) one_hour_elapsed .表达式语言值应为 ${now():toNumber():gt(${entryDate:toNumber():plus(3600000)})} .

这个表达:

  • 获取当前时间并将其转换为自纪元以来的毫秒数 ( now():toNumber() )
  • 获取 entryDate流文件的属性(当它进入 NiFi 时)并将其转换为毫秒并增加一小时( entryDate:toNumber():plus(3600000) [ 3600000 == 60*60*1000 ])
  • 比较两个数字 ( a:gt(${b}) )

  • 如果这实际上不是您流程的开始,您可以使用 UpdateAttribute处理器在流程的任何点插入任意时间戳并从那里计算。

    我还建议设置 产量持续时间运行计划 RouteOnAttribute处理器比平常高得多,因为您不希望该处理器持续运行,因为它不会工作。我建议将其设置为 1 或 5 分钟开始,因为您已经引入了一个小时的延迟。

    Flow showing timing loop

    关于apache-nifi - 在将流文件移动到 NiFi 中的下一个处理器之前引入时间延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61875809/

    相关文章:

    ssl - 使用 SSL Comodo 证书保护 NIFI

    apache-nifi - 我们可以通过cli与nifi通信吗?

    apache-nifi - NiFi - 如何在 ExecuteStreamCommand 中引用 flowFile?

    java - 如何使用NiFi进程 session 迁移功能?

    Apache NiFi 中的 Json 字段匹配

    apache-nifi - NIFI QueryDatabaes org.apache.avro.SchemaParseException : Illegal character in: COUNT(*);

    apache-nifi - Apache NiFi 和 MTConnect

    ssl - 发送数据时NIFI 1.8 Site-to-Site SSLHandshakeException

    java - 如何使用 NiFi 中的新 DBCPConnectionPoolLookup 列出一组数据库中的数据库表?

    java - 在 Apache NiFi 中读取 UCS-2 LE BOM 编码文件时出现问题