python - 如何用aioredis pub/sub实现单生产者多消费者

标签 python redis python-asyncio aiohttp

我有网络应用程序。该应用程序具有将一些对象数据推送到 redis channel 的端点。
另一个端点处理 websocket 连接,其中数据从 channel 中获取并通过 ws 发送到客户端。

当我通过 ws 连接时,消息仅获取第一个连接的客户端。

如何使用多个客户端从 redis channel 读取消息而不创建新订阅?

Websocket 处理程序。
在这里,我订阅了 channel ,将其保存到应用程序 (init_tram_channel)。然后在我收听 channel 和发送消息的地方运行作业(run_tram_listening)。

@routes.get('/tram-state-ws/{tram_id}')
async def tram_ws(request: web.Request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    tram_id = int(request.match_info['tram_id'])
    channel_name = f'tram_{tram_id}'

    await init_tram_channel(channel_name, request.app)
    tram_job = await run_tram_listening(
        request=request,
        ws=ws,
        channel=request.app['tram_producers'][channel_name]
    )

    request.app['websockets'].add(ws)
    try:
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                if msg.data == 'close':
                    await ws.close()
                    break
            if msg.type == aiohttp.WSMsgType.ERROR:
                logging.error(f'ws connection was closed with exception {ws.exception()}')
            else:
                await asyncio.sleep(0.005)
    except asyncio.CancelledError:
        pass
    finally:
        await tram_job.close()
        request.app['websockets'].discard(ws)

    return ws

订阅并保存 channel 。
每个 channel 都与唯一对象相关,为了不创建与同一对象相关的多个 channel ,我只将一个 channel 保存到应用程序。 app['tram_producers'] 是字典。

async def init_tram_channel(
        channel_name: str,
        app: web.Application
):
    if channel_name not in app['tram_producers']:
        channel, = await app['redis'].subscribe(channel_name)
        app['tram_producers'][channel_name] = channel

运行 coro 以收听 channel 。 我通过 aiojobs 运行它:

async def run_tram_listening(
        request: web.Request,
        ws: web.WebSocketResponse,
        channel: Channel
):
    """
    :return: aiojobs._job.Job object
    """
    listen_redis_job = await spawn(
        request,
        _read_tram_subscription(
            ws,
            channel
        )
    )
    return listen_redis_job

我在 Coro 收听和发送消息:

async def _read_tram_subscription(
        ws: web.WebSocketResponse,
        channel: Channel
):
    try:
        async for msg in channel.iter():
            tram_data = msg.decode()
            await ws.send_json(tram_data)
    except asyncio.CancelledError:
        pass
    except Exception as e:
        logging.error(msg=e, exc_info=e)

最佳答案

在一些 aioredis github issue 中发现了以下代码(我已将其用于我的任务)。

class TramProducer:
    def __init__(self, channel: aioredis.Channel):
        self._future = None
        self._channel = channel

    def __aiter__(self):
        return self

    def __anext__(self):
        return asyncio.shield(self._get_message())

    async def _get_message(self):
        if self._future:
            return await self._future

        self._future = asyncio.get_event_loop().create_future()
        message = await self._channel.get_json()
        future, self._future = self._future, None
        future.set_result(message)
        return message

那么,它是如何工作的呢? TramProducer 包装了我们获取消息的方式。
正如@Messa所说

message is received from one Redis subscription only once.

因此只有 TramProducer 的一个客户端正在从 Redis 中检索消息,而其他客户端正在等待从 channel 接收消息后设置的 future 结果。

如果 self._future 被初始化,这意味着有人正在等待来自 redis 的消息,所以我们将等待 self._future 结果。

TramProducer 用法(我从我的问题中举了一个例子):

async def _read_tram_subscription(
        ws: web.WebSocketResponse,
        tram_producer: TramProducer
):
    try:
        async for msg in tram_producer:
            await ws.send_json(msg)
    except asyncio.CancelledError:
        pass
    except Exception as e:
        logging.error(msg=e, exc_info=e)

TramProducer 初始化:

async def init_tram_channel(
        channel_name: str,
        app: web.Application
):
    if channel_name not in app['tram_producers']:
        channel, = await app['redis'].subscribe(channel_name)
        app['tram_producers'][channel_name] = TramProducer(channel)

我认为这可能对某些人有帮助。
完整项目在这里 https://gitlab.com/tram-emulator/tram-server

关于python - 如何用aioredis pub/sub实现单生产者多消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54159292/

相关文章:

python - 使用 Python : concise non-pycurl examples? 的 HTTP POST 二进制文件

java - ReactiveRedisTemplate opsForHash put 不会覆盖值

database - 为什么 Redis 被认为是 CP?

windows - 是否有适用于Windows的Redis数据库查看器?

python - 使用 asyncio 创建两个并发的异步任务

python - 从 async.subprocess.PIPE 读取

python - 如何在不等待 python 完成的情况下执行异步任务?

python - 使用 tensorflow 逃避局部最小值

python - 将路径附加到路径

python - 如何使用 zip 将数据分成相等大小的组?