apache-kafka - Apache NiFi 和 Kafka 集成

标签 apache-kafka apache-nifi

我不确定这个问题是否已经在某个地方得到解决,但我在互联网上的任何地方都找不到有用的答案。

我正在尝试将 Apache NiFi 与 Kafka 集成 - 使用 Apache NiFi 使用来自 Kafka 的数据。在继续此之前,我想到了以下几个问题。

Q-1) 我们的用例是 - 从 Kafka 实时读取数据,解析数据,对数据进行一些基本验证,然后将数据推送到 HBase。我知道 Apache NiFi 是进行这种处理的合适人选,但是如果我们正在处理的 JSON 是复杂的,那么构建工作流有多容易?我们曾经 最初考虑使用 Java 代码做同样的事情,但后来意识到这可以在 NiFi 中以最小的努力完成。请注意,我们正在处理的 80% 的数据来自 Kafka 会是简单的 JSON,但 20% 会是复杂的(涉及数组)

Q-2) 编写 Kafka 消费者时最棘手的部分是正确处理偏移量。 Apache NiFi 在消费 Kafka 主题时如何处理偏移量?如何抵消 如果在处理时触发重新平衡,是否会正确提交? Spring-Kafka 等框架提供了提交偏移量的选项(在某种程度上)以防万一 重新平衡在处理过程中触发。 NiFi 如何处理这个问题?

最佳答案

我在生产中的 3 节点 NiFi 集群中部署了多个管道,其中一个与您的用例相似。

Q-1) 为您的用例构建管道非常简单易行。由于您没有提到 processing json 中涉及的任务类型,我假设是通用任务。涉及 JSON 的通用任务可以是模式验证,可以使用 ValidateRecord 处理器实现,使用 JoltTransformRecord 处理器进行转换,使用 EvaluateJsonPath 提取属性值,转换将 json 转换为其他格式说 avro 使用 ConvertJSONToAvro 处理器等。 Nifi 让您可以灵活地独立扩展管道中的每个阶段/处理器。例如,如果使用 JoltTransformRecord 的转换非常耗时,您可以通过在 Scheduling 下配置 Concurrent Tasks 来扩展它以在每个节点中运行 N 个并发任务标签。

Q-2) 就 ConsumeKafka_2_0 处理器而言,偏移量管理是通过先提交 NiFi 处理器 session 然后提交 Kafka 偏移量来处理的,这意味着我们至少有一次保证默认。 当 Kafka 触发给定分区的消费者重新平衡时,处理器会快速提交(处理器 session 和 Kafka 偏移量)它所拥有的一切,并将消费者返回池以供重用。

ConsumeKafka_2_0 在消费者组的成员发生变化或成员的订阅发生变化时处理提交偏移。这可能发生在进程死亡、添加新进程实例或旧实例在失败后恢复生机时。还注意订阅主题的分区数量进行管理调整的情况。

关于apache-kafka - Apache NiFi 和 Kafka 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55741543/

相关文章:

python - 如何在 NiFi 的 ExecuteStreamCommand 处理器中读取文件

apache-nifi - ListenHttp Apache NIfi 通用基本路径

java - 来自多个主题 Kafka 的消息顺序

hadoop - 合流 HDFS 连接器 : How can I read from the latest offset when there are no hdfs files?

nginx - 允许通过nginx访问kafka

rest - 使用维基百科的 RecentChanges API 进行实时数据流

apache-nifi - Apache NIFI : Recovering from Flowfile repository issue

hadoop - 在大数据平台上从 'Near Real Time'中的新闻Web API提取数据的最佳方法

docker - 设置太阳鸟遥测Kafka DRUID和超集

python - 卡夫卡超时错误 ('Failed to update metadata after 60.0 secs.' )