java - Apache Beam/Google Dataflow 最后一步仅运行一次

标签 java google-cloud-dataflow apache-beam

我有一个管道,可以下载数千个文件,然后将它们转换并以 CSV 形式存储在 Google 云存储上,然后再在 bigquery 上运行加载作业。

这工作正常,但当我运行数千个加载作业(每个下载的文件一个)时,我达到了导入配额。

我更改了代码,因此它列出了存储桶中的所有文件,并运行一个作业,并将所有文件作为作业的参数。

所以基本上,当所有数据都已处理完毕时,我只需要运行一次最后一步。我想我可以使用 groupBy 转换来确保所有数据都已被处理,但我想知道是否有更好/更标准的方法。

最佳答案

如果我正确理解你的问题,我们可能在我们的一个数据流中遇到类似的问题 - 我们正在点击 'Load jobs per table per day' BigQuery limit由于 GCS 中的每个文件分别触发数据流执行,并且我们的存储桶中有 1000 多个文件。

最后,我们问题的解决方案非常简单 - 我们修改了 TextIO.read 转换以使用通配符而不是单个文件名

i.e TextIO.read().from("gs://<BUCKET_NAME>/<FOLDER_NAME>/**")

通过这种方式,仅执行一个数据流作业,因此,尽管存在多个源,但写入 BigQuery 的所有数据都被视为单个加载作业。

不确定您是否可以应用相同的方法。

关于java - Apache Beam/Google Dataflow 最后一步仅运行一次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49759079/

相关文章:

java - "adapter"放在 MVP(被动 View )中的什么位置?

java - 如何为 JFileChooser 添加文件过滤器

google-cloud-platform - 如何重新启动已取消的Cloud Dataflow流作业?

将批处理数据与存储在 BigTable 中的数据合并

java - 想要并行运行 Apache Beam Pipeline

python - 在 python apache beam 中,是否可以按特定顺序编写元素?

java - 我需要在给定条件下从弹出菜单中隐藏/显示特定项目

java - 找不到类路径的速度模板

java - BigQuery : java. lang.NoClassDefFoundError:com/google/api/gax/retrying/ExceptionRetryAlgorithm

google-cloud-dataflow - 使用 Google DataFlow 将数据直接流式传输到 Cloud SQL 的简单查询