我正在使用 Java 编写一个 kafka
消费者。我想保持消息的实时性,所以如果等待消费的消息太多,比如1000条或更多,我应该放弃未消费的消息,从最后一个偏移量开始消费。
针对这个问题,我尝试比较一个topic(只有1个partition)的最后提交的偏移量和结束偏移量,如果这两个偏移量的差大于一定的量,我会将最后提交的偏移量设置为主题作为下一个偏移量,以便我可以放弃那些多余的消息。
现在我的问题是如何获取一个话题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?
最佳答案
新的消费者也很复杂。
//分配主题
consumer.assign();
//寻找话题的结尾
consumer.seekToEnd();
//位置是最新的偏移量
consumer.position();
关于java - 如何获取 kafka 主题分区的最后/结束偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38428196/