python - 为什么不会这个 pickle ?

标签 python pickle

我正在尝试将其作为来自多处理的进程运行,但是当启动线程时,pickler 崩溃了,我无法弄清楚是什么阻止了它进行 pickling。我已经尝试注释掉套接字代码和消息 obj 代码,但仍然无法正常工作 - 我做错了什么?

class TransmitThread(Process):

    def __init__(self, send_queue, reply_queue, control_pipe, recv_timeout=2, buffer_size=4096):
        """
            This init function is called when the thread is created. Function simply calls the Process class init
            function, and stores the class vars.
        """
        # Call process class init
        Process.__init__(self)

        # Store class vars
        self.send_queue     = send_queue
        self.reply_queue    = reply_queue
        self.control_pipe   = control_pipe
        self._recv_timeout  = recv_timeout
        self._buffer_size   = buffer_size

    def run(self):
        """
            This is the main function that is called when the thread is started.
            The function loops forever, waiting for a send message in the queue, and processes the message to send
            and fetches the response. The thread loops forever until it's terminated or the KILL THREAD command is
            passed through the control pipe.
        """
        # Start our forever running loop
        while True:

            # Check if there is anything in the pipe
            if self.control_pipe.poll():
                # Check if we received the kill thread command
                if self.control_pipe.recv() == KILL_THREAD_COMMAND:
                    # Kill the while loop and end the thread
                    break

            # Check if there is anything in message queue
            if not self.send_queue.empty():
                # Fetch message from the queue to send, and unpickle
                message_obj, message_pickle = self.send_queue.get()

                # Open socket and set timeout
                sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                sock.settimeout(self._recv_timeout)

                # Connect socket to the recipient
                sock.connect( message_obj.recipient_address )

                # Push the pickled message down the socket
                sock.sendall(message_pickle)

                # Check if the message we send is a request (should get a response)
                if str(message_obj.message_type) == str(Message.REQUEST):

                    print "fetching reply"

                    # Lets fetch the response, and push the pickled message onto the queue
                    self.reply_queue.put( sock.recv(self._buffer_size) )

                    print "got a reply"

                # All done, close the socket
                sock.close()

            # Add small delay to stop this thread consuming too much CPU time
            sleep(0.1)

错误信息是:

File "C:/Users/oliver/OneDrive/GIT/pyke/pyke.py", line 127, in __init__
    self.thread.start()
  File "C:\Python27\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Python27\lib\multiprocessing\forking.py", line 277, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Python27\lib\multiprocessing\forking.py", line 199, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Python27\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 725, in save_inst
    save(stuff)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 396, in save_reduce
    save(cls)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 748, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <type 'thread.lock'>: it's not found as thread.lock
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Python27\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Python27\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Python27\lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError

最佳答案

pickler 错误来自 multiprocessing.Process 试图在内部将自身 pickle 到子进程。我很确定您的实例变量之一没有正确地 pickle 到子进程。你的问题哪一个不清楚

    # Store class vars
    self.send_queue     = send_queue
    self.reply_queue    = reply_queue
    self.control_pipe   = control_pipe
    self._recv_timeout  = recv_timeout
    self._buffer_size   = buffer_size

[根据 OP 的评论进行编辑]:

问题在于 send_queuereply_queueQueue.Queue 而不是 multiprocessing.Queue。当 fork 子 worker 时,Process 尝试将自身和任何实例变量序列化到子 worker 。但是 Queue.Queue 是不可序列化的本地对象,因此会出现错误。

与该问题相关的另一个事实是,multiprocessing.Queue re-uses the exceptionsQueue 中,而不重新导出这些。这is actually documented ,尽管有点隐藏在困惑之下:

Note: multiprocessing uses the usual Queue.Empty and Queue.Fullexceptions to signal a timeout. They are not available in the multiprocessing namespace so you need to import them from Queue.

关于python - 为什么不会这个 pickle ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31177482/

相关文章:

python - 如何在 python-fedex 中为国际货件添加海关值(value)?

python - 递归代替循环n次

python - 无法使用 PySpark pickle listreverseiterator 对象

python - “ascii”编解码器无法解码位置 6 : ordinal not in range(128) 中的字节 0x8b

python - numpy.multiply 总是等价于 * 运算符吗?

python - 打开(文件) - 它是否保持打开状态?

python - 替换 Pandas 数据框中的部分字符串

python - 使用 zlib 和 cPickle 将字典压缩/解压缩到文件

python - 将内存中的变量保存到文件中

python - 如何取消其类存在于不同命名空间(python)中的对象?