有没有办法找到分配给同一消费者组的所有消费者的整个 kafka 滞后?
我只能获取指定分区的延迟。例如假设只有一个分区分配给一个消费者,下面的代码只会给该分区带来延迟。不适用于其他分区。
Set<TopicPartition> partitionSet = consumer.assignment();
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitionSet);
for(TopicPartition tp : partitionSet) {
LOG.info("Topic:{}, EndOffset:{}, currentOffset:{}, LAG:{}",
tp.topic(), endOffsets.get(tp), consumer.position(tp), endOffsets.get(tp)-consumer.position(tp));
}
基本上,想要找到所有分区的滞后总和,以了解某个主题的所有消费者(同一组)滞后了多少。
此外,是否有任何类似于 kafka-consumer-groups 的可用 api,并传递 bootstrap-server 和 group 作为参数来查找滞后?
./kafka-consumer-groups.sh --bootstrap-server --group --describe
最佳答案
以编程方式实现此目的的正确方法是使用 AdminClient API:
使用
listConsumerGroupOffsets()
获取该组的提交偏移量.获取日志结束偏移量。目前您需要启动一个 Consumer 并调用
endOffsets()
对于步骤 1 中检索到的所有分区。在 Kafka 2.5(预计 2020 年 2 月结束)中,有一个新的 AdminClient API 用于检索日志结束偏移量
listOffsets()
,这样就可以单独使用 AdminClient 来检索延迟。对于每个分区,从日志结束偏移量(步骤 2)中减去提交的偏移量(步骤 1)。
这基本上就是kafka-consumer-groups.sh
在幕后所做的事情。所以检查implementation of this tool如果你愿意的话。
关于java - 无法从所有分区获取 Kafka 滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59988199/