java - 无法从所有分区获取 Kafka 滞后

标签 java apache-kafka

有没有办法找到分配给同一消费者组的所有消费者的整个 kafka 滞后?

我只能获取指定分区的延迟。例如假设只有一个分区分配给一个消费者,下面的代码只会给该分区带来延迟。不适用于其他分区。

Set<TopicPartition> partitionSet = consumer.assignment();
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitionSet);
for(TopicPartition tp : partitionSet) {
            LOG.info("Topic:{}, EndOffset:{}, currentOffset:{}, LAG:{}",
                    tp.topic(), endOffsets.get(tp), consumer.position(tp), endOffsets.get(tp)-consumer.position(tp));
        }

基本上,想要找到所有分区的滞后总和,以了解某个主题的所有消费者(同一组)滞后了多少。

此外,是否有任何类似于 kafka-consumer-groups 的可用 api,并传递 bootstrap-server 和 group 作为参数来查找滞后?

./kafka-consumer-groups.sh --bootstrap-server --group --describe

最佳答案

以编程方式实现此目的的正确方法是使用 AdminClient API:

  1. 使用 listConsumerGroupOffsets() 获取该组的提交偏移量.

  2. 获取日志结束偏移量。目前您需要启动一个 Consumer 并调用 endOffsets()对于步骤 1 中检索到的所有分区。

    在 Kafka 2.5(预计 2020 年 2 月结束)中,有一个新的 AdminClient API 用于检索日志结束偏移量 listOffsets(),这样就可以单独使用 AdminClient 来检索延迟。

  3. 对于每个分区,从日志结束偏移量(步骤 2)中减去提交的偏移量(步骤 1)。

这基本上就是kafka-consumer-groups.sh在幕后所做的事情。所以检查implementation of this tool如果你愿意的话。

关于java - 无法从所有分区获取 Kafka 滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59988199/

相关文章:

java - 连接 https,我无法定义 ssl 上下文,Google App Engine,Java

java - Selenium:启动 IE 时出现意外错误。浏览器缩放级别设置为 122%。应设置为 100%

apache-kafka - 事件溯源是基于编排的 SAGA 模式的增强模式吗?

java - 如何将KStream聚合到固定大小的列表?

java - 更改 JPA 生成的表列的数据类型

java - ND4J在GPU上运行缓慢,但在CPU上运行速度很快

java - 2 个 kafka 消费者使用不同的 SSL 配置读取 2 个不同的主题

ssl - Kafka 中间代理 SSL 握手失败

java - 使用 SoapUI 生成 JAX-WS 客户端而不使用 JAXBElement

apache-kafka - 如何在 Kafka Consumer Group 中将偏移量重置为任意值?