spring-boot - 根据 header 在反序列化之前过滤消息

标签 spring-boot spring-kafka

有时可以在反序列化之前根据 header 值过滤掉消息。使用 spring kafka 的这种情况是否有任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托(delegate)也将过滤器谓词也作为属性。有什么建议么?谢谢。

最佳答案

是的,您可以使用 ErrorHandlingDeserializer 使用的相同技术来返回“标记”对象而不是进行反序列化,然后添加一个 RecordFilterStrategy,过滤记录对于此类对象,监听器(使用 @KafkaListener 时的容器工厂或为显式监听器使用过滤适配器)。

编辑

Spring Boot 和添加过滤器...

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
        return factory;
    }

关于spring-boot - 根据 header 在反序列化之前过滤消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53843328/

相关文章:

spring-boot - Spring Security 5.7 - 如何返回自定义 UserDetails

java - 使用spring kafka时如何在生产者中压缩数据

spring-boot - Rest API 的 Kafka 实现

spring - 如何在 Spring 测试中排除存储库?

java - 在 Webflux 中解决 Pageable

java - 如何在 Spring Boot 应用程序中实现安全性?

使用 Spring Cloud Stream 处理 Avro 反序列化异常

spring-kafka - Spring Kafka中检测broker断开连接

spring-boot - Spring Boot 和 Kafka : Broker disconnected

java - Spring 安全5 : There is no PasswordEncoder mapped for the id "null"