java - Kafka 0.9新的consumer api ---如何只观察consumer offsets

标签 java kafka-consumer-api

我正在尝试使用 Java API 监视给定组的消费者偏移量。我创建了一个额外的消费者,它不订阅任何主题,而只是调用 consumer.comfilled(topic) 来获取偏移量信息。这种方法可行,但是:

为了进行测试,我仅使用一个真实的消费者(即订阅该主题的消费者)。当我使用 close() 将其关闭并稍后重新启动时,订阅和第一次消费消息之间需要 27 秒,尽管我使用了 poll(1000).

我猜测这与重新平衡可能被非订阅消费者混淆有关。这可能吗?有没有更好的方法来使用Java API监视偏移量(我知道命令行工具,但需要使用API​​)。

最佳答案

检查主题偏移量的方法有多种,具体取决于您想要的目的,除了上面描述的“ promise ”之外,还有两个选项:

1) 如果你想知道消费者下次线程启动时从broker开始获取数据的偏移量id,那么你必须使用“position”作为

long offsetPosition;
TopicPartition tPartition = new TopicPartition(topic,partitionToReview);
    offsetPosition = kafkaConsumer.position(tPartition);
    System.out.println("offset of the next record to fetch is : " + position);

2) 在从 kafkaConsumer 执行轮询后,从 ConsumerRecord 对象调用“offset()”方法

Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator();
while(it.hasNext()){
ConsumerRecord<byte[],byte[]> record = it.next();
System.out.println("offset : " + record.offset());
}

关于java - Kafka 0.9新的consumer api ---如何只观察consumer offsets,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35333935/

相关文章:

Java HashSet 包含预期值以外的值,而不是预定义值

apache-kafka - 在使用 kafka-python 时定期轮询 Kafka 消费者的最佳方法是什么?

java - Java spark kafka 流中的对象不可序列化(org.apache.kafka.clients.consumer.ConsumerRecord)

apache-kafka - shell脚本中的kafka消费者

java - 如何将记录从 Kafka 传递到方法?

java - 递归输出歧义

java - 我们可以重写 Java 中的构造函数吗?构造函数可以是私有(private)的吗?

java - Spring AOP AspectJ 尝试建议与切入点不匹配的类

java - 使用 glassfish 服务器的 JMS 客户端

apache-kafka - 卡夫卡生产者 : How to fairly balance the messages between kafka consumers (not between partitions)