我有
- 设置
CELERY_CREATE_MISSING_QUEUES = True
- 未定义
CELERY_QUEUES
- 定义
CELERY_DEFAULT_QUEUE = 'default'
(直接类型) - 一个自定义路由器类,它可以动态创建路由,如中所示 这张票 ( https://github.com/celery/celery/issues/150 )。
我看到自定义路由器返回的路由中的新队列已创建,我认为这是因为 CELERY_CREATE_MISSING_QUEUES
。
现在在我运行的工作节点中,我没有传递 -Q
参数,它只从似乎与文档一致的“默认”队列中使用 -
By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).
有没有办法让我的工作节点从所有队列(包括动态创建的队列)中消费?
谢谢,
最佳答案
工作人员需要了解这些自动或动态创建的队列,因此您需要一种方法来获取这些队列名称并在创建它们时存储它们,或者从 rabbitmqctl list_queues
获取它们,如果您正在使用 RabbitMQ 作为代理,例如添加一个信号处理程序以将这些动态队列添加到工作线程以供使用。
例如使用 celeryd_after_setup
信号:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
# get the dynamic queue, maybe stored somewhere
queue = 'dynamic_queue'
instance.app.amqp.queues.select_add(queue)
如果您总是创建新的动态队列,您还可以命令工作人员在运行时使用这些队列开始消费:
#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)
# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])
参见 Adding Consumers .
我希望这会有所帮助,当我获得更多相关信息时,我会编辑问题。
关于python - celery worker : How to consume from all queues?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26118506/