python - 我们如何使用 BigQueryCreateEmptyTableOperator 创建包含 "Clustered by"列的表?

标签 python google-bigquery airflow

我正在尝试使用 Python 脚本在我的 GCP Composer 任务中使用 BigQueryCreateEmptyTableOperator 和 cluster_by 列创建一个表。我正在使用“cluster_fields”创建列,但它不起作用。请建议我正确的方法是什么?

下面是我正在使用的代码。

stop_op = BigQueryCreateEmptyTableOperator(
        task_id='BigQuery_CreateTable',
        dataset_id=dataset_nm,
        table_id=table_nm,
        project_id=project_nm,
        schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                       {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
                       {"name": "distribution_name", "type": "STRING", "mode": "NULLABLE"},
                       {"name": "transaction_date", "type": "DATE", "mode": "NULLABLE"}],
    time_partitioning={'type': 'DAY', 'field': 'transaction_date'},
    cluster_fields='distribution_name',
    bigquery_conn_id='bigquery_default',
    google_cloud_storage_conn_id='bigquery_default',
    autodetect=True,
    dag=dag
)

最佳答案

目前,最新版本的 Airflow 中不提供此功能(撰写此答案时为 1.10.5)。

但是,您可以按如下方式创建一个新运算符并使用它。

from airflow.utils.decorators import apply_defaults 
from airflow.contrib.hooks.bigquery_hook import BigQueryHook 
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook, _parse_gcs_url
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator 
import json

class BQCreateEmptyTableWithClusteredFieldsOp(BigQueryCreateEmptyTableOperator):
    template_fields = ('dataset_id', 'table_id', 'project_id',
                       'gcs_schema_object', 'labels')
    ui_color = '#f0eee4'

    # pylint: disable=too-many-arguments
    @apply_defaults
    def __init__(self,
                 dataset_id,
                 table_id,
                 project_id=None,
                 schema_fields=None,
                 gcs_schema_object=None,
                 time_partitioning=None,
                 bigquery_conn_id='bigquery_default',
                 google_cloud_storage_conn_id='google_cloud_default',
                 delegate_to=None,
                 labels=None,
                 encryption_configuration=None,
                 cluster_fields=None,
                 *args, **kwargs):

        super(BigQueryCreateEmptyTableOperator, self).__init__(*args, **kwargs)

        self.project_id = project_id
        self.dataset_id = dataset_id
        self.table_id = table_id
        self.schema_fields = schema_fields
        self.gcs_schema_object = gcs_schema_object
        self.bigquery_conn_id = bigquery_conn_id
        self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
        self.delegate_to = delegate_to
        self.time_partitioning = {} if time_partitioning is None else time_partitioning
        self.labels = labels
        self.encryption_configuration = encryption_configuration
        self.cluster_fields = cluster_fields or []

    def execute(self, context):
        bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                               delegate_to=self.delegate_to)

        if not self.schema_fields and self.gcs_schema_object:

            gcs_bucket, gcs_object = _parse_gcs_url(self.gcs_schema_object)

            gcs_hook = GoogleCloudStorageHook(
                google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                delegate_to=self.delegate_to)
            schema_fields = json.loads(gcs_hook.download(
                gcs_bucket,
                gcs_object).decode("utf-8"))
        else:
            schema_fields = self.schema_fields

        conn = bq_hook.get_conn()
        cursor = conn.cursor()

        cursor.create_empty_table(
            project_id=self.project_id,
            dataset_id=self.dataset_id,
            table_id=self.table_id,
            schema_fields=schema_fields,
            time_partitioning=self.time_partitioning,
            labels=self.labels,
            cluster_fields=self.cluster_fields,
            encryption_configuration=self.encryption_configuration
        )

现在您可以按如下方式使用它:

stop_op = BQCreateEmptyTableWithClusteredFieldsOp(
        task_id='BigQuery_CreateTable',
        dataset_id=dataset_nm,
        table_id=table_nm,
        project_id=project_nm,
        schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                       {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
                       {"name": "distribution_name", "type": "STRING", "mode": "NULLABLE"},
                       {"name": "transaction_date", "type": "DATE", "mode": "NULLABLE"}],
    time_partitioning={'type': 'DAY', 'field': 'transaction_date'},
    cluster_fields='distribution_name',
    bigquery_conn_id='bigquery_default',
    google_cloud_storage_conn_id='bigquery_default',
    autodetect=True,
    dag=dag
)

关于python - 我们如何使用 BigQueryCreateEmptyTableOperator 创建包含 "Clustered by"列的表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57955515/

相关文章:

python - 如何在 Python 循环中更改 for 循环迭代器变量?

kubernetes - Apache Airflow 或 Argoproj 用于在 kubernetes 上长时间运行和 DAG 任务

python - 将 Airflow 触发的 DAG 标记为失败/成功会影响 DAG 的所有运行

python - 使用数组结构将 DataFrame 上传到 BigQuery

google-app-engine - Go Bigquery 上的 DEADLINE_EXCEEDED

python - BashOperator 不运行 bash 文件 apache Airflow

python - 使用 virtualenv 时以 sudo 身份运行脚本

python - 无法将 yfinance 中的数据保存到 CSV 文件中

python - Bash 脚本在 python 变量上循环

google-bigquery - BigQuery 记录类型缺失