java - 如何获取 kafka 主题分区的最后/结束偏移量?

标签 java apache-kafka kafka-consumer-api

我正在使用 Java 编写一个 kafka 消费者。我想保持消息的实时性,所以如果等待消费的消息太多,比如1000条或更多,我应该放弃未消费的消息,从最后一个偏移量开始消费。

针对这个问题,我尝试比较一个topic(只有1个partition)的最后提交的偏移量和结束偏移量,如果这两个偏移量的差大于一定的量,我会将最后提交的偏移量设置为主题作为下一个偏移量,以便我可以放弃那些多余的消息。

现在我的问题是如何获取一个话题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?

最佳答案

新的消费者也很复杂。

//分配主题 consumer.assign();

//寻找话题的结尾 consumer.seekToEnd();

//位置是最新的偏移量 consumer.position();

关于java - 如何获取 kafka 主题分区的最后/结束偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38428196/

相关文章:

java - kafka spout不发出数据

spring-boot - 休息 Controller 通过Spring kafka返回kafka中的记录

apache-kafka - kafka 新版本 2.1.0 broker 无故挂起

apache-kafka - Kafka自动提交间隔最佳实践

用于将 XML 转换为语法彩色 HTML 的 Java 库

java - 修剪输入框并添加到上下文 JSTL

java - Stream.of(int[]) 返回 Stream<int[]> 而 Stream.of(String[]) 返回 Stream<String> 为什么?

java - 如何解析 Joda Time 中没有年份的日期时间字符串?

Java,如何在 apache kafka 中获取主题中的消息数

apache-kafka - 这个特定的 Kafka Streams 拓扑是否引入了竞争条件?