java - 当偏移量存在时,如何消耗kafka frome的最大偏移量?

标签 java apache-kafka kafka-producer-api

我已阅读文档并找到了配置“auto.offset.reset”:

当 ZooKeeper 中没有初始偏移量或偏移量超出范围时该怎么办:

问题是我曾经使用组id来消费kafka,我想保留组id但放弃旧消息。

我该怎么做?

最佳答案

您可以尝试在消费者中的 ConsumerRebalanceListener.onPartitionsAssigned 中执行此操作:

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {   
    kafkaConsumer.seekToEnd(partitions);
}

关于java - 当偏移量存在时,如何消耗kafka frome的最大偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48125786/

相关文章:

java - Kafka 0.9.0.1 Java Consumer陷入awaitMetadataUpdate()

java - 将 Instant 从大纪元时间转换为微秒

java - cassandra db 慢速连接

java - Android - Imageview Fragment 延迟

java - 是否可以使用 KafkaIO.read 为单个管道的两个不同集群指定 Kafka 引导服务器?

python - 在 docker 中运行时,python 脚本无法导入 kafka 库

kubernetes - 在 Kubernetes 上创建 kafka 集群

java - 如何从 Selenium Java 中禁用的输入字段中获取文本

java - KafkaProducer不发送记录

apache-kafka - 带有 .Net 客户端的 Kafka