我想按照下面的模式在 Airflow 中创建一个条件任务。预期的情况如下:
- 任务 1:Start_cluster 执行
- 如果任务 1 成功,则执行任务 2
- 如果任务 2 成功,则执行任务 3
- 如果所有任务都成功或一个任务失败,则执行任务 4:terminate_cluster
我试过:
trigger_rule=TriggerRule.ONE_FAILED
任务 4 保持跳过状态,all_done 也是
trigger_rule=TriggerRule.ALL_DONE
我找到了这个解决方案:How to create a conditional task in Airflow但它对我不起作用。
最佳答案
我认为无论前面的任务是否成功,您都希望终止集群,因此 ALL_DONE
听起来很合适。除了 Start_Cluster。如果失败,则可能没有要终止的集群,但您可能需要检查/尝试以防万一。
默认的 trigger_rule 是 ALL_SUCCESS
因此,例如,如果任务 1 失败,则整个 Dag 都会失败,因为任务 2 需要任务 1 成功才能运行。
如果任何任务有可能失败,但您仍然想终止集群,您将需要一些备用路径让 dag 遵循,例如使用 PythonBranchOperator
和 Python 回调函数。
另一种可能性是仅使用以“ONE_FAILURE”的 trigger_rule 运行的虚拟运算符,然后运行终止集群任务。
例如,如果您将虚拟任务命名为“Task_Failure”,这将是依赖链:
Start_Cluster >> Task_2 >> Task_3 >> Terminate_Cluster
Task_2 >> Task_Failure
Task_3 >> Task_Failure
Task_Failure >> Terminate_Cluster
在那种情况下,Task_Failure 可能必须将 Terminate_Cluster trigger_rule 设置为 ONE_SUCCESS
,因为有些任务可能永远不会运行。如果您将最终任务设置为 ALL_DONE
并且之前的一些任务没有状态,它可能只是挂起或可能失败。
ALL_DONE 和 ALL_SUCCESS 的区别:https://stackoverflow.com/a/47716981/1335793
关于python - Airflow :避免任务状态=跳过,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44048818/