java - Apache Beam 使用多个表时有多少写入次数

标签 java google-bigquery google-cloud-dataflow apache-beam

我正在使用 Apache Beam 从 PubSub 读取消息并将其写入 BigQuery。我想做的是根据输入中的信息写入多个表。为了减少写入量,我对 PubSub 的输入使用窗口化。

一个小例子:

messages
    .apply(new PubsubMessageToTableRow(options))
    .get(TRANSFORM_OUT)
    .apply(ParDo.of(new CreateKVFromRow())
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
    // group by key
    .apply(GroupByKey.create())
    // Are these two rows what I want?
    .apply(Values.create())
    .apply(Flatten.iterables())
    .apply(BigQueryIO.writeTableRows()
          .withoutValidation()
          .withCreateDisposition(CreateDisposition.CREATE_NEVER)
          .withWriteDisposition(WriteDisposition.WRITE_APPEND)
          .withExtendedErrorInfo()
          .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
          .to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input -> {
                                        // Simplified for readability
                                        Integer destination = (Integer) input.getValue().get("key");
                                        return new TableDestination(
                                                new TableReference()
                                                        .setProjectId(options.getProjectID())
                                                        .setDatasetId(options.getDatasetID())
                                                        .setTableId(destination + "_Table"),
                                                "Table Destination");
                                    }));

我在文档中找不到任何内容,但我想知道每个窗口进行了多少次写入?如果有多个表,是否需要为窗口中的所有元素对每个表进行一次写入?或者是每个元素一次,因为每个表对于每个元素可能不同?

最佳答案

由于您使用 PubSub 作为来源,因此您的工作似乎是一项流媒体工作。因此,默认插入方法是 STREAMING_INSERTS(请参阅 docs )。我没有看到使用此方法减少写入的任何好处或理由,因为 billig 是基于数据大小的。顺便说一句,您的示例或多或少并没有真正有效地减少写入。

虽然它是一个流作业,但由于某些版本也支持 FILE_LOADS 方法。如果 withMethod 设置为 FILE_LOADS,您可以在 BigQueryIO 上定义 withTriggeringFrequency。此设置定义加载作业发生的频率。这里连接器为您处理所有事情,您不需要按键或窗口数据进行分组。将为每个表启动加载作业。

如果您的数据需要一些时间才能进入 BigQuery,那么我建议使用 FILE_LOADS,因为加载是免费的,而不是流式插入。请注意 quotas当定义触发频率时。

关于java - Apache Beam 使用多个表时有多少写入次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59436563/

相关文章:

google-play - 尝试将Google Play统计信息加载到Google BigQuery时出错

google-bigquery - 选择分组依据不起作用

python - 在 python Google Cloud Dataflow 中通过 bigquery reader 读取行时出现 AssertError

python - 是否可以在 Apache Beam 或 google Cloud Dataflow 中运行自定义 Python 脚本

java - 在不同的 jcombobox 中选择后如何设置 jcombobox 的不同字符串数组?

java - 在字符串中查找重复的单词并计算重复次数

java - uri.getQueryParameter() 返回空值

Java多线程文件下载性能

google-bigquery - 使用 GROUP BY 计算组的百分比

database - 计算 TB 数据集中分位数的高效算法