python - 用于多处理的共享内存中的大型 numpy 数组 : Is something wrong with this approach?

标签 python numpy multiprocessing

多处理是一个很棒的工具,但使用大内存块并不是那么简单。您可以在每个进程中加载​​ block 并将结果转储到磁盘上,但有时您需要将结果存储在内存中。最重要的是,使用花哨的 numpy 功能。

我阅读/搜索了很多内容并得出了一些答案:

Use numpy array in shared memory for multiprocessing

Share Large, Read-Only Numpy Array Between Multiprocessing Processes

Python multiprocessing global numpy arrays

How do I pass large numpy arrays between python subprocesses without saving to disk?

等等等等

它们都有缺点:不太主流的库(sharedmem);全局存储变量;不太容易阅读代码、管道等。

我的目标是在我的工作人员中无缝地使用 numpy,而不必担心转换等问题。

经过多次试验,我想出了 this .它适用于我的 ubuntu 16、python 3.6、16GB、8 核机器。与以前的方法相比,我做了很多“捷径”。没有全局共享状态,没有需要在 worker 内部转换为 numpy 的纯内存指针,作为进程参数传递的大型 numpy 数组等。

Pastebin link above , 但我会在这里放一些片段。

一些导入:

import numpy as np
import multiprocessing as mp
import multiprocessing.sharedctypes
import ctypes

分配一些共享内存并将其包装到一个 numpy 数组中:

def create_np_shared_array(shape, dtype, ctype)
     . . . . 
    shared_mem_chunck = mp.sharedctypes.RawArray(ctype, size)
    numpy_array_view = np.frombuffer(shared_mem_chunck, dtype).reshape(shape)
    return numpy_array_view

创建共享数组并在里面放一些东西

src = np.random.rand(*SHAPE).astype(np.float32)
src_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float)
dst_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float)
src_shared[:] = src[:]  # Some numpy ops accept an 'out' array where to store the results

生成进程:

p = mp.Process(target=lengthly_operation,args=(src_shared, dst_shared, k, k + STEP))
p.start()
p.join()

以下是一些结果(完整引用见 pastebin 代码):

Serial version: allocate mem 2.3741257190704346 exec: 17.092209577560425 total: 19.46633529663086 Succes: True
Parallel with trivial np: allocate mem 2.4535582065582275 spawn  process: 0.00015354156494140625 exec: 3.4581971168518066 total: 5.911908864974976 Succes: False
Parallel with shared mem np: allocate mem 4.535916328430176 (pure alloc:4.014216661453247 copy: 0.5216996669769287) spawn process: 0.00015664100646972656 exec: 3.6783478260040283 total: 8.214420795440674 Succes: True

我还做了一个 cProfile(为什么在分配共享内存时多了 2 秒?)并意识到有一些对 tempfile.py 的调用,{ “_io.BufferedWriter”对象的“写入”方法

问题

  • 我做错了什么吗?
  • (大型)阵列是否来回腌制而我没有获得任何速度?请注意,第二次运行(使用常规 np 数组未通过正确性测试)
  • 有没有办法进一步改进时间安排、代码清晰度等? (关于多处理范例)

注意事项

  • 我无法使用进程池,因为 mem 必须在 fork 处继承,而不是作为参数发送。

最佳答案

共享数组的分配很慢,因为它显然是先写入磁盘,所以可以通过 mmap 共享。有关引用,请参阅 heap.pysharedctypes.py . 这就是 tempfile.py 出现在探查器中的原因。我认为这种方法的优点是在发生崩溃时清理共享内存,而 POSIX 共享内存不能保证这一点。

感谢 fork,您的代码不会发生 pickling,正如您所说,内存是继承的。第二次运行不起作用的原因是子进程不允许写入父进程的内存。相反,私有(private)页面是动态分配的,只有在子进程结束时才会被丢弃。

我只有一个建议:您不必自己指定 ctype,可以通过 np.ctypeslib._typecodes 从 numpy dtype 中找出正确的类型。或者只是对所有内容使用 c_byte 并使用 dtype itemsize 来计算缓冲区的大小,无论如何它都会被 numpy 转换。

关于python - 用于多处理的共享内存中的大型 numpy 数组 : Is something wrong with this approach?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46811709/

相关文章:

python - Numpy 向量化算法找到第一个大于当前元素的 future 元素

python - 定义具有非标准域的多维字段

python - python 中类似 matlab 的复杂数据结构(numpy/scipy)

python - numpy.polyfit 与 scipy.odr

linux - 操作系统内 2 个进程之间的双向消息传递

python - py.test : how to automatically detect an exception in a child process?

python - 拆分数据帧的数据帧并插入一列

python - 无法使用 selenium python 访问网站上的 Google Alert 登录

python - 定义一个函数来生成字典

python - 如果我 fork 一个有守护线程的进程会发生什么?