python - 如何忽略在另一个任务的 run() 中触发的 Luigi 任务失败

标签 python etl luigi

考虑以下任务:

import luigi


class YieldFailTaskInBatches(luigi.Task):
    def run(self):
        for i in range(5):
            yield [
                FailTask(i, j)
                for j in range(2)
            ]


class YieldAllFailTasksAtOnce(luigi.Task):
    def run(self):
        yield [
            FailTask(i, j)
            for j in range(2)
            for i in range(5)
        ]

class FailTask(luigi.Task):
    i = luigi.IntParameter()
    j = luigi.IntParameter()

    def run(self):
        print("i: %d, j: %d" % (self.i, self.j))
        if self.j > 0:
            raise Exception("i: %d, j: %d" % (self.i, self.j))

如果 j > 0,则 FailTask​​ 失败。 YieldFailTask​​InBatches 在 for 循环内多次生成 FailTask​​,而 YieldAllFailTask​​sAtOnce 生成数组中的所有任务。

如果我运行 YieldFailTask​​InBatches,Luigi 会运行在第一个循环中产生的任务,并且当其中一个失败时(i = 0, j = 1),Luigi 不会'让出其余部分。如果我运行 YieldAllFailTask​​sAtOnce,Luigi 会按预期运行所有任务。

我的问题是:如何告诉 Luigi 在 YieldFailTask​​sInBatches 上继续运行剩余任务,即使某些任务失败了?有可能吗?

我问的原因是我有大约 40 万个任务要触发。我不想一次触发它们,因为这会让 Luigi 花费太多时间来构建每个任务的需求(他们可以有 1 到 400 个需求)。我目前的解决方案是分批生成它们,一次生成几个,但是如果其中任何一个失败,任务就会停止,其余的不会生成。

似乎this issue如果实现可以解决这个问题,但我想知道是否有其他方法。

最佳答案

这非常 hackish,但它应该做你想做的:

class YieldAll(luigi.Task):
    def run(self):
        errors = list()
        for i in range(5):
            for j in range(2):
                try:
                    FailTask(i, j).run()
                except Exception as e:
                    errors.append(e)

        if errors:
            raise ValueError(f' all traceback: {errors}')

class FailTask(luigi.Task):
    i = luigi.IntParameter()
    j = luigi.IntParameter()

    def run(self):
        print("i: %d, j: %d" % (self.i, self.j))
        if self.j > 0:
            raise Exception("i: %d, j: %d" % (self.i, self.j))

所以基本上您是在 luigi 上下文之外运行任务。除非你输出一个目标,否则 luigi 永远不会知道任务是否已经运行。

luigi 唯一知道的任务是 YieldAll。如果任何 YieldAll 产生错误,代码将捕获它并将 YieldAll 任务设置为失败状态。

关于python - 如何忽略在另一个任务的 run() 中触发的 Luigi 任务失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53523339/

相关文章:

python - 在 Python 中将输入转换为字符串

python - 使用 python 更新 Google 搜索方法

java - 使用 Smooks 读取 CSV

sql - SSIS Oracle 显示数据但导入 NULL

python - 如何在 Python Luigi 中使用参数

hadoop - 用于构建基于 hadoop 的数据管道的调度工具的建议

python - 调整DataFrame中的异常格式,函数继续返回None

python - 如何在python中从第k列开始删除具有空值的行

sql - 将 Excel 日期序列号转换为常规日期

python - 如何使参数可用于所有 Luigi 任务?