空 Airflow 动。仅当第一个任务的结果为 true 时才运行第二个任务

标签 airflow

我在 Airflow DAG 中有两个任务,如下所示。

def check_condition(**kwargs):
    # do something
    return True # or return False
 
task1 = PythonOperator(
    task_id='condition_task',
    python_callable=check_condition,
    provide_context=True,
    dag=dag
)

task2 = DummyOperator(
    task_id='following_task',
    dag=dag
)

我应该做什么
仅当task 1的返回值为True时才调用task 2

最佳答案

你只需要做:

task1 >> task2

运算符具有 trigger_rule 参数,用于设置何时运行的条件。 默认值为all_success,因此无需特别提及。 更多关于触发规则的信息可以查看here

这里的问题是,当您返回False时,您期望task1的状态是什么。您预计任务 1 会失败还是成功? 如果您预计它会失败,那么您需要将 return False 替换为 raise Exception() 如果您希望它成功,那么您将需要 task2task1 获取返回值(通过 Xcom),然后处理该值。 例如:

def following(**kwargs):
    ti = kwargs['ti']
    pulled_value = ti.xcom_pull(task_ids='task1')
    if bool(pulled_value) = True:
        do_something
    else
        do_something_else

task2 = PythonOperator(
    task_id='following_task',
    python_callable=following,
    provide_context=True,
    dag=dag
)

关于空 Airflow 动。仅当第一个任务的结果为 true 时才运行第二个任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65302573/

相关文章:

python - Airflow 没有以 airflow_home 目录中的目录命名的模块

google-cloud-platform - 在apache airflow中安装Scrapy会导致INVALID_ARGUMENT

python-3.x - 如何禁用 Airflow 登录以进行身份​​验证和授权?

airflow - DAG cli 和追赶

amazon-web-services - AWS Airflow v2.0.2 不显示 Google Cloud 连接类型

airflow - 如何在 DEV 和 PROD 环境之间迁移 Airflow 变量?

java - 约 10 秒后与 Airflow docker 容器断开连接

python - 在Python的Airflow中,如何在一定时间后停止任务运行?

airflow - 在 airflow 2.0 taskflow API 中定义复杂的工作流依赖

python - Airflow ,标记任务成功或在 dag 运行之前跳过它