python - Flask-Socketio 不从外部 RQ 进程发出

标签 python multithreading flask task-queue flask-socketio

我正在运行一个 Flask 服务器,它通过 Flask-Socketio 连接到 iOS 客户端。服务器必须处理一些复杂的数据,由于这需要一段时间才能解决,所以我使用 Redis 队列在后台作业中完成。

通信正常工作正常,但我需要发送给客户端,并在作业完成后写入数据库,并且我正在尝试从作业功能中执行此操作(如果有一种方法可以让应用程序知道何时工作完成后,应用程序可以在一个地方处理所有通信)。

为此,我在作业中启动了一个新的 Socketio 实例,并将其连接到 redis 队列,但我认为我的做法是错误的。

它没有崩溃,但客户端没有收到任何东西。

这是我的代码:

tasks.py

# This is the job
def engine(path, id):
    result = process(path)
    print(result)
    socket = SocketIO(message_queue = os.environ.get('REDIS_URL'))
    socket.emit('info', result)

events.py

def launch_task(name, description, *args, **kwargs):
    rq_job = current_app.task_queue.enqueue('app.tasks.' + name,
                                        *args, **kwargs)
    return rq_job.get_id()

@socketio.on('File')
def got_file(file):
    print("GOT FILE")
    print(file[0])
    name = file[0] + ".csv"
    path = queue_dir + name
    data = file[1]
    csv = open(path, "w")
    csv.write(data)
    csv.close()
    print(path)
    launch_task("engine", "test", path, request.sid)

__init__.py

socketio = SocketIO()

def create_app(debug=False, config_class=Config):
    app = Flask(__name__)
    app.debug = debug
    app.config.from_object(config_class)

    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('alg-tasks', connection=app.redis)

    from .main import main as main_blueprint
    app.register_blueprint(main_blueprint)

    socketio.init_app(app)
    return app

events.py 处理所有通信并启动工作线程。

我认为在实例化 Socketio 时我的论点是错误的,但我不知道......关于 Socketio 和后台作业,我还有很多不明白的地方。

提前致谢!

最佳答案

在应用程序上,您必须使用 app 和消息队列初始化 SocketIO 对象:

socketio.init_app(app, message_queue=os.environ.get('REDIS_URL'))

在您的 RQ 工作线程上,您做得正确,只是使用了消息队列:

socket = SocketIO(message_queue=os.environ.get('REDIS_URL'))

但是每次发出时创建一个新的 SocketIO 实例会浪费资源,您应该创建一个可以在工作线程处理的多个任务中重用的全局实例。

关于python - Flask-Socketio 不从外部 RQ 进程发出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51635019/

相关文章:

java - ReentrantReadWriteLock 与同步

python - 为什么 Flask 使用了我所有的内存?

python - 链接对象的设计模式

python - 我应该用编码信息启动一个 py 文件吗?

c# - 为什么 C# 不允许锁定空值?

c# - 新线程()和线程池?

Flask-登录密码重置

python - flask:异常后停止服务器

python - Pandas - 不同值的滚动累积计数

python - 一种使用 Selenium 检查复选框的有效方法