我正在尝试用 celery 构建一个分布式作业执行系统。
当我在一台机器(本地主机)上启动 2 个工作线程时,其中一个用于添加任务 add
另一个用于减法任务 sub
,然后使用add.delay()
要启动多个加法任务,减法工作人员的终端出现错误:
[2013-03-05 15:51:18,898: ERROR/MainProcess] Received unregistered task of type 'add_tasks.add'.
在这个测试中,我启动了2个加法任务:一个被加法worker捕获,另一个被减法worker捕获,这导致了上面的错误。我怎样才能更改配置,以便第二个加法任务不会被减法工作人员捕获?谢谢。
代码如下:
add_tasks.py:
celery = Celery('add_tasks', backend='amqp', broker='amqp://guest@localhost//')
@celery.task
def add(x, y):
sleep(20)
return x + y
sub_tasks.py:
celery = Celery('sub_tasks', backend='amqp', broker='amqp://guest@localhost//')
@celery.task
def sub(x, y):
sleep(10)
return x - y
我通过 celery -A add_tasks worker --loglevel=info -n worker1
启动了 worker 和celery -A sub_tasks worker --loglevel=info -n worker2
在本地主机的两个终端中。
最佳答案
最后我发现ROUTER
功能可以解决我的问题。我把我的解决方案放在这里,希望对遇到同样问题的其他人有用。
启动worker时,我们可以使用-Q队列
选项来限制worker仅接受队列
中的任务。在我的情况下,我使用了celery -A add_tasksworker --loglevel=info -nworker1 -Qaddition
。
另一方面,当开始新任务时,我们应该使用队列参数显式指示,例如 add.apply_async(queue='addition',priority=0,args=[1,4] )
和 sub.apply_async(queue='subtraction',priority=0,args=[1,4])
。那么减法worker就不会接受加法任务。
关于python - 一台机器上的多个 worker 执行不同的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15218543/