python - 数据流: update BigQuery rows with python pipeline

标签 python google-bigquery google-cloud-dataflow apache-beam

想象一个简单的 Google 数据流管道。在此管道中,您使用 apache beam 函数从 BQ 读取数据,并且根据返回的 pcollection,您必须更新这些行

Journeys = (p
                    | 'Read from BQ' >> beam.io.Read(
                    beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))

Update = ( Journeys
                   | 'Updating Journey Table' >> beam.Map(UpdateBQ))

Write = (Journeys
                    | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))

此管道的问题是,当您读取表(beam.Map)时,会对返回的 pcollection 中的每个项目执行 UpdateBQ

<小时/>

哪一种是对 BigQuery 表执行更新的更好方法?

我想这可以在不使用beam.Map的情况下完成,并且只执行并更新一次处理所有输入pcollection。

<小时/>

额外

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD = "#standardSQL"
    QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    <...>
<小时/>

可能的解决方案

with beam.Pipeline(options=options) as p:
    Journeys = (p
                | 'Read from BQ' >> beam.io.Read(
                beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
                )

    Write = (Journeys
                | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))


UpdateBQ();

最佳答案

从 BQ 读取数据后,您是否使用光束管道进行进一步的转换?或者这只是您在代码中显示的方式,即从 BQ 读取,然后在 BQ 中触发更新命令?在这种情况下,您根本不需要光束。只需使用 BQ 查询即可使用另一个表更新表中的数据。 BQ best practices建议避免一次插入/更新单行。

关于python - 数据流: update BigQuery rows with python pipeline,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53629480/

相关文章:

java - 谷歌云数据流对压缩文件的处理发生变化

java - 使用 Google Cloud Dataflow 的 Java API for Datastore 将属性设置为 null?

python - 继承模型元类的正确语法?

python - 在 Python 2.7 中模拟按键事件

python - 更快地刷新 sqlalchemy 表模型中的数据

python - 如何使用 Python 删除 CSV 文件的第二行

sql - 当我尝试取消嵌套数组字段时,BigQuery 中出现 "Column name id is ambiguous"错误

python - 如何将数据帧从 Cloud Datalab 导出到 BigQuery 表?

google-bigquery - 使用 require_partition_filter 列出 BigQuery 分区表中的所有分区

python - 使用 PYTHON 运行 Google 数据流模板