python - 在线程中使用 Popen 会阻塞每个传入的 Flask-SocketIO 请求

标签 python multithreading flask pipe popen

我有以下情况: 我在 socketio 服务器上收到一个请求。我回答它 (socket.emit(..)),然后然后在另一个线程中开始计算负载繁重的东西。

如果繁重的计算是由 subprocess.Popen(使用 subprocess.PIPE)引起的,只要它正在执行,它就会完全阻止每个传入的请求,尽管它发生在一个单独的线程。

没问题 - 在 this thread建议异步读取缓冲区大小为 1 的子进程的结果,以便在这些读取之间其他线程有机会做某事。不幸的是,这对我没有帮助。

我也已经monkeypatched eventlet 并且工作正常 - 只要我不在线程中使用 subprocess.Popensubprocess.PIPE

在此代码示例中,您可以看到它仅在使用 subprocess.Popensubprocess.PIPE 时发生。当取消注释 #functionWithSimulatedHeavyLoad() 并改为注释 functionWithHeavyLoad() 时,一切都像魅力一样。

from flask import Flask
from flask.ext.socketio import SocketIO, emit
import eventlet

eventlet.monkey_patch()
app = Flask(__name__)
socketio = SocketIO(app)

import time
from threading  import Thread

@socketio.on('client command')
def response(data, type = None, nonce = None):
    socketio.emit('client response', ['foo'])
    thread = Thread(target = testThreadFunction)
    thread.daemon = True
    thread.start()

def testThreadFunction():
    #functionWithSimulatedHeavyLoad()
    functionWithHeavyLoad()

def functionWithSimulatedHeavyLoad():
    time.sleep(5)

def functionWithHeavyLoad():
    from datetime import datetime
    import subprocess
    import sys
    from queue import Queue, Empty

    ON_POSIX = 'posix' in sys.builtin_module_names

    def enqueueOutput(out, queue):
        for line in iter(out.readline, b''):
            if line == '':
                break
            queue.put(line)
        out.close()

    # just anything that takes long to be computed
    shellCommand = 'find / test'

    p = subprocess.Popen(shellCommand, universal_newlines=True, shell=True, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX)
    q = Queue()
    t = Thread(target = enqueueOutput, args = (p.stdout, q))
    t.daemon = True
    t.start()
    t.join()

    text = ''

    while True:
        try:
            line = q.get_nowait()
            text += line
            print(line)
        except Empty:
            break

    socketio.emit('client response', {'text': text})

socketio.run(app)

在functionWithHeavyLoad()函数中的阻塞工作完成后,客户端收到消息'foo'。不过,它应该更早收到消息。

此示例可以复制并粘贴到 .py 文件中,并且可以立即重现该行为。

我正在使用 Python 3.4.3、Flask 0.10.1、flask-socketio1.2、eventlet 0.17.4

更新

如果我将它放入 functionWithHeavyLoad 函数中,它实际上可以工作并且一切都很好:

import shlex
shellCommand = shlex.split('find / test')

popen = subprocess.Popen(shellCommand, stdout=subprocess.PIPE)

lines_iterator = iter(popen.stdout.readline, b"")
for line in lines_iterator:
    print(line)
    eventlet.sleep()

问题是:我使用 find 来处理繁重的负载,以使您的示例更容易重现。但是,在我的代码中,我实际上使用 tesseract "{0}"stdout -l deu 作为销售命令。这(与 find 不同)仍然会阻止所有内容。这是一个 tesseract 问题而不是 eventlet 吗?但仍然:如果它发生在一个单独的线程中,当 find 不阻塞时,它会逐行读取上下文切换,如何阻塞?

最佳答案

感谢这个问题,我今天学到了一些新东西。 Eventlet 确实提供了对子进程及其功能的 greenlet 友好版本,但出于某种奇怪的原因,它没有在标准库中对这个模块进行猴子补丁。

子流程的eventlet实现链接:https://github.com/eventlet/eventlet/blob/master/eventlet/green/subprocess.py

看小事件patcher ,打补丁的模块有os、select、socket、thread、time、MySQLdb、builtins和psycopg2。补丁程序中绝对没有对子进程的引用。

好消息是,在我替换后,我能够在与您的非常相似的应用程序中使用 Popen():

import subprocess

与:

from eventlet.green import subprocess

但请注意,当前发布的eventlet版本(0.17.4)不支持Popen中的universal_newlines选项,使用会报错。对此选项的支持在 master 中(这里是添加该选项的 commit)。您要么必须从调用中删除该选项,要么直接从 github 安装 eventlet 的主分支。

关于python - 在线程中使用 Popen 会阻塞每个传入的 Flask-SocketIO 请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34599578/

相关文章:

python - 无法使用flask-moment在网页上显示时间

python - 使用 Django + HTML 表单上传文件

Python:提取字符串内标签之间的所有子字符串

python - 无法在 python virtualenv 中安装 mysqlclient

objective-c - 我的 iPhone 应用程序的这个 Objective-C 线程代码有什么问题?

c++ - 将执行从一个线程转移到另一个线程以实现任务并行性和 future 调用

android - Picasso Android 图片加载——线程模型

python - “功能”对象在注册蓝图时没有属性 'name'

python - matplotlib 中的两个表

python - 在 Jinja 2 中使用变量作为字典键