google-bigquery - Apache 光束 : Transform an objects having a list of objects to multiple TableRows to write to BigQuery

标签 google-bigquery google-cloud-dataflow apache-beam apache-beam-io

我正在使用 Beam 管道来处理 json 并将其写入 bigquery。 JSON 是这样的。

{
"message": [{
    "name": "abc",
    "itemId": "2123",
    "itemName": "test"

}, {
    "name": "vfg",
    "itemId": "56457",
    "itemName": "Chicken"
}],
"publishDate": "2017-10-26T04:54:16.207Z"

我使用 Jackson 将其解析为以下结构。

class Feed{
List<Message> messages; 
TimeStamp  publishDate;

}

public class Message implements Serializable{

/**
 * 
 */
private static final long serialVersionUID = 1L;
private String key;
private String value;

private Map<String, String> eventItemMap = new HashMap<>();
this property translate the list of map as a single map with all the key-value pair together. because,  the messages property will be parsed as list of HashMap objets for each key/value. This will be translated to a single map. 

现在在我的管道中,我会将集合转换为

PCollection<KV<String, Feed>>

根据类中的属性将其写入不同的表。我写了一个转换来做到这一点。 需求是根据消息对象的数量创建多个 TableRow。我在 JSON 中还有一些属性以及将添加到 tableRow 和每个消息属性的 publishDate。 所以表格如下。

id, name, field1, field2, message1.property1, message1.property2...

id, name, field1, field2, message2.property1, message2.property2...

我尝试创建以下转换。但是,不确定它将如何根据消息列表输出多行。

private class BuildRowListFn extends DoFn<KV<String, Feed>, List<TableRow>> {

    @ProcessElement
    public void processElement(ProcessContext context) {
        Feed feed = context.element().getValue();

        List<Message> messages = feed.getMessage();
        List<TableRow> rows = new ArrayList<>();
        messages.forEach((message) -> {
            TableRow row = new TableRow();
            row.set("column1", feed.getPublishDate());
            row.set("column2", message.getEventItemMap().get("key1"));
            row.set("column3", message.getEventItemMap().get("key2"));
            rows.add(row);
        }

        );

    }

但是,这也将是一个列表,我将无法应用 BigQueryIO.write 转换。


根据“Eugene”又名@jkff 的评论更新

感谢@jkff。现在,我已经按照您在第二段中提到的那样更改了代码。 messages.forEach 中的 context.output(row),在将表格行设置为之后

List<Message> messages = feed.getMessage();
        messages.forEach((message) -> {
            TableRow row = new TableRow();
            row.set("column2", message.getEventItemMap().get("key1"));
            context.output(row);
            }

现在,当我尝试将此集合写入 BigQuery 时,如

rows.apply(BigQueryIO.writeTableRows().to(getTable(projectId, datasetId, tableName)).withSchema(getSchema())
                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND));

我收到以下异常。

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at com.chefd.gcloud.analytics.pipeline.MyPipeline.main(MyPipeline.java:284)


Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:759)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)

请帮忙。

谢谢。

最佳答案

您似乎假设 DoFn每个元素只能输出一个值。事实并非如此:它可以为每个元素输出任意数量的值——无值、一个值、多个值等。DoFn甚至可以output values to multiple PCollection's .

在您的情况下,您只需调用 c.output(row)对于 @ProcessElement 中的每一行方法,例如:rows.forEach(c::output) .当然,您还需要更改 DoFn 的类型至 DoFn<KV<String, Feed>, TableRow> ,因为其输出中元素的类型 PCollectionTableRow , 不是 List<TableRow> - 您只是为每个输入元素将多个元素生成到集合中,但这不会改变类型。

另一种方法是做你目前所做的,也做 c.output(rows)然后申请 Flatten.iterables()压平 PCollection<List<TableRow>>进入PCollection<TableRow> (您可能需要将 List 替换为 Iterable 才能使其正常工作)。但另一种方法更简单。

关于google-bigquery - Apache 光束 : Transform an objects having a list of objects to multiple TableRows to write to BigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46947335/

相关文章:

python-3.x - 如何从 Dataflow 批量(有效)发布到 Pub/Sub?

java - 在数据流模板中调用 waitUntilFinish() 后可以运行代码吗?

python - 在美国位置未找到数据集

java - BigQueryIO 读取获取 TableSchema

在 Google bigquery 的 On 子句中使用 <= 进行连接

google-bigquery - BigQuery : How to overwrite a table with bigquery. Client()。copy_table方法

sql - 在 Google Dataflow 上使用 JdbcIO 的吞吐量非常低

google-bigquery - BigQuery 中跨多个数组列的高级 UNNEST

google-cloud-dataflow - 从数据流加载 Bigquery 表时,我们如何设置 maximum_bad_records?

java - 如何使用 avro 在 parquet 文件模式中创建 REPEATED 类型?