上下文: 我们有一个 Dataflow 作业,它将 PubSub 消息转换为 Avro GenericRecords,并将它们作为“.avro”写入 GCS。 PubSub 消息和 GenericRecords 之间的转换需要一个模式。此架构每周都会更改,仅添加字段。我们希望能够在不更新数据流作业的情况下更新字段。
我们做了什么: 我们听取了this post的建议并创建了一个 Guava Cache,每分钟刷新一次内容。刷新功能将从 GCS 中提取模式。然后,我们让 FileIO.write 查询 Guava Cache 以获取最新的架构,并将具有该架构的元素转换为 GenericRecord。我们还有 FileIO.write 输出到 Avro 接收器,该接收器也是使用该架构创建的。
代码如下:
genericRecordsAsByteArrays.apply(FileIO.<byte[]>write()
.via(fn((input, c) -> {
Map<String, Object> schemaInfo = cache.get("");
Descriptors.Descriptor paymentRecordFd =
(Descriptors.Descriptor) schemaInfo.get(DESCRIPTOR_KEY);
DynamicMessage paymentRecordMsg = DynamicMessage.parseFrom(paymentRecordFd, input);
Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
//From concrete PaymentRecord bytes to DynamicMessage
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
ProtobufDatumWriter<DynamicMessage> pbWriter = new ProtobufDatumWriter<>(schema);
pbWriter.write(paymentRecordMsg, encoder);
encoder.flush();
// From dynamic message to GenericRecord
byte[] avroContents = output.toByteArray();
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroContents, null);
return reader.read(null, decoder);
}
}, requiresSideInputs()),
fn((output, c) -> {
Map<String, Object> schemaInfo = cache.get("");
Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
return AvroIO.sink(schema).withCodec(CodecFactory.snappyCodec());
}, requiresSideInputs()))
.withNumShards(5)
.withNaming(new PerWindowFilenames(baseDir, ".avro"))
.to(baseDir.toString()));
我的问题:
- 当我们写入一个 Avro 文件时,突然发生架构更新,现在我们将新架构写入使用旧架构创建的 Avro 文件中,会发生什么情况?
- Dataflow 看到新架构时是否会启动新文件?
- 在创建新文件之前,Dataflow 是否会忽略新架构和附加字段?
每个 Avro 文件在文件的开头都有自己的架构,因此我不确定预期的行为是什么。
最佳答案
now we are writing the new schema into an Avro file created with the old schema
这是不可能的。每个 Avro 文件只有一个架构。如果它发生变化,根据定义,您将写入一个新文件。
我怀疑 Dataflow 会忽略字段。
关于java - 写入 Avro 文件时架构更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59903206/