python - Airflow :避免任务状态=跳过

标签 python triggers conditional-statements airflow

我想按照下面的模式在 Airflow 中创建一个条件任务。预期的情况如下:

  • 任务 1:Start_cluster 执行
  • 如果任务 1 成功,则执行任务 2
  • 如果任务 2 成功,则执行任务 3
  • 如果所有任务都成功或一个任务失败,则执行任务 4:terminate_cluster

enter image description here

我试过:

trigger_rule=TriggerRule.ONE_FAILED

任务 4 保持跳过状态,all_done 也是

trigger_rule=TriggerRule.ALL_DONE

我找到了这个解决方案:How to create a conditional task in Airflow但它对我不起作用。

最佳答案

我认为无论前面的任务是否成功,您都希望终止集群,因此 ALL_DONE 听起来很合适。除了 Start_Cluster。如果失败,则可能没有要终止的集群,但您可能需要检查/尝试以防万一。

默认的 trigger_rule 是 ALL_SUCCESS 因此,例如,如果任务 1 失败,则整个 Dag 都会失败,因为任务 2 需要任务 1 成功才能运行。

如果任何任务有可能失败,但您仍然想终止集群,您将需要一些备用路径让 dag 遵循,例如使用 PythonBranchOperator 和 Python 回调函数。

另一种可能性是仅使用以“ONE_FAILURE”的 trigger_rule 运行的虚拟运算符,然后运行终止集群任务。

例如,如果您将虚拟任务命名为“Task_Failure”,这将是依赖链:

Start_Cluster >> Task_2 >> Task_3 >> Terminate_Cluster
Task_2 >> Task_Failure
Task_3 >> Task_Failure
Task_Failure >> Terminate_Cluster

在那种情况下,Task_Failure 可能必须将 Terminate_Cluster trigger_rule 设置为 ONE_SUCCESS,因为有些任务可能永远不会运行。如果您将最终任务设置为 ALL_DONE并且之前的一些任务没有状态,它可能只是挂起或可能失败。

ALL_DONE 和 ALL_SUCCESS 的区别:https://stackoverflow.com/a/47716981/1335793

关于python - Airflow :避免任务状态=跳过,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44048818/

相关文章:

java - Jython,我应该把我的 .py 文件放在哪里

python - pandas - 高效的迭代和替换

vba - If...Then...Else 在 Then 之后有多个语句

MySQL - 连接 a 或 b

python - 如何从 if else 条件中获取值?

python - 在 Python 2.7 中实现 Python 3 的 round 函数

python - Python 中的 match.arg 等价物?

mysql - 为多表插入操作创建触发器

oracle - 动态读取:NEW object in an oracle trigger的列

sql - 在 SQL Server 中向触发器添加条件