python - 失败的 Airflow DAG 任务是否可以使用更改的参数重试

标签 python pipeline airflow directed-acyclic-graphs

使用 Airflow,如果下游任务失败,是否可以重新启动上游任务?这似乎与 DAG 术语的“非循环”部分相悖。我认为这是一个常见问题。

背景

我正在考虑使用 Airflow 来管理手动管理的数据处理工作流程。

如果参数 x 设置得太高,有一个任务将会失败,但增加参数值会带来更好的质量结果。我们还没有找到一种方法来计算安全但最大的参数 x。如果使用较低的参数失败,则手动过程会重新启 Action 业,直到作业正常为止。

工作流程如下所示:

任务 A - 收集原始数据

任务 B - 为作业生成配置文件

任务C - 修改配置文件参数x

任务 D - 运行数据操作作业

任务 E - 处理作业结果

任务 F - 生成报告

问题

如果任务D由于参数x太高而失败,我想重新运行任务C和任务D。这似乎不受支持。我非常感谢有关如何处理此问题的指导。

最佳答案

首先:这是一个很好的问题,我想知道为什么直到现在才得到广泛讨论

<小时/>

我可以想到两种可能的方法

  1. 融合运算符:正如 @Kris 所指出的那样, Combining Operators together似乎是最明显的解决方法

  2. 单独的顶级 DAG:阅读下文

<小时/>

单独的顶级 DAG 方法

给定

  • 假设您有任务 A 和 B
  • A 位于 B 的上游
  • 如果 B 失败,您希望从 A 恢复(重试)执行

(可能)想法:如果你喜欢冒险

  • 将任务 A 和 B 放在单独的顶级 DAG 中,例如 DAG-A 和 DAG-B
  • 在 DAG-A 结束时,使用 TriggerDagRunOperator 触发 DAG-B
    • 很可能,您还必须在 TriggerDagRunOperator 之后使用 ExternalTask​​Sensor
  • 在 DAG-B 中,在 Task-B 之后放置一个 BranchPythonOperator 并使用 trigger_rule=all_done
  • BranchPythonOperator 应分支到另一个 TriggerDagRunOperator,然后再调用 DAG-A(再次!)
<小时/>

有用的引用

<小时/>

EDIT-1

这是一种可以实现类似行为的更简单的方法

How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)

关于python - 失败的 Airflow DAG 任务是否可以使用更改的参数重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54223655/

相关文章:

postgresql - Airflow "Something Bad Has Happened"错误 : Session Table does not exist

调度程序不会选择 Airflow 清除的回填任务

regex - 按模式查找线条,仅保留模式但保留不匹配的线条

Python - 千位分隔符也适用于小数,保留 4 位有效数字

python - 无法使用executemany执行ST_GEOMFROMTEXT

python - 导入错误的文件不存在

jenkins - 包装 Jenkins 管道中的几个阶段

r - str_replace_all 不在管道中工作

airflow - Prometheus:如何根据任何 Airflow Dag 而不是特定 Airflow Dag 的结果创建警报

python - 从python中的另一个文件调用__main__