google-cloud-dataflow - 通过 Dataflow 从 Google Cloud Storage 读取大型 gzip JSON 文件到 BigQuery

标签 google-cloud-dataflow

我正在尝试从 Google Cloud Storage (GCS) 读取大约 90 个 gzipped JSON 日志文件,每个大约 2GB(未压缩 10GB),解析它们,然后通过 Google 将它们写入日期分区表到 BigQuery(BQ)云数据流 (GCDF)。

每个文件包含 7 天的数据,整个日期范围约为 2 年(730 天,并且还在增加)。我当前的管道如下所示:

p.apply("Read logfile", TextIO.Read.from(bucket))
 .apply("Repartition", Repartition.of())
 .apply("Parse JSON", ParDo.of(new JacksonDeserializer()))
 .apply("Extract and attach timestamp", ParDo.of(new ExtractTimestamps()))
 .apply("Format output to TableRow", ParDo.of(new TableRowConverter()))
 .apply("Window into partitions", Window.into(new TablePartWindowFun()))
 .apply("Write to BigQuery", BigQueryIO.Write
         .to(new DayPartitionFunc("someproject:somedataset", tableName))
         .withSchema(TableRowConverter.getSchema())
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Repartition 是我在尝试制作管道时内置的东西 reshuffle after decompressing ,我尝试过使用和不使用它来运行管道。按照建议通过 Jackon ObjectMapper 和相应的类解析 JSON here . TablePartWindowFun 取自 here , 它用于为 PCollection 中的每个条目分配一个分区。

管道适用于较小的文件而不是太多文件,但对于我的真实数据集会中断。我选择了足够大的机器类型并尝试设置最大工作人员数量,以及使用自动缩放至最多 100 台 n1-highmem-16 机器。我已经尝试过流式和批处理模式以及每个工作人员 250 到 1200 GB 的 disSizeGb 值。

目前我能想到的可能的解决方案是:

  1. 解压缩 GCS 上的所有文件,从而在工作人员之间启用动态工作拆分,因为无法利用 GCS 的 gzip transcoding
  2. 在一个循环中构建“许多”并行管道,每个管道仅处理 90 个文件的一个子集。

选项 2 在我看来像是“围绕”框架进行编程,还有其他解决方案吗?

附录:

使用 Repartition after Reading 以批处理模式处理 gzip JSON 文件,最多 100 个工作人员(类型为 n1-highmem-4),管道运行大约一个小时,有 12 个工作人员,并完成读取以及 Repartition 的第一阶段。然后它扩展到 100 个 worker 并处理重新分区的 PCollection。完成后图形如下所示:

Write to BQ Service Graph

有趣的是,到了这个阶段,先是处理到150万个element/s,然后进度降到0。图中GroupByKey步骤OutputCollection的size从3亿左右先升后降到 0(总共有大约 18 亿个元素)。像是在丢弃什么。此外,ExpandIterableParDo(Streaming Write) run-time 最后为 0。图片显示它在“向后”运行之前略微显示。 在 worker 的日志中,我看到一些 exception thrown while executing request来自 com.google.api.client.http.HttpTransport 的消息记录器,但我无法在 Stackdriver 中找到更多信息。

读取后没有重新分区 管道失败使用 n1-highmem-2在完全相同的步骤(GroupByKey 之后的所有内容)出现内存不足错误的实例 - 使用更大的实例类型会导致异常,如

java.util.concurrent.ExecutionException: java.io.IOException: 
CANCELLED: Received RST_STREAM with error code 8 dataflow-...-harness-5l3s 
talking to frontendpipeline-..-harness-pc98:12346

最佳答案

感谢 Google Cloud Dataflow 团队的 Dan 和他提供的示例 here ,我能够解决这个问题。我所做的唯一更改:

  • 在 175 =(25 周)大块的日子里循环,一个接一个地运行管道,以免系统不堪重负。在循环中,确保重新处理上一次迭代的最后一个文件,并以与基础数据相同的速度(175 天)向前移动 startDate。当使用 WriteDisposition.WRITE_TRUNCATE 时, block 末尾的不完整日期将以这种方式被正确的完整数据覆盖。

  • 使用上面提到的 Repartition/Reshuffle 转换,在读取 gzip 文件后,加快进程并允许更平滑的自动缩放

  • 使用 DateTime 而不是 Instant 类型,因为我的数据不是 UTC

更新(Apache Beam 2.0):

随着 Apache Beam 2.0 的发布,解决方案变得更加容易。现在支持对 BigQuery 输出表进行分片 out of the box .

关于google-cloud-dataflow - 通过 Dataflow 从 Google Cloud Storage 读取大型 gzip JSON 文件到 BigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42246396/

相关文章:

python - 通过 Airflow 中的 PythonVirtualenvOperator 成功运行多次数据流管道

google-cloud-dataflow - 如何根据处理的元素数量动态触发窗口?

google-cloud-dataflow - 谷歌数据流 Apache Beam

java - 通过 Google DataFlow Transformer 查询关系数据库

go - 当前用于 Google Dataflow 的 GoLang SDK 是否支持自动缩放和并行处理?

java - 谷歌数据流: Read unbound PCollection from Google Cloud Storage

google-cloud-platform - 如何以 CSV 格式将数据从 Bigquery 导出到外部服务器?

google-cloud-dataflow - com.google.common.base.Preconditions.checkState 的 NoSuchMethodError

python - 如何使用 Apache Beam 在 Python 中将有界 pcollection 转换为无界?

eclipse - Eclipse 中的数据流 : Cannot run program "gcloud"