背景:我正在制作一个 Websocket 接口(interface),它控制 ProcessPoolExecutor 中的一些CPU 绑定(bind)进程。这些进程使用队列定期发送客户端更新,并且仅在收到“停止”消息时终止(长时间运行)。
问题:阅读文档后,我无法让 ProcessPoolExecutor 工作,以便 a) 套接字保持畅通(并行调用 update_cicada() 需要)。 b)、可以使用 websocket 消息终止该进程。
我在这里缺少什么?默认线程执行器可以工作,但在这种情况下,MP 并行性更快(I/O 最少)。有用信息(无网络套接字): How to do multiprocessing in FastAPI 。 线程而不是多处理。 Make an CPU-bound task asynchronous for FastAPI WebSockets
示例
@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
pool = ProcessPoolExecutor() #used instead of default/threadpool
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
s = CICADA(clip_class, queue)
await websocket.accept()
while True:
data = await websocket.receive_json()
#should be non-blocking and terminate on "stop" message
loop.run_in_executor(pool, update_cicada(data, s, queue))
#update_cicada adds to queue thus updating client
result = await queue.get()
websocket.send_json(result) #so process can update client whithout terminating
最佳答案
我两天前读过这个问题,但无法动摇。这里有多个概念,其中一些(如果不是全部)非常复杂。我为自己创建了一个原型(prototype),以充分了解发生的情况,这是一个有效的示例。我添加了许多评论来解释发生了什么或为什么发生某些事情。
球棒右侧的几个指针:
asyncio.Queue
是非线程或进程安全的。这意味着,在跨进程共享此类对象时,您可能(并且可能会)获得损坏状态。这样的队列非常适合在任务
之间共享状态,因为它们都在事件循环中的同一线程上运行。multiprocessing.Queue
是线程和进程安全的,但您需要一个Manager()
来处理具体细节。它本质上创建了另一个子进程来处理与队列的所有通信(来自其他进程)。- 确保您的代码不会阻止其他请求。在下面的示例中,我使用 asyncio.sleep() 将控制权交还给事件循环,以允许事件循环中的其他任务继续处理。如果我没有这样做,它就会在无限 while 循环中阻塞当前任务。
我使用 4 个并发请求测试了以下内容(我在命令行中使用了 wscat
)。请注意,我绝不是 asyncio
或 multiprocessing
方面的专家,因此我并不声称这些是最佳实践。
import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
from queue import Empty
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import time
app = FastAPI()
#do not re-create the pool with every request, only create it once
pool = ProcessPoolExecutor()
def long_running_task(q: mp.Queue) -> str:
# This would be your update_cicada function
for i in range(5):
#this represents some blocking IO
time.sleep(3)
q.put(f"'result': 'Iteration {i}'")
return "done!"
@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
loop = asyncio.get_event_loop()
#To use Queue's across processes, you need to use the mp.Manager()
m = mp.Manager()
q = m.Queue()
await websocket.accept()
#run_in_executor will return a Future object. Normally, you would await
#such an method but we want a bit more control over it.
result = loop.run_in_executor(pool, long_running_task, q)
while True:
#None of the coroutines called in this block (e.g. send_json())
# will yield back control. asyncio.sleep() does, and so it will allow
# the event loop to switch context and serve multiple requests
# concurrently.
await asyncio.sleep(0)
try:
#see if our long running task has some intermediate result.
# Will result None if there isn't any.
q_result = q.get(block=False)
except Empty:
#if q.get() throws Empty exception, then nothing was
# available (yet!).
q_result = None
#If there is an intermediate result, let's send it to the client.
if q_result:
try:
await websocket.send_json(q_result)
except WebSocketDisconnect:
#This happens if client has moved on, we should stop the long
# running task
result.cancel()
#break out of the while loop.
break
#We want to stop the connection when the long running task is done.
if result.done():
try:
await websocket.send_json(result.result())
await websocket.close()
except WebSocketDisconnect:
#This happens if client has moved on, we should stop the long
# running task
result.cancel()
finally:
#Make sure we break out of the infinte While loop.
break
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, )
关于websocket - 如何使用非阻塞 fastapi websocket 进行多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72703446/