我似乎无法在具体的 avro 实现中使用消息,我收到以下异常:
class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass
这是代码(我使用Spring Boot
)
MyProducer.java
private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;
public PositionProducer(KafkaTemplate<String, MyConcreteClass> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(MyConcreteClass myConcreteClass) {
this.kafkaTemplate.send(topic, myConcreteClass);
}
MyConsumer.java
@KafkaListener(topics = "#{'${consumer.topic.name}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void listen(MyConcreteClass incomingMsg) {
//handle
}
请注意,如果我将所有内容更改为 GenericRecord
,反序列化会正常工作,因此我知道所有配置(未粘贴)都已正确配置。
也许还需要注意的是,我自己没有注册架构,而是让我的客户端代码为我注册。
有什么想法吗?
编辑:
配置:
@Bean
public ConsumerFactory<String, MyConcreteClass> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new DefaultKafkaConsumerFactory<>(props);
}
最佳答案
MyConcreteClass 需要扩展 SpecificRecord
您可以使用 Avro maven 插件从架构生成它
然后您必须配置序列化器以了解您想要使用特定记录
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") ;
关于java - Kafka Avro 使用模式注册表序列化/反序列化为具体类型失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60344848/