python - 订阅者接收消息缓慢

标签 python python-asyncio python-multithreading pyzmq

我有一个 pyzmq Publisher,每秒发送大约 1000 条消息。我正在尝试在 asyncio event_loop 中启动大约 10 个订阅者。

它可以工作,但比唯一一个订阅者的速度慢大约 2.5 倍。

代码可能有什么问题?

import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop


class Client:
    REQUEST_TIMEOUT = 35000
    SERVER_ENDPOINT = "tcp://localhost:6666"

    def __init__(self, id_):
        self.id = id_

    def get_task(self):
        return asyncio.create_task(self.client_coroutine())

    async def client_coroutine(self):
        context = Context.instance()

        socket = context.socket(zmq.SUB)
        socket.connect(self.SERVER_ENDPOINT)
        socket.setsockopt(zmq.SUBSCRIBE, b'4')
        poller = Poller()
        poller.register(socket, zmq.POLLIN)

        while True:
            event = dict(await poller.poll(self.REQUEST_TIMEOUT))
            if event.get(socket) == zmq.POLLIN:
                reply = await socket.recv_multipart(flags=NOBLOCK)
                if not reply:
                    break
                else:
                    print(eval(json.loads(reply[1].decode('utf-8'))))
            else:
                print("No response from server, retrying...")
                socket.setsockopt(zmq.LINGER, 0)
                socket.close()
                poller.unregister(socket)

async def tasks():
    _tasks = [Client(id_).get_task() for id_ in range(10)]
    done, pending = await asyncio.wait(_tasks, return_when=asyncio.FIRST_EXCEPTION)


loop = asyncio.get_event_loop()
loop.run_until_complete(tasks())

最佳答案

Q : What could possibly be wrong with the code?

鉴于代码使用相同的 localhost (从使用地址可以看出),第一个嫌疑人是,有 10 倍以上的工作需要处理,这样的工作量总是会给 localhost 带来压力。的操作系统和CPU,不是吗?

接下来是传输级别的选择。鉴于所有 SUB -s 位于同一个 localhost PUB ,所有基于 L3 堆栈的 TCP/IP 协议(protocol)工作都被浪费了。要比较相对成本(使用 tcp:// 传输类对此硬件单一消息传递的附加效果),请使用 inproc:// 进行相同的测试。 传输类,其中不会发生任何与协议(protocol)相关的 TCP/IP 堆栈附加处理。

最后但并非最不重要的一点是,我的代码永远不会混合不同的事件循环(从 v2.11 开始使用 ZeroMQ,所以有人可能会认为我在避免依赖最近 py3 中提供的 async 装饰功能方面有点过时) .6+ )

我的代码将使用显式的、非阻塞的、零等待的测试来检查消息是否存在 - aSocketINSTANCE ,如 aSocketINSTANCE.poll( zmq.POLLIN, 0 ) 而不是使用任何“外部”添加的装饰,这可能会报告相同的情况,但通过一些额外的(昂贵且超出我的代码控制域)事件处理。所有实时、低延迟的用例都力求承受尽可能小的延迟/开销,因此在我的项目中,对于任何“现代”语法糖甜技巧来说,使用显式控制总是会获胜。

无论如何,享受零之禅

关于python - 订阅者接收消息缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58113764/

相关文章:

python - 合并列表中的重叠项目

python - 中断当前正在执行的所有 asyncio.sleep

Python - 在单独的子进程或线程中运行 Autobahn|Python asyncio websocket 服务器

python - Python 中的多线程 : Getting stuck at last thread

python - 如何在 Python 中使用线程?

python - 使用 [ :] or copy() in python is shallow? 复制列表

python - 根据匹配前两个字符查找两列之间的分钟差异

python - 更改毒物工作目录

python - 异步生成器不是迭代器?

python - 将 CLIPS 专家系统公开为 Web 应用程序的体系结构选择