serialization - 如何将 AVRO 序列化器与 Kafka Connect SourceTask 中的架构注册表结合使用

标签 serialization schema apache-kafka avro

我已经设置了 Confluence 数据平台并开始开发 SourceConnector,并在相应的 SourceTask.poll() 方法中执行以下操作(下面是伪 Java 代码):

    public List<SourceRecord> poll() throws InterruptedException {

....

    Envelope envelope = new Envelope();
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder enc = EncoderFactory.get().binaryEncoder(out, null);
    DatumWriter<Envelope> dw = new ReflectDatumWriter<Envelope>(Envelope.class);
    dw.write((Envelope)envelope, enc);
    enc.flush();
    out.close();
    Map<String, String> sourcePartition = new HashMap<String, String>();
    sourcePartition.put("stream", streamName);
    Map<String, Integer> sourceOffset = new HashMap<String, Integer>();
    sourceOffset.put("position", Integer.parseInt(envelope.getTimestamp()));
    records.add(new SourceRecord(sourcePartition, sourceOffset, topic, org.apache.kafka.connect.data.Schema.BYTES_SCHEMA, envelope));

....

我想使用模式注册表,以便使用注册表中的模式 id 标记正在序列化的对象,序列化,然后通过 poll() 函数发布到 Kafka 主题。如果任意对象的模式不驻留在注册表中,我希望它被注册,并将相应的生成 ID 返回到序列化程序进程,以便它成为序列化对象的一部分,使其可反序列化。

我需要在上面的代码中做什么才能实现这一点?

最佳答案

要使用 SchemaRegistry,您必须使用 Confluence 提供的类序列化/反序列化数据:

  • io.confluence.kafka.serializers.KafkaAvroSerializer
  • io.confluence.kafka.serializers.KafkaAvroDeserializer

这些类包含从注册表注册和请求架构的所有逻辑。

如果您使用maven,您可以添加此依赖项:

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>2.0.1</version>
</dependency>

关于serialization - 如何将 AVRO 序列化器与 Kafka Connect SourceTask 中的架构注册表结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37317567/

相关文章:

c++ - 尽管一切看起来都正确,为什么 boost::serialize 不起作用? ("unregistered class")

java - 通过套接字将 Java 文件对象发送到服务器

json - 为两个几乎相同的实例定义一个 JSON 模式

mysql - 从备份创建新的 MySql 模式。SQL 源文件重置自动递增 id 列?

java - 测试正确 JSON 序列化的最佳方法

c++ - 将数据从 GPS 传输到 SIGFOX

sql - 如何授予数据库角色对架构的执行权限?我究竟做错了什么?

java - Jar 文件有类,但我仍然得到 java.lang.ClassNotFoundException : org. apache.kafka.clients.consumer.ConsumerRecord

command-line - Kafka/PubSub 连接器 : Example pipeline: ERROR Task Converting byte[], 无法识别的 token ,需要 ('true'、 'false' 或 'null')

java - Confluent Cloud 上的 Kafka 流 : 'segment.ms' with value '600000' exceeded min limit of 14400000 for internal repartition topic