我正在使用 Spring Integration Kafka 扩展来读取和处理来自 Kafka in Java 应用程序的消息。据我所知,它使用不允许在 Zookeeper 中完全管理偏移量的高级消费者 API。
在我的例子中,我们有 auto.commit.enable=false 以便在处理消息后向 Zookeeper 提交偏移量。如果处理失败,则不会提交偏移量,我们应该尝试在某个配置的时间内再次处理相同的消息,从 Zookeeper 的偏移量开始。但它不起作用,因为我认为 Apache Kafka 客户端在内存中保持偏移量。
我发现 kafka.consumer.ConsumerIterator 处理偏移量,如果其中的 consumedOffset 大于 Zookeeper 中的偏移量,那么它将使用已消耗的偏移量读取消息。
所以,我想知道是否有任何方法可以重置 Kafka 客户端中的偏移量以从 Zookeeper 中的偏移量开始读取?
最佳答案
你不介意分享你的<int-kafka>
吗?配置并指出哪里有问题?
也许这样做就够了 MessageLeftOverTracker.clearMessagesLeftOver()
?
我不太了解 Kafka,但知道 Spring Integration 的作用。
关于java - Spring 集成 Kafka 和管理偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24286876/