我有两个不相关的阻塞操作,它们监听不同的事件。当它们中的任何一个返回时,我需要对它们引发的底层事件进行适当的处理。
出于某种原因,无论我如何使用 AsyncIO 安排它们,我都无法同时运行它们。显然,receive_json()
每当另一个循环运行时,似乎就会无限期地阻塞;这就是为什么我怀疑 websocket
上存在并发问题。或异步循环,但无法真正查明问题是什么或如何解决问题。
下面这个简化示例中说明了我当前的代码,但我也尝试了其他异步接口(interface),例如在单个循环中运行它们、使用超时或使用 asycio.wait()
没有再取得任何成功。
使用的技术包括 uvicorn 作为 ASGI 服务器、FastApi 用于 Web 界面、Redis pubsub(redis-py 连接器)作为等待项,starlette Websocket 作为另一个。它们在 Docker 容器中运行,如果您感兴趣的话,托管在 Windows 计算机上。
async def await_redis(p):
return str(p.get_message(timeout=None))
@router.websocket('/'):
def ws_endpoint(websocket Websocket):
async def ws_loop():
while True:
data = await websocket.receive_json() # Blocks here whenever rd_loop runs
messages = await handler(data)
r.publish('some-channel', messages)
async def rd_loop():
r = Redis('host')
p = r.pubsub('some-channel')
while True:
mess = await await_redis(p)
if mess:
await websocket.send_json([mess])
# The strange thing is if rd_loop exits because of exception,
# ws_loop starts to receive and handle messages.
await asyncio.gather(ws_loop(), rd_loop())
最佳答案
await_redis
函数正在阻塞 event
循环,而 redis-py
库中的 get_message
方法则不会阻塞异步,因此它会阻止事件循环。让我们尝试使用 aioredis
库而不是 redis-py
。
首先我们安装它pip install aioredis
然后这是您修改后的代码:
import aioredis
async def await_redis(p):
return str(await p.get_message())
@router.websocket('/')
async def ws_endpoint(websocket: WebSocket):
async def ws_loop():
while True:
data = await websocket.receive_json()
messages = await handler(data)
await r.publish('some-channel', messages)
async def rd_loop():
r = await aioredis.create_redis('redis://host')
p = await r.pubsub()
await p.subscribe('some-channel')
while True:
mess = await await_redis(p)
if mess:
await websocket.send_json([mess])
# The strange thing is if rd_loop exits because of exception,
# ws_loop starts to receive and handle messages.
await asyncio.gather(ws_loop(), rd_loop())
关于python - 如何使用 python async 同时运行阻塞操作循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76029974/