java - Kafka Avro 使用模式注册表序列化/反序列化为具体类型失败

标签 java apache-kafka avro confluent-platform confluent-schema-registry

我似乎无法在具体的 avro 实现中使用消息,我收到以下异常:

class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass

这是代码(我使用Spring Boot)

MyProducer.java

private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;

public PositionProducer(KafkaTemplate<String, MyConcreteClass> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(MyConcreteClass myConcreteClass) {
    this.kafkaTemplate.send(topic, myConcreteClass);
}

MyConsumer.java

@KafkaListener(topics = "#{'${consumer.topic.name}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void listen(MyConcreteClass incomingMsg) {
    //handle
}

请注意,如果我将所有内容更改为 GenericRecord,反序列化会正常工作,因此我知道所有配置(未粘贴)都已正确配置。

也许还需要注意的是,我自己没有注册架构,而是让我的客户端代码为我注册。

有什么想法吗?

编辑:

配置:

@Bean
public ConsumerFactory<String, MyConcreteClass> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    return new DefaultKafkaConsumerFactory<>(props);
}

最佳答案

MyConcreteClass 需要扩展 SpecificRecord

您可以使用 Avro maven 插件从架构生成它

然后您必须配置序列化器以了解您想要使用特定记录

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") ;

关于java - Kafka Avro 使用模式注册表序列化/反序列化为具体类型失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60344848/

相关文章:

scala - 读取多个AVRO文件时对象无法序列化错误

apache-kafka - 如何在 Kafka Source Connector 中编写自定义 SMT 来混淆私有(private)数据

shell - 如何从输入的 avro 文件创建一个 ".avsc"文件?

java - Orika:如何使用嵌套映射器进行映射

java - 用 Java 编写 removeAll 的实现

java - 不变性和可读性

java - Android Studio 错误 : class, 界面,或预期的枚举

java - KafkaStreams serde异常

Spring Boot 测试未使用测试属性

c - 如何使用 Avro C 库解码从 Kafka 读取的二进制文件?