java - 有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以写入 Parquet 文件?

标签 java google-cloud-dataflow apache-beam dataflow

我正在尝试在 Beam/Java 中编写一个数据流作业来处理来自 Pub/Sub 并写入 Parquet 的一系列事件。 Pub/Sub 中的事件采用 JSON 格式,每个事件可以生成一行或多行。我能够编写一个非常简单的示例,编写仅返回 1 条记录的 ParDo 转换。 ParDo 看起来像这样

    static class GenerateRecords extends DoFn<String, GenericRecord> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            final GenericData.Record record = new GenericData.Record(schema);
            String msg = context.element();

            com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);


            context.output(pRecord);
        }
    }

以及管道的写入部分

                .apply("Write to file",
                FileIO.<GenericRecord>
                        write()
                        .via(
                                ParquetIO.sink(schema)
                                        .withCompressionCodec(CompressionCodecName.SNAPPY)
                        )
                        .to(options.getOutputDirectory())
                        .withNumShards(options.getNumShards())
                        .withSuffix("pfile")
                );

我的问题是,如何概括此 ParDo 转换以返回记录列表?我尝试了 List,但不起作用,ParquetIO.sink(schema) 发出“无法通过以下方式解析方法”的警告。

最佳答案

您可以根据需要多次在 DoFn 中调用 context.output()。因此,如果您知道在哪种情况下需要发出多条记录的业务逻辑,那么您只需为每个输出记录调用 context.output(record) 即可。它应该比拥有容器的 PCollection 更简单。

PS:顺便说一句,我有一个 simple example如何使用 ParquetIOAvroCoder 编写 GenericRecord 可能会有所帮助。

关于java - 有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以写入 Parquet 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58721495/

相关文章:

java - 如何使用java中给定的公钥使用rsa/ecb/pkcs1填充模式对文本进行编码?

java - 邮件中HTML数据内容不正确

java - 是否可以在从 Pub/Sub 写入 BigQuery 的 Google Cloud Dataflow 管道中捕获丢失的数据集 java.lang.RuntimeException?

google-cloud-dataflow - 通过 Apache Beam 使用 ParquetIO 读写 parquet 文件的示例

kubernetes - 在Kubernetes的远程Flink集群上运行Apache Beam作业的问题

javascript - 如何将 id ="getdobval"插入输入值?

java - Android:将本地文件保存到下载文件夹并使其可见

google-cloud-dataflow - Python SDK中等效的Apache Beam : DoFn.安装程序

java - 如何在 Apache Beam 中以 byte[] 形式读取文件?

python - Beam Python Dataflow Runner 在 apply_WriteToBigQuery 中使用已弃用的 BigQuerySink 而不是 WriteToBigQuery