java - AVRO 原始类型的 Serde 类

标签 java apache-kafka avro apache-kafka-streams confluent-platform

我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用架构注册表和 avro 作为键和值转换器。连接器生成以下架构:

key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
    {"name": "firstname", "type": "string"},
    {"name": "lastname",  "type": "string"}
]}

实际上,有几个主题,键模式始终是“int”,值模式始终是某种类型的记录(用户、产品等)。我的代码包含以下定义

Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);

Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);

起初我尝试用类似的东西来消费这个主题 Consumed.with(Serdes.Integer(), userSerde);但这不起作用,因为 Serdes.Integer() 期望使用 4 个字节对整数进行编码,但 avro 使用可变长度编码。使用Consumed.with(Serdes.Bytes(), userSerde);有效,但我真的想要 int 而不是 bytes,所以我将代码更改为这个

KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true); 
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);

这使得编译器产生警告(它不喜欢 (Serde<Integer>)(Serde) 转换),但它允许我使用

Consumed.with(keySerde, userSerde);并获取一个整数作为 key 。这工作得很好,我的应用程序的行为符合预期(太棒了!!!)。但现在我想为键/值定义默认的 serde,但我无法让它工作。

设置默认值 serde 很简单:

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

但是我不知道如何定义默认 key serde。

我试过了

  1. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName()); 产生运行时错误:找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共(public)无参构造函数
  2. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); 产生运行时错误:java.lang.Integer 无法转换为 org.apache.avro.specific.SpecificRecord

我错过了什么? 谢谢。

最佳答案

更新 (版本 5.5 及更高版本)

Confluence 版本 5.5 通过 PrimitiveAvroSerde 添加了对原始 Avro 类型的 native 支持(参见 https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/PrimitiveAvroSerde.java )

原始答案 (版本 5.4 及更早版本):

这是一个已知问题。原始 Avro 类型不能与 Confluence 的 AvroSerdes 很好地配合使用,因为 Serdes 仅适用于 GenericAvroRecordSpecificAvroRecord

比较 https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro .

因此,基于 KafkaAvroSerializer 和 KafkaAvroDeserializer 构建您自己的 Serde 是正确的方法。为了能够将其作为默认 Serde 传递到配置中,您不能使用 Serdes.serdeFrom,因为类型信息由于 genrics 类型删除而丢失。

但是,您可以实现自己的类来扩展 Serde 接口(interface),并将自定义类传递到配置中:

public class MySerde extends Serde<Integer> {
    // use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);

关于java - AVRO 原始类型的 Serde 类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51955921/

相关文章:

java - 在 grails 服务器内运行 .sh 文件

java - Pagefactory 在页面对象结构中抛出 null

apache-spark - Azure DataBricks Stream foreach 因 NotSerializableException 而失败

java - 使用 slf4j 和 kafka 进行日志记录

apache-kafka - 只读取来自 kafka 主题的特定消息

hadoop - 在Hadoop中使用Avro输入格式控制拆分大小

apache-kafka - Avro 架构 : is adding an enum value to existing schema backward compatible?

java - CentOS OpenJDK 组件是否已通过 TCK 合规性测试?

java - 在 Java 中使用各种日历时区(不使用 Joda Time)

java - 如何为 Map<Integer,Map<Integer,Float>> 创建 Avro 模式?