python - 以 DAG 方式调度作业

标签 python linux cron scheduled-tasks airflow

我们有一个包含不同类型工作的系统。例如,我们称它们为:

job_1
job_2
job_3

它们都需要不同的参数集(和可选参数)。 IE。我们为不同的 x= A, B, C ... 运行 job_1(x)job_2 运行一组参数,这些参数取决于 job_1(x) 的结果,并且 job_2 加载 job_A( x) 存储。等等。

结果是依赖关系的树结构。现在,这些工作偶尔会因为某种原因而失败。因此,如果 x=Bjob_A 失败,树的分支将完全失败,不应运行。不过,所有其他分支都应该运行。

所有作业均用 Python 编写并使用并行性(基于产生 SLURM 作业)。它们是用 cron 安排的。这显然不是很好并且有两个主要缺点:

  • 很难调试。无论树中较高的作业是否失败,所有作业都会运行。如果不深入了解依赖关系,很难看出问题出在哪里。
  • 如果更高的作业(例如 job_A)未完成,job_B 可能会按计划运行,但会失败或根据过时日期运行。

为了解决这个问题,我们正在寻找用于调度或可视化的 Airflow ,因为它是用 Python 编写的,它似乎大致符合我们的需求。不过,我看到了不同的挑战:

  • 作业的依赖树要么非常普遍(即 job_B 依赖于 job_A)要么非常广泛(即 job_B(y) 100 个参数取决于 job_A(x=A)。第一种情况下的可视化树大约有 10 个叶子,但会使调试变得非常困难,因为作业可能只是因为某个参数而失败。后一种情况下的可视化树会非常宽,大约有 300 个叶子。它会更准确,但可视化可能难以阅读。我们可以过滤失败的作业,只查看它们的依赖关系吗?
  • 我们在工作中有并行性(我们需要它,否则工作会运行超过一天,我们想每天重新运行整个批处理)这会搞砸我们的日程安排吗?
  • 我们希望尽可能少地改变我们的工作和数据管理。
  • 我们能否以易于理解的方式实现下一个生成哪些作业的规则系统?

airflow 是一个不错的选择吗?我知道还有其他一些(luigi、Azkaban 等)与 Hadoop 堆栈有些相关(我们没有使用它,因为它不是大数据)。需要多少黑客攻击?多少黑客行为是明智的?

最佳答案

我真的不能代表airflow,但我可以代表luigi。

luigi 的简要概述: Luigi 是为数据流和数据依赖而设计的,就像 airflow 一样,但它是在 Spotify 开发的。数据流中的每个步骤都表示为一个继承自 luigi.Task 的类,我们将每个步骤称为一个任务。每个任务都由三个主要函数组成,并且也有参数声明。三个函数及其说明:

  1. 要求:在此函数中,您通过返回这些任务来指定手头的任务取决于哪些任务。
  2. 输出:在此函数中,您通过返回类 Luigi.LocalTarget(或类似但用于远程)的对象指定保存此任务结果的位置。
  3. 运行:在此函数中,您指定运行任务时实际发生的情况。

注意:luigi 中央调度程序通过检查文件是否存在来知道任务何时完成 - 特别是在要运行的任务的 requires 函数中返回的任务输出函数中指定的文件。

Can we filter failed jobs, and just look at their dependencies?

Luigi 记录传递给每个任务的所有参数以及运行每个任务的每次尝试。默认情况下,luigi 不保存日志,但您可以轻松设置它。去年夏天我做了一个很大的 luigi 管道,我让它保存了日志。然后它使用模糊字符串比较(使用 Levenshtein 库)来删除重复行并大量压缩日志,然后基本上搜索“error”这个词,如果出现,它会向我发送一封电子邮件,其中包含压缩文件登录。

We have parallelism within the job (and we need it otherwise the jobs run for more than a day, and we want to rerun the whole lot every day) does that screw up our scheduling?

我在其中以并行方式运行任务,没有任何问题。但是,某些库可能会导致问题,例如gensim.

We want to change our jobs and data management as little as possible.

您通常可以将大部分计算粘贴到 luigi 任务的运行函数中。

Can we implement the rule system of what jobs to spawn next in a easily understandable way?

我相信是的,是的。对于每个任务,您在任务的 requires 函数中指定它依赖于哪些任务。

另外需要考虑的是文档。 Luigi 的文档非常好,但它并没有尽可能地流行起来。 Luigi 的社区不是很大,也不是非常活跃。据我所知,Airflow 具有相当的可比性,但它较新,因此目前可能有一个更活跃的社区。

Here这是 luigi 作者的博客文章,其中对 luigi 和更新的替代品进行了一些简要比较。他的结论是:它们都很糟糕。包括路易吉。

关于python - 以 DAG 方式调度作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34331543/

相关文章:

python - 如何从python smtp服务器检索电子邮件

c++ - 可能的编译器错误 : Weird results using boost bessel functions with Intel compiler between two machines?

linux - netinet/sctp.h : No such file or directory

php - Cron 运行脚本但代码不执行

cron - 如何设置 cron 每 40 分钟/25 分钟运行一次我的脚本?

javascript - 安排抓取作业 - Meteor JS

python - 从多个 Excel 文件创建 Pandas 数据框

python - 如何在 Python 中创建 session 变量?

python - Python 中的迭代函数生成

python - UWSGI 堆栈转储