python - Celery 相当于一个 JoinableQueue

标签 python queue celery python-multiprocessing gevent

Celery 的 multiprocessing.JoinableQueue 是什么? (或 gevent.queue.JoinableQueue)?

我正在寻找的功能是能够 .join() 来自发布者的 Celery 任务队列,等待队列中的所有任务完成。

等待初始的 AsyncResultGroupResult 是不够的,因为队列会由 worker 自己动态填满。

最佳答案

它可能并不完美,但这是我最终想到的。

它基本上是一个基于共享 Redis 计数器和列表监听器的现有 Celery 队列之上的 JoinableQueue 包装器。它要求队列名称与其路由键相同(由于 before_task_publishtask_postrun 信号的内部实现细节)。

joinableceleryqueue.py:

from celery.signals import before_task_publish, task_postrun
from redis import Redis
import settings

memdb = Redis.from_url(settings.REDIS_URL)

class JoinableCeleryQueue(object):
    def __init__(self, queue):
        self.queue = queue
        self.register_queue_hooks()

    def begin(self):
        memdb.set(self.count_prop, 0)

    @property
    def count_prop(self):
        return "jqueue:%s:count" % self.queue

    @property
    def finished_prop(self):
        return "jqueue:%s:finished" % self.queue

    def task_add(self, routing_key, **kw):
        if routing_key != self.queue:
            return

        memdb.incr(self.count_prop)

    def task_done(self, task, **kw):
        if task.queue != self.queue:
            return

        memdb.decr(self.count_prop)
        if memdb.get(self.count_prop) == "0":
            memdb.rpush(self.finished_prop, 1)

    def register_queue_hooks(self):
        before_task_publish.connect(self.task_add)
        task_postrun.connect(self.task_done)

    def join(self):
        memdb.brpop(self.finished_prop)

我选择使用 BRPOP 而不是发布/订阅,因为我只需要一个监听器来监听“所有任务已完成”事件(发布者)。

使用 JoinableCeleryQueue 非常简单 - begin() 在将任何任务添加到队列之前,使用常规 Celery API 添加任务,.join() 等待所有任务完成。

关于python - Celery 相当于一个 JoinableQueue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39401374/

相关文章:

python - 使用python递归打印星号

python - 使用python将带有特殊字符的值插入mysql表时出错

python - 我如何知道是否可以禁用 SQLALCHEMY_TRACK_MODIFICATIONS?

python - OS X - 在 anaconda 和 Homebrew Python 环境之间做出决定

JavaScript 堆栈、队列和事件循环?

python - 卡在 Celery 队列中的任务

java - 从独立代码访问队列

java - 如何更好地测试具有公共(public)队列的多线程程序?

python - 每个用户的 Celery PeriodicTask

python - celery.service : Failed with result 'signal'