websocket - 如何使用非阻塞 fastapi websocket 进行多处理

标签 websocket multiprocessing cpu fastapi

背景:我正在制作一个 Websocket 接口(interface),它控制 ProcessPoolExecutor 中的一些CPU 绑定(bind)进程。这些进程使用队列定期发送客户端更新,并且仅在收到“停止”消息时终止(长时间运行)。

问题:阅读文档后,我无法让 ProcessPoolExecutor 工作,以便 a) 套接字保持畅通(并行调用 update_cicada() 需要)。 b)、可以使用 websocket 消息终止该进程。

我在这里缺少什么?默认线程执行器可以工作,但在这种情况下,MP 并行性更快(I/O 最少)。有用信息(无网络套接字): How to do multiprocessing in FastAPI 。 线程而不是多处理。 Make an CPU-bound task asynchronous for FastAPI WebSockets

示例

@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
    pool = ProcessPoolExecutor() #used instead of default/threadpool
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    s = CICADA(clip_class, queue)

    await websocket.accept()

    while True:
        data = await websocket.receive_json()

        #should be non-blocking and terminate on "stop" message
        loop.run_in_executor(pool, update_cicada(data, s, queue))

        #update_cicada adds to queue thus updating client
        result = await queue.get()
        websocket.send_json(result) #so process can update client whithout terminating

最佳答案

我两天前读过这个问题,但无法动摇。这里有多个概念,其中一些(如果不是全部)非常复杂。我为自己创建了一个原型(prototype),以充分了解发生的情况,这是一个有效的示例。我添加了许多评论来解释发生了什么或为什么发生某些事情。

球棒右侧的几个指针:

  • asyncio.Queue线程或进程安全的。这意味着,在跨进程共享此类对象时,您可能(并且可能会)获得损坏状态。这样的队列非常适合在任务之间共享状态,因为它们都在事件循环中的同一线程上运行。
  • multiprocessing.Queue 是线程和进程安全的,但您需要一个 Manager() 来处理具体细节。它本质上创建了另一个子进程来处理与队列的所有通信(来自其他进程)。
  • 确保您的代码不会阻止其他请求。在下面的示例中,我使用 asyncio.sleep() 将控制权交还给事件循环,以允许事件循环中的其他任务继续处理。如果我没有这样做,它就会在无限 while 循环中阻塞当前任务。

我使用 4 个并发请求测试了以下内容(我在命令行中使用了 wscat)。请注意,我绝不是 asynciomultiprocessing 方面的专家,因此我并不声称这些是最佳实践。

import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
from queue import Empty
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import time

app = FastAPI()

#do not re-create the pool with every request, only create it once
pool = ProcessPoolExecutor()


def long_running_task(q: mp.Queue) -> str:
    # This would be your update_cicada function
    for i in range(5):
        #this represents some blocking IO
        time.sleep(3)
        q.put(f"'result': 'Iteration {i}'")
    return "done!"


@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
    loop = asyncio.get_event_loop()
    
    #To use Queue's across processes, you need to use the mp.Manager()
    m = mp.Manager()
    q = m.Queue()
    
    await websocket.accept()
    
    #run_in_executor will return a Future object. Normally, you would await
    #such an method but we want a bit more control over it. 
    result = loop.run_in_executor(pool, long_running_task, q)
    while True:
        
        #None of the coroutines called in this block (e.g. send_json()) 
        # will yield back control. asyncio.sleep() does, and so it will allow
        # the event loop to switch context and serve multiple requests 
        # concurrently.
        await asyncio.sleep(0)

        try:
            #see if our long running task has some intermediate result.
            # Will result None if there isn't any.
            q_result = q.get(block=False)
        except Empty:
            #if q.get() throws Empty exception, then nothing was 
            # available (yet!).
            q_result = None

        #If there is an intermediate result, let's send it to the client.
        if q_result:
            try:
                await websocket.send_json(q_result)
            except WebSocketDisconnect:
                #This happens if client has moved on, we should stop the long
                #  running task
                result.cancel()
                #break out of the while loop.
                break
        
        #We want to stop the connection when the long running task is done.
        if result.done():
            try:
                await websocket.send_json(result.result())
                await websocket.close()  
            except WebSocketDisconnect:
                #This happens if client has moved on, we should stop the long
                #  running task
                result.cancel()
            finally:
                #Make sure we break out of the infinte While loop.
                break
            
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000,  )

关于websocket - 如何使用非阻塞 fastapi websocket 进行多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72703446/

相关文章:

javascript - 如何在声明 WebSocket HTML5 时发送参数

javascript - 用于小型传输的 Ajax 或 Websockets

python - 如何异步执行多次函数并获得第一个结果

python - 将子进程输出重定向到名称为子进程 pid 的日志文件

c - linux 中的 sched_setaffinity cpu 亲和性

Azure CPU 性能基准和/或规范

javascript - WebSockets : Can multiple Events fired in a private channel arrive out of order in client

javascript - 使用 socket.io 跟踪连接的套接字

python - 如何限制并发worker数量?

c++ - OnPaint 更新太频繁