python - 与 Flask 或其他 Python webframework 并行生成和处理 Websocket 输出数据的后台线程

标签 python multithreading websocket flask

好吧,我知道这有点啰嗦。我不知道如何处理这个问题。我想运行与 Flask 并行处理数据的线程,但我还没有看到很多人这样做。这是一个个人网络应用程序,所以我不想走 Celery 和 RabbitMQ 的道路。我已经创建了一个连接到股票经纪人的 API 并流式传输股票数据的模块。为了简单起见,我们只说它以每秒 1 个数字的速率生成一个随机数。在 Number Generator 线程创建的新数据的每个滴答声中,我希望其他几个线程处理相同的数字。让我们称它们为数学线程。一旦他们完成了对最新数字的处理,我希望将结果合并(到 JSON 中)并通过 Websocket 发送。我已经独立地能够使用 Flask-Sockets 通过 Websocket 发送数据。这是我想要完成的示例,其中每个框都可以看作是一个线程。

                     -------------
                     |  Number   |
                     | Generator | (rate of 1 number / second)
                     -------------
                           |  (same number sent to all 3 math threads)
                      -----+----- 
                     /     |     \
                    v      v      v
            ---------  ---------  ---------
            | math1 |  | math2 |  | math3 |
            ---------  ---------  ---------
                |          |          |  (results combined and sent over Websocket)
                v          v          v
    ---------------------------------------
    | Flask |       WebSocket Handler     |
    ---------------------------------------

这是简单的 websocket 代码。

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/stream')
def stream_socket(ws):
    while True: # for now just sending endless stream of time + random number every second
        message = {"time": int(time.time()*1000), "data": randrange(100)}
        ws.send(json.dumps(message))
        time.sleep(1)

@app.route('/')
def test():
    return render_template('main.html')

if __name__ == '__main__':
    app.run(host='0.0.0.0',debug=True,threaded=True)

所以我有独立工作的“数字生成器”模块(实际上是股票报价流媒体)和 websocket 连接。我只需要将它们与线程连接在一起,这是我正在努力的地方。如果那个数字生成器是一个简单的随机数生成器,并且数学线程也很简单(例如 2*x、sinc(x) 等),那么我很好奇是否有人可以让我朝着正确的方向前进线程与 Flask 并行完成。也许是一些骨架代码。谢谢。

更新:我可以让单独的线程与 Flask 并行运行,如下所示。这在我运行“python test.py”时有效,但为了让 Websockets 正常工作,我使用像这样的“gunicorn -k flask_sockets.worker test:app”这样的 gunicorn。然而,这似乎阻止了多线程工作。

更新 2:我能够使用 gevent 而不是 gunicorn 让 Websockets 和多线程工作。更新了下面的代码。

from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler

class myThread(threading.Thread):

    def __init__(self, wait, msg):
        super(myThread, self).__init__()
        self.wait = wait
        self.msg = msg

    def run(self):
        for i in range(5):
            time.sleep(self.wait)
            print self.msg

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/stream')
def stream_socket(ws):
    while True: # for now just sending endless stream of time + random number every second
        message = {"time": int(time.time()*1000), "data": randrange(100)}
        ws.send(json.dumps(message))
        time.sleep(1)

@app.route('/')
def test():
    return render_template('main.html')

if __name__ == '__main__':
    thread1 = myThread(2, "thread1")
    thread2 = myThread(3, "thread2")
    thread1.start()
    thread2.start()
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()
    #app.run(host='0.0.0.0',debug=True,threaded=True)

既然我已经能够在 Flask 通过 Websocket 独立发送数据的同时运行并行线程,我想我现在最大的问题是如何与 Websocket 装饰器共享并行工作线程的结果这样它就可以发送由这些单独的线程处理过的数据。有没有办法将 @sockets.route 装饰器放在一个线程中?

最佳答案

好的,这样的事情似乎可行。单独的线程以每秒一个的速度生成数字并将它们放入队列中。 Websocket 处理程序在可用时从队列中获取数据,并在 Websocket 上将其发送出去。显然这只是一个简单的例子,但它让我朝着正确的方向前进。如果有人有任何建议,仍然会很好奇。

app = Flask(__name__)
sockets = Sockets(app)

myQueue = Queue.Queue(10)

class myThread(threading.Thread):

    def __init__(self, length):
        super(myThread, self).__init__()
        self.length = length

    def run(self):
        for i in range(self.length):
            time.sleep(1)
            myQueue.put({"time": int(time.time()*1000), "data": randrange(100)})


@sockets.route('/stream')
def stream_socket(ws):
    while True:
        message = myQueue.get()
        ws.send(json.dumps(message))


@app.route('/')
def test():
    return render_template('main.djhtml')

if __name__ == '__main__':

    thread1 = myThread(30)
    thread1.start()
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()

关于python - 与 Flask 或其他 Python webframework 并行生成和处理 Websocket 输出数据的后台线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22240944/

相关文章:

python : Finding hidden element in html source code

c++ - Python PyGILState_{Ensure/Release} 在从 Python 代码返回到 C++ 时导致段错误

python - ValueError : When changing to a larger dtype, 其大小必须是数组最后一个轴的总大小(以字节为单位)的除数

java - Eclipse Jetty 服务器单实例应用程序

javascript - 使用 TLS 时的 WebSocket 错误(但并非没有)

amazon-web-services - 尝试将响应发布到Websocket时,AWS API Gateway“x509:证书由未知授权机构签名”

python - 如何遍历类似于 PHP 的 foreach 函数的 python 数组/对象

ios - Unity3D WWW 在 iOS 的主线程上调用

java - 提前/快速启动 ScheduledThreadPoolExecutor 中的任务

java - 一会儿,两个服务器线程互相阻塞