java - Spring-Kafka @KafkaHandlers 不消耗各自的 java 对象

标签 java spring-boot apache-kafka spring-kafka

我知道这个问题已经被问过多次,但是,我还找不到解决方案。我能够通过我的@KafkaListener(在类级别)使用特定的Java对象并且工作得很好,但是,当我尝试使用来自同一主题的多个不同的JSON对象时(我在类级别使用@KafkaListener和@KafkaHandler)在方法级别,每个 @KafkaHandler 方法需要不同的对象),它总是生成 LinkedHashMap。我可以解析它并获取数据并执行工厂模式来基于 json 字段生成不同的实例,但是,当 Spring 可以自动检测用于路由消息的特定 @KafkaHandler 时,我不想这样做。 如何使用单个 @KafkaListener 使用单个主题中的不同 JSON 对象。

我正在使用以下配置:

public ConsumerFactory<String, Object> abcConsumerFactory() {       
    Map<String, Object> config = new HashMap<>();       
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class, false)); 

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {      
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(abcConsumerFactory());
    return factory;
    }

如果我在上面的配置中使用实际的类(Foo 或 Bar)而不是 Object,它对于特定对象来说效果很好,但是,当我尝试概括时,它不会转到特定的 @KafkaHandler,而是转到有效负载类型为 LinkedHashMap 的 @KafkaHandler(我试图检查它是否被传递到 @KafkaHandler)

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {      
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(abcConsumerFactory());
    return factory;
    }

Class:  

    @KafkaListener(topics={"abc_gh"}, containerFactory = "abcListenerContainerFactory")
    @Service
    public class MyListener {

    @KafkaHandler
    public void consumeMessage(@Payload Foo f) {
        //only comes here when i use Foo in my configs instead of Object

    }

    @KafkaHandler
    public void consumeMessage22(@Payload Bar b) {
        //only comes here when i use Bar in my configs instead of Object

    }   

    @KafkaHandler
    public void consumeMessage77(@Payload LinkedHashMap bc) {

        //comes here when i use Object in the configs, even if i expect a Foo or a Bar object

    }               
    }

我想分享的一件事是 Producer 没有使用 Spring-Kafka。

我不知道我错过了什么,我尝试了很多东西,但没有运气。

最佳答案

the documentation说:

When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer, or the JsonMessageConverter with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.

为了路由到正确的方法,ConsumerRecord 中必须已经具有正确的类型。

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );

您没有向反序列化器提供有关要创建哪个对象的任何信息。

当 Spring 是生产者时,它会在记录头中添加类型信息,反序列化器可以使用这些信息来构造正确的类型。

如果没有类型信息,您将获得一个Map

生产者必须设置类型 header 才能使其工作。

@KaflaListener处于方法级别时,我们可以根据方法参数确定要创建的类型。在类级别,我们有一个 catch-22 - 除非记录已经转换,否则我们无法选择方法。

您的生产者不必知道实际类型,但它至少必须提供一个 header ,可用于查找我们需要转换为的类型。

参见Mapping Types .

或者,生产者的 JSON 序列化器必须配置为将类型信息添加到 JSON 本身中。

另一个选项是自定义反序列化器,它“查看”JSON 以确定要实例化的类。

关于java - Spring-Kafka @KafkaHandlers 不消耗各自的 java 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61418440/

相关文章:

java - 当没有 kafka 代理运行时,如何出于开发目的禁用 Spring Cloud 流?

apache-kafka - 消息处理失败后Kafka消费者恢复

java - grails中转换日期错误

java - 多个Spring Boot错误: package org. springframework.boot不存在

spring - 由于 "file",字段类型为 "MultipartException: The current request is not a multipart request"的表单未提交到服务器

java - 如何使用 spring-boot 创建 REST 服务?

mongodb - kafka mongo db源连接器,在kubernetes上运行mongo db

java - jdbc 使用自增字段向数据库中插入数据

Java 四舍五入到最接近的 .5

java - 当内存增强器杀死我的应用程序时,它会强制关闭