python - 结合 itertools 和多处理?

标签 python multiprocessing python-itertools

我有一个 256x256x256 Numpy 数组,其中每个元素都是一个矩阵。我需要对这些矩阵中的每一个进行一些计算,并且我想使用 multiprocessing 模块来加快速度。

这些计算的结果必须像原来的那样存储在256x256x256数组中,这样元素[i,j,k]处的矩阵结果必须将原始数组中的元素放入新数组的 [i,j,k] 元素中。

为了做到这一点,我想制作一个列表,它可以用伪方式编写为 [array[i,j,k], (i, j, k)] 和将其传递给要“多处理”的函数。 假设 matrices 是从原始数组中提取的所有矩阵的列表,myfunc 是执行计算的函数,代码看起来有点像这样:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

但是,似乎 map_async 实际上首先创建了这个巨大的 finput-list:我的 CPU 没有做太多,但内存和交换在几秒钟的事,这显然不是我想要的。

有没有一种方法可以将这个巨大的列表传递给多处理函数,而无需先显式创建它? 或者您知道解决此问题的另一种方法吗?

非常感谢! :-)

最佳答案

所有multiprocessing.Pool.map* 方法都完全使用迭代器 (demo code) 一旦函数被调用。要一次一个 block 地向迭代器的映射函数 block 提供数据,请使用 grouper_nofill:

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

附言。 pool.map_asyncchunksize 参数做了一些不同的事情:它将 iterable 分成 block ,然后将每个 block 交给一个调用 map(func,chunk )。如果 func(item) 完成得太快,这可以为工作进程提供更多数据来咀嚼,但这对您的情况没有帮助,因为迭代器仍然在 map_async< 之后立即被完全消耗 调用已发出。

关于python - 结合 itertools 和多处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7306522/

相关文章:

python - 使用 urlretrieve 后关闭 ftp 连接

python - 导入错误: No module named geographiclib.测地线

python-3.x - 让某种形式的 keras 多处理/线程在 Windows 上工作

python - 使用 Python 多重处理并行分割多个 ArcGIS 栅格

python - 以高效/pythonic 方式从列表中生成以下子集

python - itertools 的 chain.from_iterable 和 chain() 的更简化解释

python - 使用python查找不为空的最低值

python - 有没有一种方法可以在不使用 Python 中的 try/except 的情况下对引发到程序顶部的异常使用react?

Python 多处理文档示例

Python - 按月分组日期