python - 写入 BigQuery 动态表名 Python SDK

标签 python google-bigquery google-cloud-dataflow apache-beam

我正在研究一个 ETL,它从数据库中提取数据,进行较小的转换并输出到 BigQuery。我使用 Python SDK 在 Apache Beam 2.26.0 中编写了我的管道。我正在加载十几个表,并将它们的名称作为参数传递给 beam.io.WriteToBigQuery

现在,文档说 ( https://beam.apache.org/documentation/io/built-in/google-bigquery ):

When writing to BigQuery, you must supply a table schema for the destination table that you want to write to, unless you specify a create disposition of CREATE_NEVER.

我相信这不完全正确。在我的测试中,我发现这种情况仅在传递静态表名时

如果你有一堆表并想将表名作为参数传递,那么它会抛出一个错误:

ErrorProto message: 'No schema specified on job or table.'

我的代码:

    bq_data | "Load data to BQ" >> beam.io.WriteToBigQuery(
                      table=lambda row: bg_config[row['table_name']],
                      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                      create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
                  )

bq_data 是 pandas 数据框行的字典。我有一列 table_name。 bq_config 是一个字典,其中 key = row['table_name'] 值的格式为:

[project_id]:[dataset_id].[table_id]

有人对此有什么想法吗?

最佳答案

看看这个thread ,我在那里解决了。简而言之;在执行 python BigQuery API 请求之前,我使用了内部 python 时间/日期函数来呈现变量。

关于python - 写入 BigQuery 动态表名 Python SDK,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65612939/

相关文章:

google-apps-script - Google Apps 脚本和 Bigquery - tabledata.insertAll

google-bigquery - BIGQUERY SELECT 列表表达式引用了 CHANNEL_ID 列,该列既未在 [10 :13] 处分组也未聚合

google-cloud-dataflow - 跨多个 DataFlow 作业/管道共享实例 - 可能吗?

google-cloud-dataflow - GroupBy/Combine 之后如何创建数据流包?

python - Pandas 如何对 MultiIndex 进行条件选择

Python 比较日期范围列表

python - 无法使用 BigQuery Python API 设置目标表

python - 如何按值删除列表中的几个元素?

python - 列表python中的相对大小

python - 如何在 Apache Beam (Python) 中通过键在静态查找表上以流模式加入 PCollection