我们正在通过 Kafka 主题从上游系统接收 Json 消息。要求是以一定的时间间隔将这些消息存储到HDFS中。由于我们要存储到 HDFS 中,因此我们希望将一定数量的这些记录合并到单个文件中。根据 NiFi documentation我们正在使用"MergeRecords"处理器。
关于即将到来的记录:##
- 这些是具有嵌套结构的多行 JSON 消息。
- 它们基于相同的架构(它们是从单个 Kafka 主题中选取的)
- 这些是经过验证的消息,甚至 NiFi 处理器也能够解析它。所以从 Schema 的角度来看,JSon 消息显然没有问题
当前配置
下面是处理器配置的快照。 NiFi版本:1.8
预期行为
对于上述配置,预计 MergeRecords 应针对阈值之一进行加权,即最大记录(100000) 或最大 Bean 大小(100KB)。
观察到的行为
但据观察,在达到任一阈值之前,bean 就已经被捆绑在一起了。它仅针对 2 个 5KB 大小的记录触发 Bean 形成。
如果您可以帮助分析和/或指出为什么 MergeRecord 处理器未按照配置运行?
最佳答案
也许它没有等待最大记录(100000)或最大 Bean 大小(100KB),因为它达到了您首先指定的最大 Bin Age(1 分钟)。
最大 Bin Age 在文档中定义为:
The maximum age of a Bin that will trigger a Bin to be complete.
关于java - 为什么 Apache NiFi MergeRecords 处理器没有按照配置合并记录数量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55392882/