我正在使用 Apache Beam
从 PubSub 读取消息并将其写入 BigQuery。我想做的是根据输入中的信息写入多个表。为了减少写入量,我对 PubSub 的输入使用窗口化。
一个小例子:
messages
.apply(new PubsubMessageToTableRow(options))
.get(TRANSFORM_OUT)
.apply(ParDo.of(new CreateKVFromRow())
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
// group by key
.apply(GroupByKey.create())
// Are these two rows what I want?
.apply(Values.create())
.apply(Flatten.iterables())
.apply(BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input -> {
// Simplified for readability
Integer destination = (Integer) input.getValue().get("key");
return new TableDestination(
new TableReference()
.setProjectId(options.getProjectID())
.setDatasetId(options.getDatasetID())
.setTableId(destination + "_Table"),
"Table Destination");
}));
我在文档中找不到任何内容,但我想知道每个窗口进行了多少次写入?如果有多个表,是否需要为窗口中的所有元素对每个表进行一次写入?或者是每个元素一次,因为每个表对于每个元素可能不同?
最佳答案
由于您使用 PubSub 作为来源,因此您的工作似乎是一项流媒体工作。因此,默认插入方法是 STREAMING_INSERTS
(请参阅 docs )。我没有看到使用此方法减少写入的任何好处或理由,因为 billig 是基于数据大小的。顺便说一句,您的示例或多或少并没有真正有效地减少写入。
虽然它是一个流作业,但由于某些版本也支持 FILE_LOADS
方法。如果 withMethod
设置为 FILE_LOADS
,您可以在 BigQueryIO
上定义 withTriggeringFrequency
。此设置定义加载作业发生的频率。这里连接器为您处理所有事情,您不需要按键或窗口数据进行分组。将为每个表启动加载作业。
如果您的数据需要一些时间才能进入 BigQuery,那么我建议使用 FILE_LOADS
,因为加载是免费的,而不是流式插入。请注意 quotas当定义触发频率时。
关于java - Apache Beam 使用多个表时有多少写入次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59436563/