我有一个形状为 (k, n)
的大数组 arr1
,其中 k
和 n
都是1e7 阶。每行仅包含几百个非零元素并且是稀疏的。
对于每一行k
,我需要与形状(1, n)
的arr2
进行逐元素乘法。
目前,我使用 scipy.sparse.csc_matrix
的 multiply
方法执行此乘法。并且乘法是作为我正在最小化的函数的一部分执行的,这意味着它被评估数千次并导致大量的计算负载。更重要的是,我发现这个函数在单个核心上运行。
我尝试通过将数组拆分为 k
中的子数组来并行计算,从而找到并行计算的方法。令我沮丧的是,我发现并行版本运行得更慢。到目前为止,我已经尝试过 Dask 中的实现, Ray ,和multiprocessing 。下面是我在大约 500GB RAM 和 56 个 CPU 的机器上使用的实现。
我不明白为什么并行版本运行得这么慢。这是我第一次并行化自己的代码,因此非常感谢任何帮助。
设置数据(为了重现性)
import scipy.sparse as scisp
import numpy as np
import dask.array as da
import dask
import multiprocessing as mp
import ray
import psutil
rng = np.random.default_rng()
rows = np.zeros((5600, 1_000_000))
rows[:, rng.integers(low=0, high=1_000_000, size=110)] = 1
scisp_arr1 = scisp.coo_matrix(rows)
scisp_arr1 = scisp.csc_matrix(scisp_arr1)
arr2 = rng.uniform(size=(1, 1_000_000))
arr2 = scisp.csc_matrix(arr2)
arr1 = None
for i in range(1000):
big_box = scisp.vstack((arr1, scisp_arr))
arr1 = scisp.csc_matrix(arr1)
无与伦比
%%time arr1.multiply(arr2).sum()
CPU times: user 4.92 s, sys: 2.72 s, total: 7.64 s
Wall time: 7.64 s
达克
%%time
def f(arr1, arr2):
return arr1.multiply(arr2)
delayed_multiply = dask.delayed(f)
steps = arr1.shape[0]//56
total = []
for i in range(0, arr1.shape[0], steps):
total.append(delayed_multiply(arr1[i:i+steps], arr2).sum())
total = dask.delayed(sum)(total)
total.compute()
CPU times: user 1min 13s, sys: 49 s, total: 2min 2s
Wall time: 55.5 s
雷
ray.init(num_cpus=psutil.cpu_count())
%%time
@ray.remote
def f(arr1, arr2):
return arr1.multiply(arr2).sum()
steps = arr1.shape[0]//56
total = []
for i in range(0, arr1.shape[0], steps):
total.append(f.remote(arr1[i:i+steps], arr2))
sum(ray.get(total))
CPU times: user 52.4 s, sys: 9.39 s, total: 1min 1s
Wall time: 59.4 s
多处理
%%time
steps = arr1.shape[0]//56
chunks = [(arr1[i:i+steps], arr2) for i in range(0, arr1.shape[0], steps)]
def f(arr1, arr2):
return arr1.multiply(arr2).sum()
def main(args):
steps = arr1.shape[0]//56
pool = mp.Pool(mp.cpu_count())
result = pool.starmap(f, args)
return result
sum(main(chunks))
CPU times: user 49.8 s, sys: 41.9 s, total: 1min 31s
Wall time: 1min 39s
编辑 2021 年 11 月 18 日
遵循 @michael-delgado's suggestion我使用 Dask 进行了以下更新尝试:
def foo(arr):
arr[:, rng.integers(low=0, high=1_000_000, size=110)] = 1
return arr
chunk_len = 0.8*psutil.virtual_memory().available // 1e6 // psutil.cpu_count() // 8
arr1 = da.zeros((5_600_000, 1_000_000), chunks=(chunk_len, 1_000_000))
arr1 = foo(arr1)
arr1 = arr1.map_blocks(sparse.COO)
result = arr1.compute()
arr1 = da.from_array(result, chunks=(chunk_len, 1_000_000))
---
%%time
arr2 = da.random.uniform(size=(1, 1_000_000))
K = (arr1*arr2).sum(axis=1)
final_result = np.log(K.compute()).sum(axis=0)
CPU times: user 2min 5s, sys: 51 s, total: 2min 56s
Wall time: 5.71 s
在单核上使用 Scipy.sparse 进行相同的操作可以得到:
arr1_scipy = result.tocsc()
---
%%time
arr2 = scisp.csc_matrix(rng.uniform(size=(1, 1_000_000)))
K = arr1_scipy.multiply(arr2).sum(axis=1)
final_result = np.log(K).sum(axis=0)
CPU times: user 4.88 s, sys: 1.65 s, total: 6.53 s
Wall time: 6.53 s
令我惊讶的是没有更大的改进。这仅仅是由于并行化的开销吗? Dask 的实现可以进一步改进吗?
最佳答案
如果我正确理解您的实现,那么在任何这些情况下您实际上都没有对数组进行分区。因此,您所做的就是运行完全相同的工作流程,但在不同的线程上,因此“并行”执行时间是原始运行时加上设置分布式作业调度程序并将所有内容传递给第二个线程的开销。
如果您希望看到总时间的改进,则必须实际重写代码以对数据子集进行操作。
在 dask 情况下,使用 dask.array.from_numpy
将数组拆分为多个 block ,然后重写工作流程以使用 dask.array 操作而不是 numpy 操作。或者,您可以自己对数据进行分区,并使用 dask distribution 的 client.map
在数组的子集上运行您的函数。 (参见quickstart)。
这些方法都不容易,您需要认识到其中任何一种都存在开销(无论是在实际计算/网络使用/内存等方面还是在您的时间的实际投资方面),但是如果总运行时间很重要,那么这是值得的。请参阅dask best practices文档以获取更多背景信息。
更新:
在使用 dask.array 进行迭代之后,您的实现现在比单线程墙时间更快,是的,额外的 CPU 时间是开销。对于你第一次尝试这个,让它比 numpy/scipy 更快(顺便说一句,它们已经经过了深度优化,并且很可能在单线程方法中进行并行化)是一个巨大的胜利,所以拍拍自己背部。让它变得更快是一个合理的挑战,远远超出了这个问题的范围。欢迎来到并行性!
补充阅读:
关于python - 并行 scipy.sparse 逐元素乘法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70008521/