python - 在 Apache Airflow 中保存算子的结果

标签 python python-3.x airflow

几个运算符允许提取数据,但我从未设法使用结果。

例如:
https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_get_data.py

该运算符可以按如下方式调用:

get_data = BigQueryGetDataOperator(
      task_id='get_data_from_bq',
      dataset_id='test_dataset',
      table_id='Transaction_partitions',
      max_results='100',
      selected_fields='DATE',
      bigquery_conn_id='airflow-service-account'
      )

然而,get_data 是 DAG 类型,但第 116 行说“返回 table_data”。
需要明确的是,运算符(operator)工作并检索数据,我只是不明白如何使用数据检索/数据所在的位置。

如何使用上面的“get_data”获取数据?

最佳答案

您将使用的方式 get_data是在下一个任务可以是PythonOperator然后您可以使用它来处理数据。

get_data = BigQueryGetDataOperator(
      task_id='get_data_from_bq',
      dataset_id='test_dataset',
      table_id='Transaction_partitions',
      max_results='100',
      selected_fields='DATE',
      bigquery_conn_id='airflow-service-account'
      )

def process_data_from_bq(**kwargs):
      ti = kwargs['ti']
      bq_data = ti.xcom_pull(task_ids='get_data_from_bq')
      # Now bq_data here would have your data in Python list
      print(bq_data)

process_data = PythonOperator(
      task_id='process_data_from_bq',
      python_callable=process_bq_data,
      provide_context=True
      )

get_data >> process_data

PS:我是BigQueryGetDataOperator的作者和 Airflow 提交者/PMC

关于python - 在 Apache Airflow 中保存算子的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53960327/

相关文章:

python - Python2.7、3.4 的 pip 安装/升级错误

python - 在 python 中使用 pop 时获取索引超出范围错误

python - 使用df到excel时不断出现多页错误,只有1页可以正常工作

python - 如何减小从 Pydub 导出的 .wav 文件的大小

airflow - 有人可以帮我启动 apache Airflow 吗?

python-2.7 - 为什么任务在 trigger_dag 之后在 Airflow 1.10.2 中停留在 None 状态

python - 如何向 Airflow 添加新的 DAG?

python - docker-compose exec web python manage.py makemigrations问题

python - 在 Windows 上使用 IDLE 安装 python 模块/包

python-3.x - ImportError: libtinfo.so.5: 无法打开共享对象文件: 没有这样的文件或目录