spring-boot - 如何使用 Spring Cloud Stream 4.x 生产者和消费者检测 Spring Boot 3.x 以关联记录器中的跟踪信息

标签 spring-boot spring-cloud spring-kafka spring-cloud-stream micrometer

升级到 Spring Boot 3 后,我不得不更新跟踪/关联配置以从 Spring Cloud Sleuth 切换到新的 Micrometer Tracing 库。

此时,我可以在日志中看到 traceId/spanId 信息,这些信息已通过自动检测的 WebClient 使用 HTTP 调用正确传输到其他服务。

但是,Spring Cloud Streams Kafka 生产者和消费者似乎没有被检测到。

这里有一个生产者的例子:

logger.debug("Sending message to kafka queue {}", message)
streamBridge.send(bindingName, message)

带有 traceId,spanId 的日志:

[producer,638b36642e4fe0b203b7f32b746b22de,03b7f32b746b22de] 52233 --- [ctor-http-nio-3] i.s.m.p.p.ProjectTaskEventProducer       : Sending message to kafka queue GenericMessage [xxx]
[producer,638b36642e4fe0b203b7f32b746b22de,03b7f32b746b22de] 52233 --- [ctor-http-nio-3] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka

在消费者方面,我有这个简单的 KStream:

    @Bean
    fun processEvent() =
        Function<KStream<EventKey, EventValue>, KStream<EventKey, EventValue?>> { events ->
            events.process(
                ProcessorSupplier {
                    Processor<EventKey, EventValue, EventKey, EventValue> {
                        logger.info("{}", it.headers())
                    }
                }
            )
        }

日志

[consumer,,] 52544 --- [-StreamThread-1] ventKStreamConfiguration$$SpringCGLIB$$0 : RecordHeaders(headers = [RecordHeader(key = target-protocol, value = [107, 97, 102, 107, 97]), RecordHeader(key = spring_json_header_types, value = [123, 34, 116, 97, 114, 103, 101, 116, 45, 112, 114, 111, 116, 111, 99, 111, 108, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false)

如您所见,仅传输了两个 header (target-protocolspring_json_header_types),缺少 b3 header 。因此,也未设置 MDC 日志。

Micrometer documentation关于消息传递检测非常稀疏,因此不清楚如何在 Spring Cloud Stream 的上下文中进行。

  • StreamBridge 是否应该像 WebClient 一样自动检测?
  • 消费者方面也是如此。

更新 1:

我已经按照说明添加了一个 ProducerMessageHandlerCustomizer,从而可以观察底层的 KafkaTemplate

@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun kafkaProducerObservationCustomizer () : ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<EventKey, EventValue>> {
        return ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<EventKey, EventValue>> {
                handler, destinationName ->
            handler.kafkaTemplate.setObservationEnabled(true)
        }
    }
}

StreamBridge 被调用时,执行在将 observationEnabled 属性设置为 true 的定制器中结束:

enter image description here

但是,消费者仍然只得到两个 header :

enter image description here

如果您比较关联 HTTP 调用日志的 ObservationRegistry:

enter image description here

和KafkaTemplate里面的不一样:

enter image description here

问题似乎出在KafkaTemplate:

enter image description here

observationRegistry 在应用程序启动期间初始化,此时 ProducerMessageHandlerCustomizer 尚未被调用。因此,observationEnabled 的值将始终为 false,不执行 if block 并默认为 NOOP 注册表:

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

更新 2:

我试过这个解决方法

@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun kafkaProducerObservationCustomizer (applicationContext: ApplicationContext, observationRegistry: ObservationRegistry) : ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<ClientProjectId, ProjectTaskEvent>> {
        return ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<ClientProjectId, ProjectTaskEvent>> {
                handler, destinationName ->
            handler.kafkaTemplate.setObservationEnabled(true)
            handler.kafkaTemplate.setApplicationContext(applicationContext)
            handler.kafkaTemplate.afterSingletonsInstantiated()
        }
    }
}

但它不起作用。它似乎扰乱了生产者的配置,覆盖了它的值(value)。在我的例子中,它寻找本地 Kafka 集群而不是配置的集群:

2022-12-05T17:36:06.815+01:00  INFO [metrics-ingestor,,] 8056 --- [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2022-12-05T17:36:06.816+01:00  WARN [metrics-ingestor,,] 8056 --- [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

最佳答案

底层KafkaTemplate默认不启用千分尺追踪,您必须将observationEnabled设置为true

https://docs.spring.io/spring-kafka/docs/current/reference/html/#observation

借助 Spring Cloud Stream,您可以使用 ProducerMessageHandlerCustomizer @Bean

实现此目的

https://docs.spring.io/spring-cloud-stream/docs/4.0.0-M3/reference/html/spring-cloud-stream.html#_advanced_producer_configuration

处理程序类型是KafkaProducerMessageHandler;所以使用 handler.getKafkaTemplate().set... 来改变它的属性。

关于spring-boot - 如何使用 Spring Cloud Stream 4.x 生产者和消费者检测 Spring Boot 3.x 以关联记录器中的跟踪信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74666562/

相关文章:

java - 在 oAuth2 资源服务器应用程序中使用@WithMockUser(与@SpringBootTest)

java - spring cloud netflix jersey 版本冲突

spring-cloud-stream 在测试中与 spring-boot-data-mongodb 冲突

java - Spring-Kafka消费者不会自动接收消息

backbone.js - Spring Boot Gradle插件bootRun任务无法识别Yeoman生成的静态文件并使用grunt更新时

css - 我应该将 Spring MVC 的 CSS 和 JS 文件放在哪里?

java - 如何将Eureka客户端注册到带有负载均衡器的Eureka服务器集群?

java - Spring 批处理-Kafka : KafkaItemReader reads the data ALWAYS from beginning

java - Spring Kafka 并发与 spring-integration

java - 启动应用程序 spring-boot 后执行类中的某些方法