Django&Celery-路由问题

标签 django celery django-celery kombu

我正在使用Django和Celery,并且尝试设置到多个队列的路由。当我指定任务的routing_keyexchange(在任务装饰器中或使用apply_async())时,该任务未添加到代理(即Kombu连接到我的MySQL数据库)。

如果我在任务装饰器中指定队列名称(这将意味着忽略路由键),则该任务将正常运行。路由/交换设置似乎存在问题。

知道可能是什么问题吗?

设置如下:

settings.py

INSTALLED_APPS = (
    ...
    'kombu.transport.django',
    'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
    'default': {
        'binding_key':'task.#',
    },
    'i_tasks': {
        'binding_key':'important_task.#',
    },
}

tasks.py
from celery.task import task

@task(routing_key='important_task.update')
def my_important_task():
    try:
        ...
    except Exception as exc:
        my_important_task.retry(exc=exc)

启动任务:
from tasks import my_important_task
my_important_task.delay()

最佳答案

您将Django ORM用作代理,这意味着声明仅存储在内存中
(请参阅http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison上的运输比较表,这无疑是很难找到的)

因此,当您使用routing_key important_task.update应用此任务时,它将无法执行
进行路由,因为它尚未声明队列。

如果您这样做,它将起作用:

@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
    print("IMPORTANT")

但是使用自动路由功能会更加简单,
由于此处没有任何内容表明您需要使用“主题”交换,
要使用自动路由,只需删除设置即可:
  • CELERY_DEFAULT_QUEUE
  • CELERY_DEFAULT_EXCHANGE
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

  • 并像这样声明您的任务:
    @task(queue="important")
    def important_task():
        return "IMPORTANT"
    

    然后从该队列开始消耗一个工作线程:
    $ python manage.py celeryd -l info -Q important
    

    或从默认(celery)队列和important队列中使用:
    $ python manage.py celeryd -l info -Q celery,important
    

    另一个好的做法是不要将队列名称硬编码到
    任务,并使用CELERY_ROUTES代替:
    @task
    def important_task():
        return "DEFAULT"
    

    然后在您的设置中:
    CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}
    

    如果您仍然坚持使用主题交流,那么您可以
    添加此路由器以在第一时间自动声明所有队列
    发送任务:
    class PredeclareRouter(object):
        setup = False
    
        def route_for_task(self, *args, **kwargs):
            if self.setup:
                return
            self.setup = True
            from celery import current_app, VERSION as celery_version
            # will not connect anywhere when using the Django transport
            # because declarations happen in memory.
            with current_app.broker_connection() as conn:
                queues = current_app.amqp.queues
                channel = conn.default_channel
                if celery_version >= (2, 6):
                    for queue in queues.itervalues():
                        queue(channel).declare()
                else:
                    from kombu.common import entry_to_queue
                    for name, opts in queues.iteritems():
                        entry_to_queue(name, **opts)(channel).declare()
    CELERY_ROUTES = (PredeclareRouter(), )
    

    关于Django&Celery-路由问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10707287/

    相关文章:

    Python/Django 列表对象没有属性 'status_code'

    html - Bootstrap 3 在 Django 模板中显示 col-md-6 之外的内容

    django - 在 Pinax/Django 中,登录后如何重定向到链接?

    python - 如何在django中更改celeryd的权限

    python - Celery 和 Redis 后端的问题

    python - Django - 导入错误: cannot import name simplejson

    python - celery 任务不会执行

    python - 模拟 Celery 任务方法 apply_async

    python - 设置 docker-compose.yml 以运行 celery worker 和 celery beat 以使用 redis 作为代理的 django 项目

    python - 使用类方法作为 celery 任务