java - 为什么 Apache NiFi MergeRecords 处理器没有按照配置合并记录数量?

标签 java apache-nifi

我们正在通过 Kafka 主题从上游系统接收 Json 消息。要求是以一定的时间间隔将这些消息存储到HDFS中。由于我们要存储到 HDFS 中,因此我们希望将一定数量的这些记录合并到单个文件中。根据 NiFi documentation我们正在使用"MergeRecords"处理器。

关于即将到来的记录:##

  • 这些是具有嵌套结构的多行 JSON 消息。
  • 它们基于相同的架构(它们是从单个 Kafka 主题中选取的)
  • 这些是经过验证的消息,甚至 NiFi 处理器也能够解析它。所以从 Schema 的角度来看,JSon 消息显然没有问题

当前配置

下面是处理器配置的快照。 NiFi版本:1.8

enter image description here

预期行为

对于上述配置,预计 MergeRecords 应针对阈值之一进行加权,即最大记录(100000) 或最大 Bean 大小(100KB)。

观察到的行为

但据观察,在达到任一阈值之前,bean 就已经被捆绑在一起了。它仅针对 2 个 5KB 大小的记录触发 Bean 形成。

如果您可以帮助分析和/或指出为什么 MergeRecord 处理器未按照配置运行?

最佳答案

也许它没有等待最大记录(100000)或最大 Bean 大小(100KB),因为它达到了您首先指定的最大 Bin Age(1 分钟)。

最大 Bin Age 在文档中定义为:

The maximum age of a Bin that will trigger a Bin to be complete.

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.8.0/org.apache.nifi.processors.standard.MergeRecord/index.html

关于java - 为什么 Apache NiFi MergeRecords 处理器没有按照配置合并记录数量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55392882/

相关文章:

java - SQL Server 的 DBCPConnectionPool Controller 服务,jdbc 异常

java - 将 HashMap<Integer, List<String>> 转换为 HashMap<String, HashSet<Integer>>

java - 当输入值jtextfield然后点击按钮java时如何生成ireport

json - 如果某些属性为空,则使用 nifi 中的 jolt 变换规范从 Json 数组中删除 Json 元素

java - 两个 Java 异常,但对 Linux 上的 Java 来说是新的。通过 Apache 使用 NiFi

excel - 在 Nifi 数据流中将 .xls 文件转换为 json 文件的位置?

java - 空白页面作为 servlet 中的输出出现

java - MySQL 连接消失 Java

java - 如何在多态树中使用Java泛型类型参数

java - 根据NiFi中的内容更新属性