java - 如何将 Kafka Producer 的指标报告给 prometheus(使用 spring boot)

标签 java apache-kafka prometheus spring-kafka spring-micrometer

我正在使用 spring-integration 来处理从 UDP 端点到 kafka 的数据流。我已经使用消费者和生产者配置在 @Configuration 中将回复KafkaTemplate 初始化为@Bean。当我的服务器启动并发送一些 udp 请求后,我可以看到消费者的指标。但是,即使在生产者配置中设置了 jmx 报告器后,我也看不到生产者的指标。

我试图不设置生产者指标报告器,假设它会像消费者指标一样自动出现(那里没有额外的配置)

生产者配置

Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaAvroSerializer.class);
        configProps.put("schema.registry.url", "http://schema-regisry-server:8081");
        configProps.put(
                ProducerConfig.RETRIES_CONFIG,
                3);
        configProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 500);
        configProps.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5000);
        configProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
        configProps.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO");

        printConfigProps(configProps);
        return new DefaultKafkaProducerFactory<>(configProps);

消费者配置
Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put("schema.registry.url", "http://schema-regisry-server:8081");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-integration");
        // automatically reset the offset to the earliest offset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return properties;

kafka 模板创建
@Bean
    public ReplyingKafkaTemplate<String, DataModel, DataModel> replyKafkaTemplate(ProducerFactory<String, DataModel> pf, KafkaMessageListenerContainer<String, DataModel> container) {
        ReplyingKafkaTemplate<String, DataModel, DataModel> template = new ReplyingKafkaTemplate<>(pf, container);
        template.start();
        return template;
    }

监听器容器创建:
@Bean
    public KafkaMessageListenerContainer<String, DataModel> replyContainer(ConsumerFactory<String, DataModel> cf) {
        ContainerProperties containerProperties = new ContainerProperties(destinationTopic);
        containerProperties.setGroupId("test");
        return new KafkaMessageListenerContainer<>(cf, containerProperties);
    }

消费者工厂创建
@Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

最佳答案

2.3.0 之前的 Spring Boot 2 版本默认仅公开消费者指标。
Spring Boot 2.3.0(几周前发布)依赖于 Micrometer 1.4,它默认公开消费者和生产者指标。
如果您不能使用最新版本的 Spring Boot,则必须自己实现。

关于java - 如何将 Kafka Producer 的指标报告给 prometheus(使用 spring boot),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55916697/

相关文章:

apache-kafka - CDH 5.8.3 中的 Spark Streaming Kafka 以 yarn-cluster 模式集成

docker - 领导者去世时改变动物园管理员集群的领导

java - 在 Java 中读取可能被另一个进程修改的文件

java - 使用 AI 旋转灰度图像增加对比度

java - 如何在 Qt for android 上获取应用程序参数

spring-boot - Springboot网关Prometheus采集海量数据

docker - 优雅地停止 prometheus docker 容器

java - 从堆中删除随机元素

java - 使用kafka+java流式处理数据

kubernetes - Sumo Logic kubernetes 集成要求不存在 Prometheus