java - 使用@KafkaListener 对 Kafka 消费者进行勇敢追踪

标签 java spring apache-kafka spring-kafka zipkin

我正在使用 Brave 库 https://github.com/openzipkin/brave用于跟踪,现在我想将它也用于 Kafka 消费者。我想避免添加 Spring Sleuth 并仅利用 Brave Kafka 检测 https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients .
对于 Kafka 消费者,我使用 @KafkaListener .代码如下所示:
TestKafkaEndpoint.java

@Service
public class TestKafkaEndpoint {

    @KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
    public void procesMyRequest(@Payload final MyRequest request) {
       // do some magic...
    }
}
和配置类 TestKafkaConfig.java

@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {

    @Bean
    public ConsumerFactory<String, MyRequest> testConsumerFactory() {
        final Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
        return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(testConsumerFactory());
        factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
        return factory;
    }

但是不知道怎么用卡夫卡消费者 使用 Kafka 工厂或利用 时KafkaTracing .有没有人有这方面的经验并让它工作?

最佳答案

我不熟悉它,但它看起来像TracingConsumer是一个简单的消费者包装器:https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79
您应该能够创建 DefaultKafkaConsumerFactory 的子类;覆盖 createConsumer方法 - 监听器容器使用...

this.consumer =
        KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
                this.consumerGroupId,
                this.containerProperties.getClientId(),
                KafkaMessageListenerContainer.this.clientIdSuffix,
                consumerProperties);
... 调用 super.createConsumer(...) 并将其包装在 TracingConsumer 中.
如果您使用的是 2.5.3 或更高版本,则可以添加 ConsumerPostProcessor到 DKCF。
这就是侦探的工作方式:
https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/TraceMessagingAutoConfiguration.java#L263-L285

关于java - 使用@KafkaListener 对 Kafka 消费者进行勇敢追踪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64896332/

相关文章:

java - 如何使用crawler4j提取页面上的所有链接?

java - cmake找不到java,但是已经安装了

java - 在 Tomcat 中使用 JNDI 关闭连接

apache-kafka - 如果将新分区添加到 Kafka 主题,消费者偏移量会发生什么?

security - 即使 Kafka 提供 TLS/SSL 服务,Kafka 能否通过 TCP 与 ZooKeeper 通信?

java - String 方法 isEmpty 在 Android API Levels < 10 中真的不可用吗?

java - 如何让RESTful java客户端在Openstack上发送GET/POST请求?

java jdbc xml sql 条件与 cdata

java - 如何在 Spring 中使用枚举设置 int 属性?

apache-kafka - 扩展 200 多个 Kafka 主题