我正在使用 Spring Kafka 2.2.7,我已经使用 kafkaListenerContainerFactory
配置了 @EnableKafka
并使用 @KafkaListener
来消费消息,一切正在按预期工作。
我想添加一个 RecordInterceptor
来记录所有消费的消息,但发现很难配置它。 documentation指出可以在容器上设置 RecordInterceptor,但是我不确定如何获取容器的实例。
Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
factory.setConcurrency(consumerCount);
return factory;
}
我查看了 Spring 文档,但没有找到解决方案,这似乎是一件简单的事情,但也许我错过了一些东西。
如有任何帮助,我们将不胜感激。
提前致谢。
最佳答案
有一个方法setRecordInterceptor自 2.2.7
factory.setRecordInterceptor(new RecordInterceptor);
还有另一个信息RecordInterceptor不适用于批处理监听器
Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record. If the interceptor returns null, the listener is not called. The interceptor is not invoked when the listener is a batch listener.
关于java - 如何将 RecordInterceptor 设置为 ConcurrentKafkaListenerContainerFactory,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58584773/