我是 Avro 新手,最近几天 Kafka 一直在发送有关 Kafka 主题的序列化数据...但没有成功。
让我解释一下我想要实现的目标:
在生产者方面,我通过 SOAP 接收数据并发送有关 Kafka 主题的内容。我正在使用 CXF 从 WSDL 生成 POJO,并且我已经编写了相应的架构。 我想做的是序列化由 CXF 解码的对象并将它们发送到我的 Kafka 主题。
在网络上找到的大多数示例中,Avro 记录是使用已知架构(或数据类型)生成的,但在这种情况下,我不知道序列化数据时将使用哪个架构。 所以我动态获取消息类型(通过 CXF 拦截器)并以这种方式序列化:
// get unmarshaled POJO
MessageContentsList objs = MessageContentsList.getContentsList(message);
Object obj = objs.get(0);
EncoderFactory factory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = factory.directBinaryEncoder(out, null);
// getting schema from class name (first approach)
String scName = obj.getClass().getSimpleName();
InputStream avroRes = this.getClass().getClassLoader().getResourceAsStream(scName);
Schema schema = new Schema.Parser().parse(avroRes);
ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema);
writer.write(obj, encoder);
encoder.flush();
out.close();
KeyedMessage< String, byte[]> kMessage = new KeyedMessage<String, byte[]>("mytopic", out.toByteArray());
producer.send(kMessage);
这样我就可以发送有关我的主题的数据,但无法从传入消息中获取架构。
有没有办法:
- 从 Kafka 主题读取消息并获取用于序列化的架构?
- 在消费和反序列化时将通用记录映射到 POJO?
当数据类型未知时,发送有关 Kafka 主题的 Avro 记录的“最佳”实践是什么?
也许我在阅读 Avro 文档时错过了一些东西,并且没有按预期使用它。
感谢您的帮助...
最佳答案
发送到 Kafka 主题的消息应该对架构和 Avro 记录进行编码。如果在每条消息中发送架构的开销太大,则发送架构的标识符。消息使用者可以使用标识符从 schema registry 检索完整的架构定义。 。例如这个code to serialize a Kafka message将模式标识符写入消息的第一个字节中:
ByteArrayOutputStream out = new ByteArrayOutputStream();
schema = getSchema(object);
int id = schemaRegistry.register(subject, schema);
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
if (object instanceof SpecificRecord) {
writer = new SpecificDatumWriter<Object>(schema);
} else {
writer = new GenericDatumWriter<Object>(schema);
}
writer.write(object, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
关于java - Avro - 反序列化 POJO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30462066/