我们使用 Prometheus 和 Grafana 来监控我们的 Kafka 集群。
在我们的应用程序中,我们使用 Kafka 流,Kafka 流有可能因异常而停止。我们正在记录事件 setUnCaughtExceptionHandler
但是,我们还需要在流停止时发出某种警报。
我们目前拥有的是,jmx_exporter 作为代理运行并通过端点公开 Kafka 指标,而 prometheus 从端点获取指标。
我们没有看到任何类型的指标来提供每个主题的活跃消费者数量。我们错过了什么吗?关于如何获取活跃消费者的数量并在消费者停止时发送警报的任何建议。
最佳答案
我们有类似的需求,并将每个分区的 Kafka 消费者滞后添加到 Grafana 中,并且还在滞后超过指定阈值时添加了警报(每个主题的阈值应该不同,具体取决于负载,例如,对于某些主题,它可能是 10,对于高负载 - 100000)。所以如果你有更多,例如1000 条未处理的消息,您将收到警报。
您可以为每个 kafka 流添加状态监听器,如果流处于错误状态,记录错误或发送电子邮件:
kafkaStream.setStateListener((newState, oldState) -> {
log.info("Kafka stream state changed [{}] >>>>> [{}]", oldState, newState);
if (newState == KafkaStreams.State.ERROR || newState == KafkaStreams.State.PENDING_SHUTDOWN) {
log.error("Kafka Stream is in [{}] state. Application should be restarted", newState);
}
});
您还可以添加健康检查指示器(例如通过 REST 端点或通过 spring-boot
HealthIndicator
)来提供流是否正在运行的信息:
KafkaStreams.State streamState = kafkaStream.state();
state.isRunning();
我也没有找到任何 kafka 流指标来提供有关活跃消费者或可用连接分区的信息,但对我来说,如果 kafka 流提供此类数据(并希望它在未来的版本中可用),那就太好了。
关于java - 监控 Kafka 主题的消费者数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51445293/