python - ZeroMQ Pub/Sub 操作队列中的最后一个元素和其他元素

标签 python sockets zeromq publish-subscribe

我开始将 zeromq 与 python 一起使用 Publisher/Subscriber引用。但是,我没有找到任何有关如何处理队列中的消息的文档。我想将最后收到的消息与队列的其余元素不同地对待。

示例

publisher.py

import zmq
import random
import time

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    messagedata = random.randrange(1,215)
    print "%s %d" % (topic, messagedata)
    socket.send("%s %d" % (topic, messagedata))
    time.sleep(.2)

订阅者.py

import zmq

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Connecting..."
socket.connect ("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,topic)

while True:
    if isLastMessage(): # probably based on socket.recv()
         analysis_function() # time consuming function
    else:
         simple_function()  # something simple like print and save in memory

我只想知道如何创建 subscriber.py 文件中描述的 isLastMessage() 函数。如果 zeromq 中有直接的东西或解决方法。

最佳答案

欢迎来到非阻塞消息传递/信号发送的世界

这是任何严肃的分布式系统设计的一个基本特征。

如果您通过管道中没有另一条消息来假设“最后”消息,那么 Poller() 实例可能会帮助您的主事件循环,您可以在其中控制时间量在考虑管道“空”之前先“等待”一下,而不是用零等待旋转循环破坏您的 IO 资源。

显式信令总是更好(如果您可以设计远程端行为)

接收方存在零知识,接收到的“最后”消息的上下文是什么(建议从消息发送方广播显式信令),但是有一个相反的功能为此,指示 ZeroMQ 原型(prototype)“在内部”丢弃所有此类消息(不是“最后”消息),从而减少接收方处理以正确处理可用的“最后”消息。

aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )

如果您想阅读有关 ZeroMQ 模式和反模式的更多内容,请不要错过 Pieter HINTJENS 的精彩著作“Code Connected,第 1 卷”(也是 pdf 版本),并且可能希望对 有更广泛的了解。使用principally a non-blocking ZeroMQ approach

关于python - ZeroMQ Pub/Sub 操作队列中的最后一个元素和其他元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45630432/

相关文章:

linux - epoll_wait 似乎卡在了 EPOLLRDHUP

c++ - 套接字关闭时管道破裂

zeromq - 使用 zmqpp 进行零复制消息传递

python - 如何使用 PyGithub 在 repo 中创建一个新目录?

python - 此代码是否正确识别 python 字符串

string - 如何通过telnet或任何其他程序将字符串数据发送到套接字连接?

javascript - 在zeromq javascript中接收超时

c# - ZeroMQ,DEALER -> disconnected ROUTER 发送的消息不会被丢弃。如何改变呢?

python - 如何使用 python(不是按键)检测按键释放?

python - 使用色调选项从 pandas db 和seaborn 的 fiddle 图制作多个图