python - 如何在 ZeroMQ 中以正确的方式中止 context.socket.recv()?

标签 python zeromq abort

我有一个小软件,其中有一个单独的线程正在等待 ZeroMQ 消息。我正在使用 ZeroMQ 的 PUB/SUB 通信协议(protocol)。

目前,我通过将变量“cont_loop”设置为 False 来中止该线程。

但我发现,当没有消息到达 ZeroMQ 订阅者时,我无法退出线程(不关闭整个程序)。

def __init__(self):
    Thread.__init__(self)
    self.cont_loop = True

def abort(self):
    self.continue_loop = False

def run(self):
    zmq_context = zmq.Context()
    zmq_socket = zmq_context.socket(zmq.SUB)
    zmq_socket.bind("tcp://*:%s" % *(5556))
    zmq_socket.setsockopt(zmq.SUBSCRIBE, "")
    while self.cont_loop:
        data = zmq_socket.recv()
        print "Message: " + data
    zmq_socket.close()
    zmq_context.term()
    print "exit"

我尝试将 socket.close()context.term() 移动到中止方法。所以它关闭了订阅者,但这杀死了整个程序。

关闭上述程序的正确方法是什么?

最佳答案

问:...的正确方法是什么?

答:实现既定目标的方法有很多种。让我只挑一个,作为关于如何处理分布式进程到进程消息传递的模型示例。

首先。假设,在典型的软件设计任务中有更多的优先级。有些更高,有些更低,有些甚至如此之低,以至于可以推迟执行这些低优先级的子任务,这样调度程序中就有更多的时间来执行那些无法处理等待的子任务。

这就是说,让我们查看您的代码。 SUB 侧指令 .recv() 正在被使用,导致两件事。一个可见的 - 它在具有 SUB 行为的 ZeroMQ 套接字上执行 RECEIVE 操作。第二个不太明显的是,它一直挂起,直到它获得与 SUB 行为的当前状态“兼容”的东西(稍后会详细介绍)。

这意味着,自从此类 .recv() 方法调用以来,它也一直BLOCKS UNTIL 一些未知的、本地无法控制的状态巧合/events 使其能够传递 ZeroMQ 消息,其内容与此(仍处于阻塞状态)SUB 行为实例的本地预设状态“兼容”

这可能需要很长时间。

这正是为什么 .recv() 更适合在控制循环内使用的原因,在控制循环中,外部处理既有机会也有责任去做你想做的事(包括与中止相关的操作和具有适当资源释放的公平/优雅终止。

接收过程变成 .recv( flags = zmq.NOBLOCK ) 而不是 try: except: 情节。这样一来,您的本地进程就不会失去对事件流的控制(包括 NOP 就是这样的一个)。

最好的下一步是什么?

花点时间阅读一本伟大的珍宝书“Code Connected,第 1 卷”,ZeroMQ 之父 Pieter HINTJENS 已出版(也作为 PDF )。

他与我们分享的许多需要避免的想法和错误确实值得您花时间。

享受 ZeroMQ 的强大功能。它非常强大,值得自上而下地掌握。

关于python - 如何在 ZeroMQ 中以正确的方式中止 context.socket.recv()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30655431/

相关文章:

python - 如何获取SQL Server返回的消息?

Scalatest 中止的测试处理

c# - 中止正在运行长查询的线程

python - 对所有行中不包括空格的总字符求和 Python

python - 迭代数据框列: TypeError: 'float' object is not subscriptable

python - 如何在 python 上工作输出?

node.js - 如何终止已关闭的 ZeroMQ 套接字的进程?

c - 如何正确使用zmq_msg_more?

java - 接收数据导致 "too many files open"

c - 中止陷阱 : 6 error when working with array in C