python - 使用 Dataflow 批量插入到 Bigquery

标签 python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam

我正在使用 apache beam 管道,我想使用 python 批量插入到 bigquery。我的数据来自不受限制的 Pub/Sub。根据我的研究,带触发器的 GlobalWindows 应该可以解决我的问题。我用窗口尝试了我的管道,但它仍然进行流式插入。我的管道代码如下:

p2 = (p | 'Read ' >> beam.io.ReadFromPubSub(subscription=subscription_path,
    with_attributes=True,
    timestamp_attribute=None,id_label=None)
       | 'Windowing' >>  beam.WindowInto(window.GlobalWindows(),
           trigger=Repeatedly(
                   AfterAny(
                AfterCount(100),
           AfterProcessingTime(1 * 60))), 
        accumulation_mode=AccumulationMode.DISCARDING)
      | 'Process ' >> beam.Map(getAttributes))
p3 = (p2 | 'Filter ' >> beam.Filter(lambda msg: (("xx" in msg) and (msg["xx"].lower() == "true")))
         | 'Delete ' >> beam.Map(deleteAttribute)
         | 'Write '  >> writeTable(bq_table_test, bq_batch_size))

def writeTable(table_name):
return beam.io.WriteToBigQuery(
    table=table_name,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    batch_size=100)

我正在从账单报告中检查插入内容是批处理还是流处理。当 Streming 插入使用量增加时,我了解到批量插入没有发生。还有其他功能可以检查插入是流式还是批式吗?还有我怎样才能对 bigquery 进行批量插入?

最佳答案

根据documentation您无法指定插入类型,它会根据您的输入自动检测 PCollection:

The Beam SDK for Python does not currently support specifying the insertion method.

BigQueryIO supports two methods of inserting data into BigQuery: load jobs and streaming inserts. Each insertion method provides different tradeoffs of cost, quota, and data consistency. See the BigQuery documentation for load jobs and streaming inserts for more information about these tradeoffs.

BigQueryIO chooses a default insertion method based on the input PCollection.

BigQueryIO uses load jobs when you apply a BigQueryIO write transform to a bounded PCollection.

BigQueryIO uses streaming inserts when you apply a BigQueryIO write transform to an unbounded PCollection.

在您的情况下,您正在从无限制的源 (Pubsub) 中读取,因此在这种情况下它始终是流式写入。开窗不会改变数据的性质。

我能想到的一种解决方法是拆分管道,例如流式管道将写入某个存储 (GCS) 中的文件集合,然后另一个管道将读取并上传这些文件(文件是有界的)。

关于python - 使用 Dataflow 批量插入到 Bigquery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54037337/

相关文章:

python - 为什么XGBoost安装中没有booster参数,而文档中却有?

go - 将服务帐户 key 文件传递给 bigquery 客户端

google-cloud-platform - 如何查看 Google Cloud Function 实例使用的 IP 地址?

google-cloud-platform - 创建新的数据实验室实例时参数无效

google-bigquery - 从 BigQuery 导出数据

python - 父进程退出后让子进程继续运行

javascript - 如何使用 selenium python 从动态网站检索所有链接

python - 设置 App Engine mapreduce 分片大小

google-bigquery - Google Pub/Sub to Dataflow,避免与记录 ID 重复

google-bigquery - Google BigQuery 查询将点 "."转换为逗号 ","