celery - 重试属于链的 celery 失败的任务

标签 celery django-celery celery-task

我有一个运行一些任务的 celery 链。每个任务都可能失败并重试。请参阅下面的快速示例:

from celery import task

@task(ignore_result=True)
def add(x, y, fail=True):
    try:
        if fail:
            raise Exception('Ugly exception.')
        print '%d + %d = %d' % (x, y, x+y)
    except Exception as e:
        raise add.retry(args=(x, y, False), exc=e, countdown=10)

@task(ignore_result=True)
def mul(x, y):
    print '%d * %d = %d' % (x, y, x*y)

和链条:

from celery.canvas import chain
chain(add.si(1, 2), mul.si(3, 4)).apply_async()

运行这两个任务(并假设没有失败),你会得到/看到打印:
1 + 2 = 3
3 * 4 = 12

但是,当添加任务第一次失败并在随后的重试调用中成功时,链中的其余任务不会运行,即添加任务失败,链中的所有其他任务都不会运行,几秒钟后,添加任务再次运行并成功,链中的其余任务(在本例中为 mul.si(3, 4))不运行。

celery 是否提供了一种方法来从失败的任务中继续失败的链?如果不是,那么完成此任务并确保链的任务以指定的顺序运行并且仅在前一个任务成功执行之后(即使该任务重试几次)的最佳方法是什么?

注1:这个问题可以通过做来解决

add.delay(1, 2).get()
mul.delay(3, 4).get()

但我有兴趣了解为什么链不适用于失败的任务。

最佳答案

你发现了一个错误:)

已修复 https://github.com/celery/celery/commit/b2b9d922fdaed5571cf685249bdc46f28acacde3
将成为 3.0.4 的一部分。

关于celery - 重试属于链的 celery 失败的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11508112/

相关文章:

python - 测量 Celery 任务执行时间

Python SEM_OPEN 错误

django-celery - Django Celery 应用程序 - 没有名为 celery 的模块错误

flask - 将 Web 请求上下文透明地传递给 celery 任务

通过 'eta' 选项进行调度时不遵守 Celeryrate_limit

rabbitmq - 从一个 Docker 容器连接到另一个

http - 如何通过http在 celery 任务调用上设置http_headers

python - celery ,组任务,AttributeError : 'NoneType' object has no attribute 'app'

python - celery - "WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV)"

celery - 如何在内存中加载对象并在 Celery worker 的不同执行之间共享?