java - 卡夫卡不从头开始就无法消费-Java

标签 java apache-kafka kafka-consumer-api kafka-producer-api

我是 kafka 的新手,正在尝试使用 kafka 构建生产者-消费者应用程序。在这里,我能够向 kalka 生成消息,但是当我尝试使用消费者消费它时,它返回 0 条记录。

我检查我的消费者组的偏移量,我可以看到偏移量等于日志长度是相同的(在我的例子中是 1M - 与记录数相同)。

如果我在创建我的消费者时使用这个配置属性,它会从头开始读取。

configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "最早");

但我的要求是,如果我重启消费者,它应该像AMQ一样从之前的终点开始。

这里有什么我遗漏的吗?我认为抵消应该只在消费者投票后改变。为什么在开始时将其设置为最大记录长度?

最佳答案

如链接所述,您需要考虑几种情况:

  1. 启动一个新的消费者(new group.id):对于这种情况,不会有committed offset,因此消费者根据参数设置开始读取auto .offset.reset

  2. 重新启动消费者(重用 group.id):对于这种情况,消费者将从它停止的地方恢复。参数设置 auto.offset.reset 被忽略。

因此,对于场景 (1),您只需“配置”您的起始位置即可。对于场景 (2),您的起始位置是“固定的”(即,始终是最后提交的偏移量)并且不能通过配置更改。但是,您始终可以在首次调用 poll() 之前执行 .seekToBeginning().seekToEnd() 并阅读整个主题或从主题末尾开始。调用 .seekXX() 将“覆盖”最后提交的偏移量,并允许您以您喜欢的任何偏移量开始消费。请注意,还有 seek() 接受“偏移参数”,因此您可以指定要开始使用的任何偏移量。

关于java - 卡夫卡不从头开始就无法消费-Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43073725/

相关文章:

java - 如何使用Spring Kafka的Acknowledgement.acknowledge()方法进行手动提交

java - JUnit - 没有可运行的方法

java - 如何使用 Java 运行带参数的 python 代码,./AdafruitDHT.py 22 4

java - 编译成JAR形式的Java编译器?

hadoop - 在 Kerberos 下为 Kafka 启动 Spark-Submit 作业

apache-kafka - 如何使用 kafka-configs 查看和设置 offsets.retention.minutes

java - 重新启动在远程计算机上运行的远程 tomcat?

mongodb - jhipster - 如何掌握数据管理和微服务通信?

java - Kafka 2消费者工厂监听器没有持续连接

apache-kafka - 生产者和消费者是否需要指定partition