python - 将返回值从运算符传递给 Airflow 中的后续运算符

标签 python airflow google-cloud-composer

我尝试向 GoogleCloudStorageToBigQueryOperatorsource_objects 字段提供一个字符串列表,但使用以下代码时出现错误:

string indices must be integers, not unicode

我不知道的事情:

  • 如何在 DAG 范围内使用 XCOM 获取 get_file_name返回值?
  • 如何在 DAG 范围内调用 xcom_pull 函数而无需提供上下文?在我看来,任务实例不需要提供上下文。

我想到的事情:

  • 重写运算符并采用 XCOM 作为参数

我想做的事:

  • 我希望能够调用接线员

此外,似乎某些运算符的字段使用了名为 templated_field 的功能,模板字段背后的机制是什么?不仅仅适用于 PythonOperatorBashOperator

最后一个问题,为什么 PythonOperator 不返回 TaskInstance

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        fileName = kwargs['dag_run'].conf['fileName']
        return [fileName]

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        #source_objects=['data.csv'], 
        source_objects=get_file_name.xcom_pull(context='', task_ids='get_file_name'), 
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_EMPTY')

    bq_load.set_upstream(get_file_name)

我对 Python 和 Airflow 有点陌生。我想这些事情应该是微不足道的。我确信我在这里误解了一些东西。

最佳答案

经过多次测试,我想出了这个解决方案,感谢 tobi6 的评论,为我指明了正确的方向。我必须使用template_fields功能。

当我尝试返回包含单个字符串的列表时,出现连接错误,因此我必须在 XCOM 中返回单个字符串,并用方括号将模板调用括起来,以使结果成为列表。

这是最终的代码:

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        return kwargs['dag_run'].conf['fileName']

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        source_objects=["{{ task_instance.xcom_pull(task_ids='get_file_name') }}"],
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_APPEND')

    bq_load.set_upstream(get_file_name)

关于python - 将返回值从运算符传递给 Airflow 中的后续运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51892190/

相关文章:

python - Python 列表的奇怪行为

Python numpy 方法/属性比 numpy 函数更快?

python - 使用 Apache Airflow 执行包含 PySpark 代码的 Databricks Notebook

google-cloud-platform - 如何使用 Python 使用 Google Data Prep API

google-cloud-platform - Airflow 或 Composer 上的用户事件日志?

Python 类型 long 与 C 'long long'

python - 如何让球像打乒乓球一样保持移动?

python - Airflow - 按执行日期查找特定 dag id 的 dag 运行,无需时间

airflow - 我们应该让 Airflow 调度程序运行多长时间?

python - Google 平台上的 Composer 不适用于 Python 3