python - 大对象和 `multiprocessing` 管道和 `send()`

标签 python multiprocessing pipe

我最近发现,如果我们使用 multiprocessing.Pipe 创建一对父子连接对象,并且如果对象 obj 我们是试图通过管道发送太大,我的程序挂起而没有抛出异常或根本没有做任何事情。请参阅下面的代码。 (下面的代码使用 numpy 包来生成大量 float 。)

import multiprocessing as mp
import numpy as np

def big_array(conn, size=1200):
    a = np.random.rand(size)
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 1200])
    proc.start()
    print "Child process started."
    proc.join()
    print "Child process joined."
    a = parent_conn.recv()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))

输出如下。

Main process started.
Child process started.
Child process trying to send array of 1200 floats.

它会无限期地卡在这里。但是,如果我们尝试发送一个包含 1000 个 float 的数组而不是 1200,那么程序将成功执行,并按预期输出以下内容。

Main process started.
Child process started.
Child process trying to send array of 1000 floats.
Child process joined.
Received the following object.
Type: <type 'numpy.ndarray'>. Size: 1000.
Press any key to continue . . .

这对我来说像是一个错误。文档说明如下。

send(obj) Send an object to the other end of the connection which should be read using recv().

The object must be picklable. Very large pickles (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception.

但是在我的运行中,甚至没有抛出 ValueError 异常,程序只是卡在那里。而且,1200 长的 numpy 数组有 9600 字节大,肯定不会超过 32MB!这看起来像一个错误。有谁知道如何解决这个问题?

顺便说一下,我使用的是 Windows 7,64 位。

最佳答案

尝试将 join() 移到 recv() 下方:

import multiprocessing as mp

def big_array(conn, size=1200):
    a = "a" * size
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 120000])
    proc.start()
    print "Child process started."
    print "Child process joined."
    a = parent_conn.recv()
    proc.join()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))

但我不太明白为什么您的示例即使对于小尺寸也适用。我在想,写入管道然后在不首先从管道读取数据的情况下进行连接的过程将阻止连接。您应该首先从管道接收,然后加入。但显然它不会阻塞小尺寸......?

编辑:来自文档(http://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming):

“一个会死锁的例子如下:”

from multiprocessing import Process, Queue

def f(q):
q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

关于python - 大对象和 `multiprocessing` 管道和 `send()`,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15137292/

相关文章:

python - 在 python 上使用请求发布图像

python - 为什么我在 scrapy 上的所有项目都是一样的?

multithreading - 计算最小值的最短时间

python - 在python中使用多处理时全局列表不更新

Unix 管道问题

node.js - 如何使用nodeJS从stdin读取文件

python - 如何在 Windows 7 上运行 python 脚本,能够启用/禁用窗口和任务栏的外观?

python - 如何使用 HTTP Basic Auth 注释测试 Flask 路由方法

python - 如何将 pyautogui 连接到虚拟显示器?

c - C (Linux) 中的管道问题