我正在使用Django和Celery,并且尝试设置到多个队列的路由。当我指定任务的routing_key
和exchange
(在任务装饰器中或使用apply_async()
)时,该任务未添加到代理(即Kombu连接到我的MySQL数据库)。
如果我在任务装饰器中指定队列名称(这将意味着忽略路由键),则该任务将正常运行。路由/交换设置似乎存在问题。
知道可能是什么问题吗?
设置如下:
settings.py
INSTALLED_APPS = (
...
'kombu.transport.django',
'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
'default': {
'binding_key':'task.#',
},
'i_tasks': {
'binding_key':'important_task.#',
},
}
tasks.py
from celery.task import task
@task(routing_key='important_task.update')
def my_important_task():
try:
...
except Exception as exc:
my_important_task.retry(exc=exc)
启动任务:
from tasks import my_important_task
my_important_task.delay()
最佳答案
您将Django ORM用作代理,这意味着声明仅存储在内存中
(请参阅http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison上的运输比较表,这无疑是很难找到的)
因此,当您使用routing_key important_task.update
应用此任务时,它将无法执行
进行路由,因为它尚未声明队列。
如果您这样做,它将起作用:
@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
print("IMPORTANT")
但是使用自动路由功能会更加简单,
由于此处没有任何内容表明您需要使用“主题”交换,
要使用自动路由,只需删除设置即可:
CELERY_DEFAULT_QUEUE
,CELERY_DEFAULT_EXCHANGE
,CELERY_DEFAULT_EXCHANGE_TYPE
CELERY_DEFAULT_ROUTING_KEY
CELERY_QUEUES
并像这样声明您的任务:
@task(queue="important")
def important_task():
return "IMPORTANT"
然后从该队列开始消耗一个工作线程:
$ python manage.py celeryd -l info -Q important
或从默认(
celery
)队列和important
队列中使用:$ python manage.py celeryd -l info -Q celery,important
另一个好的做法是不要将队列名称硬编码到
任务,并使用
CELERY_ROUTES
代替:@task
def important_task():
return "DEFAULT"
然后在您的设置中:
CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}
如果您仍然坚持使用主题交流,那么您可以
添加此路由器以在第一时间自动声明所有队列
发送任务:
class PredeclareRouter(object):
setup = False
def route_for_task(self, *args, **kwargs):
if self.setup:
return
self.setup = True
from celery import current_app, VERSION as celery_version
# will not connect anywhere when using the Django transport
# because declarations happen in memory.
with current_app.broker_connection() as conn:
queues = current_app.amqp.queues
channel = conn.default_channel
if celery_version >= (2, 6):
for queue in queues.itervalues():
queue(channel).declare()
else:
from kombu.common import entry_to_queue
for name, opts in queues.iteritems():
entry_to_queue(name, **opts)(channel).declare()
CELERY_ROUTES = (PredeclareRouter(), )
关于Django&Celery-路由问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10707287/