python - 确保工作进程总是在 zeroMQ 中终止

标签 python zeromq

我正在使用 python 绑定(bind)通过 zeroMQ 实现管道模式。

任务以扇形分布给工作人员,工作人员使用这样的无限循环监听新任务:

    while True:
        socks = dict(self.poller.poll())
        if self.receiver in socks and socks[self.receiver] == zmq.POLLIN:
            msg = self.receiver.recv_unicode(encoding='utf-8')
            self.process(msg)
        if self.hear in socks and socks[self.hear] == zmq.POLLIN:
            msg = self.hear.recv()
            print self.pid,":",  msg
            sys.exit(0)

当他们收到来自汇节点的消息时退出,确认已收到所有预期的结果。

然而,工作人员可能会错过这样的消息而无法完成。当工作人员无法知道(除了通过已经提到的消息,没有进一步的任务要处理)时,让工作人员始终完成的最佳方式是什么。

这是我为检查工作人员状态而编写的测试代码:

#-*- coding:utf-8 -*-
"""
Test module containing tests for all modules of pypln 

"""
import unittest
from servers.ventilator import Ventilator
from subprocess import Popen, PIPE
import time
class testWorkerModules(unittest.TestCase):
    def setUp(self):
        self.nw = 4
        #spawn 4 workers
        self.ws = [Popen(['python', 'workers/dummy_worker.py'], stdout=None) for i in range(self.nw)]
        #spawn a sink
        self.sink = Popen(['python', 'sinks/dummy_sink.py'], stdout=None)
        #start a ventilator
        self.V = Ventilator()
        # wait for workers and sinks to connect
        time.sleep(1)

    def test_send_unicode(self):
        '''
        Pushing unicode strings through workers to sinks.
        '''

        self.V.push_load([u'são joão' for i in xrange(80)])
        time.sleep(1)
        #[p.wait() for p in self.ws]#wait for the workers to terminate
        wsr = [p.poll() for p in self.ws]
        while None in wsr:
            print wsr, [p.pid for p in self.ws if p.poll() == None] #these are the unfinished workers
            time.sleep(0.5)
            wsr = [p.poll() for p in self.ws]
        self.sink.wait()
        self.sink = self.sink.returncode
        self.assertEqual([0]*self.nw, wsr)
        self.assertEqual(0, self.sink)

if __name__ == '__main__':
    unittest.main()

最佳答案

所有消息传递最终都以心跳结束。如果您(作为 worker 或水槽或其他任何人)发现您需要使用的组件已死,您基本上可以尝试连接到其他地方或自杀。因此,如果您作为一名 worker 发现水槽不再存在,请离开。这也意味着即使接收器仍然存在但连接已断开,您也可以退出。但我不确定你能做更多,也许更合理地设置所有超时......

关于python - 确保工作进程总是在 zeroMQ 中终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6831235/

相关文章:

python - 从字符串中间提取文本

python - 虚拟环境中的 0MQ

zeromq - 如何检查zeromq发送数据是否成功

php - 使用 zmq 和 composer 安装 Ratchet 库

python - 如何更新我的矩形的位置坐标?

python - 在循环行时 append 到 Pandas 数据框?

PythonKit 找不到 Python3 的 PYTHON_LIBRARY

python - 根据出现对数组元素进行分组,保持顺序,获取第一个和最后一个索引

python - zeromq (zmq) 缺少带有 c++ 发布者和 python 订阅者的消息

c++ - zGuide for zeroMQ 编译失败