python - 回调 celery apply_async

标签 python celery

我在我的应用程序中使用 celery 来运行周期性任务。让我们看下面的简单示例

from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
    queue = Queue()
    uid, questions = queue.pop()
    if uid is None:
        return

    job = group(do_stuff(q) for q in questions)
    job.apply_async()

def do_stuff(question):
    try:
        ...
    except:
        ...
        raise

正如您在上面的示例中看到的,我使用 celery 来运行异步任务,但是(因为它是一个队列)我需要执行 queue.fail(uid) 如果在 do_stuffqueue.ack(uid) 中出现异常,否则。在这种情况下,在这两种情况下从我的任务中获得一些回调是非常清楚和有用的 - on_failureon_success

我看到了一些documentation ,但从未见过将回调与 apply_async 结合使用的做法。有可能吗?

最佳答案

子类化 Task 类并重载 on_success 和 on_failure 函数:

from celery import Task


class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        '''
        retval – The return value of the task.
        task_id – Unique id of the executed task.
        args – Original arguments for the executed task.
        kwargs – Original keyword arguments for the executed task.
        '''
        pass
        
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        '''
        exc – The exception raised by the task.
        task_id – Unique id of the failed task.
        args – Original arguments for the task that failed.
        kwargs – Original keyword arguments for the task that failed.
        '''
        pass

使用:

@celery.task(base=CallbackTask)  # this does the trick
def add(x, y):
    return x + y

关于python - 回调 celery apply_async,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12526606/

相关文章:

python - 绘制 networkx.Graph : how to change node position instead of resetting every node?

django - Heroku 上的 Redis 连接被拒绝

python - 让 celery 广播所有 worker 的返回结果

amazon-web-services - celery 与亚马逊 SQS

python - 如果未指定,默认的 Celery 日志级别是多少?

java - 检查连接 jpype - java

python - 如何从 pgAdmin4 连接到 Cloud SQL?

python - 从 str 转换为 float 时保持尾随 0

python - 在 Keras 中 reshape 自定义损失函数中的张量

django - 凯拉斯预测 celery 任务不归队