我最近发现,如果我们使用 multiprocessing.Pipe
创建一对父子连接对象,并且如果对象 obj
我们是试图通过管道发送太大,我的程序挂起而没有抛出异常或根本没有做任何事情。请参阅下面的代码。 (下面的代码使用 numpy
包来生成大量 float 。)
import multiprocessing as mp
import numpy as np
def big_array(conn, size=1200):
a = np.random.rand(size)
print "Child process trying to send array of %d floats." %size
conn.send(a)
return a
if __name__ == "__main__":
print "Main process started."
parent_conn, child_conn = mp.Pipe()
proc = mp.Process(target=big_array, args=[child_conn, 1200])
proc.start()
print "Child process started."
proc.join()
print "Child process joined."
a = parent_conn.recv()
print "Received the following object."
print "Type: %s. Size: %d." %(type(a), len(a))
输出如下。
Main process started.
Child process started.
Child process trying to send array of 1200 floats.
它会无限期地卡在这里。但是,如果我们尝试发送一个包含 1000 个 float 的数组而不是 1200,那么程序将成功执行,并按预期输出以下内容。
Main process started.
Child process started.
Child process trying to send array of 1000 floats.
Child process joined.
Received the following object.
Type: <type 'numpy.ndarray'>. Size: 1000.
Press any key to continue . . .
这对我来说像是一个错误。文档说明如下。
send(obj) Send an object to the other end of the connection which should be read using recv().
The object must be picklable. Very large pickles (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception.
但是在我的运行中,甚至没有抛出 ValueError
异常,程序只是卡在那里。而且,1200 长的 numpy
数组有 9600 字节大,肯定不会超过 32MB!这看起来像一个错误。有谁知道如何解决这个问题?
顺便说一下,我使用的是 Windows 7,64 位。
最佳答案
尝试将 join()
移到 recv()
下方:
import multiprocessing as mp
def big_array(conn, size=1200):
a = "a" * size
print "Child process trying to send array of %d floats." %size
conn.send(a)
return a
if __name__ == "__main__":
print "Main process started."
parent_conn, child_conn = mp.Pipe()
proc = mp.Process(target=big_array, args=[child_conn, 120000])
proc.start()
print "Child process started."
print "Child process joined."
a = parent_conn.recv()
proc.join()
print "Received the following object."
print "Type: %s. Size: %d." %(type(a), len(a))
但我不太明白为什么您的示例即使对于小尺寸也适用。我在想,写入管道然后在不首先从管道读取数据的情况下进行连接的过程将阻止连接。您应该首先从管道接收,然后加入。但显然它不会阻塞小尺寸......?
编辑:来自文档(http://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming):
“一个会死锁的例子如下:”
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
关于python - 大对象和 `multiprocessing` 管道和 `send()`,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15137292/