我正在尝试使用 spring kafka 消费 JSON 消息。消费者消费的消息是这样的。
{
"EventHeader": {
"entityName": "Account",
"changedFields": ["Id", "Name"]
},
"NewFields": {
"Id": "001",
"Name": "Test Account",
},
"OldFields": {}
}
到目前为止,我已经为“EventHeader”、“NewFields”、“OldFields”和“KafkaPayload”创建了类。我还创建了一个自定义反序列化器来反序列化此 JSON 负载。这是我的自定义反序列化器。
public class CustomDeserailizer <T extends Serializable> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
public static final String VALUE_CLASS_NAME_CONFIG = "value.class.name";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}
@Override
public T deserialize(String topic, byte[] objectData) {
return (objectData == null) ? null : (T) SerializationUtils.deserialize(objectData);
}
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}
@Override
public void close() {
Deserializer.super.close();
}
}
我已将消费者配置设置如下。
public class KafkaConfig {
@Bean
public KafkaConsumer<String, KafkaPayload> consumerFactory(){
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
return new KafkaConsumer<>(config);
}
}
现在我需要通过 @KafkaListener 将消费者设置为 ConsumerFactory 来显示消费的消息。但我不明白该怎么做。这是我第一次使用 kafka。有人能给我一些关于这个的想法吗?
这就是我正在努力做到的。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListener(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
这是我的 KafkaListener
public class ConsumerService {
@KafkaListener(topics = "Topic", groupId = "sample-group",containerFactory = "kafkaListener")
public void consume(KafkaPayload kafkaPayload){
System.out.println("Consumed Message :"+ kafkaPayload);
}
}
最佳答案
由于您使用的是 Spring Boot,只需将值反序列化器类名设置为属性,Boot 就会自动将其连接到 @KafkaListener
的容器工厂中。无需定义自己的消费者工厂或容器工厂。
spring.kafka.consumer.value-deserializer=com.acme.CustomDeserializer
关于spring - 如何配置自定义 Kafka 反序列化器并使用 KafkaListener 获取使用的 JSON 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72533340/