我使用 beam python 库设计了一个 beam/数据流管道。管道大致执行以下操作:
- ParDo:从 API 收集 JSON 数据
- ParDo:转换 JSON 数据
- I/O:将转换后的数据写入 BigQuery 表
通常,代码会执行它应该执行的操作。但是,当从 API 收集大数据集(大约 500.000 个 JSON 文件)时,bigquery 插入作业在使用 DataflowRunner 启动后立即停止(=在一秒内),没有特定的错误消息(它正在与我的 DirectRunner 一起执行电脑)。使用较小的数据集时,一切正常。
数据流日志如下:
2019-04-22 (00:41:29) Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the...
Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the bq tool: "bq show -j --project_id=X dataflow_job_14675275193414385105".
2019-04-22 (00:41:29) Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /Wr...
Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /WriteToBigQuery/NativeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:
beamapp-X-04212005-04211305-sf4k-harness-lqjg,
beamapp-X-04212005-04211305-sf4k-harness-lgg2,
beamapp-X-04212005-04211305-sf4k-harness-qn55,
beamapp-X-04212005-04211305-sf4k-harness-hcsn
按照建议使用 bq cli 工具获取有关 BQ 加载作业的更多信息不起作用。找不到作业(由于即时失败,我怀疑它是否已创建)。
我想我遇到了某种配额/bq 限制,甚至是内存不足问题(参见:https://beam.apache.org/documentation/io/built-in/google-bigquery/)
Limitations BigQueryIO currently has the following limitations.
You can’t sequence the completion of a BigQuery write with other steps of >your pipeline.
If you are using the Beam SDK for Python, you might have import size quota >issues if you write a very large dataset. As a workaround, you can partition >the dataset (for example, using Beam’s Partition transform) and write to >multiple BigQuery tables. The Beam SDK for Java does not have this >limitation as it partitions your dataset for you.
对于如何缩小此问题的根本原因的任何提示,我将不胜感激。
我也想尝试分区 Fn,但没有找到任何 python 源代码示例如何将分区 pcollection 写入 BigQuery 表。
最佳答案
可能有助于调试的一件事是查看 Stackdriver 日志。
如果您在 Google console 中拉起 Dataflow 作业然后单击图形面板右上角的 LOGS
,这应该会打开底部的日志面板。日志面板的右上角有一个指向 Stackdriver 的链接。这将为您提供大量有关您的工作人员/洗牌/等的日志记录信息。对于这个特定的工作。
其中有很多内容,很难过滤掉相关内容,但希望您能找到比A work item was attempted 4 times without success
更有帮助的内容。例如,每个工作人员偶尔会记录其使用的内存量,可以将其与每个工作人员拥有的内存量(基于机器类型)进行比较,以查看他们是否确实内存不足,或者是否发生了您的错误别处。
祝你好运!
关于python - Dataflow BigQuery 插入作业因大数据集而立即失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55794087/