python - ZMQ 轮询器不在类实例中工作

标签 python zeromq pyzmq

您好,我在 Python 类中包装一些 ZMQ 拉取客户端时遇到一些问题。这些类通过多处理模块在子流程中实例化和调用。当客户端是函数时,一切都可以工作,但是当它们是类时, poller.poll() 会挂起。

下面的代码有两个版本:一个有效,另一个无效。为什么?

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1) 

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1)    


class Client:
    def __init__(self,port_push, port_sub):
        context = zmq.Context()
        self.socket_pull = context.socket(zmq.PULL)
        self.socket_pull.connect ("tcp://localhost:%s" % port_push)
        print "Connected to server with port %s" % port_push
        self.socket_sub = context.socket(zmq.SUB)
        self.socket_sub.connect ("tcp://localhost:%s" % port_sub)
        self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
        print "Connected to publisher with port %s" % port_sub
        # Initialize poll set


    def __call__(self):
        poller = zmq.Poller()
        poller.register(self.socket_pull, zmq.POLLIN)
        poller.register(self.socket_sub, zmq.POLLIN)
        # Work on requests from both server and publisher
        should_continue = True
        print "listening"
        while should_continue:
            print "hello"
            socks = dict(poller.poll())
            print poller
            if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
                message = self.socket_pull.recv()
                print "Recieved control command: %s" % message
                if message == "Exit": 
                    print "Recieved exit command, client will stop recieving messages"
                    should_continue = False

                if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
                    string = self.socket_sub.recv()
                    topic, messagedata = string.split()
                    print "Processing ... ", topic, messagedata

def client(port_push, port_sub):
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    #~ Process(target=client,args=(server_push_port,server_pub_port)).start()
    Process(target=Client(server_push_port,server_pub_port)).start()

最佳答案

编辑1:这不太正确...请给我一些时间来使其正确...

我认为您可能以错误的方式调用 Client 类。我不是这方面的专家,但我认为您的客户端应该从 Process 子类化,然后使用 .start() 函数运行。因此,像这样定义您的 Client 类:

class Client(Process):
    def __init__(self, port_push, port_sub):
        (...) # your class init code here...make sure indentation is correct

然后在运行服务器的最后,创建 Client 类的实例并像这样启动它:

client_class = Client(port_push, port_sub)
client_class.start()

Edit2:这是 fccoelho 代码的编辑版本,对我有用。

最大的问题似乎是 ZMQ 初始化工作需要在 __call__ 方法中完成,而不是在 __init__ 中完成。我怀疑这是由于多处理中的内存分配方式所致,因为 __init__ 函数将在父进程中完成,而 __call__ 函数在子进程中完成具有独立的内存空间。显然ZMQ不喜欢这样。我还添加了一些等待时间,以防止客户端在服务器准备就绪之前连接到服务器,并防止服务器在客户端订阅之前发送消息。还使用 127.0.0.1 而不是 localhost(由于某种原因,我的计算机不喜欢 localhost)。还删除了客户端中 poll 调用周围烦人的打印消息,并修复了客户端在 pubsub 套接字上检查 poll 结果时的缩进问题。

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://127.0.0.1:%s" % port)
    print "Running server on port: ", port
    time.sleep(1.0)
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            print 'Push server sent "Exit" signal'
            break
        time.sleep(0.4) 

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:%s" % port)
    socket.setsockopt(zmq.HWM, 1000)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    time.sleep(1.0)
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(0.4)    


class Client:
    def __init__(self,port_push, port_sub):
        self.port_push = port_push
        self.port_sub = port_sub
        # Initialize poll set

    def __call__(self):
        time.sleep(0.5)
        print 'hello from class client!'
        context = zmq.Context()
        self.socket_pull = context.socket(zmq.PULL)
        self.socket_pull.connect ("tcp://127.0.0.1:%s" % self.port_push)
        print "Connected to server with port %s" % self.port_push
        self.socket_sub = context.socket(zmq.SUB)
        self.socket_sub.connect ("tcp://127.0.0.1:%s" % self.port_sub)
        self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
        print "Connected to publisher with port %s" % self.port_sub

        poller = zmq.Poller()
        poller.register(self.socket_pull, zmq.POLLIN)
        poller.register(self.socket_sub, zmq.POLLIN)
        # Work on requests from both server and publisher
        should_continue = True
        print "listening"
        while should_continue:
            # print "hello"
            socks = dict(poller.poll())
            # print poller
            if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
                message = self.socket_pull.recv()
                print "Recieved control command: %s" % message
                if message == "Exit": 
                    print "Recieved exit command, client will stop recieving messages"
                    should_continue = False

            if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
                string = self.socket_sub.recv()
                topic, messagedata = string.split()
                print "Processing ... ", topic, messagedata

def client(port_push, port_sub):
    print 'hello from function client!'
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://127.0.0.1:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://127.0.0.1:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll(1000))
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    # Process(target=client,args=(server_push_port,server_pub_port)).start()
    Process(target=Client(server_push_port,server_pub_port)).start()

最后,这是一个更简洁的多进程 pubsub 实现,它非常简单,但演示得更清楚:

import zmq
from multiprocessing import Process
import time

class ServerPubSub(Process):
    def __init__(self, port, n):
        Process.__init__(self)
        self.port = port
        self.n = n

    def run(self):
        self.context = zmq.Context()
        self.pub = self.context.socket(zmq.PUB)
        self.pub.bind('tcp://127.0.0.1:%d' % self.port)
        self.pub.setsockopt(zmq.HWM, 1000)

        time.sleep(1)

        end = False
        for i in range(self.n):
            print 'SRV: sending message %d' % i
            self.pub.send('Message %d' % i)
            print 'SRV: message %d sent' % i
            time.sleep(0.2)

        self.pub.close()

class ClientPubSub(Process):
    def __init__(self, port, n):
        Process.__init__(self)
        self.port = port
        self.n = n

    def run(self):
        self.context = zmq.Context()
        self.sub = self.context.socket(zmq.SUB)
        self.sub.connect('tcp://127.0.0.1:%d' % self.port)
        self.sub.setsockopt(zmq.SUBSCRIBE, '')
        self.poller = zmq.Poller()
        self.poller.register(self.sub, zmq.POLLIN)

        end = False
        count = 0
        while count < self.n:
            ready = dict(self.poller.poll(0))
            if self.sub in ready and ready[self.sub] == zmq.POLLIN:
                msg = self.sub.recv()
                print 'CLI: received message "%s"' % msg
                count += 1

        self.sub.close()

if __name__ == "__main__":
    port = 5000
    n = 10
    server = ServerPubSub(port, n)
    client = ClientPubSub(port, n)

    server.start()
    client.start()

    server.join()
    client.join()

关于python - ZMQ 轮询器不在类实例中工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9770589/

相关文章:

python - 为什么在我给出位置参数时 lambda 返回 "<lambda>() takes 0 positional arguments but 1 was given"?

python - 如何从运行 pyzmq 的进程捕获 SIGERM、SIGINT?

python - 如何在 Alpine Linux 容器上安装 pyzmq?

python - Python 中的简单客户端/服务器 ZMQ,每个请求发送多行

Python在继承自字典的类中添加反向映射字典属性

python - 从 Access 创建 sqlite 数据库

node.js - 如何为 Electron 版本 4.1.4 重建 zeromq.js 绑定(bind)?

python - 从并发请求的分布式工作中发回正确的信息

python - multiprocessing.Pool 中的 up.close 和 p.join

c++ - ZeroMQ:带有大消息的 REQ/REP