apache-kafka - Kafka 一次性生产者消费者

标签 apache-kafka kafka-consumer-api kafka-producer-api

我正在为一个简单的数据管道实现 Exactly-once 语义,并使用 Kafka 作为消息代理。我可以通过设置 set enable.idempotence=true 强制 Kafka 生产者将每个生成的记录写入一次。

但是,在消费方面,我需要保证消费者只读取每条记录一次(我对将消费的记录存储到外部系统或另一个刚刚处理的 Kafka 主题不感兴趣)。为了实现这一点,我必须确保轮询记录得到处理,并且它们的偏移量以原子/事务方式提交给 __consumer_offsets 主题(同时成功/失败)。

在这种情况下,我是否需要求助于 Kafka 事务 API 在消费者轮询循环中创建事务生产者,在事务中我执行:(1) 处理已消耗的记录并 (2) 在之前提交它们的偏移量结束交易。在这种情况下,正常的 commitSync/commitAsync 会起作用吗?

最佳答案

"on the consumption front I need to guarantee that the consumer reads each record exactly once"

Gopinath 的答案很好地解释了如何在 KafkaProducer 和 KafkaConsumer 之间实现恰好一次。这些配置(连同 KafkaProducer 中 Transaction API 的应用)保证了生产者发送的所有数据将在 Kafka 中仅存储一次。但是,它并不能保证 Consumer 只读取一次数据。当然,这取决于您的偏移量管理。

无论如何,我理解您的问题,您想知道消费者本身如何一次性处理已消费的消息。

为此,您需要以原子方式自行管理偏移量。这意味着,您需要围绕构建自己的“交易”

  • 从 Kafka 获取数据,
  • 处理数据,以及
  • 在外部存储已处理的偏移量。

commitSync 和 commitAsync 方法不会让您走得太远,因为它们只能确保 Consumer 内最多一次或至少一次处理。此外,您的处理幂等也是有益的。

有一个不错的blog这解释了如何使用ConsumerRebalanceListener 并将偏移量存储在本地文件系统中的实现。还提供了完整的代码示例。

"do I need to resort to Kafka transaction APIs to create a transactional producer in the consumer polling loop"

Transaction API 仅适用于 KafkaProducers,据我所知不能用于您的偏移量管理。

关于apache-kafka - Kafka 一次性生产者消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64292775/

相关文章:

java - 为什么元数据添加到这个 Kafka 连接器的输出中?

python - 如何在一条消息中为kafka生成音频(.wav)

apache-kafka - 有没有办法为 kafka 生产者发送的消息设置延迟?

scala - 如何在 Scala 中编写 Kafka Producer

linux - 将 kafka 代理 ID(zookeeper-shell.sh 结果)保存到 bash 脚本中的变量

multithreading - Kafka 高级消费者 : Can partitions have multiple threads consuming it?

java - 卡夫卡不从头开始就无法消费-Java

java - 我在某些计算机上收到 "Topic not present in metadata after 60000 ms"消息

java - Kafka - 如何在 Producer 类中获取失败的消息详细信息

java - 等待 kafkaTemplate 待处理的 future