我一直想知道当使用静态类型语言(如 Java)使用来自 Kafka 主题的消息时是否需要 Avro 架构注册表。我正在使用来自 Kafka 主题设置的消息,如下所示:
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getServers()));
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getSchemaRegistryUrl());
KafkaConsumer<byte[], FooClass> kafkaConsumer = new KafkaConsumer<>(props);;
在我的项目中,我有 .avsc
文件定义 FooClass 类的架构。我还配置了 avro-maven-plugin
以在构建时为我生成类 FooClass
。
为什么我仍然需要指定架构注册表 URL?我的消费者是否无法使用项目中的 .avsc
文件反序列化我的 Kafka 消息的值?
最佳答案
您正在使用 Confluence 库 ( io.confluent.kafka.serializers.KafkaAvroDeserializer ),它定义了它们的 own Confluent Avro format并强制使用 Confluence 架构注册表。
从技术上讲,您不需要 Apache Avro 注册表。
Avro 需要编写器模式来解码消息,虽然这包含在 Avro 文件中,使它们具有 self 描述性,但它不包含在 the streaming format 中。或Confluent Avro .
因此,客户端需要某种方式来查找架构。这可以通过 Confluence Avro 格式的 Confluence 架构注册表来解决,也可以通过您自己的 org.apache.avro.message.SchemaStore 来解决。 。请参阅this example ,我使用 SchemaStore.Cache预先填充已知的模式。
请注意,该示例使用 Apache Avro format ,与 Confluent Avro 不兼容.
Confluence Avro 反序列化器需要 Confluence 模式注册表,并且没有用于“使用已知模式运行”的 API。
关于java - 为什么静态类型语言需要 Avro 架构注册表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61320666/