java - 监控 Kafka 主题的消费者数量

标签 java apache-kafka grafana apache-kafka-streams prometheus

我们使用 Prometheus 和 Grafana 来监控我们的 Kafka 集群。

在我们的应用程序中,我们使用 Kafka 流,Kafka 流有可能因异常而停止。我们正在记录事件 setUnCaughtExceptionHandler 但是,我们还需要在流停止时发出某种警报。

我们目前拥有的是,jmx_exporter 作为代理运行并通过端点公开 Kafka 指标,而 prometheus 从端点获取指标。

我们没有看到任何类型的指标来提供每个主题的活跃消费者数量。我们错过了什么吗?关于如何获取活跃消费者的数量并在消费者停止时发送警报的任何建议。

最佳答案

我们有类似的需求,并将每个分区的 Kafka 消费者滞后添加到 Grafana 中,并且还在滞后超过指定阈值时添加了警报(每个主题的阈值应该不同,具体取决于负载,例如,对于某些主题,它可能是 10,对于高负载 - 100000)。所以如果你有更多,例如1000 条未处理的消息,您将收到警报。

您可以为每个 kafka 流添加状态监听器,如果流处于错误状态,记录错误或发送电子邮件:

kafkaStream.setStateListener((newState, oldState) -> {
    log.info("Kafka stream state changed [{}] >>>>> [{}]", oldState, newState);
    if (newState == KafkaStreams.State.ERROR || newState == KafkaStreams.State.PENDING_SHUTDOWN) {
        log.error("Kafka Stream is in [{}] state. Application should be restarted", newState);
    }
});

您还可以添加健康检查指示器(例如通过 REST 端点或通过 spring-boot HealthIndicator)来提供流是否正在运行的信息:

KafkaStreams.State streamState = kafkaStream.state(); state.isRunning();

我也没有找到任何 kafka 流指标来提供有关活跃消费者或可用连接分区的信息,但对我来说,如果 kafka 流提供此类数据(并希望它在未来的版本中可用),那就太好了。

关于java - 监控 Kafka 主题的消费者数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51445293/

相关文章:

java - 使用 JCalendar 使用复杂的 SQL 查询来检查日期/房间的可用性

java - 读取超过 90k 条记录的 Excel 文件时出现内存不足异常

grafana - 计算 Group By 语句中的值

nginx 403禁止错误

java - 在 Java 中编译时可能存在有损转换

java - 将 Gluon UI 库添加到 JavaFX 桌面应用程序

Azure 事件中心限制及其与纯 Kafka 集群的比较

java - 警告 org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap 代理 127.0.0.1 :9092 (id: -1 rack: null) disconnected

apache-kafka - Kafka-confluent : How to use pk. mode=record_key 用于 JDBC 接收器连接器中的 upsert 和删除模式?

elasticsearch - Grafana中没有提供带有Elasticsearch数据源的指标