python - 在 Flask 应用程序中获取 Celery Group 结果

标签 python flask celery

我正在使用 Flask,设置了一个简单的应用程序,并且正在尝试学习如何有效地利用 celery 来完成工作,并在查询时显示结果。

在最基本的例子中,我创建了一个任务,通过 task.delay(args) .然后,这个对象让我为作业提取一个 ID,稍后我可以通过点击不同的端点来查询它。简单的。

我的目标是模仿这一点,尽管利用群体。阅读文档,我发现组原语是懒惰的,所以我必须在保存它之前实际调用它。

我的问题肯定来自缺乏理解,但基本上是:
如果我的目标是能够通过 Flask Pipeline 异步并行地运行多组后台任务,那么我该如何检索 .join()给定以下限制的组的结果

  • 请求 Endpoint1(可能返回一个 ID)
  • 请求 Endpotin2,传递 ID 以返回完成工作的结果

  • 这是正确的方法吗?还是我应该考虑不同的心态?

    伪代码:
    # From my apps init
    celery_instance = Celery("module.modulename", backend = app.config['CELERY_RESULT_BACKEND'], broker = app.config['CELERY_BROKER_URL'])
    celery_instance.conf.update(app.config)
    
    <snip>
    
    from celery import group
    from app import celery_instance
    
    @app.route("/status/domain/<id>", methods=['GET'])
    def query(id):
      # Works for single job, not job group
      result = celery_instance.AsyncResult(id)
      ...
    
    @app.route("/query/domain/<domain>", methods=['GET'])
    def query_by_domain(domain):
      ...
      job = group([task1.delay(domain), task2.delay(domain)])
      return redirect(url_for('app.query', id=job.id), code=302)
    

    最佳答案

    您需要将组结果保存为通过 GroupResult.save()方法:

    @app.route("/query/domain/<domain>", methods=['GET'])
    def query_by_domain(domain):
        ...
        job = group([task1.delay(domain), task2.delay(domain)])
        <b>job.save()</b>
        return redirect(url_for('app.query', id=job.id), code=302)
    

    然后你可以得到GroupResult如通过 GroupResult.restore()方法。
    @app.route("/status/domain/<id>", methods=['GET'])
    def query(id):
        # Works for single job, not job group
        <b>result = celery_instance.GroupResult.restore(group_id)</b>
        ...
    

    关于python - 在 Flask 应用程序中获取 Celery Group 结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33611235/

    相关文章:

    python - 如何从数组中仅返回一次重复项?

    python - 如何通过Python从Azure订阅获取所有资源的详细信息

    python - Cloud9 Python : getting Flask module not found on Run action

    django - 是否可以显示 django celery 任务的进度条?

    python - 如何在 Python 3.6 中读取/转换包含用 Python 2.7 编写的 pandas 数据帧的 HDF 文件?

    python - 给定 rpm 包名称,查询 yum 数据库以获取更新

    python - 通过 Python Flask 从一个 HTML 输入中获取多个值

    python - 在 flask 中嵌入 Bokeh 应用程序

    python - 创建一个支持 json 序列化的类以用于 Celery

    python - Celery Worker 中的多线程