google-bigquery - 使用bigquery运算符设置 Airflow

标签 google-bigquery airflow

我正在试验用于数据管道的 Airflow 。不幸的是,到目前为止,我无法使其与bigquery运算符一起使用。我一直在寻找一种力所能及的解决方案,但我仍然受困。.我正在使用在本地运行的顺序执行程序。

这是我的代码:

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['example@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    dag_id='bigQueryPipeline', 
    default_args=default_args, 
    schedule_interval=timedelta(1)
)

t1 = BigQueryOperator(
    task_id='bigquery_test',
    bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
    destination_dataset_table=False,
    bigquery_conn_id='bigquery_default',
    delegate_to=False,
    udf_config=False,
    dag=dag,
)

错误信息:
[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
  File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
    args.func(args)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
    ti.run(force=True, ignore_dependencies=True, test_mode=True)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
    conn = hook.get_conn()
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
    project = connection_extras['project']

最佳答案

花了我一段时间才终于找到它,因为它没有被非常清楚地记录下来。在 Airflow 用户界面中,转到“管理”->“连接”。该连接ID是参数bigquery_connection_id所引用的。您必须在“extras”字段中添加一个json对象,该对象定义了一对k,v对的“project”:“”

如果您尚未在运行Airflow的盒子上明确授权某个帐户,则还必须为“service_account”和“key_path”添加 key 。 (gcloud身份验证)

关于google-bigquery - 使用bigquery运算符设置 Airflow ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39178629/

相关文章:

mysql - 大查询转置

python - 编排小型 Python 任务的最佳实践(大多数在 BigQuery 中执行 SQL)

python - 重置 Airflow DAG 执行时间

airflow - 任务重试次数超过Airflow中指定的重试次数

python - Airflow :将 {{ ds }} 作为参数传递给 PostgresOperator

database - 谷歌大查询 : SUM returns different results even on SUM alias changes

google-bigquery - 创建 View 失败。未知 TVF : myFunc

python - BigQuery API 查询作业针对现有数据集返回 404

airflow - Apache Airflow/ Composer : how to connect to https using http connector with untrusted certificate

airflow - 如何检查 Airflow 测试的输出?