python - 在 Celery 中,我怎样才能防止长期延迟的任务阻塞更新的任务?

标签 python rabbitmq celery scheduling

我有两种任务。任务 A 由 celerybeat 每小时生成一次。它会立即运行,并生成一千(或几千)个任务 B 实例,每个实例的 ETA 都是 future 一天。

启动时,任务 A 的一个实例运行并生成一千个 B。从那时起,什么也没有发生。我应该看到另一个 A 每小时运行一次,还有一千个 B。但实际上我什么也没看到。

在卡住时,rabbitmqctl 显示 1000 条消息,其中 968 条准备就绪,32 条未确认。一小时后,有 1001 条消息,其中 969 条已准备就绪,32 条未确认。依此类推,每小时一条新消息被归类为就绪。据推测,正在发生的事情是工作人员正在预取 32 条消息,但无法对它们采取行动,因为它们的 ETA 仍在未来。与此同时,本应立即运行的较新任务无法运行。

处理这个问题的正确方法是什么?我猜我需要多个 worker ,也许还有多个队列(但我不确定后一点)。有更简单的方法吗?我试过摆弄 CELERYD_PREFETCH_MULTIPLIER 和 -Ofail(如此处讨论:http://celery.readthedocs.org/en/latest/userguide/optimizing.html),但无法实现。我的问题和这个问题一样吗:[[Django Celery]] Celery blocked doing IO tasks

无论如何:我可以解决这个问题只是因为我对任务的性质及其时间安排有很多了解。 future ETA 的足够任务会锁定整个系统,这似乎不是设计缺陷吗?如果我等几个小时,然后终止并重新启动 worker,它会再次获取前 32 个任务并卡住,即使此时队列中有任务准备好立即运行。某些组件不应该足够聪明以查看 ETA 并忽略不可运行的任务吗?

附录:我现在认为这个问题是 RabbitMQ 3.3 与 Celery 3.1.0 一起使用时的一个已知错误。更多信息在这里: https://groups.google.com/forum/#!searchin/celery-users/countdown|sort:date/celery-users/FiAAESOzezA/499OH-pylacJ

更新到 Celery 3.1.1 后,情况似乎好多了。任务 A 每小时运行一次(好吧,它有几个小时)并安排其任务 B 的副本。那些似乎正在填满工作人员:未确认消息的数量继续增长。我得看看它能不能无限生长。

最佳答案

这似乎是一个可以通过路由解决的问题: http://celery.readthedocs.org/en/latest/userguide/routing.html

使用路由时,您可以有多个队列,其中填充了不同类型的任务。如果您希望任务 B 不阻塞更多任务 A,您可以将它们放入具有不同优先级的单独工作队列,这样您的工作人员将在充满任务 B 的大队列中工作,但是当任务 A 到达时,它会被下一个可用的任务拉走 worker 。

这样做的额外好处是,您还可以将更多工作人员分配给已满员的队列,而这些工作人员只会从指定的高容量队列中拉取数据。

关于python - 在 Celery 中,我怎样才能防止长期延迟的任务阻塞更新的任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23635707/

相关文章:

python - 使用 chunksize 的缺失记录 - Pandas 和 Google Analytics API 集成

java - 我们能否拥有类似于 RabbitMq 的 Apache Kafka 强大的路由能力?

rabbitmq - 使用 Ruby-Amqp 时如何保持 AMQP 连接打开?

python - 属性错误 : 'Flask' object has no attribute 'user_options'

python - 仅当所有元素都是 pandas 的 groupby 中的 NA 时,如何删除 NA

python - 将列表项转换为 int 并将它们相加的最有效方法

django - Docker/Kubernetes + Gunicorn/Celery - 多个 worker 与副本?

python - 从连续运行的 flask 应用程序中异步调用函数

Python: Pandas 中的数据框操作和聚合

model - AsyncApi 和 RabbitMq