我用 custom states 实现了长任务的进度反馈在 Celery + RabbitMQ 结果后端。
但是调用者无法按照我的预期检索最新的进度状态。在下面的代码中,result.info['step']
总是返回0
,然后任务将以“result=42”结束.
# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@app.task
def long_task():
for i in range(0, 10):
timer.sleep(10) # some work
self.update_state(state='PROGRESS', meta={'step': i})
return 42
# caller.py
from tasks import long_task
result = long_task.delay()
while not (result.successful() or result.failed()):
try:
result.get(timeout=1)
except celery.exceptions.TimeoutError:
if result.state == 'PROGRESS':
print("progress={}".format(result.info['step']))
print("result={}".format(result.get()))
Python 3.4.1/Celery 3.1.17/RabbitMQ 3.4.4
最佳答案
我认为这是一个微妙的时间问题,再加上 RabbitMQ result backend将任务结果作为消息发送,并且只能检索一次。
预先简短回答:避免调用 result.get()
直到您确实需要最终结果:
while not result.ready():
if result.state == "PROGRESS":
print("progress={}".format(result.info['step']))
time.sleep(1)
print("result={}".format(result.get()))
# +additional cleanup: see comments below
更长的答案是,这里实际上有两种与 AMQP 后端对话的方法(和属性):
-
调用
AMQPBackend.wait_for()
,它会消耗任务队列中的所有结果,直到出现具有celery.states.READY_STATES
状态的结果。 AsyncResult.successful()
、AsyncResult.failed()
、AsyncResult.info
调用
AMQPBackend.get_task_meta()
,它消耗任务队列中的所有结果,然后缓存并返回最新的结果。如果未检索到消息,后端将返回缓存结果或PENDING
结果。注:最新消息为requeued通过后端,如果是 the final result ,它将由AsyncResult
实例1缓存。
调用 result.get()
将消耗所有状态更新,从而使 result.info
没有机会提供最新的进度报告;相反,它很可能是一个过时的缓存,对 AsyncResult.get_task_meta()
的调用之一在某个时刻成功地获取了该缓存。
因此,根据时间的不同,在下一个最坏的情况下,step
可能会停留在 0,其中最坏的情况是 PROGRESS
状态永远不会联系调用者。
1由于通过调用 get_task_meta()
获取最终结果时会重新排队并缓存,因此您需要手动清空队列,如请在下面评论。
关于python - Celery+RabbitMQ 上任务进度未更新最新状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28870679/