我有一个工厂,对于我的四个不同的 @KafkaListers 来说是通用的,并且每个监听器都应该从其各自的主题中使用各自的 JSON 对象。我无法让它工作,因为我收到异常,上面写着: 类“com.abc.MyObject”不在受信任的包中:[java.util,java.lang]。如果您认为反序列化此类是安全的,请提供其名称。如果序列化仅由可信来源完成,您还可以启用全部信任 (*)。即使在我添加下面之后,我仍然得到相同的异常: config.put(JsonDeserializer.TRUSTED_PACKAGES, ""); 如果我不使用通用工厂并将其与一个特定对象一起使用,那么它对于该对象来说效果很好,但这样我就必须为四个不同的 Kafkalistener 创建四个工厂。
我的卡夫卡列表:
@KafkaListener( topics="number_1_event", groupId="abc-group", containerFactory="kafkaABCListenerContainerFactory")
public void consumeMyMessage(MyTopic1Class data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) int offsetId) {
// code here
}
@KafkaListener( topics="number_2_event", groupId="abc2-group", containerFactory="kafkaABCListenerContainerFactory")
/* Similar method signature here*/
@KafkaListener( topics="number_3_event", groupId="abc3-group", containerFactory="kafkaABCListenerContainerFactory")
@KafkaListener( topics="number_4_event", groupId="abc4-group", containerFactory="kafkaABCListenerContainerFactory")
配置:
@Bean
public ConsumerFactory<String, Object> abcdConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(Object.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaABCListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcdConsumerFactory());
return factory;
}
我目前正在使用上述配置,并且收到如上所述的信任包错误。 如果我将上述配置中的对象替换为 MyTopic1Class 或 MyTopic2Class, 然后运行它,它将适用于该特定对象。 请帮忙!!
最佳答案
Do you mean that i can just use
ConcurrentKafkaListenerContainerFactory<String, MyTopic1Class>
andConsumerFactory<String, MyTopic1Class> abcdConsumerFactory()
. and this factory signature would work for when my number_1_event topic receives a MyTopic1Class. Will it work when topic number_2_event receives MyTopic2Class and using the same factory signature
是的,它会起作用,但使用起来更干净
ConcurrentKafkaListenerContainerFactory<String, Object> abcdContainerFactory()
ConsumerFactory<String, Object> abcdConsumerFactory()
您的听众可以使用较窄的类型 MyTopic1Class
, MyTopic2Class
等等
工厂中的泛型类型仅在编译时有效;它们在运行时被删除。
关于java - spring-kafka 具有不同 Json 对象的多个 @KafkaListener 不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61947200/