apache-kafka - 在不提交来自 Kafka 10 消费者的情况下使用消息

标签 apache-kafka kafka-consumer-api

我需要从主题中读取消息,对其进行批处理并将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的起始和终止偏移量都存储在数据库中。为了实现这一点,我通过将分区分配给读取器来为每个分区创建一个 Kafka 消费者,基于先前存储的偏移量,消费者寻找该位置并开始读取。我已经关闭了自动提交并且我不提交来自消费者的偏移量。对于每个批处理,我为每个分区创建一个新的消费者,从存储的最后一个偏移量读取消息并发布到外部系统。您是否发现在不提交偏移量和跨批处理使用相同的消费者组的情况下消费消息有任何问题,但在任何时候每个分区都不会超过一个消费者?

最佳答案

我觉得你的设计很合理。

将偏移量提交给 Kafka 只是 Kafka 中一种方便的内置机制,用于跟踪偏移量。但是,没有任何使用它的要求——您也可以使用任何其他机制来跟踪偏移量(比如在您的情况下使用数据库)。

而且,如果你手动分配分区,无论如何都没有分组管理。所以参数 group.id 没有效果。参见 http://docs.confluent.io/current/clients/consumer.html了解更多详情。

关于apache-kafka - 在不提交来自 Kafka 10 消费者的情况下使用消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40446454/

相关文章:

java - 如何像使用 avro console producer 一样生成 Kafka avro 记录?

java - kafka-设置java启动参数的正确方法

c# - Confluence Kafka Consumer 类中 key 的反序列化意味着什么?

apache-kafka - 卡夫卡有重复的消息

go - Golang Consumer连接Kafka后延迟接收Kafka消息

java - 为什么我的具有相同组 ID 的 Kafka 消费者不平衡?

java - 合适的 Apache Kafka 客户端 jar 是什么

java - 在消费者中反序列化加密的kafka消息

apache-kafka - 是否可以将偏移量重置为 kafka 连接器中的 kafka 消费者组的主题?

java - 同一 IntelliJ 项目中的 Kafka 消费者和生产者