python - 将参数从 BranchPythonOperator 传递到 PythonOperator

标签 python airflow

我对 Airflow 还很陌生,并试图弄清楚在任务/DAG 之间传递参数的逻辑。我的问题是 - 是否可以将参数从 BranchPythonOperator 任务传递到它调用的 task_id 中。

即:

@task
def task_a():
    ***print(a)***
    return {}

def get_task_run(**kwargs):
    a = 'Pass-Argument'
    return 'task_a'

tasks = BranchPythonOperator(
        task_id='get_task_run',
        python_callable=get_task_run,
    )

例如,在上面的代码中,是否可以以某种方式获取从 BranchPythonOperator 调用的“task_a”内的变量“a”?

最佳答案

实现此目的的一种方法是使用 get_task_run 函数执行 xcom_push,然后使用 task_a 中提取它获取当前上下文

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context, BranchPythonOperator

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1),
     catchup=False, tags=['example'])
def decorated_dag():

    @task
    def task_a():
        context = get_current_context()
        var_from_branch_task = context['ti'].xcom_pull(
            task_ids='branch_task', key='a')
        print(f"Result: {var_from_branch_task}")

    @task
    def task_b():
        print('task_b')

    def _get_task_run(ti):
        if 'something':
            ti.xcom_push(key='a', value='var_pushed_from_branch task')
            return 'task_a'
        else:
            return 'task_b'

    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=_get_task_run,
    )
    task_a_exec = task_a()
    task_b_exec = task_b()
    branch_task >> [task_a_exec, task_b_exec]

example_decorated_dag = decorated_dag()

请记住,BranchPythonOperator 应返回单个task_id 或要遵循的task_ids 列表。这就是为什么你不能返回 dictlisttuple 将其用作 XcomArg 与另一个装饰任务。让我知道这是否对您有用!

关于python - 将参数从 BranchPythonOperator 传递到 PythonOperator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67801458/

相关文章:

python - 如何使用 `return` 从循环中取回多个值?我可以把它们放在一个列表中吗?

python - 完整性错误 : NOT NULL constraint failed: core_userprofile. 用户 ID

python - 查找 cv2.findContours() 的面积(Python、OpenCV)

airflow - 如何从 Airflow 列表中删除损坏的DAG?

python - 如何将参数传递给 Airflow on_success_callback 和 on_failure_callback

python - 创建时间序列数据框的最快方法

airflow - 计算 Airflow 传感器的尝试次数

python - Airflow :如何安排 dag 在工作日的第二天开始?

amazon-web-services - 如何从Airflow提交Spark作业到EMR集群?

python - 如何使用 brew 在 Mac OS Sierra 上安装更新的 Python?