java - 使用 KafkaAvroDecoder 将 Avro 消息反序列化为特定数据

标签 java apache-kafka avro

我正在阅读 Kafka 主题,其中包含使用 KafkaAvroEncoder(自动将模式注册到主题)序列化的 Avro 消息。我正在使用 maven-avro-plugin 生成纯 Java 类,我想在阅读时使用它。

KafkaAvroDecoder 仅支持反序列化为 GenericData.Record 类型,这(在我看来)忽略了静态类型语言的全部意义。我的反序列化代码目前看起来像这样:

    SpecificDatumReader<event> reader = new SpecificDatumReader<>(
        event.getClassSchema() // event is my class generated from the schema
    );
    byte[] in = ...; // my input bytes;
    ByteBuffer stuff = ByteBuffer.wrap(in);
    // the KafkaAvroEncoder puts a magic byte and the ID of the schema (as stored 
    //   in the schema-registry) before the serialized message
    if (stuff.get() != 0x0) {
        return;
    }
    int id = stuff.getInt();

    // lets just ignore those special bytes
    int length = stuff.limit() - 4 - 1;
    int start = stuff.position() + stuff.arrayOffset();

    Decoder decoder = DecoderFactory.get().binaryDecoder(
        stuff.array(), start, length, null
    );
    try {
        event ev = reader.read(null, decoder);
    } catch (IOException e) {
        e.printStackTrace();
    }

我发现我的解决方案很麻烦,所以我想知道是否有更简单的解决方案。

最佳答案

多亏了评论,我才能找到答案。秘诀是实例化 KafkaAvroDecoderProperties指定特定 Avro 阅读器的使用,即:

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C‌ONFIG, "...");
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    VerifiableProp vProps = new VerifiableProperties(props);

    KafkaAvroDecoder decoder = new KafkaAvroDecoder(vProps);
    MyLittleData data = (MyLittleData) decoder.fromBytes(input);

同样的配置适用于直接使用KafkaConsumer<K, V>的情况。类(我在 Storm 中使用来自 storm-kafka 项目的 KafkaSpout 消费,它使用 SimpleConsumer ,所以我必须手动反序列化消息。对于勇敢的人来说,有 storm-kafka-client 项目,它通过使用新样式的消费者自动执行此操作)。

关于java - 使用 KafkaAvroDecoder 将 Avro 消息反序列化为特定数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37159607/

相关文章:

java - 试图解决NoSuchMethodError

java - 如何摆脱对已实现接口(interface)的多余强制转换?

docker - 没有这样的主机,我通过端口转发连接到服务器上的 kafka 代理

python - Spark 2.4.4 Avro Pyspark Shell 配置

java - 添加 ArrayList<String> 输入,每个输入都通过枚举引用 double 值

java - 如何在3个独立的java进程之间传递cron作业的值?

java - 从 bootstrap server 和 zookeeper 消费消息有什么区别?

python - 带有 Python 消费者的 Docker Kafka

java - 如何在扩展 http 应用程序启动器时访问 Kafka 消息头

java - SQLContext.read() Spark 中的 NullPointerException