java - spring-kafka 具有不同 Json 对象的多个 @KafkaListener 不起作用

标签 java spring-boot apache-kafka spring-kafka

我有一个工厂,对于我的四个不同的 @KafkaListers 来说是通用的,并且每个监听器都应该从其各自的主题中使用各自的 JSON 对象。我无法让它工作,因为我收到异常,上面写着: 类“com.abc.MyObject”不在受信任的包中:[java.util,java.lang]。如果您认为反序列化此类是安全的,请提供其名称。如果序列化仅由可信来源完成,您还可以启用全部信任 (*)。即使在我添加下面之后,我仍然得到相同的异常: config.put(JsonDeserializer.TRUSTED_PACKAGES, ""); 如果我不使用通用工厂并将其与一个特定对象一起使用,那么它对于该对象来说效果很好,但这样我就必须为四个不同的 Kafkalistener 创建四个工厂。

我的卡夫卡列表:

@KafkaListener( topics="number_1_event", groupId="abc-group", containerFactory="kafkaABCListenerContainerFactory")
public void consumeMyMessage(MyTopic1Class data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
            @Header(KafkaHeaders.OFFSET) int offsetId) {
                // code here
            }



@KafkaListener( topics="number_2_event", groupId="abc2-group", containerFactory="kafkaABCListenerContainerFactory")
/* Similar method signature here*/

@KafkaListener( topics="number_3_event", groupId="abc3-group", containerFactory="kafkaABCListenerContainerFactory")

@KafkaListener( topics="number_4_event", groupId="abc4-group", containerFactory="kafkaABCListenerContainerFactory")

配置:

@Bean
public ConsumerFactory<String, Object> abcdConsumerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");


    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(Object.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaABCListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();      
    factory.setConsumerFactory(abcdConsumerFactory());

    return factory;
    }

我目前正在使用上述配置,并且收到如上所述的信任包错误。 如果我将上述配置中的对象替换为 MyTopic1Class 或 MyTopic2Class, 然后运行它,它将适用于该特定对象。 请帮忙!!

最佳答案

Do you mean that i can just use ConcurrentKafkaListenerContainerFactory<String, MyTopic1Class> and ConsumerFactory<String, MyTopic1Class> abcdConsumerFactory(). and this factory signature would work for when my number_1_event topic receives a MyTopic1Class. Will it work when topic number_2_event receives MyTopic2Class and using the same factory signature

是的,它会起作用,但使用起来更干净

ConcurrentKafkaListenerContainerFactory<String, Object> abcdContainerFactory()
ConsumerFactory<String, Object> abcdConsumerFactory()

您的听众可以使用较窄的类型 MyTopic1Class , MyTopic2Class等等

工厂中的泛型类型仅在编译时有效;它们在运行时被删除。

关于java - spring-kafka 具有不同 Json 对象的多个 @KafkaListener 不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61947200/

相关文章:

java - 如何在模型中包含列表?

java - 无法使用 Gradle 配置 Spring AOP

java - 如何创建由 gradle 预编译运行的注释处理器以将代码添加到方法中?

elasticsearch - 融合的Elasticsearch接收器连接器索引名操作

java - 父类中的私有(private)字段被子类隐藏/隐藏 - 检查以捕获它

java - 如何保持 Accordion 面板的顺序相同?

java - 如何用汉诺塔调用递归

java - Spring boot @Qualifier 不适用于数据源

apache-kafka - Kafka Broker - 它和 Zookeeper 一样吗?或者 KafkaProducer 是经纪人?

amazon-s3 - 实时流量批量上传至S3