我目前正在探索测试我的 Zeromq 应用程序的可能性。我的印象是,我可以在同一个线程中拥有一个发布者/订阅者,让发布者发布而订阅者订阅它而不会丢失消息。然而,当我让发布者发送几条消息时,没有一条消息到达订阅者。
这是我使用的代码:
import zmq
def main():
ctx = zmq.Context.instance()
sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.HWM, 1000)
sender.bind('tcp://*:10001')
rcvr = ctx.socket(zmq.SUB)
rcvr.setsockopt(zmq.HWM, 1000)
rcvr.connect('tcp://127.0.0.1:10001')
rcvr.setsockopt(zmq.SUBSCRIBE, "")
for i in range(100):
sender.send('%i' % i)
while True:
try:
print rcvr.recv(zmq.NOBLOCK)
except zmq.ZMQError:
break
if __name__ == '__main__':
main()
运行此程序时,我没有得到任何输出。
令我印象深刻的是,接收者在发送者发送之前就已连接,因此应该对这些消息进行排队。或者这是一个完全错误的假设,我应该使用 PUSH/PULL 来代替?
最佳答案
我认为这是慢加入问题的一个案例,如 ZeroMQ guide 中所述。 .
This "slow joiner" symptom hits enough people often enough that we're going to explain it in detail.
我认为主要问题是在订户套接字开始监听之前所有消息都已发送,并且消息飞过并被丢弃。在设置套接字和发送消息之间设置延迟是行不通的,因为最后一条消息在接收方开始监听之前已发送。
正如您所建议的,推/拉套接字会在内存中对作业进行排队。您可以像这样在单个进程中的套接字之间发送作业
# pushpull.py
import zmq
def main():
ctx = zmq.Context()
sender = ctx.socket(zmq.PUSH)
sender.bind('tcp://*:10001')
rcvr = ctx.socket(zmq.PULL)
rcvr.connect('tcp://127.0.0.1:10001')
for i in range(100):
sender.send_unicode('%i' % i)
while True:
msg = rcvr.recv()
print(msg)
if __name__ == '__main__':
main()
或者,如果您想使用 pub/sub 套接字,我们需要两个进程以及套接字设置和消息发送之间的 time.sleep(1)
:
首先启动接收器
# rcvr.py
import zmq
def main():
ctx = zmq.Context()
rcvr = ctx.socket(zmq.SUB)
rcvr.connect('tcp://127.0.0.1:10001')
rcvr.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
msg = rcvr.recv()
print(msg)
if __name__ == '__main__':
main()
然后是发件人,
# sender.py
import zmq
import time
def main():
ctx = zmq.Context()
sender = ctx.socket(zmq.PUB)
sender.bind('tcp://*:10001')
time.sleep(1)
for i in range(100):
sender.send_unicode('%i' % i)
if __name__ == "__main__":
main()
接收:
b'0'
b'1'
b'2'
b'3' ...
我现在正在 Python 3.3 和 pyzmq 13.1.0 中工作,有很棒的 WinPython分布,因此 zmq 调用中的一些字符串处理有点不同,打印函数也是如此。 希望对您有所帮助。
关于python - 单线程的 ZeroMQ 进程间通信丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15945915/