python - 使用 TaskFlowAPI 在 Apache Airflow 中进行分支

标签 python airflow airflow-taskflow

我找不到 Airflow 的 TaskFlowAPI 中的分支文档。我尝试以“Pythonic”方式执行此操作,但运行时,无论前一个任务返回的真值如何,DAG 都看不到 task_2_execute_if_true

@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['test'],
)
def my_dag():
    @task()
    def task_1_returns_boolean():
        # evaluate and return boolean value
        return boolean_value
    
    @task()
    def task_2_execute_if_true():
        # do_something...

    outcome_1 = task_1_returns_boolean()
    if outcome_1:
        outcome_2 = task_2_execute_if_true() 


executed = my_dag()

TaskFlowAPI 中正确的分支方式是什么?我应该再添加一个专门用于分支的函数吗?

最佳答案

源代码中有一个 DAG 示例:https://github.com/apache/airflow/blob/f1a9a9e3727443ffba496de9b9650322fdc98c5f/airflow/example_dags/example_branch_operator_decorator.py#L43 .

语法是:

from airflow.decorators import task

@task.branch(task_id="branching_task_id")
def random_choice():
    return "task_id_to_run"

它是在 Airflow 2.3.0 中引入的。

关于python - 使用 TaskFlowAPI 在 Apache Airflow 中进行分支,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72845443/

相关文章:

python - 无法在带有 Airflow 的 Jinja 模板中使用 Python 变量

python - 模块 'six.moves' 没有属性 'collections_abc'

Airflow 任务流程——并行运行任务

python - 以下伪代码的运行时复杂度(大 O)是多少?

python - 子图 matplotlib 的常见图例

python - 如何从github python安装 "sparse_dot_topn"

Python 两个函数的线程

python - 在Airflow的docker内部运行python脚本