我正在尝试使用 Java API 监视给定组的消费者偏移量。我创建了一个额外的消费者,它不订阅任何主题,而只是调用 consumer.comfilled(topic)
来获取偏移量信息。这种方法可行,但是:
为了进行测试,我仅使用一个真实的消费者(即订阅该主题的消费者)。当我使用 close()
将其关闭并稍后重新启动时,订阅和第一次消费消息之间需要 27 秒,尽管我使用了 poll(1000)
.
我猜测这与重新平衡可能被非订阅消费者混淆有关。这可能吗?有没有更好的方法来使用Java API监视偏移量(我知道命令行工具,但需要使用API)。
最佳答案
检查主题偏移量的方法有多种,具体取决于您想要的目的,除了上面描述的“ promise ”之外,还有两个选项:
1) 如果你想知道消费者下次线程启动时从broker开始获取数据的偏移量id,那么你必须使用“position”作为
long offsetPosition;
TopicPartition tPartition = new TopicPartition(topic,partitionToReview);
offsetPosition = kafkaConsumer.position(tPartition);
System.out.println("offset of the next record to fetch is : " + position);
2) 在从 kafkaConsumer 执行轮询后,从 ConsumerRecord 对象调用“offset()”方法
Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator();
while(it.hasNext()){
ConsumerRecord<byte[],byte[]> record = it.next();
System.out.println("offset : " + record.offset());
}
关于java - Kafka 0.9新的consumer api ---如何只观察consumer offsets,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35333935/