java - Spring Boot Kafka 消费者多种类型崩溃

标签 java spring-boot apache-kafka

据我了解,如果我在同一个进程中有topic1=ClassA,topic2=ClassB,我需要2个容器工厂?

我的配置类:

@Bean
public ConsumerFactory<String, MessageADto> xxxConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageADto.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageADto> xxxListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MessageADto> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(xxxConsumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, MessageBDto> xxx2ConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageBDto.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageBDto> xxx2ListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MessageBDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(xxx2ConsumerFactory());
    return factory;
}

private Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaHost());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "xxx");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return props;
}

在我的其余 Controller 类中,我有以下监听器(仅用于 POC):

@KafkaListener(topics=KafkaTopicConfig.xxx_TOPIC, containerFactory="xxxListenerContainerFactory")
public void xxxListener(MessageADto message) {
    System.out.println(message.getMessage());
}

@KafkaListener(topics=KafkaTopicConfig.xxx2_TOPIC, containerFactory="xxx2ListenerContainerFactory")
public void xxx2Listener(MessageBDto message) {
    System.out.println(message.getMessage() + " : " + message.getCount());
}

对于其余 Controller 方法,我必须发送 MessageADto 和 MessageBDto:

        MessageBDto messageBDto = new MessageBDto() {{ setMessage(message.getMessage()); setCount(17); }};
        this.kafkaService.sendMessageB(messageBDto);
        return convertToDto(this.kafkaService.sendMessageA(message).get().getRecordMetadata());

这会产生异常:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dtms2-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.xxx.xxx.controllers.KafkaController$1' is not in the trusted packages: [java.util, java.lang, org.xxx.xxx.dtos]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:125) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:99) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:425) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.4.1.RELEASE.jar:2.4.1.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

我不明白这个异常。它与我的 Controller 类有什么关系?

最佳答案

您需要将 Controller 类添加到受信任的包列表中

尝试将其添加到您的属性文件中

spring.kafka.consumer.properties.spring.json.trusted.packages=*

如果它有效,那么您可以找出您具体需要哪些包,并将 * 替换为以逗号分隔的包名称。

关于java - Spring Boot Kafka 消费者多种类型崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60033306/

相关文章:

java - 在此行 : in jsp code 处找到多个注释

ssl - Spring Boot 1.4.1 SSL trustAnchors 异常

java - 是否可以将 postgres DB 恢复到本地 liquibase

java - Struts 2 操作线程安全吗?

java - Blowfish 加密问题

java - 如何使用 print 语句对代码进行单元测试?

python - Flask API 作为实时 kafka 消费者

java - 将kafka与jpa一起使用时的良好做法

apache-kafka - 流消息到多个主题