python - 如何使用 python async 同时运行阻塞操作循环?

标签 python websocket redis fastapi starlette

我有两个不相关的阻塞操作,它们监听不同的事件。当它们中的任何一个返回时,我需要对它们引发的底层事件进行适当的处​​理。

出于某种原因,无论我如何使用 AsyncIO 安排它们,我都无法同时运行它们。显然,receive_json()每当另一个循环运行时,似乎就会无限期地阻塞;这就是为什么我怀疑 websocket 上存在并发问题。或异步循环,但无法真正查明问题是什么或如何解决问题。

下面这个简化示例中说明了我当前的代码,但我也尝试了其他异步接口(interface),例如在单个循环中运行它们、使用超时或使用 asycio.wait()没有再取得任何成功。

使用的技术包括 uvicorn 作为 ASGI 服务器、FastApi 用于 Web 界面、Redis pubsub(redis-py 连接器)作为等待项,starlette Websocket 作为另一个。它们在 Docker 容器中运行,如果您感兴趣的话,托管在 Windows 计算机上。


async def await_redis(p):
    return str(p.get_message(timeout=None))

@router.websocket('/'):
def ws_endpoint(websocket Websocket):
    async def ws_loop():
        while True:
            data = await websocket.receive_json() # Blocks here whenever rd_loop runs
            messages = await handler(data)
            r.publish('some-channel', messages)

    async def rd_loop():
        r = Redis('host')
        p = r.pubsub('some-channel')
        while True:
            mess = await await_redis(p)
            if mess:
                await websocket.send_json([mess])
    # The strange thing is if rd_loop exits because of exception,
    # ws_loop starts to receive and handle messages.
    await asyncio.gather(ws_loop(), rd_loop()) 

最佳答案

await_redis 函数正在阻塞 event 循环,而 redis-py 库中的 get_message 方法则不会阻塞异步,因此它会阻止事件循环。让我们尝试使用 aioredis 库而不是 redis-py

首先我们安装它pip install aioredis然后这是您修改后的代码:

import aioredis

async def await_redis(p):
    return str(await p.get_message())

@router.websocket('/')
async def ws_endpoint(websocket: WebSocket):
    async def ws_loop():
        while True:
            data = await websocket.receive_json()
            messages = await handler(data)
            await r.publish('some-channel', messages)

    async def rd_loop():
        r = await aioredis.create_redis('redis://host')
        p = await r.pubsub()
        await p.subscribe('some-channel')
        while True:
            mess = await await_redis(p)
            if mess:
                await websocket.send_json([mess])

    # The strange thing is if rd_loop exits because of exception,
    # ws_loop starts to receive and handle messages.
    await asyncio.gather(ws_loop(), rd_loop())

关于python - 如何使用 python async 同时运行阻塞操作循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76029974/

相关文章:

python - 用元组替换程序中的列表?

python - 是否可以使用 matplotlib 生成附图所示的图?

python - 属性错误 : 'list' object has no attribute 'keys' when attempting to create DataFrame from list of dicts

java - 尝试重启时 OkHttp WebSocket RejectedExecutionException

python - 使用 Flask 无法正确显示图像

go - 我需要使用 go lang 连接到现有的 websocket 服务器

ios - Websockets 与 iOS 推送通知

redis - Redis 是否在保存或 bgsave 时清除过期 key ?

caching - 使用 HashTable 将对象作为值保存在 Redis 中

redis - 如何在redis中实现键的前缀匹配?