我正在使用 Flask,设置了一个简单的应用程序,并且正在尝试学习如何有效地利用 celery 来完成工作,并在查询时显示结果。
在最基本的例子中,我创建了一个任务,通过 task.delay(args)
.然后,这个对象让我为作业提取一个 ID,稍后我可以通过点击不同的端点来查询它。简单的。
我的目标是模仿这一点,尽管利用群体。阅读文档,我发现组原语是懒惰的,所以我必须在保存它之前实际调用它。
我的问题肯定来自缺乏理解,但基本上是:
如果我的目标是能够通过 Flask Pipeline 异步并行地运行多组后台任务,那么我该如何检索 .join()
给定以下限制的组的结果
这是正确的方法吗?还是我应该考虑不同的心态?
伪代码:
# From my apps init
celery_instance = Celery("module.modulename", backend = app.config['CELERY_RESULT_BACKEND'], broker = app.config['CELERY_BROKER_URL'])
celery_instance.conf.update(app.config)
<snip>
from celery import group
from app import celery_instance
@app.route("/status/domain/<id>", methods=['GET'])
def query(id):
# Works for single job, not job group
result = celery_instance.AsyncResult(id)
...
@app.route("/query/domain/<domain>", methods=['GET'])
def query_by_domain(domain):
...
job = group([task1.delay(domain), task2.delay(domain)])
return redirect(url_for('app.query', id=job.id), code=302)
最佳答案
您需要将组结果保存为通过 GroupResult.save()
方法:
@app.route("/query/domain/<domain>", methods=['GET'])
def query_by_domain(domain):
...
job = group([task1.delay(domain), task2.delay(domain)])
<b>job.save()</b>
return redirect(url_for('app.query', id=job.id), code=302)
然后你可以得到
GroupResult
如通过 GroupResult.restore()
方法。@app.route("/status/domain/<id>", methods=['GET'])
def query(id):
# Works for single job, not job group
<b>result = celery_instance.GroupResult.restore(group_id)</b>
...
关于python - 在 Flask 应用程序中获取 Celery Group 结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33611235/