python - 与 Celery 和 RabbitMQ 进行主题交流

标签 python rabbitmq celery amqp rabbitmq-exchange

我对设置主题交换的配置应该是什么样子有点困惑。

http://www.rabbitmq.com/tutorials/tutorial-five-python.html

这就是我想要完成的:

Task1 -> send to QueueOne and QueueFirehose
Task2 -> sent to QueueTwo and QueueFirehose

然后:

Task1 -> consume from QueueOne
Task2 -> consume from QueueTwo
TaskFirehose -> consume from QueueFirehose

我只希望 Task1 从 QueueOne 使用,Task2 从 QueueTwo 使用。

现在的问题是,当 Task1 和 2 运行时,它们也会耗尽 QueueFirehose,而 TaskFirehose 任务永远不会执行。

是我的配置有问题,还是我误解了什么?

CELERY_QUEUES = { 
    "QueueOne": {
        "exchange_type": "topic",
        "binding_key": "pipeline.one",
    },  
    "QueueTwo": {
        "exchange_type": "topic",
        "binding_key": "pipeline.two",
    },  
    "QueueFirehose": {
        "exchange_type": "topic",
        "binding_key": "pipeline.#",
    },  
}

CELERY_ROUTES = {
        "tasks.task1": {
            "queue": 'QueueOne',
            "routing_key": 'pipeline.one',
        },
        "tasks.task2": {
            "queue": 'QueueTwo',
            "routing_key": 'pipeline.two',
        },
        "tasks.firehose": {
            'queue': 'QueueFirehose',
            "routing_key": 'pipeline.#',
        },
}

最佳答案

假设你的意思是这样的:

Task1 -> send to QueueOne
Task2 -> sent to QueueTwo
TaskFirehose -> send to QueueFirehose

然后:

Worker1 -> consume from QueueOne, QueueFirehose
Worker2 -> consume from QueueTwo, QueueFirehose
WorkerFirehose -> consume from QueueFirehose

这可能不完全是您的意思,但我认为它应该涵盖许多场景,希望也涵盖您的场景。 这样的事情应该有效:

# Advanced example starting 10 workers in the background:
#   * Three of the workers processes the images and video queue
#   * Two of the workers processes the data queue with loglevel DEBUG
#   * the rest processes the default' queue.

$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
-Q default -L:4,5 DEBUG

更多选项和引用:http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html

这直接来自文档。

我也遇到过类似的情况,我的处理方式略有不同。我不能将 celery multi 与 supervisord 一起使用。 因此,我在 supervisord 中为每个 worker 创建了多个程序。 worker 无论如何都会在不同的过程中,所以让 supervisord 为你处理一切。 配置文件看起来像:-

; ==================================
; celery worker supervisor example
; ==================================

[program:Worker1]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose

directory=/path/to/project
user=nobody
numprocs=1
stdout_logfile=/var/log/celery/worker1.log
stderr_logfile=/var/log/celery/worker1.log
autostart=true
autorestart=true
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

同样,对于Worker2和WorkerFirehose,编辑相应的行,使:

[program:Worker2]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose

[program:WorkerFirehose]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose

将它们全部包含在 supervisord.conf 文件中,这样就可以了。

关于python - 与 Celery 和 RabbitMQ 进行主题交流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9792727/

相关文章:

python - 如何确定 python 中的整型变量是否具有超出范围的隐藏值?

python - 如何将 codeskulptor 中的代码从浏览器移动到 python 2.7?

RabbitMQ 交换类型比较 : Topic vs. Header

Scala 与 Rabbit MQ 的集成

python - 停止 celery worker 处理任务或监控 Flower 中的 'unconsumed' 任务

django - REST API 或 "direct"远程 Celery/Django 工作人员的数据库访问权限?

python - 为什么 'Extended Iterable Unpacking' 不适用于空字符串?

Python C API unicode 参数

java - Amqp 客户端未连接到 activemq 服务器。

heroku - Heroku 上的 Celery 监控