我正在使用具有pub/sub 模式的pyzmq
库。我有使用 .connect()
方法的一些快速 ZMQ 发布者和使用 .bind()
的较慢的 ZMQ 订阅者方法。
几分钟后,由于 ZMQ 缓冲区,我的订阅者收到发布者发布的旧数据。
我的问题:
是否有任何方法可以管理ZMQ 队列缓冲区大小?(设置有限的缓冲区)
[注意]:
- 我不想使用 ZMQ PUSH/PULL。
- 我读过这篇文章,但这种方法只清除缓冲区:clear ZMQ buffer
- 我也尝试了
high watermark
选项,但没有用:
socket.setsockopt(zmq.RCVHWM, 10) # not working socket.setsockopt(zmq.SNDHWM, 10) # not working
发布者:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10) # not working
while True:
data = time.time()
print("%d" % data)
socket.send("%d" % data)
time.sleep(1)
订阅者:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10) # not working
while 1:
time.sleep(2) # A speed reducer like.
data = socket.recv()
print(data)
尽管有这些选项(通过配置的发送/接收高水位线
),队列大小仍然超过 10。
最佳答案
我找到了一种使用 CONFLATE
选项在 ZMQ
订阅者中获取仅的方法。
请注意,您应该在连接之前设置 CONFLATE
选项:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1) # last msg only.
socket.connect("tcp://localhost:%s" % port) # must be placed after above options.
while 1:
time.sleep(2) # Dummy delay
data = socket.recv()
print(data)
换句话说,它会删除订阅者端的所有缓冲队列。
[注意]:
此外,通过使用 zmq.SNDBUF
和 zmq.RCVBUF
选项,我们可以设置 ZMQ 缓冲区大小的限制。 ( More information )
关于python - 如何在 python 中限制 ZMQ (ZeroMQ - PyZMQ) 队列缓冲区大小?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48278859/