我在 Airflow DAG 中有两个任务,如下所示。
def check_condition(**kwargs):
# do something
return True # or return False
task1 = PythonOperator(
task_id='condition_task',
python_callable=check_condition,
provide_context=True,
dag=dag
)
task2 = DummyOperator(
task_id='following_task',
dag=dag
)
我应该做什么
仅当task 1
的返回值为True
时才调用task 2
?
最佳答案
你只需要做:
task1 >> task2
运算符具有 trigger_rule
参数,用于设置何时运行的条件。
默认值为all_success
,因此无需特别提及。
更多关于触发规则的信息可以查看here
这里的问题是,当您返回False
时,您期望task1的状态是什么。您预计任务 1 会失败还是成功?
如果您预计它会失败,那么您需要将 return False
替换为 raise Exception()
如果您希望它成功,那么您将需要 task2
从 task1
获取返回值(通过 Xcom),然后处理该值。
例如:
def following(**kwargs):
ti = kwargs['ti']
pulled_value = ti.xcom_pull(task_ids='task1')
if bool(pulled_value) = True:
do_something
else
do_something_else
task2 = PythonOperator(
task_id='following_task',
python_callable=following,
provide_context=True,
dag=dag
)
关于空 Airflow 动。仅当第一个任务的结果为 true 时才运行第二个任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65302573/