python - 从 Airflow 中的 BigQueryOperator 获取结果

标签 python google-bigquery airflow

我正在尝试使用 Airflow 从 BigQueryOperator 获取结果,但我找不到执行此操作的方法。我尝试调用 bq_cursor 成员(在 1.10 中可用)中的 next() 方法,但它返回 None。这是我尝试过的方式

import datetime
import logging

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time()
)

def MyChequer(**kwargs):
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql='select count(*) from mydataset.mytable'
    )

    big_query_count.execute(context=kwargs)

    logging.info(big_query_count)
    logging.info(big_query_count.__dict__)
    logging.info(big_query_count.bq_cursor.next())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'project_id': 'myproject'
}

with models.DAG(
        'bigquery_results_execution',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    myoperator = python_operator.PythonOperator(
        task_id='threshold_operator',
        provide_context=True,
        python_callable=MyChequer
    )

    # Define DAG
    myoperator

查看 bigquery_hook.pybigquery_operator.py这似乎是获取结果的唯一可用方法。

最佳答案

每当我需要从 BigQuery 查询中获取数据并将其用于某些事情时,我都会使用 BigQuery Hook 创建自己的运算符。我通常将其称为 BigQueryToXOperator,我们有很多用于将 BigQuery 数据发送到其他内部系统的运算符.

例如,我有一个 BigQueryToPubSub 运算符,您可能会发现它可以作为一个有用的示例,说明如何查询 BigQuery,然后逐行处理结果,并将它们发送到 Google PubSub。请考虑以下通用示例代码,了解如何自行执行此操作:

class BigQueryToXOperator(BaseOperator):
    template_fields = ['sql']
    ui_color = '#000000'

    @apply_defaults
    def __init__(
            self,
            sql,
            keys,
            bigquery_conn_id='bigquery_default',
            delegate_to=None,
            *args,
            **kwargs):
        super(BigQueryToXOperator, self).__init__(*args, **kwargs)
        self.sql = sql
        self.keys = keys # A list of keys for the columns in the result set of sql
        self.bigquery_conn_id = bigquery_conn_id
        self.delegate_to = delegate_to


    def execute(self, context):
        """
        Run query and handle results row by row.
        """
        cursor = self._query_bigquery()
        for row in cursor.fetchall():
            # Zip keys and row together because the cursor returns a list of list (not list of dicts)
            row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')

            # Do what you want with the row...
            handle_row(row_dict)


    def _query_bigquery(self):
        """
        Queries BigQuery and returns a cursor to the results.
        """
        bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                          use_legacy_sql=False)
        conn = bq.get_conn()
        cursor = conn.cursor()
        cursor.execute(self.sql)
        return cursor

关于python - 从 Airflow 中的 BigQueryOperator 获取结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53565834/

相关文章:

python - 如何使用不同版本的 Python 运行脚本

python - 将代理模型实例分配给外键

python - 如何获取 Atom 的脚本包以从脚本的当前工作目录运行脚本?

docker-compose env 文件在 yml 中工作但不使用命令行参数

python - 在 BranchPython Operator 之后跳过 Airflow 2.0 任务

python - 无法在Raspberry Pi 3上通过pip3安装opencv-python

java - 如何从 BigQuery 检索数据并将数据插入到 mySQL?

java - 如何访问 BigQuery 结果集的 cacheHit 属性?

google-bigquery - 在 Big Query Standard SQL 中将 AM/PM 转换为 24 小时制

airflow - dag 中 Airflow 任务的状态