java - 写入 Avro 文件时架构更新

标签 java google-cloud-dataflow avro apache-beam dataflow

上下文: 我们有一个 Dataflow 作业,它将 PubSub 消息转换为 Avro GenericRecords,并将它们作为“.avro”写入 GCS。 PubSub 消息和 GenericRecords 之间的转换需要一个模式。此架构每周都会更改,仅添加字段。我们希望能够在不更新数据流作业的情况下更新字段。

我们做了什么: 我们听取了this post的建议并创建了一个 Guava Cache,每分钟刷新一次内容。刷新功能将从 GCS 中提取模式。然后,我们让 FileIO.write 查询 Guava Cache 以获取最新的架构,并将具有该架构的元素转换为 GenericRecord。我们还有 FileIO.write 输出到 Avro 接收器,该接收器也是使用该架构创建的。

代码如下:

genericRecordsAsByteArrays.apply(FileIO.<byte[]>write()
    .via(fn((input, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Descriptors.Descriptor paymentRecordFd =
              (Descriptors.Descriptor) schemaInfo.get(DESCRIPTOR_KEY);
          DynamicMessage paymentRecordMsg = DynamicMessage.parseFrom(paymentRecordFd, input);
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);

          //From concrete PaymentRecord bytes to DynamicMessage
          try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
            ProtobufDatumWriter<DynamicMessage> pbWriter = new ProtobufDatumWriter<>(schema);
            pbWriter.write(paymentRecordMsg, encoder);
            encoder.flush();

            // From dynamic message to GenericRecord
            byte[] avroContents = output.toByteArray();
            DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroContents, null);
            return reader.read(null, decoder);
          }
        }, requiresSideInputs()),
        fn((output, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
          return AvroIO.sink(schema).withCodec(CodecFactory.snappyCodec());
        }, requiresSideInputs()))
    .withNumShards(5)
    .withNaming(new PerWindowFilenames(baseDir, ".avro"))
    .to(baseDir.toString()));

我的问题:

  1. 当我们写入一个 Avro 文件时,突然发生架构更新,现在我们将新架构写入使用旧架构创建的 Avro 文件中,会发生什么情况?
  2. Dataflow 看到新架构时是否会启动新文件?
  3. 在创建新文件之前,Dataflow 是否会忽略新架构和附加字段?

每个 Avro 文件在文件的开头都有自己的架构,因此我不确定预期的行为是什么。

最佳答案

now we are writing the new schema into an Avro file created with the old schema

这是不可能的。每个 Avro 文件只有一个架构。如果它发生变化,根据定义,您将写入一个新文件。

我怀疑 Dataflow 会忽略字段。

关于java - 写入 Avro 文件时架构更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59903206/

相关文章:

google-cloud-dataflow - 带有行号的 Apache Beam TextIO.Read

google-cloud-dataflow - 具体版本 : PubSub/Dataflow acknowledgement of unbounded data

java - 将 XML 转换为 Avro 并生成 AVRO 架构

java - Android 错误 : You must specifiy a valid layout reference. 布局 ID。 eclipse 中

Java 简单的 "isNull"/"isNotNull"函数有什么用?

java - 为什么将内部类注入(inject)同一个外部类中的另一个内部类不起作用?

java - 无法使用 Java 通过 CLI 命令创建 Google 云数据流模板

avro - 如何在 Avro 中将记录与 map 混合?

java - 目标 org.apache.avro :avro-maven-plugin:1 . 10.2 的执行默认值:架构失败:字段默认值无效

java - Esper 中 OUTPUT WHEN 和 WHERE 的区别