我不确定这个问题是否已经在某个地方得到解决,但我在互联网上的任何地方都找不到有用的答案。
我正在尝试将 Apache NiFi 与 Kafka 集成 - 使用 Apache NiFi 使用来自 Kafka 的数据。在继续此之前,我想到了以下几个问题。
Q-1) 我们的用例是 - 从 Kafka 实时读取数据,解析数据,对数据进行一些基本验证,然后将数据推送到 HBase。我知道 Apache NiFi 是进行这种处理的合适人选,但是如果我们正在处理的 JSON 是复杂的,那么构建工作流有多容易?我们曾经 最初考虑使用 Java 代码做同样的事情,但后来意识到这可以在 NiFi 中以最小的努力完成。请注意,我们正在处理的 80% 的数据来自 Kafka 会是简单的 JSON,但 20% 会是复杂的(涉及数组)
Q-2) 编写 Kafka 消费者时最棘手的部分是正确处理偏移量。 Apache NiFi 在消费 Kafka 主题时如何处理偏移量?如何抵消 如果在处理时触发重新平衡,是否会正确提交? Spring-Kafka 等框架提供了提交偏移量的选项(在某种程度上)以防万一 重新平衡在处理过程中触发。 NiFi 如何处理这个问题?
最佳答案
我在生产中的 3 节点 NiFi 集群中部署了多个管道,其中一个与您的用例相似。
Q-1) 为您的用例构建管道非常简单易行。由于您没有提到 processing
json 中涉及的任务类型,我假设是通用任务。涉及 JSON 的通用任务可以是模式验证,可以使用 ValidateRecord
处理器实现,使用 JoltTransformRecord
处理器进行转换,使用 EvaluateJsonPath
提取属性值,转换将 json 转换为其他格式说 avro 使用 ConvertJSONToAvro
处理器等。
Nifi 让您可以灵活地独立扩展管道中的每个阶段/处理器。例如,如果使用 JoltTransformRecord 的转换非常耗时,您可以通过在 Scheduling
下配置 Concurrent Tasks
来扩展它以在每个节点中运行 N
个并发任务标签。
Q-2) 就 ConsumeKafka_2_0
处理器而言,偏移量管理是通过先提交 NiFi 处理器 session 然后提交 Kafka 偏移量来处理的,这意味着我们至少有一次保证默认。
当 Kafka 触发给定分区的消费者重新平衡时,处理器会快速提交(处理器 session 和 Kafka 偏移量)它所拥有的一切,并将消费者返回池以供重用。
ConsumeKafka_2_0 在消费者组的成员发生变化或成员的订阅发生变化时处理提交偏移。这可能发生在进程死亡、添加新进程实例或旧实例在失败后恢复生机时。还注意订阅主题的分区数量进行管理调整的情况。
关于apache-kafka - Apache NiFi 和 Kafka 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55741543/