python - 回调函数在实例中看不到正确的值

标签 python python-2.7 function callback zeromq

我在 Python 中的回调函数和处理程序中遇到了一个奇怪的现象。 我使用 ZMQ 来处理通信并使用套接字流。我有基类:

import multiprocessing    
import zmq
from concurrent.futures import ThreadPoolExecutor
from zmq.eventloop import ioloop, zmqstream
from zmq.utils import jsonapi as json

# Types of messages
TYPE_A = 'type_a'
TYPE_B = 'type_b'


class ZmqProcess(multiprocessing.Process):
    def __init__(self):
        super(ZmqProcess, self).__init__()
        self.context = None
        self.loop = None
        self.handle_stream = None

    def setup(self):
        self.context = zmq.Context()
        self.loop = ioloop.IOLoop.instance()

    def send(self, msg_type, msg, host, port):
        sock = zmq.Context().socket(zmq.PAIR)
        sock.connect('tcp://%s:%s' % (host, port))
        sock.send_json([msg_type, msg])

    def stream(self, sock_type, addr):
        sock = self.context.socket(sock_type)
            if isinstance(addr, str):
            addr = addr.split(':')
        host, port = addr if len(addr) == 2 else (addr[0], None)
            if port:
            sock.bind('tcp://%s:%s' % (host, port))
        else:
            port = sock.bind_to_random_port('tcp://%s' % host)
        stream = zmqstream.ZMQStream(sock, self.loop)    
        return stream, int(port)

class MessageHandler(object):
    def __init__(self, json_load=-1):
        self._json_load = json_load
        self.pool = ThreadPoolExecutor(max_workers=10)

    def __call__(self, msg):
        i = self._json_load
        msg_type, data = json.loads(msg[i])
        msg[i] = data
        if msg_type.startswith('_'):
            raise AttributeError('%s starts with an "_"' % msg_type)
        getattr(self, msg_type)(*msg)

我有一个继承它的类:

import zmq    
import zmq_base    

class ZmqServerMeta(zmq_base.ZmqProcess):
    def __init__(self, bind_addr, handlers):
        super(ZmqServerMeta, self).__init__()
        self.bind_addr = bind_addr
        self.handlers = handlers

    def setup(self):
        super(ZmqServerMeta, self).setup()
        self.handle_stream, _ = self.stream(zmq.PAIR, self.bind_addr)
        self.handle_stream.on_recv(StreamHandler(self.handle_stream, self.stop,
                                                 self.handlers))

    def run(self):
        self.setup()
        self.loop.start()

    def stop(self):
        self.loop.stop()

class StreamHandler(zmq_base.MessageHandler):
    def __init__(self, handle_stream, stop, handlers):
        super(StreamHandler, self).__init__()
        self._handle_stream = handle_stream
        self._stop = stop
        self._handlers = handlers

    def type_a(self, data):
        if zmq_base.TYPE_A in self._handlers:
            if self._handlers[zmq_base.TYPE_A]:
                for handle in self._handlers[zmq_base.TYPE_A]:
                    self.pool.submit(handle, data)
            else:
                pass
        else:
            pass

    def type_b(self, data):
        if zmq_base.TYPE_B in self._handlers:
            if self._handlers[zmq_base.TYPE_B]:
                for handle in self._handlers[zmq_base.TYPE_B]:
                    self.pool.submit(handle, data)
            else:
                pass
        else:
            pass

    def endit(self):
        self._stop()

此外,我有一个类想要用作存储。这就是麻烦开始的地方:

import threading
import zmq_server_meta as server
import zmq_base as base


class Storage:
    def __init__(self):
        self.list = []

        self.list_lock = threading.RLock()

        self.zmq_server = None
        self.host = '127.0.0.1'
        self.port = 5432
        self.bind_addr = (self.host, self.port)

    def setup(self):
        handlers = {base.TYPE_A: [self. remove]}
        self.zmq_server = server.ZmqServerMeta(handlers=handlers, bind_addr=self.bind_addr)
        self.zmq_server.start()

    def add(self, data):
        with self.list_lock:
            try:
                self.list.append(data)
            except:
                print "Didn't work"

    def remove(self, msg):
        with self.list_lock:
            try:
                self.list.remove(msg)
            except:
                print "Didn't work"

这个想法是该类存储它接收到的一些全局信息。 一切都在一个文件中开始进行测试:

import sys
import time
import storage
import zmq_base as base
import zmq_server_meta as server



def printMsg(msg):
    print msg

store = storage.Storage()

store.setup()
handlers = {base.TYPE_B: [printMsg]}
client = server.ZmqServerMeta(handlers=handlers, bind_addr=('127.0.0.1', 5431))
client.start()

message = "Test"

store.add(message)
client.send(base.TYPE_A, message, '127.0.0.1', 5432)

我简化了它以减少困惑。通常不是简单地添加它,而是发送它,然后返回响应。客户端发送的响应应该由正确的回调函数remove() 处理,并且应该从列表中删除一些内容。出现的问题是,remove() 函数看到一个空列表,尽管列表中应该有一个元素。如果我从测试文件中检查,我可以看到添加后的元素,如果我从那里调用remove(),我会看到一个非空列表并可以将其删除。我的问题是,为什么回调会看到一个空列表以及如何确保它确实看到列表中的正确元素?

亲切的问候 帕特里克

最佳答案

我认为问题在于 ZmqProcess 类继承自 multiprocessing.Process。多处理不允许在不同进程之间共享对象,除非使用使用值或数组的共享内存映射(如文档中所示:https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes)

如果您想使用自定义对象,可以使用服务器进程/代理对象,可以在文档的同一页面上找到该对象。

例如,您可以在 Storage 类的 init 函数中定义一个管理器,例如:self.manager = Manager()然后你输入 self.list = self.manager.list() 。这应该可以解决问题。

关于python - 回调函数在实例中看不到正确的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43607434/

相关文章:

python - 如何使用正则表达式在非数字之前拆分句点?

linux - 如何在 Python 中检测我的 RAM 可用空间和总空间?

php - 测试参数是否存在于函数中

python - 在 Facebook 图形 API 上使用 twisted.web.client.Agent 时如何处理 OpenSSL.SSL.Error?

python - np_utils.to_categorical 反向

python - 如何将 Python 数组转换为 SymPy 矩阵来计算行列式?

c++ - 为什么堆栈溢出会导致总线错误 10 而不是段错误

python - Tkinter 按钮命令函数不执行其任务

类似于 MATLAB 中的 Python 交互式选择工具

python - 否则缩进错误: unexpected unindent in PyCharm