Airflow - xcom 值访问自定义操作符

标签 airflow

自过去 6 个月以来,我一直在使用 Airlfow。我很高兴在 Airflow 中定义工作流程。
我有以下场景,我无法获得 xcom 值(以黄色突出显示)。

请在以下示例代码中找到代码:

工作流程

def push_function(**context):
context['ti'].xcom_push(key='reportid', value='xyz')

dummy_operator = DummyOperator(
task_id='Start',
dag=main_dag
)

push_function_task = PythonOperator(
    task_id='push_function',
    provide_context=True,
    python_callable=push_function,
    op_kwargs={},
    dag=main_dag)


push_function_task .set_upstream(dummy_operator)

custom_task = CustomOperator(
        dag=main_dag,
        task_id='import_data',
        provide_context=True,
        url="http://www.google.com/{}".format("{{task_instance.xcom_pull(task_ids='push_function')}}")

     )

custom_task .set_upstream(push_function_task)

备注 :
1. CustomOperator 是我自己的操作符,用于下载给定 URL 的数据

请帮我。

谢谢,
萨曼思

最佳答案

我相信您在推拉 XCom 时按键不匹配。每个 XCom 值都与 DAG ID、任务 ID 和 key 相关联。如果您正在插入 report_id关键,那么你也需要用它来拉。

请注意,如果未将 key 指定给 xcom_pull() ,它使用默认值 return_value .这是因为如果一个任务返回一个结果,Airflow 会自动将其推送到 return_value 下的 XCom。 key 。

这为您提供了两种解决问题的选项:

1)继续推送到report_id key ,并确保你也从中拉出

def push_function(**context):
    context['ti'].xcom_push(key='reportid', value='xyz')

...

custom_task = CustomOperator(
    ...
    url="http://www.google.com/{}".format("{{ task_instance.xcom_pull(task_ids='push_function', key='reportid') }}")
)

2) 有 push_function()将您要推送到 XCom 的值返回,然后从默认 key 中提取。
def push_function(**context):
    return 'xyz'

...

custom_task = CustomOperator(
    ...
    url="http://www.google.com/{}".format("{{ task_instance.xcom_pull(task_ids='push_function') }}")
)

关于Airflow - xcom 值访问自定义操作符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51999742/

相关文章:

airflow - 无法导入 Airflow 插件

airflow - 解释depends_on_past 功能

python - 为什么 Airflow Scheduler 只能作为非守护进程工作,而作为守护进程却失败?

airflow-scheduler - Airflow Web 服务器主页持续加载并显示 html 页面中的错误

python-2.7 - 如何使用 Cloud Composer/Apache Airflow 运行带有设置文件的 Dataflow 管道?

使用 Airflow 进行 Azure AD 身份验证

airflow - 具有许多子标签的工作流程可以高效执行吗?

docker - Mac M1 上的 Apache Airflow 安装问题

具有不同 conda 环境的 Python 任务和 DAG

airflow - 无需Web服务器即可重新启动apag Airflow