python - Airflow 中是否有运算符可以根据 BigQuery 中的查询创建表?

标签 python google-bigquery airflow

我正在寻找类似的东西

CreateBQTableOperator(
    query='select * from my_table',
    output_table='my_other_table'
)

我正在寻找现有的运算符或此类运算符的代码。运算符(operator)应该使用另一个参数来决定是否在重新创建表或将查询附加到当前表之前删除该表(如果该表存在)。

最佳答案

对于 Airflow >= 1.10 以及提供程序,您可以使用 BigQueryInsertJobOperator 此运算符正在使用 JobConfigurationQuery您可以使用 configuration 参数配置 API 支持的任何选项:

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

execute_query_save = BigQueryInsertJobOperator(
    task_id="execute_query_save",
    configuration={
        "query": {
            "query": "select * from my_table",
            "useLegacySql": False,
            "writeDisposition": "WRITE_EMPTY",
            'destinationTable': {
                'projectId': "my-project",
                'datasetId': "my_data_set",
                'tableId': "table2"
            },
        }
    },
)

对于较旧的 Airflow 版本,您可以使用 BigQueryExecuteQueryOperator:

运营商有destination_dataset_table:

from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator

execute_query_save = BigQueryExecuteQueryOperator(
    task_id="execute_query_save",
    sql="SELECT * FROM my_data_set.table1",
    use_legacy_sql=False,
    destination_dataset_table="my_data_set.table2",
    location="southamerica-east1",
    write_disposition="WRITE_EMPTY",
    create_disposition="CREATE_IF_NEEDED",
)

您可以通过设置参数值来控制请求的行为(引用 Google docs 中的值)。

write_disposition 选项为:

WRITE_TRUNCATE:如果表已存在,BigQuery 将覆盖表数据并使用查询结果中的架构。

WRITE_APPEND:如果表已存在,BigQuery 会将数据附加到表中。

WRITE_EMPTY:如果表已存在并包含数据,则作业结果中将返回“重复”错误。

create_disposition 选项是:

CREATE_IF_NEEDED:如果该表不存在,BigQuery 将创建该表。

CREATE_NEVER:该表必须已经存在。如果没有,作业结果中将返回“notFound”错误。

关于python - Airflow 中是否有运算符可以根据 BigQuery 中的查询创建表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68575367/

相关文章:

sql - SQL 中偏移每一行的 Sumproduct

java - 如何在没有不成对的代理字符的情况下将表情符号等字符编码为 UTF8?

snowflake-cloud-data-platform - 将 GCP Snowflake 连接到 Airflow 证书问题

google-cloud-platform - Cloud Composer - 未找到 DAG 任务日志文件

调度程序不会选择 Airflow 清除的回填任务

python - Django相关模型未更新管理中的相关对象

python - 在无堆栈 Python 中,您可以通过 channel 发送 channel 吗?

python - 将多个列表写入 csv。 Python 中的文件

python - 检测不规则图像中的形状

git - 在 GitHub 上搜索具有特定框架和演示的项目