python - ZMQ 延迟与 PUB-SUB(慢订阅者)

标签 python performance sockets zeromq

我发现了很多关于类似主题的问题,但它们并没有帮助我解决我的问题。

使用:

  • Linux Ubuntu 14.04
  • python 3.4
  • zmq : 4.0.4//pyZMQ 14.3.1

长话短说

即使在设置了 HWM 之后,ZMQ SUB 套接字中的接收方队列仍在无限增长。当订阅者比发布者慢时会发生这种情况。 我能做些什么来防止它?

背景

我在人机交互领域工作。我们有一个庞大的代码库来控制鼠标光标之类的东西。我想在几个模块中“打破它”,与 ZMQ 通信。 它必须有尽可能少的延迟,但丢弃(丢失)消息并不那么重要。

另一个有趣的方面是可以在节点之间添加“ spy ”。因此 PUB/SUB 套接字似乎是最合适的。

像这样:

+----------+                +-----------+                 +------------+
|          | PUB            |           |  PUB            |            |
|  Input   | +----+------>  |  Filter   |  +----+------>  |   Output   |
|          |      |     SUB |           |       |     SUB |            |
+----------+      v         +-----------+       v         +------------+
               +-----+                       +-----+                   
               |Spy 1|                       |Spy 2|                   
               +-----+                       +-----+       

问题

一切正常,除了我们添加 spy 的时候。 如果我们添加一个 spy 来做“繁重的事情”,比如使用 matplotlib 进行实时可视化,我们会注意到绘图中的延迟增加。 IE : 在上图中,过滤器和输出很快,没有看到延迟,但是在 Spy 2 上,运行 20 分钟后延迟可以达到 10 分钟 (!!)

看起来接收器上的队列无限增长。 我们调查了 ZMQ 的高水位线 (HWM) 功能,将其设置为低位以丢弃旧消息,但没有任何改变。

最少的代码

架构:

+------------+                +-------------+
|            |  PUB           |             |
|   sender   | -------------> |  receiver   |
|            |             SUB|             |
+------------+                +-------------+

接收者是一个慢速接收者(在第一张图中充当 spy )

代码:

发件人.py

import time
import zmq

ctx = zmq.Context()

sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10

i = 0
while True:
    mess = "{} {}".format(i, time.time())
    sender.send_string(mess)
    print("Send : {}".format(mess))
    i+= 1

接收器.py:

import time
import zmq

ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)

front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)

front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1

while True:
    mess = front_end.recv_string()
    i, t = mess.split(" ")
    mess = "{} {}".format(i, time.time() - float(t))
    print("received : {}".format(mess))
    time.sleep(1)  # slow

我认为这不是 ZMQ Pub/Sub 的正常行为。 我尝试在接收器、订阅者和两者中设置 HWM,但没有任何改变。

我错过了什么?

编辑:

我认为我在解释我的问题时没有说清楚。我做了一个移动鼠标光标的实现。输入是在 ZMQ 中以 200Hz 发送的鼠标光标位置(带有 .sleep( 1.0/200 ) ),完成了一些处理并更新了鼠标光标位置(我没有这个 sleep 在我的最小示例中)。

一切都很顺利,甚至在我启动 spy 时也是如此。然而, spy 的延迟时间越来越长(因为处理速度慢)。延迟不会出现在“管道”末尾的光标中。

我认为问题出在缓慢的订阅者排队消息。

在我的示例中,如果我们杀死发送者并让接收者存活,消息将继续显示,直到显示所有(?)提交的消息。

spy 正在绘制光标的位置以提供一些反馈,这样的延迟仍然很不方便......我只想获得发送的最后一条消息,这就是我尝试降低 HWM 的原因。

最佳答案

缺少更好的实时设计/验证

ZeroMQ 是一个强大的消息传递层。

就是说,检查它在原始 while True: killer-loop

中每秒真正发送了多少消息

测量它。根据事实而非感觉进行设计。

事实很重要。

start_CLK = time.time()                                    # .SET _CLK
time.sleep( 0.001)                                         # .NOP avoid DIV/0!
i = 0                                                      # .SET CTR
while True:                                                # .LOOP
    sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB 
    print i / ( time.time() - start_CLK )                  # .GUI perf [msg/sec]
    i+= 1                                                  # .INC CTR

ZeroMQ 尽最大努力在方案中填充雪崩。

而且它非常擅长这一点。

你的 [Filter] + [Spy1] + [Output] + [Spy2] 流水线处理,端到端,都有

  • 更快,包括。 .send() + .recv_string() 开销都比 [Input]-sender

  • 是主要的阻塞元素,导致内部 PUB/SUB 队列增长、增长、增长

这个队列链问题可以通过另一种架构设计来解决。

重新思考的事情:

  1. sub-sample [Filter].send() cadency(interleave factor is dependent to stability issues of the real -时间过程在你的控制下——可以是 1 毫秒(顺便说一句,O/S 计时​​器分辨率,所以使用 COTS O/S 计时​​器控件不可能进行量子物理实验 :o)),双向语音流 10 毫秒,50电视/GUI 流传输毫秒,键盘事件流等 300 毫秒)

  2. 在线 v/s 离线后处理/可视化(您注意到繁重的matplotlib处理,您通常承担大约 800 - 1600 - 3600 毫秒的开销,即使是在简单的 2D 图形上——在决定 PUB/SUB 的更改之前测量-<proc1>-PUB/SUB-<proc2> 处理架构(您已经注意到,<spy2> 在增长 <proc2>-PUB 时会导致问题-馈送和发送开销)。

  3. 执行它们的线程数与本地主机内核数 -- 从localhost ip可以看出,所有进程都驻留在同一个localhost上。加上每个使用的 ZMQ.Context 添加一个线程,加上 查看 Python GIL 锁定开销,如果所有线程都被实例化 来自同一个 Python 解释器......阻塞增长。阻挡伤害。 更好的分布式架构可以提高这些性能 方面。不过先复习一下[1]和[2]

n.b. 调用 20 分钟的处理管道延迟(实时系统 TimeDOMAIN 偏差)延迟是委婉的说法

关于python - ZMQ 延迟与 PUB-SUB(慢订阅者),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25523231/

相关文章:

python - 我可以添加对媒体 django 媒体文件的权限吗?

python - 在 openpyxl 中定位合并的单元格范围

c++ - 通过网络发送数据的常用方法是什么?

python - 在Python中通过UDP发送CAN帧

c - Linux操作系统的udp中的sendto api?

python - flask 邮件 gmail : connection refused

performance - 显示初始图像几秒钟是空白的

performance - 如何在一秒内计算任意 n <= 600 的最短加法链?

java - 查找,然后高效地用java中的相反内容替换

python 在 emacs 上执行错误(ImportError : No module named site)