python - Celery + Gevent 池在执行 1000 多个任务后挂起

标签 python celery gevent celery-task

我们有 8 核,16 GB 内存,运行 celery 的 Linux 服务器, 它正在运行一个 celery 工作队列 myQueue,并在 gevent 池下以 1000 并发运行。

执行任务大约 1 小时后,worker 突然死机,它不接受来自 celery beat 的新任务 这是我们的 celery 配置

App =  Celery('tasks')
class Conf:
   BROKER_URL   = 'amqp://<user>:<pass>@<host>:<port>/<vhost>'
   CELERY_IGNORE_RESULT = True
   CELERY_IMPORTS = ("worker_class",)
   CELERYBEAT_SCHEDULE = {
       'RunTask':{
           'task': 'tasks.worker.MyWorker',
           'schedule' : timedelta(minutes=5)
       }
   }

App.config_from_object(Conf)

我们像下面这样运行 celery

celery worker --workdir=tasks/ -A worker -P gevent -c 1000 -Q myQueue --loglevel=INFO

还有人可以解释一下如何使用 celery multi 来使用 gevent 池

最佳答案

使用 celery multi 指定池类型:

celery -A myApp multi start 4 -l INFO -P gevent -c 1000 -Q myQueue

上面的命令启动了 4 个 gevent worker,每个 worker 的并发级别为 1000,并且都从 myQueue 消费。

但这不是乐趣结束的地方,因为您甚至可以指定每个 worker 的并发性,还可以指定每个 worker 使用哪个队列。例如:

celery -A myApp multi start 4 -l INFO -P gevent -c:1-3 1000 -c:4 200 -Q:1-2 myQueue1 -Q:3 myQueue2 -Q:4 myQueue3

就像之前我们启动了 4 个 gevent worker,但是现在 worker 1 到 3 的并发为 1000,而最后一个 worker 的并发为 200。另外,worker 1 和 2 从 myQueue1 消费, worker 3 从 myQueue2 消费,worker 4 从 myQueue4 消费。

注意:celery worker 选项适用于 celery multi

关于python - Celery + Gevent 池在执行 1000 多个任务后挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28690211/

相关文章:

django - 当达到并发值时,Celery inspect 显示 worker 离线

python - 格林莱特VS。线程

flask - Gevent 和 Flask - 本地线程

python - Celery 使用 SNS 发布消息

asynchronous - python : Prioritizing tasks and Running asynchronous tasks without a lock

python - 在Python中没有lambda的函数列表上的映射函数应用

Python上下文管理器

python正则表达式用于转发

python - 根据条件用不同的替换字典替换 Pandas 数据框列中的值

python - 如何将 celery 作业的参数输入到要查询的数据库中;使用 mysql