python - 使用 python 的多处理在 celery 工作人员和 Flask 之间共享列表

标签 python flask celery python-multiprocessing

我正在构建一个 Flask 应用程序,它依赖于 Celery 来处理一些长时间运行的任务。一旦完成处理,每个任务本质上都会将一个字典附加到共享列表中 - 该列表由 celery 工作人员和 Flask 应用程序的路由共享。 Flask 组件本质上由一组路由组成,用于检索共享列表的内容并修改元素的顺序。

我想我已经使用 Python 多处理模块中的管理器成功地在 Celery 工作人员之间共享了列表。但是,Flask 应用程序看不到对此列表所做的更改。这是一个说明问题的最小应用程序:

import os
import json

from flask import Flask
from multiprocessing import Manager
from celery import Celery

application = Flask(__name__)

redis_url = os.environ.get('REDIS_URL')
if redis_url is None:
    redis_url = 'redis://localhost:6379/0'

# Set the secret key to enable cookies
application.secret_key = 'some secret key'
application.config['SESSION_TYPE'] = 'filesystem'

# Redis and Celery configuration
application.config['BROKER_URL'] = redis_url
application.config['CELERY_RESULT_BACKEND'] = redis_url

celery = Celery(application.name, broker=redis_url)
celery.conf.update(BROKER_URL=redis_url,
                CELERY_RESULT_BACKEND=redis_url)

manager = Manager()
shared_queue = manager.list() # THIS IS THE SHARED LIST

@application.route("/submit", methods=['GET'])
def submit_song():
    add_song_to_queue.delay()
    return 'Added a song to the queue'

@application.route("/playlist", methods=['GET', 'POST'])
def get_playlist():
    playlist = []
    i = 0
    queue_size = len(shared_queue)
    while i < queue_size:
        print(shared_queue[i])
        playlist.append(shared_queue[i])
    return json.dumps(playlist)

@celery.task
def add_song_to_queue():
    shared_queue.append({'some':'data!'})
    print(len(shared_queue))

if __name__ == "__main__":
    application.run(host='0.0.0.0', debug=True)

在 celery 日志中,我可以清楚地看到字典被附加到列表中,并且列表的大小增加了。但是,当我在浏览器上访问/playlist 路由时,我总是得到一个空列表。

有谁知道如何让列表在所有工作人员和 Flask 应用程序之间共享?

最佳答案

我通过放弃 Celery 并使用 multiprocessing.Pool 作为任务队列并通过 Manager 共享内存找到了解决方案,如问题中的示例代码所示。此链接提供了一个很好的示例,说明了如何将此解决方案与 Flask 集成:http://gouthamanbalaraman.com/blog/python-multiprocessing-as-a-task-queue.html

from multiprocessing import Pool
from flask import Flask

app = Flask(__name__)
_pool = None

def expensive_function(x):
        # import packages that is used in this function
        # do your expensive time consuming process
        return x*x

@app.route('/expensive_calc/<int:x>')
def route_expcalc(x):
        f = _pool.apply_async(expensive_function,[x])
        r = f.get(timeout=2)
        return 'Result is %d'%r

if __name__=='__main__':
        _pool = Pool(processes=4)
        try:
                # insert production server deployment code
                app.run()
        except KeyboardInterrupt:
                _pool.close()
                _pool.join()

关于python - 使用 python 的多处理在 celery 工作人员和 Flask 之间共享列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35797974/

相关文章:

具有两个条件的python groupby并计算平均值

python - 使用 Google Drive 获取 WebViewLinks

python - 连接 Flask Socket.IO Server 和 Flutter

python - Flask 中的嵌套蓝图?

python - AngularJS+Flask 无法实例化模块

django - celery 不运行任务

python - 优化问题的神经网络

python - "%"登录 matplotlib Python

python - celery.utils.log 和 logging 中的 get logger 函数有什么不同?

python - 如何使用Ajax实现Progressbar以完成长的 celery 任务