python - 同步 celery 队列

标签 python django celery

我有一个应用程序,每个用户都可以在其中创建任务,并且用户创建的每个任务都会添加到特定用户的动态队列中。所以来自 User1 的所有任务都被添加到 User1_queue,User2 到 User2_queue,等等。

我需要做的是,当 User1 将 Task1、Task2 和 Task3 添加到他们的队列时,Task1 被执行,Celery 等到它完成后再执行 Task2,等等。

让它们从多个队列中彼此并排执行是很好的,因此来自 User1_queue 的 Task1 和来自 User2_queue 的 Task1。它只是限制 Celery 按照添加的顺序在队列中同步执行任务。

是否可以让 Celery 每个队列的并发数为 1,这样任务就不会在同一个队列中并排执行?


如果它能帮助访问此问题的任何人,我通过设置多个并发为 1 的工作人员解决了我的问题,每个工作人员都在一个唯一的队列中。然后我在我的 Django 应用程序中使用了一些逻辑来为事件用户存储每个 session 的队列名称。

接下来我将添加额外的逻辑来选择“最不忙”的工作人员,并尝试将用户平均分布在活跃的工作人员中。但目前它运行良好。

Flower ,Celery 的监控工具,在尝试解决这个问题时也提供了巨大的帮助。

最佳答案

您可以选择一个队列-Q option每个工作人员使用 --concurrency=1

 celery -A proj worker --concurrency=1 -n user1@%h -Q User1_queue

@JamesFoley 尽管可以在 celery 端实现以下功能,但目前还不是 ( https://github.com/celery/celery/issues/1599 )

一些想法:

  • 动态生成/控制 celery worker
  • 单一节拍任务来决定哪些任务可以被派生来运行(锁/互斥或数据库表监控任务)

关于python - 同步 celery 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49737366/

相关文章:

python - 使用 Pandas DateOffset 移动到给定时间的最近过去的日期时间

没有 fill_diagonal 的 inf 的 Python 矩阵对角线

python - ValueError:使用套接字和 ssl 模块时 check_hostname 需要 server_hostname

python - 获取标签文章数

Django 正则表达式验证程序消息无效

python - celery "Received unregistered task of type"

python - Celery 任务和子任务有什么区别?

python - for/in循环帮助,帮助理解

python - 如何添加身份验证中间件 JWT django?

celery - 在 Django 应用程序中将 celery 4.x 升级到 5.x.x - execute_from_command_line() 替换