我正在使用 Brave 库 https://github.com/openzipkin/brave用于跟踪,现在我想将它也用于 Kafka 消费者。我想避免添加 Spring Sleuth 并仅利用 Brave Kafka 检测 https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients .
对于 Kafka 消费者,我使用 @KafkaListener .代码如下所示:
TestKafkaEndpoint.java
@Service
public class TestKafkaEndpoint {
@KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
public void procesMyRequest(@Payload final MyRequest request) {
// do some magic...
}
}
和配置类 TestKafkaConfig.java
@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {
@Bean
public ConsumerFactory<String, MyRequest> testConsumerFactory() {
final Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(testConsumerFactory());
factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
return factory;
}
但是不知道怎么用卡夫卡消费者 使用 Kafka 工厂或利用 时KafkaTracing .有没有人有这方面的经验并让它工作?
最佳答案
我不熟悉它,但它看起来像TracingConsumer
是一个简单的消费者包装器:https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79
您应该能够创建 DefaultKafkaConsumerFactory
的子类;覆盖 createConsumer
方法 - 监听器容器使用...
this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
... 调用 super.createConsumer(...) 并将其包装在 TracingConsumer
中.如果您使用的是 2.5.3 或更高版本,则可以添加
ConsumerPostProcessor
到 DKCF。这就是侦探的工作方式:
https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/TraceMessagingAutoConfiguration.java#L263-L285
关于java - 使用@KafkaListener 对 Kafka 消费者进行勇敢追踪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64896332/