python - 在 numpy block 上并行运行重循环

标签 python numpy multiprocessing

我需要迭代一个巨大的 numpy 数组来构建三个列表,具体取决于昂贵的 C 库调用的结果,它接受标量值并且无法矢量化(或者至少我不知道如何做到这一点)。 这个循环可能需要几个小时到几天,我可以看到性能随着时间的推移而下降(我记录了进度,我可以看到最后的速度要慢得多),可能是由于列表的大小不断增加(??)。

代码如下(我省略了与打印进度和一些微观优化相关的代码):

import numpy as np
import swig_c_lib


def build_indexes(large_numpy_array_1, large_numpy_array_2):
    xs = []
    ys = []
    idxs = []

    for (x, y), value in np.ndenumerate(large_numpy_array_1):

        if not (value <= -1.0e+10):
            try:
                index = swig_c_lib.slow_computation(np.asscalar(large_numpy_array_2[x, y]), np.asscalar(large_numpy_array_1[x, y]))
            except swig_lib.InternalError:
                pass
            else:
                xs.append(x)
                ys.append(y)
                idxs.append(index)
    return np.asarray(xs), np.asarray(ys), np.asarray(idxs)

可能,一种解决方案是将大型输入 numpy 数组拆分为 4 个子数组并使用多重处理(但我不确定如何合并结果)。有谁可以帮忙吗?

最佳答案

这是一个问题,其中dask模块可以提供帮助

我们首先创建两个数组 a1a2。它们可以具有任意形状,但在本例中,我们将使它们成为 nn,其中 n=30。我们将数组压平并将它们堆叠在一起,形成一个形状大的数组 (2,900)。沿 axis=1 维度的每一对都是 a1a2 上相同位置的一对项目:

In[1]:
import numpy as np
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)
a = np.stack((a1.flat, a2.flat))
a.shape

Out[1]:
(2, 900)

然后我们继续将数组分割成 block 。我们选择 250 个 block :

In[2]:
chunks = np.array_split(a, 250, axis=1)
len(chunks)

Out[2]:
250


In[3]:
chunks[0]

Out[3]:
array([[ 0.54631022,  0.8428954 ,  0.11835531,  0.59720379],
   [ 0.51184696,  0.64365038,  0.74471553,  0.67035977]])

我们现在将定义一个slow_function,它将发挥问题中描述的慢速计算的作用。我们还定义了一种使用 numpy 将慢速函数应用于其中一个 block 的方法。

In[4]:
def slow_function(pair):
    return np.asscalar(pair[0]) + np.asscalar(pair[1])

def apply_on_chunk(chunk):
    return np.apply_along_axis(slow_function, 0, chunk)

apply_on_chunk(chunks[0])

Out[4]:
array([ 1.05815717,  1.48654578,  0.86307085,  1.26756356])

在上面,请注意,无论 block 中 axis=1 的大小如何,apply_on_chunk 都会起作用。换句话说,我们也可以继续调用 apply_on_chunk(a) 来计算整个初始数组的结果。


dask.bag 并行化

我们现在展示如何使用 dask.bag 对象的 map 方法沿 block 并行计算:

In[5]:
import dask.bag as db
mybag = db.from_sequence(chunks)

In[6]:
%time myresult = mybag.map(apply_on_chunk)

Out[6]:
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.62 ms

此时我们还没有计算任何内容。我们已经向 dask 描述了我们希望如何计算结果。此步骤发生得相对较快,大约 1.6 毫秒


要继续并触发实际计算,我们对 myresult 调用 compute 方法:

In[7]:
%time myresult = myresult.compute()


Out[7]:
CPU times: user 256 ms, sys: 24 ms, total: 280 ms
Wall time: 362 ms

上面的运行时间略多于 1/3 秒。我们获得的是一个数组列表,对应于对 dask.bag 中每个元素调用 apply_on_chunk 的结果。我们检查其中的前五个:

In[8]:
myresult[:5]

Out[8]:
[array([ 1.05815717,  1.48654578,  0.86307085,  1.26756356]),
 array([ 1.48913909,  1.25028145,  1.36707112,  1.04826167]),
 array([ 0.90069768,  1.24921559,  1.23146726,  0.84963409]),
 array([ 0.72292347,  0.87069598,  1.35893143,  1.02451637]),
 array([ 1.16422966,  1.35559156,  0.9071381 ,  1.17786002])]

如果我们确实想要最终形式的结果,我们必须调用 np.concatenate 来将所有 block 的结果放在一起。我们在下面执行此操作,并显示计算的性能:

In[9]:
%time myresult = np.concatenate(\
    db.from_sequence(\
        np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
    ).map(apply_on_chunk).compute())

Out[9]:
CPU times: user 232 ms, sys: 40 ms, total: 272 ms
Wall time: 342 ms

完整的计算为我们提供了一个带有结果的数组,运行时间略多于 1/3 秒。


如果我们直接在整个数组上进行计算而不使用任何并行化会怎样:

In[10]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))

Out[10]:
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 12.9 ms

直接计算快很多。但原因是 slow_function 目前并没有那么慢。只是两个要素的相加而已,根本不需要花太多时间。我们在 dask.bag 计算中看到的缓慢是并行化产生的开销。


让我们继续再试一次,但这一次是一个非常慢的函数,每次调用大约需要 20 毫秒:

In[11]:
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)

import time

def slow_function(pair):
    time.sleep(0.02)
    return np.asscalar(pair[0]) + np.asscalar(pair[1])

def apply_on_chunk(chunk):
    return np.apply_along_axis(slow_function, 0, chunk)

让我们比较一下 dask 的功能与直接在整个数组上运行计算的功能:

In[12]:
%time myresult = np.concatenate(\
    db.from_sequence(\
        np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
    ).map(apply_on_chunk).compute())

Out[12]:
CPU times: user 236 ms, sys: 20 ms, total: 256 ms
Wall time: 4.75 s


In[13]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))


Out[13]:
CPU times: user 72 ms, sys: 16 ms, total: 88 ms
Wall time: 18.2 s

可以看出,dask正在利用多处理来加速计算。我们的速度提高了大约 4 倍。

为了完整起见,我们表明 dask 的结果与直接计算的结果彼此一致:

In[14]:
np.testing.assert_array_equal(myresult, myresult_)
print("success")

Out[14]:
success


请注意,问题中的函数返回一个元组

np.asarray(xs), np.asarray(ys), np.asarray(idxs)

我们所描述的仅涉及np.asarray(idxs)的计算。如果知道原始 a1a2 的形状,则可以轻松获得返回元组中的前两项。

关于python - 在 numpy block 上并行运行重循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34240481/

相关文章:

python - 使用 TensorFlow 层的 `kernel_constraint` 实现权重归一化

Android Volley 似乎无法连接到 Flask 本地主机地址以获取 Flask 返回字符串的 GET 方法?

python - 如何在使用 mark.parametrize 装饰器的同时使用补丁装饰器?

multithreading - 多进程系统中与sigwait相关的混淆

python - 多处理:池和映射以及 sys.exit()

python - 限制大 RDD

python - 为什么 scipy.stats.nanmean 给出与 numpy.nansum 不同的结果?

python - 如何高效操作大型 numpy 数组

python - t 中缺少 ppf 函数

python多处理池重试