我有一个应用程序,它使用 PUB/SUB 设置从 ZeroMQ 发布者获取消息。阅读器有时很慢,所以我在发送者和接收者上都设置了 HWM。我希望接收方会填充缓冲区并在从处理速度减慢中恢复时跳转以 catch 进度。但是我观察到的行为是它永远不会掉线! ZeroMQ 似乎忽略了 HWM。我做错了什么吗?
这是一个最小的例子:
发布者.py
import zmq
import time
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.setsockopt(zmq.SNDHWM, 1)
sock.bind("tcp://*:5556")
i = 0
while True:
sock.send(str(i))
print i
time.sleep(0.1)
i += 1
订阅者.py
import zmq
import time
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt(zmq.SUBSCRIBE, "")
sock.setsockopt(zmq.RCVHWM, 1)
sock.connect("tcp://localhost:5556")
while True:
print sock.recv()
time.sleep(0.5)
最佳答案
我相信这里有几件事在起作用:
- High Water Marks are not exact (请参阅链接部分的最后一段)- 通常这意味着实际队列大小将小于您列出的数字,我不知道这在
1
时会如何表现>. - 您的
PUB
HWM
永远不会丢弃消息...由于PUB
套接字的工作方式,它总是会立即处理消息是否是否有可用的订阅者。因此,除非 ZMQ 实际需要 0.1 秒来处理队列中的消息,否则您的HWM
将永远不会在PUB
端发挥作用。
应该发生的事情如下所示(我假设一个操作顺序可以让您实际收到第一条发布的消息):
- 启动 subscriber.py 并等待一段合适的时间以确保它完全启动(基本上立即启动)
- 启动publisher.py
PUB
处理并发送第一条消息,SUB
接收并处理第一条消息PUB
休眠 0.1 秒并处理并发送第二条消息SUB
休眠 0.5 秒,套接字收到第二条消息,但在队列中等待下一次调用sock.recv()
处理它PUB
休眠 0.1 秒并处理并发送第三条消息SUB
仍然休眠了 0.3 秒,因此第三条消息应该在第二条消息之后进入队列,这将使队列中有 2 条消息,而第三条消息应该丢弃,因为HWM
...等等等等
我建议进行以下更改以帮助解决问题:
- 删除发布商上的
HWM
...它只会添加一个我们不需要在您的测试用例中处理的变量,因为我们从不期望它会改变任何东西。如果您的生产环境需要它,请将其重新添加并稍后在高容量场景中进行测试。 - 将订阅者上的
HWM
更改为 50。这会使测试花费更长的时间,但您不会处于极端情况,因为 ZMQ 文档指出 HWM 是确切地说,极端情况可能会导致意外行为。请注意,我相信你的测试(小数字)不会那样做,但我没有看过实现队列的代码所以我不能肯定地说,你的数据可能足够小您的有效HWM
实际上更大。 - 将您的订阅者 sleep 时间更改为 3 整秒...理论上,如果您的队列恰好容纳 50 条消息,您将在两个循环内使它饱和(就像您现在所做的那样),然后您将拥有等待 2.5 分钟来处理这些消息,看看您是否开始跳过,在前 50 条消息之后应该开始跳过大量数字。但我至少要等 5-10 分钟。如果您发现您在 100 或 200 条消息后开始跳过,那么您就会被数据量小所困扰。
这当然没有解决如果您仍然不跳过任何消息会发生什么......如果您这样做但仍然遇到同样的问题,那么我们可能需要深入研究高水位标记的实际工作原理,我们可能遗漏了什么。
关于python - 为什么 ZMQ 不会丢弃消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23800442/