python - 如何使用 Celery 实现多处理池

标签 python multiprocessing celery

在 python 多处理中,我能够创建一个包含 30 个进程的多处理池来处理某些 ID 上的一些长时间运行的方程式。下面的代码在 8 核机器上生成 30 个进程,load_average 永远不会超过 2.0。事实上,30 个消费者是一个限制,因为托管 ID 的 postgresql 数据库所在的服务器有 32 个核心,所以我知道如果我的数据库可以处理它,我可以产生更多进程。

from multiprocessing import Pool
number_of_consumers = 30
pool = Pool(number_of_consumers)

我已经花时间设置 Celery,但无法重新创建 30 个进程。我想设置并发性,例如-c 30 会创建 30 个进程,但如果我没有错,那意味着我有 32 个我打算使用的处理器,这是错误的,因为我只有 8 个!此外,我看到 load_average 在 8 核机器上达到 10.0,这很糟糕..

[program:my_app]
command = /opt/apps/venv/my_app/bin/celery -A celery_conf.celeryapp worker -Q app_queue -n app_worker --concurrency=30 -l info

那么,在使用 Celery 时,如何在 8 核机器上重新创建 30 个进程?

编辑:限定困惑

我想附上一张图片来说明我在讨论 Celery 和 Python 多处理时对服务器负载的困惑。正在使用的服务器有 8 个内核。使用 Python Multiprocessing 并生成 30 个进程,如附图所示,平均负载为 0.22,这意味着 - 如果我的 linux 知识对我有用 - 我的脚本正在使用一个核心生成 30 个进程,因此非常低的 load_average。

load_average using python multiprocessing screenshot

我对 celery 中的 --concurrency=30 选项的理解是,它指示 Celery 它将使用多少个核心,而不是它需要生成多少个进程.我说得对吗?有没有办法指示 Celery 使用 2 个内核,并为每个内核生成 15 个进程,从而使我总共有 30 个并发进程,从而使我的服务器负载保持较低水平?

最佳答案

Celery worker 包括:

  1. 消息消费者
  2. 工作人员池

消息消费者从代理中获取任务并将它们发送给池中的工作人员。

--concurrency-c 参数指定该池中的进程数,因此如果您使用的是 prefork 池是默认值,那么您已经使用 --concurrency=30 在池中有 30 个进程,您可以通过查看 worker 启动时的输出来检查,它应该是这样的:

concurrency: 30 (prefork)

来自 docs on concurrency 的注释:

Number of processes (multiprocessing/prefork pool)

More pool processes are usually better, but there’s a cut-off point where adding more pool processes affects performance in negative ways. There is even some evidence to support that having multiple worker instances running, may perform better than having a single worker. For example 3 workers with 10 pool processes each. You need to experiment to find the numbers that works best for you, as this varies based on application, work load, task run times and other factors.

如果你想启动多个 worker 实例,你应该查看 celery multi ,或使用 celery worker 手动启动它们。

关于python - 如何使用 Celery 实现多处理池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27079464/

相关文章:

python - 如何在 python 2.7 中使用 pymongo 进行多处理池

python - 将父任务参数传递给所有 celery 子任务

Celery-检测空闲 worker 的信号

python - python中的分布式任务调度程序?

python - python 3.6 multiprocessing.Pool()开始使用Windows的速度非常慢

python - 如何返回给定特定值的字典中的先前值?

Python 3.5 - 创建填充有生成器的命名元组

Python 在 lambda 中尝试 Catch block

python - 尝试在此单行循环中增加 Y

python - TQDM 和多处理 - python