spring - 如何配置自定义 Kafka 反序列化器并使用 KafkaListener 获取使用的 JSON 数据

标签 spring spring-boot apache-kafka apache-kafka-streams spring-kafka

我正在尝试使用 spring kafka 消费 JSON 消息。消费者消费的消息是这样的。

{
    "EventHeader": {
        "entityName": "Account",
        "changedFields": ["Id", "Name"]
    },
    "NewFields": {
        "Id": "001",
        "Name": "Test Account",
    },
    "OldFields": {}
}

到目前为止,我已经为“EventHeader”、“NewFields”、“OldFields”和“KafkaPayload”创建了类。我还创建了一个自定义反序列化器来反序列化此 JSON 负载。这是我的自定义反序列化器。

   public class CustomDeserailizer <T extends Serializable> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
    public static final String VALUE_CLASS_NAME_CONFIG = "value.class.name";
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            Deserializer.super.configure(configs, isKey);
        }
    
        @Override
        public T deserialize(String topic, byte[] objectData) {
            return (objectData == null) ? null : (T) SerializationUtils.deserialize(objectData);
        }
    
        @Override
        public T deserialize(String topic, Headers headers, byte[] data) {
            return Deserializer.super.deserialize(topic, headers, data);
        }
    
        @Override
        public void close() {
            Deserializer.super.close();
        }
    }
 

我已将消费者配置设置如下。

public class KafkaConfig {
    @Bean
    public KafkaConsumer<String, KafkaPayload> consumerFactory(){
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
        return new KafkaConsumer<>(config);
    }
}

现在我需要通过 @KafkaListener 将消费者设置为 ConsumerFactory 来显示消费的消息。但我不明白该怎么做。这是我第一次使用 kafka。有人能给我一些关于这个的想法吗?

这就是我正在努力做到的。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListener(){
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

这是我的 KafkaListener

public class ConsumerService {

    @KafkaListener(topics = "Topic", groupId = "sample-group",containerFactory = "kafkaListener")
    public void consume(KafkaPayload kafkaPayload){

        System.out.println("Consumed Message :"+ kafkaPayload);
    }

}

最佳答案

由于您使用的是 Spring Boot,只需将值反序列化器类名设置为属性,Boot 就会自动将其连接到 @KafkaListener 的容器工厂中。无需定义自己的消费者工厂或容器工厂。

spring.kafka.consumer.value-deserializer=com.acme.CustomDeserializer

https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.consumer.value-deserializer

关于spring - 如何配置自定义 Kafka 反序列化器并使用 KafkaListener 获取使用的 JSON 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72533340/

相关文章:

java - 无法推断基本网址。这在使用动态 servlet 注册或 API 位于 API 网关 swagger 2.10.5 之后时很常见

mysql - 没有数据库的Spring Boot

linux - Spring Security getprincipal() 方法返回字符串(用户名)代替类 UserDetails

hadoop - 如何在 Flink 独立集群上的 Flink 作业中使用两个 Kerberos key 表(用于 Kafka 和 Hadoop HDFS)?

java - 使用 Twilio 进行 Spring MVC Controller 重定向

java - 同一台服务器上的 Spring Boot + Angular2

java - Hibernate - 多对多实现

java - SQL语句MyBatis中未知元素<select>

apache-kafka - kafka.server :type=KafkaServer, name=BrokerState 与 4 是什么意思?

java - Spring Kafka - 代理重新连接事件