django - python celery : How to append a task to an old chain

标签 django rabbitmq celery chain

我保存在我的数据库中,对链的引用。

from tasks import t1, t2, t3
from celery import chain
res = chain(t1.s(123)|t2.s()|t3.s())()
res.get()

如何将其他任务附加到此特定链?

res.append(t2.s())

我的目标是确保链按照我在代码中指定的顺序执行。
如果我的链中的任务失败,则不会执行以下任务。

知道我在指定队列中使用超大任务。

最佳答案

所有信息都包含在消息中。

消息可能在传输中,可能在世界的另一端,或者可能被中间处理器使用。因此,在发送消息后无法修改消息。

http://docs.celeryproject.org/en/latest/userguide/tasks.html#state

My goal is to be sure that chains are executed in the same order I specified in my code. And if a task fail in my chain, the following tasks are not executed.



您可以确定,订单是作为消息的一部分发送的,不会继续
如果任何任务失败。

现在,如果您真的希望能够在运行时添加任务,那么您可以存储
数据库中的信息,并让任务本身检查并调用新任务。
但是,在执行此操作时存在一些挑战:

1)链中的第一个任务如果成功将调用下一个任务,
然后下一个任务将在此之后调用下一个任务,依此类推。

2)如果你在这个过程中添加一个任务,如果第一个任务已经执行会发生什么?
还是第二个,还是第三个?

因此,您可能会猜到这将需要一些繁重的同步才能工作。

我想一个更简单的解决方案是创建一个等待一个任务完成的任务
然后应用回调:
from celery import subtask
from celery.result import from_serializable

@app.task(bind=True)
def after_task(self, result, callback, errback=None):
    result = from_serializable(result)
    if not result.ready():
        raise self.retry(countdown=1)
    if task.successful():
        subtask(callback).delay(result.get())
    else:
        if errback:
            subtask(errback)()


def add_to_chain(result, callback, errback=None):
    callback = callback.clone()     # do not modify caller
    new_result = callback.freeze()  # sets id for callback, returns AsyncResult
    new_result.parent = result
    after_task.delay(result.serializable(), callback, errback)
    return new_result

然后你可以像这样使用它:
from tasks import t1, t2, t3

res = (t1.s(123) | t2.s() | t3.s())()
res = add_to_chain(t2.s())

笔记:
bind=True在即将到来的 3.1 版本中是新的,对于旧版本
您必须删除 self 参数并使用 current_task.retry (得到这个 from celery import current_task )。
Signature.freeze也是 3.1 中的新功能,用于执行
在旧版本中您可以使用相同的:
from celery import uuid

def freeze(sig, _id=None):
    opts = sig.options
    try:
        tid = opts['task_id']
    except KeyError:
        tid = opts['task_id'] = _id or uuid()
    return sig.AsyncResult(tid)

关于django - python celery : How to append a task to an old chain,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19400305/

相关文章:

django - AWS + Django 定时任务

django - 如何将 order_by 添加到 django rest 框架中的自定义序列化程序字段

c# - RabbitMQ C# 验证消息已发送

python - 在运行时添加、修改、删除 celery.schedules

python - Django 属性错误。 'module' 对象没有属性 'rindex'

Django - 转到 HTML 中的 #id 标签

amazon-ec2 - 在 Kubernetes 上运行时更改主机名会破坏 Rabbitmq

java - 在 Java Spring 中,如何在没有 @KafkaListener 的情况下使用 KafkaTemplate 消费来自 Kafka 的消息? RabbitMQ 类似物

python - 在 Celery 中设置组中任务之间的延迟

python - 如何解决这个错误? django+celery+rabbitmq+mysql+redis中的 "RestartFreqExceeded: 5 in 1s"