java - 如何将 RecordInterceptor 设置为 ConcurrentKafkaListenerContainerFactory

标签 java apache-kafka spring-kafka

我正在使用 Spring Kafka 2.2.7,我已经使用 kafkaListenerContainerFactory 配置了 @EnableKafka 并使用 @KafkaListener 来消费消息,一切正在按预期工作。

我想添加一个 RecordInterceptor 来记录所有消费的消息,但发现很难配置它。 documentation指出可以在容器上设置 RecordInterceptor,但是我不确定如何获取容器的实例。

Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(createConsumerFactory());
        factory.setConcurrency(consumerCount);
        return factory;
    }

我查看了 Spring 文档,但没有找到解决方案,这似乎是一件简单的事情,但也许我错过了一些东西。

如有任何帮助,我们将不胜感激。

提前致谢。

最佳答案

有一个方法setRecordInterceptor2.2.7

factory.setRecordInterceptor(new RecordInterceptor);

还有另一个信息RecordInterceptor不适用于批处理监听器

Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record. If the interceptor returns null, the listener is not called. The interceptor is not invoked when the listener is a batch listener.

关于java - 如何将 RecordInterceptor 设置为 ConcurrentKafkaListenerContainerFactory,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58584773/

相关文章:

java - 在单个 try-catch block 中创建多个PreparedStatements 是不好的做法吗?

java - Spring集成和Kafka消费者: Stop message-driven-channel-adapter right after records are sucessfully fetched

java - 如何为 onFailure 事件设置超时(Spring,Kafka)?

java - Java中 "parent"组件的调用方法

java - Repaint() 和 PaintComponent() 未正确更新

java - 在Java中使用正则表达式获取 "get this"中的字符串 "adfadf[somestring] get-this adfaf"

java - Apache Camel Kafka - 聚合 kafka 消息并定期发布到不同的主题

apache-spark - 有没有办法动态停止 Spark Structured Streaming?

apache-kafka - 当 Kafka 代理在消费者组协调方面失败时会发生什么?

java - Spring-Boot 和 Kafka : How to handle broker not available?