java - Kafka 0.9新的consumer api ---如何只观察consumer offsets

标签 java kafka-consumer-api

我正在尝试使用 Java API 监视给定组的消费者偏移量。我创建了一个额外的消费者,它不订阅任何主题,而只是调用 consumer.comfilled(topic) 来获取偏移量信息。这种方法可行,但是:

为了进行测试,我仅使用一个真实的消费者(即订阅该主题的消费者)。当我使用 close() 将其关闭并稍后重新启动时,订阅和第一次消费消息之间需要 27 秒,尽管我使用了 poll(1000).

我猜测这与重新平衡可能被非订阅消费者混淆有关。这可能吗?有没有更好的方法来使用Java API监视偏移量(我知道命令行工具,但需要使用API​​)。

最佳答案

检查主题偏移量的方法有多种,具体取决于您想要的目的,除了上面描述的“ promise ”之外,还有两个选项:

1) 如果你想知道消费者下次线程启动时从broker开始获取数据的偏移量id,那么你必须使用“position”作为

long offsetPosition;
TopicPartition tPartition = new TopicPartition(topic,partitionToReview);
    offsetPosition = kafkaConsumer.position(tPartition);
    System.out.println("offset of the next record to fetch is : " + position);

2) 在从 kafkaConsumer 执行轮询后,从 ConsumerRecord 对象调用“offset()”方法

Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator();
while(it.hasNext()){
ConsumerRecord<byte[],byte[]> record = it.next();
System.out.println("offset : " + record.offset());
}

关于java - Kafka 0.9新的consumer api ---如何只观察consumer offsets,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35333935/

相关文章:

java - kafka连接错误: cannot find or load main class

apache-kafka - 从副本消费

java - 如何使我的自定义位图按钮字段使用透明背景?

java - SQLite3 和 System.currentTimeMillis()

java - 设置构建路径遇到问题 - 资源存在不同情况

java - Java Applet 中的缓存

java - 通用 3D 演示软件的建议框架

apache-kafka - 卡夫卡 : How to get last modified time for a topic i. e.添加到主题任何分区的最后一条消息

java - 如何在 spring-kafka 中的每条 kafka 消息后提交?

apache-kafka - 卡夫卡重新分区