python - Celery - 异步任务链

标签 python asynchronous flask redis celery

我正在尝试开发一个概念验证,以使用 celery 和 redis 作为代理进行异步任务链接。

该程序是带有/run 的 Flask API 和三个需要作为异步任务运行的函数,例如从 a() 返回的是 b() 的参数,返回 b() 是写入数据的 c() 的参数通过集合对象“collection”进入 mongodb 集合。

@celery.task
def a(param):
    print("Original: {0}".format(param))
    print("Inside Task 1")
    param.update({"timestamp_A":str(datetime.timestamp), "result_A":True})
    print(param)
    return param

@celery.task
def b(param):
    print("Inside Task 2")
    param.update({"timestamp_B":str(datetime.timestamp), "result_B":True})
    print(param)
    return param

@celery.task
def c(param):
    print("Inside Task 3")
    collection.insert(dict(param))
    print("Output Saved to DB")


@app.route('/run', methods = ['GET'])
def run():
    if request.method != 'GET':
        return "HTTP Method not allowed"

    if request.method == 'GET':
        T = 1000
        for num in range(0, T):
            ds = {"test": num}
            chain(a.s(ds) | b.s() | c.s()).apply_async()
        return "Process Complete"

if __name__ == '__main__':
    app.run(debug=True)

使用上面的代码,任务链可以工作,即 a() 使用其参数执行,但是要执行函数 b(),它首先等待整个数据在 a() 中排队,然后才执行b()。我需要在执行任何任务 a() 后立即将其交给 b() 等等。 有人对我可能出错的地方有任何指示吗?

最佳答案

我可能遗漏了一些东西,但看起来最简单的方法就是在上一个任务结束时调用下一个任务。

@celery.task
def a(arg):
  ret = calc(arg)
  b.apply_async(ret)

@celery.task
def b(arg):
  ret = calc(arg)
  c.apply_async(ret)

@celery.task
def c(arg):
  ret = calc(arg)
  mongo.store(ret)

这不允许您有时在循环中调用 a 有时不允许,但您可以将任务包装在同步运行内部部分的外部任务中。

关于python - Celery - 异步任务链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48261760/

相关文章:

python - linux中如何控制后台进程

python - 尝试安装libgreader时出错

node.js - 在异步完成之前将 'Received post' 发送回请求者(NodeJS、ExpressJS)

java - 如何理解JMS中的 "synchronous"和 "asynchronouns"消息?

python - 使用 openshift 的 Flask Dropbox 快速启动示例

python - 网络抓取 Instagram 关注者数量 BeautifulSoup

python - 如何使用PyCharm进行GIMP插件开发?

c# - 同步 HTTP 处理程序和异步 HTTP 处理程序之间的性能差异

html - 将多个 Bokeh HTML 图嵌入 flask 中

python - 如何制作仅包含 jinja 模板的 python 包