我已经设置了 Confluence 数据平台并开始开发 SourceConnector,并在相应的 SourceTask.poll() 方法中执行以下操作(下面是伪 Java 代码):
public List<SourceRecord> poll() throws InterruptedException {
....
Envelope envelope = new Envelope();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder enc = EncoderFactory.get().binaryEncoder(out, null);
DatumWriter<Envelope> dw = new ReflectDatumWriter<Envelope>(Envelope.class);
dw.write((Envelope)envelope, enc);
enc.flush();
out.close();
Map<String, String> sourcePartition = new HashMap<String, String>();
sourcePartition.put("stream", streamName);
Map<String, Integer> sourceOffset = new HashMap<String, Integer>();
sourceOffset.put("position", Integer.parseInt(envelope.getTimestamp()));
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, org.apache.kafka.connect.data.Schema.BYTES_SCHEMA, envelope));
....
我想使用模式注册表,以便使用注册表中的模式 id 标记正在序列化的对象,序列化,然后通过 poll() 函数发布到 Kafka 主题。如果任意对象的模式不驻留在注册表中,我希望它被注册,并将相应的生成 ID 返回到序列化程序进程,以便它成为序列化对象的一部分,使其可反序列化。
我需要在上面的代码中做什么才能实现这一点?
最佳答案
要使用 SchemaRegistry,您必须使用 Confluence 提供的类序列化/反序列化数据:
- io.confluence.kafka.serializers.KafkaAvroSerializer
- io.confluence.kafka.serializers.KafkaAvroDeserializer
这些类包含从注册表注册和请求架构的所有逻辑。
如果您使用maven,您可以添加此依赖项:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>2.0.1</version>
</dependency>
关于serialization - 如何将 AVRO 序列化器与 Kafka Connect SourceTask 中的架构注册表结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37317567/