我知道这个问题已经被问过多次,但是,我还找不到解决方案。我能够通过我的@KafkaListener(在类级别)使用特定的Java对象并且工作得很好,但是,当我尝试使用来自同一主题的多个不同的JSON对象时(我在类级别使用@KafkaListener和@KafkaHandler)在方法级别,每个 @KafkaHandler 方法需要不同的对象),它总是生成 LinkedHashMap。我可以解析它并获取数据并执行工厂模式来基于 json 字段生成不同的实例,但是,当 Spring 可以自动检测用于路由消息的特定 @KafkaHandler 时,我不想这样做。 如何使用单个 @KafkaListener 使用单个主题中的不同 JSON 对象。
我正在使用以下配置:
public ConsumerFactory<String, Object> abcConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class, false));
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcConsumerFactory());
return factory;
}
如果我在上面的配置中使用实际的类(Foo 或 Bar)而不是 Object,它对于特定对象来说效果很好,但是,当我尝试概括时,它不会转到特定的 @KafkaHandler,而是转到有效负载类型为 LinkedHashMap 的 @KafkaHandler(我试图检查它是否被传递到 @KafkaHandler)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcConsumerFactory());
return factory;
}
Class:
@KafkaListener(topics={"abc_gh"}, containerFactory = "abcListenerContainerFactory")
@Service
public class MyListener {
@KafkaHandler
public void consumeMessage(@Payload Foo f) {
//only comes here when i use Foo in my configs instead of Object
}
@KafkaHandler
public void consumeMessage22(@Payload Bar b) {
//only comes here when i use Bar in my configs instead of Object
}
@KafkaHandler
public void consumeMessage77(@Payload LinkedHashMap bc) {
//comes here when i use Object in the configs, even if i expect a Foo or a Bar object
}
}
我想分享的一件事是 Producer 没有使用 Spring-Kafka。
我不知道我错过了什么,我尝试了很多东西,但没有运气。
最佳答案
When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the
JsonDeserializer
, or theJsonMessageConverter
with its TypePrecedence set toTYPE_ID
. See Serialization, Deserialization, and Message Conversion for more information.
为了路由到正确的方法,ConsumerRecord
中必须已经具有正确的类型。
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
您没有向反序列化器提供有关要创建哪个对象的任何信息。
当 Spring 是生产者时,它会在记录头中添加类型信息,反序列化器可以使用这些信息来构造正确的类型。
如果没有类型信息,您将获得一个Map
。
生产者必须设置类型 header 才能使其工作。
当@KaflaListener
处于方法级别时,我们可以根据方法参数确定要创建的类型。在类级别,我们有一个 catch-22 - 除非记录已经转换,否则我们无法选择方法。
您的生产者不必知道实际类型,但它至少必须提供一个 header ,可用于查找我们需要转换为的类型。
参见Mapping Types .
或者,生产者的 JSON 序列化器必须配置为将类型信息添加到 JSON 本身中。
另一个选项是自定义反序列化器,它“查看”JSON 以确定要实例化的类。
关于java - Spring-Kafka @KafkaHandlers 不消耗各自的 java 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61418440/