java - Spring 集成 Kafka 和管理偏移量

标签 java spring spring-integration apache-kafka

我正在使用 Spring Integration Kafka 扩展来读取和处理来自 Kafka in Java 应用程序的消息。据我所知,它使用不允许在 Zookeeper 中完全管理偏移量的高级消费者 API。

在我的例子中,我们有 auto.commit.enable=false 以便在处理消息后向 Zookeeper 提交偏移量。如果处理失败,则不会提交偏移量,我们应该尝试在某个配置的时间内再次处理相同的消息,从 Zookeeper 的偏移量开始。但它不起作用,因为我认为 Apache Kafka 客户端在内存中保持偏移量。

我发现 kafka.consumer.ConsumerIterator 处理偏移量,如果其中的 consumedOffset 大于 Zookeeper 中的偏移量,那么它将使用已消耗的偏移量读取消息。

所以,我想知道是否有任何方法可以重置 Kafka 客户端中的偏移量以从 Zookeeper 中的偏移量开始读取?

最佳答案

你不介意分享你的<int-kafka>吗?配置并指出哪里有问题?

也许这样做就够了 MessageLeftOverTracker.clearMessagesLeftOver()

我不太了解 Kafka,但知道 Spring Integration 的作用。

关于java - Spring 集成 Kafka 和管理偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24286876/

相关文章:

java - 如何更新特定的 Grails 域类属性?

java - 使用 Spring JPA 存储库进行不同计数

java - Kerberos AD Spnego 身份验证在一台计算机上失败,但在另一台计算机上失败

java - 在服务器上运行spring项目

java - 如何使用 spring 5 响应式(Reactive)存储库以“即发即忘”模式进行保存

java - 如何在 Spring-Integration 中使用相同的 MessageChannel 进行发送和接收?

java - 从java(android)中的文件路径获取目录

java - Spring Security 在运行时注销用户

java - 我们可以使用 Axis2 与 spring 集成吗

java - 通过 .handle() 配置服务激活器的输出 channel