java - Avro - 反序列化 POJO

标签 java web-services serialization apache-kafka avro

我是 Avro 新手,最近几天 Kafka 一直在发送有关 Kafka 主题的序列化数据...但没有成功。

让我解释一下我想要实现的目标:

在生产者方面,我通过 SOAP 接收数据并发送有关 Kafka 主题的内容。我正在使用 CXF 从 WSDL 生成 POJO,并且我已经编写了相应的架构。 我想做的是序列化由 CXF 解码的对象并将它们发送到我的 Kafka 主题。

在网络上找到的大多数示例中,Avro 记录是使用已知架构(或数据类型)生成的,但在这种情况下,我不知道序列化数据时将使用哪个架构。 所以我动态获取消息类型(通过 CXF 拦截器)并以这种方式序列化:

// get unmarshaled POJO
MessageContentsList objs = MessageContentsList.getContentsList(message);
Object obj = objs.get(0);

EncoderFactory factory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = factory.directBinaryEncoder(out, null);

// getting schema from class name (first approach)
String scName = obj.getClass().getSimpleName();
InputStream avroRes = this.getClass().getClassLoader().getResourceAsStream(scName);
Schema schema = new Schema.Parser().parse(avroRes);

ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema);
writer.write(obj, encoder);
encoder.flush();
out.close();

KeyedMessage< String, byte[]> kMessage = new KeyedMessage<String, byte[]>("mytopic", out.toByteArray());
producer.send(kMessage);

这样我就可以发送有关我的主题的数据,但无法从传入消息中获取架构。

有没有办法:

  • 从 Kafka 主题读取消息并获取用于序列化的架构?
  • 在消费和反序列化时将通用记录映射到 POJO?

当数据类型未知时,发送有关 Kafka 主题的 Avro 记录的“最佳”实践是什么?

也许我在阅读 Avro 文档时错过了一些东西,并且没有按预期使用它。

感谢您的帮助...

最佳答案

发送到 Kafka 主题的消息应该对架构和 Avro 记录进行编码。如果在每条消息中发送架构的开销太大,则发送架构的标识符。消息使用者可以使用标识符从 schema registry 检索完整的架构定义。 。例如这个code to serialize a Kafka message将模式标识符写入消息的第一个字节中:

ByteArrayOutputStream out = new ByteArrayOutputStream();

schema = getSchema(object);
int id = schemaRegistry.register(subject, schema);
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());

BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
if (object instanceof SpecificRecord) {
  writer = new SpecificDatumWriter<Object>(schema);
} else {
  writer = new GenericDatumWriter<Object>(schema);
}
writer.write(object, encoder);
encoder.flush();

byte[] bytes = out.toByteArray();
out.close();
return bytes;

关于java - Avro - 反序列化 POJO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30462066/

相关文章:

java - 使用 java lang 对象类型作为数据类型的 Avro 模式

c# - 反序列化包含字典的对象列表

java - 通过 Socket 发送的图像流 - 服务器死锁问题

c# - 无法将对象发送到 SOAP Web 服务

android - "&"我的类的字符串中的符号不​​被视为它的一部分

java - 如何解决类似于 readLine 的 readObject

java - 如何在 JSP 中保留下拉列表的选定值?

java - GWT 2.1 : ResettableEventBus doesn't reset?

java - 快速分解算法?

web-services - ColdFusion Lucee 或 Railo RestFull Web 服务映射问题