我已阅读文档并找到了配置“auto.offset.reset”:
当 ZooKeeper 中没有初始偏移量或偏移量超出范围时该怎么办:
问题是我曾经使用组id来消费kafka,我想保留组id但放弃旧消息。
我该怎么做?
最佳答案
您可以尝试在消费者中的 ConsumerRebalanceListener.onPartitionsAssigned
中执行此操作:
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
kafkaConsumer.seekToEnd(partitions);
}
关于java - 当偏移量存在时,如何消耗kafka frome的最大偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48125786/