我有一个 256x256x256
Numpy 数组,其中每个元素都是一个矩阵。我需要对这些矩阵中的每一个进行一些计算,并且我想使用 multiprocessing
模块来加快速度。
这些计算的结果必须像原来的那样存储在256x256x256
数组中,这样元素[i,j,k]
处的矩阵结果必须将原始数组中的元素放入新数组的 [i,j,k]
元素中。
为了做到这一点,我想制作一个列表,它可以用伪方式编写为 [array[i,j,k], (i, j, k)]
和将其传递给要“多处理”的函数。
假设 matrices
是从原始数组中提取的所有矩阵的列表,myfunc
是执行计算的函数,代码看起来有点像这样:
import multiprocessing
import numpy as np
from itertools import izip
def myfunc(finput):
# Do some calculations...
...
# ... and return the result and the index:
return (result, finput[1])
# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)
# Make function input from the matrices and the indices:
finput = izip(matrices, inds)
pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))
但是,似乎 map_async
实际上首先创建了这个巨大的 finput
-list:我的 CPU 没有做太多,但内存和交换在几秒钟的事,这显然不是我想要的。
有没有一种方法可以将这个巨大的列表传递给多处理函数,而无需先显式创建它? 或者您知道解决此问题的另一种方法吗?
非常感谢! :-)
最佳答案
所有multiprocessing.Pool.map*
方法都完全使用迭代器 (demo code) 一旦函数被调用。要一次一个 block 地向迭代器的映射函数 block 提供数据,请使用 grouper_nofill
:
def grouper_nofill(n, iterable):
'''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
'''
it=iter(iterable)
def take():
while 1: yield list(itertools.islice(it,n))
return iter(take().next,[])
chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)
附言。 pool.map_async
的 chunksize
参数做了一些不同的事情:它将 iterable 分成 block ,然后将每个 block 交给一个调用 map(func,chunk )
。如果 func(item)
完成得太快,这可以为工作进程提供更多数据来咀嚼,但这对您的情况没有帮助,因为迭代器仍然在 map_async< 之后立即被完全消耗
调用已发出。
关于python - 结合 itertools 和多处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7306522/