我开始将 zeromq
与 python 一起使用 Publisher/Subscriber引用。但是,我没有找到任何有关如何处理队列中的消息的文档。我想将最后收到的消息与队列的其余元素不同地对待。
示例
publisher.py
import zmq
import random
import time
port = "5556"
topic = "1"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
messagedata = random.randrange(1,215)
print "%s %d" % (topic, messagedata)
socket.send("%s %d" % (topic, messagedata))
time.sleep(.2)
订阅者.py
import zmq
port = "5556"
topic = "1"
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)
while True:
if isLastMessage(): # probably based on socket.recv()
analysis_function() # time consuming function
else:
simple_function() # something simple like print and save in memory
我只想知道如何创建 subscriber.py
文件中描述的 isLastMessage()
函数。如果 zeromq
中有直接的东西或解决方法。
最佳答案
欢迎来到非阻塞消息传递/信号发送的世界
这是任何严肃的分布式系统设计的一个基本特征。
如果您通过管道中没有另一条消息来假设“最后”消息,那么 Poller() 实例可能会帮助您的主事件循环,您可以在其中控制时间量在考虑管道“空”之前先“等待”一下,而不是用零等待旋转循环破坏您的 IO 资源。
显式信令总是更好(如果您可以设计远程端行为)
接收方存在零知识,接收到的“最后”消息的上下文是什么(建议从消息发送方广播显式信令),但是有一个相反的功能为此,指示 ZeroMQ 原型(prototype)“在内部”丢弃所有此类消息(不是“最后”消息),从而减少接收方处理以正确处理可用的“最后”消息。
aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )
如果您想阅读有关 ZeroMQ 模式和反模式的更多内容,请不要错过 Pieter HINTJENS 的精彩著作“Code Connected,第 1 卷”(也是 pdf 版本),并且可能希望对 distributed-computing 有更广泛的了解。使用principally a non-blocking ZeroMQ approach
关于python - ZeroMQ Pub/Sub 操作队列中的最后一个元素和其他元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45630432/