python - 一台机器上的多个 worker 执行不同的任务

标签 python celery distributed

我正在尝试用 celery 构建一个分布式作业执行系统。

当我在一台机器(本地主机)上启动 2 个工作线程时,其中一个用于添加任务 add另一个用于减法任务 sub ,然后使用add.delay()要启动多个加法任务,减法工作人员的终端出现错误:

[2013-03-05 15:51:18,898: ERROR/MainProcess] Received unregistered task of type 'add_tasks.add'.

在这个测试中,我启动了2个加法任务:一个被加法worker捕获,另一个被减法worker捕获,这导致了上面的错误。我怎样才能更改配置,以便第二个加法任务不会被减法工作人员捕获?谢谢。

代码如下:

add_tasks.py:

celery = Celery('add_tasks', backend='amqp', broker='amqp://guest@localhost//')

@celery.task
def add(x, y):
    sleep(20)
    return x + y

sub_tasks.py:

celery = Celery('sub_tasks', backend='amqp', broker='amqp://guest@localhost//')

@celery.task
def sub(x, y):
    sleep(10)
    return x - y

我通过 celery -A add_tasks worker --loglevel=info -n worker1 启动了 worker 和celery -A sub_tasks worker --loglevel=info -n worker2在本地主机的两个终端中。

最佳答案

最后我发现ROUTER功能可以解决我的问题。我把我的解决方案放在这里,希望对遇到同样问题的其他人有用。

启动worker时,我们可以使用-Q队列选项来限制worker仅接受队列中的任务。在我的情况下,我使用了celery -A add_tasksworker --loglevel=info -nworker1 -Qaddition

另一方面,当开始新任务时,我们应该使用队列参数显式指示,例如 add.apply_async(queue='addition',priority=0,args=[1,4] )sub.apply_async(queue='subtraction',priority=0,args=[1,4])。那么减法worker就不会接受加法任务。

关于python - 一台机器上的多个 worker 执行不同的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15218543/

相关文章:

python - 自动化多个安装程序

python - 将 pandas DataFrame 转换为正确的格式 : `DataError: No numeric types to aggregate`

python - 方法需要 1 个位置参数,但已给出 2 个

python - 检测 celery 任务和所有子任务何时完成

scala - 用于 Ocaml 和其他语言的基于 Actor 的分布式并发库

distributed - OrientDB 嵌入式和分布式

python - HTTP 错误 400 : Bad request to Firebase using Python

python - 找到最能解释数据的树状层次结构

flask - 如何使用 celery 任务访问 orm?

.net - .NET 的 BLOB 分布式存储?