python - celery 从子任务发出任务

标签 python asynchronous celery

我的“大”任务是分步进行的:它可以终止或产生更多任务。在我的示例中,我数到 5。

from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
from time import sleep

@app.task
def slow_add(x):
    """Slowly counts to 5"""
    sleep(1)
    print (x)
    if x == 5:
        return x
    else:
        return slow_add.s(x+1)()

当我安排任务时,我只得到一次调用:

In [48]: asks.slow_add.run(1)
1
2
3
4
5
Out[48]: 5
  1. 如何异步调用?我尝试了 apply_async、delay 的不同变体,但没有效果。
  2. 我在 celery 监视器中没有看到任何任务。为什么?
  3. 当任务异步执行时,如何获得中间状态(在本例中为 1 到 5 之间的数字)?

最佳答案

在我的例子中,

@app.task
def test(x):
    import time
    time.sleep(1)
    print x
    if x == 5:
        return x
    else:
        return test.delay(x+1)
  1. 当我使用res = test.delay(1)时,它可以正常异步工作
  2. 在 Celery 监视器中我可以看到收到的任务

    Received task: services.tasker.test[67f22e02-7c39-4c1d-a646-acabeb72d208]
    1
    Received task: services.tasker.test[9eed6d45-4931-4790-8477-3bbe75e213e4]
    Task services.tasker.test[67f22e02-7c39-4c1d-a646-acabeb72d208] succeeded in 3.021544273s: None
    2
    ...

  3. 您可以使用res.ready()获取任务状态,当任务完成时返回True,否则返回False >

在我的测试用例中,结果如下

>>> res = test.delay(1)
>>> res.ready()  # before finishing task
False
>>> res.ready()  # after finishing task
True

def final_result(r):
    if isinstance(r.result, celery.result.AsyncResult):
         return final_result(r.result)
    else:
        return r.result

并像上面一样使用

>>> print final_result(res)
5

关于python - celery 从子任务发出任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21574624/

相关文章:

python - 在用 celery beat 安排任务时防止口是心非

python - 您可以在另一个 eventlet 池中使用一个 eventlet 池吗?

python - 理解 Michael Nielsen 的反向传播代码

Python Linux - Is-Alive Midori 看门狗

java - 如何知道所有异步 HTTP 调用已完成

php - 如何使用 jquery、ajax、php 和 mysql 异步更新表,从表中删除条目?

python - 与 Flask 服务器同时运行 while 循环

python - 将 NumPy 数组转换为 PIL 图像

python - Django对相关一对多对象的聚合查询

c# - 如何异步初始化静态类