python - Celery+RabbitMQ 上任务进度未更新最新状态

标签 python rabbitmq celery

我用 custom states 实现了长任务的进度反馈在 Celery + RabbitMQ 结果后端。

但是调用者无法按照我的预期检索最新的进度状态。在下面的代码中,result.info['step']总是返回0,然后任务将以“result=42”结束.

# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

@app.task
def long_task():
  for i in range(0, 10):
    timer.sleep(10)  # some work
    self.update_state(state='PROGRESS', meta={'step': i})
  return 42


# caller.py
from tasks import long_task
result = long_task.delay()

while not (result.successful() or result.failed()):
  try:
    result.get(timeout=1)
  except celery.exceptions.TimeoutError:
    if result.state == 'PROGRESS':
      print("progress={}".format(result.info['step']))
print("result={}".format(result.get()))

Python 3.4.1/Celery 3.1.17/RabbitMQ 3.4.4

最佳答案

我认为这是一个微妙的时间问题,再加上 RabbitMQ result backend将任务结果作为消息发送,并且只能检索一次。

预先简短回答:避免调用 result.get() 直到您确实需要最终结果:

while not result.ready():
  if result.state == "PROGRESS":
    print("progress={}".format(result.info['step']))
  time.sleep(1)
print("result={}".format(result.get()))
# +additional cleanup: see comments below

更长的答案是,这里实际上有两种与 AMQP 后端对话的方法(和属性):

  • AsyncResult.get()

    调用 AMQPBackend.wait_for() ,它会消耗任务队列中的所有结果,直到出现具有 celery.states.READY_STATES 状态的结果。

  • AsyncResult.successful()AsyncResult.failed()AsyncResult.info

    调用 AMQPBackend.get_task_meta() ,它消耗任务队列中的所有结果,然后缓存并返回最新的结果。如果未检索到消息,后端将返回缓存结果或 PENDING 结果。注:最新消息为requeued通过后端,如果是 the final result ,它将由 AsyncResult 实例1缓存。

调用 result.get() 将消耗所有状态更新,从而使 result.info 没有机会提供最新的进度报告;相反,它很可能是一个过时的缓存,对 AsyncResult.get_task_meta() 的调用之一在某个时刻成功地获取了该缓存。

因此,根据时间的不同,在下一个最坏的情况下,step 可能会停留在 0,其中最坏的情况是 PROGRESS 状态永远不会联系调用者。

1由于通过调用 get_task_meta() 获取最终结果时会重新排队并缓存,因此您需要手动清空队列,如请在下面评论。

关于python - Celery+RabbitMQ 上任务进度未更新最新状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28870679/

相关文章:

python - Pandas:一列中的一系列True或False,在另一列中选择当有True时的值

python - 如何在 Docker、Ubuntu 中运行我的脚本 python openCV

java - 从一个队列铲到另一个队列后如何拒绝消息?

python - Django celery 连接错误 : Too many heartbeats missed

python - 如何使用 redis 代理从 celery 中删除任务?

python - 使用 Pillow 保存 JPEG 评论

python - 从另一个包导入类

django - RabbitMQ 没有关闭与 Celery 的旧连接

java - 如何使用 muleClient.request 从 Java 实现异步 AMQP-RabbitMQ?

python - Django 1.9 + Celery 未注册任务