java - 如何在 Java 中从特定偏移量开始消费来自 Kafka 的消息

标签 java apache-kafka kafka-consumer-api

从最早的开始阅读:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

阅读最新内容:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

但是如果我想开始,例如:从第 18 次提交开始,我应该使用哪一行?

最佳答案

您可以使用seek()为了强制消费者从特定的偏移量开始消费:

public void seek(TopicPartition partition, long offset)

Overrides the fetch offsets that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets


例如,假设您想从偏移量 18 开始:

TopicPartition topicPartition = new TopicPartition("myTopic", 0);
Long startOffset = 18L;
    
List<TopicPartition> topics = Arrays.asList(topicPartition);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);

// Then consume messages as normal..

关于java - 如何在 Java 中从特定偏移量开始消费来自 Kafka 的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60375581/

相关文章:

java - JOOQ Log4j - 缺少控制台输出

apache-kafka - kappa-architecture 和 lambda-architecture 有什么区别

java - Kafka Streams App - 计数和总和

java - 卡夫卡不从头开始就无法消费-Java

java - 从多个 Kafka 主题同时读取 1 条消息

java - Spark - MultiMap可以转换为JAVA中的DataFrame吗

java - 当相同键的值的数据类型发生变化时,将 JSON 解析为 POJO

ios - 如何通过 Web 服务器作为中间层将 Kafka 消费者集成到移动应用程序?

apache-kafka - 卡夫卡0.11重置消费组的偏移量--to-datetime

Java - 对象引用或标识符?