有时可以在反序列化之前根据 header 值过滤掉消息。使用 spring kafka 的这种情况是否有任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托(delegate)也将过滤器谓词也作为属性。有什么建议么?谢谢。
最佳答案
是的,您可以使用 ErrorHandlingDeserializer
使用的相同技术来返回“标记”对象而不是进行反序列化,然后添加一个 RecordFilterStrategy
,过滤记录对于此类对象,监听器(使用 @KafkaListener
时的容器工厂或为显式监听器使用过滤适配器)。
编辑
Spring Boot 和添加过滤器...
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
return factory;
}
关于spring-boot - 根据 header 在反序列化之前过滤消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53843328/