我的“大”任务是分步进行的:它可以终止或产生更多任务。在我的示例中,我数到 5。
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
from time import sleep
@app.task
def slow_add(x):
"""Slowly counts to 5"""
sleep(1)
print (x)
if x == 5:
return x
else:
return slow_add.s(x+1)()
当我安排任务时,我只得到一次调用:
In [48]: asks.slow_add.run(1)
1
2
3
4
5
Out[48]: 5
- 如何异步调用?我尝试了 apply_async、delay 的不同变体,但没有效果。
- 我在 celery 监视器中没有看到任何任务。为什么?
- 当任务异步执行时,如何获得中间状态(在本例中为 1 到 5 之间的数字)?
最佳答案
在我的例子中,
@app.task
def test(x):
import time
time.sleep(1)
print x
if x == 5:
return x
else:
return test.delay(x+1)
- 当我使用
res = test.delay(1)
时,它可以正常异步工作 在 Celery 监视器中我可以看到收到的任务
Received task: services.tasker.test[67f22e02-7c39-4c1d-a646-acabeb72d208]
1
Received task: services.tasker.test[9eed6d45-4931-4790-8477-3bbe75e213e4]
Task services.tasker.test[67f22e02-7c39-4c1d-a646-acabeb72d208] succeeded in 3.021544273s: None
2
...您可以使用
res.ready()
获取任务状态,当任务完成时返回True
,否则返回False
>
在我的测试用例中,结果如下
>>> res = test.delay(1)
>>> res.ready() # before finishing task
False
>>> res.ready() # after finishing task
True
或
def final_result(r):
if isinstance(r.result, celery.result.AsyncResult):
return final_result(r.result)
else:
return r.result
并像上面一样使用
>>> print final_result(res)
5
关于python - celery 从子任务发出任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21574624/