python - celery worker : How to consume from all queues?

标签 python celery

我有

  • 设置 CELERY_CREATE_MISSING_QUEUES = True
  • 未定义 CELERY_QUEUES
  • 定义CELERY_DEFAULT_QUEUE = 'default'(直接类型)
  • 一个自定义路由器类,它可以动态创建路由,如中所示 这张票 ( https://github.com/celery/celery/issues/150 )。

我看到自定义路由器返回的路由中的新队列已创建,我认为这是因为 CELERY_CREATE_MISSING_QUEUES

现在在我运行的工作节点中,我没有传递 -Q 参数,它只从似乎与文档一致的“默认”队列中使用 -

By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).

有没有办法让我的工作节点从所有队列(包括动态创建的队列)中消费?

谢谢,

最佳答案

工作人员需要了解这些自动或动态创建的队列,因此您需要一种方法来获取这些队列名称并在创建它们时存储它们,或者从 rabbitmqctl list_queues 获取它们,如果您正在使用 RabbitMQ 作为代理,例如添加一个信号处理程序以将这些动态队列添加到工作线程以供使用。

例如使用 celeryd_after_setup信号:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
    # get the dynamic queue, maybe stored somewhere
    queue = 'dynamic_queue'
    instance.app.amqp.queues.select_add(queue)

如果您总是创建新的动态队列,您还可以命令工作人员在运行时使用这些队列开始消费:

#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)

# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])

参见 Adding Consumers .

我希望这会有所帮助,当我获得更多相关信息时,我会编辑问题。

关于python - celery worker : How to consume from all queues?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26118506/

相关文章:

python - 比较Python中的关键内容

python - celery + RabbitMQ + "A socket error ocurred"

python - 如何模拟实例变量

python - 如何给 celery 中的每个节点命名

python - Celery:将任务结果存储在 MySQL 还是 Redis 中更好?

用于重试退避的 Celery 配置

python - celery 不采摘 CELERY_ALWAYS_EAGER 设置

python - 使用 scipy、python、numpy 进行非线性 e^(-x) 回归

Python Selenium xPath 从 div 类 a rel 中选择

python - pyglet最近有变化吗?