使用 Airflow,如果下游任务失败,是否可以重新启动上游任务?这似乎与 DAG 术语的“非循环”部分相悖。我认为这是一个常见问题。
背景
我正在考虑使用 Airflow 来管理手动管理的数据处理工作流程。
如果参数 x 设置得太高,有一个任务将会失败,但增加参数值会带来更好的质量结果。我们还没有找到一种方法来计算安全但最大的参数 x。如果使用较低的参数失败,则手动过程会重新启 Action 业,直到作业正常为止。
工作流程如下所示:
任务 A - 收集原始数据
任务 B - 为作业生成配置文件
任务C - 修改配置文件参数x
任务 D - 运行数据操作作业
任务 E - 处理作业结果
任务 F - 生成报告
问题
如果任务D由于参数x太高而失败,我想重新运行任务C和任务D。这似乎不受支持。我非常感谢有关如何处理此问题的指导。
最佳答案
首先:这是一个很好的问题,我想知道为什么直到现在才得到广泛讨论
<小时/>我可以想到两种可能的方法
融合
运算符
:正如 @Kris 所指出的那样, CombiningOperators
together似乎是最明显的解决方法单独的顶级
DAG
:阅读下文
单独的顶级 DAG 方法
给定
- 假设您有任务 A 和 B
- A 位于 B 的上游
- 如果 B 失败,您希望从 A 恢复(重试)执行
(可能)想法:如果你喜欢冒险
- 将任务 A 和 B 放在单独的顶级
DAG
中,例如 DAG-A 和 DAG-B - 在 DAG-A 结束时,使用 TriggerDagRunOperator 触发 DAG-B
- 很可能,您还必须在
TriggerDagRunOperator
之后使用ExternalTaskSensor
- 很可能,您还必须在
- 在 DAG-B 中,在 Task-B 之后放置一个
BranchPythonOperator
并使用trigger_rule=all_done
- 此
BranchPythonOperator
应分支到另一个TriggerDagRunOperator
,然后再调用 DAG-A(再次!)
有用的引用
<小时/>EDIT-1
这是一种可以实现类似行为的更简单的方法
How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)
关于python - 失败的 Airflow DAG 任务是否可以使用更改的参数重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54223655/