我有一个 scipy.sparse.csr_matrix 格式的大稀疏矩阵 X,我想利用并行性将其乘以一个 numpy 数组 W。经过一些研究,我发现我需要在多处理中使用 Array 以避免在进程之间复制 X 和 W(来自例如:How to combine Pool.map with Array (shared memory) in Python multiprocessing? 和 Is shared readonly data copied to different processes for Python multiprocessing?)。这是我最近的尝试
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
但是输出类似于:(4.431, 0.165) 表明并行版本比非并行乘法慢得多。
我相信在将大数据复制到进程时类似的情况下可能会导致减速,但这里不是这种情况,因为我使用 Array 来存储共享变量(除非它发生在 numpy.frombuffer 中或创建时一个 csr_matrix,但后来我找不到直接共享 csr_matrix 的方法)。速度慢的另一个可能原因是为每个进程返回每个矩阵乘法的大结果但是我不确定解决这个问题的方法。
有人能看出我哪里错了吗? 谢谢你的帮助!
更新:我不能确定,但我认为在进程之间共享大量数据并不是那么有效,理想情况下我应该使用多线程(尽管全局解释器锁 (GIL) 使这变得非常困难)。解决此问题的一种方法是使用 Cython 发布 GIL(请参阅 http://docs.cython.org/src/userguide/parallelism.html ),尽管许多 numpy 函数需要通过 GIL。
最佳答案
最好的选择是使用 Cython 降到 C。这样你就可以击败 GIL 并使用 OpenMP。我对多处理速度变慢并不感到惊讶——那里有很多开销。
这是 CSparse 稀疏矩阵的简单包装器 OpenMP 包装器 - python 中的矢量乘积代码。
在我的笔记本电脑上,它的运行速度比 scipy 快一点。但是我没有那么多核心。代码,包括 setup.py 脚本和 C 头文件和东西在这个要点中:https://gist.github.com/rmcgibbo/6019670
我怀疑如果你真的希望并行代码很快(在我的笔记本电脑上,它只比单线程 scipy 快 20%,即使使用 4 个线程),你需要更仔细地考虑并行性在哪里发生的事情比我做的要多,注意缓存位置。
# psparse.pyx
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
cimport cython
cimport numpy as np
import numpy as np
import scipy.sparse
from libc.stddef cimport ptrdiff_t
from cython.parallel import parallel, prange
#-----------------------------------------------------------------------------
# Headers
#-----------------------------------------------------------------------------
ctypedef int csi
ctypedef struct cs:
# matrix in compressed-column or triplet form
csi nzmax # maximum number of entries
csi m # number of rows
csi n # number of columns
csi *p # column pointers (size n+1) or col indices (size nzmax)
csi *i # row indices, size nzmax
double *x # numerical values, size nzmax
csi nz # # of entries in triplet matrix, -1 for compressed-col
cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil
cdef extern csi cs_print (cs *A, csi brief) nogil
assert sizeof(csi) == 4
#-----------------------------------------------------------------------------
# Functions
#-----------------------------------------------------------------------------
@cython.boundscheck(False)
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None):
"""Multiply a sparse CSC matrix by a dense matrix
Parameters
----------
X : scipy.sparse.csc_matrix
A sparse matrix, of size N x M
W : np.ndarray[dtype=float564, ndim=2, mode='fortran']
A dense matrix, of size M x P. Note, W must be contiguous and in
fortran (column-major) order. You can ensure this using
numpy's `asfortranarray` function.
Returns
-------
A : np.ndarray[dtype=float64, ndim=2, mode='fortran']
A dense matrix, of size N x P, the result of multiplying X by W.
Notes
-----
This function is parallelized over the columns of W using OpenMP. You
can control the number of threads at runtime using the OMP_NUM_THREADS
environment variable. The internal sparse matrix code is from CSPARSE,
a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis.
http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the
GNU LGPL v2.1+.
References
----------
.. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems
(Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136
"""
if X.shape[1] != W.shape[0]:
raise ValueError('matrices are not aligned')
cdef int i
cdef cs csX
cdef np.ndarray[double, ndim=2, mode='fortran'] result
cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr = X.indptr
cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices
cdef np.ndarray[double, ndim=1, mode = 'c'] data = X.data
# Pack the scipy data into the CSparse struct. This is just copying some
# pointers.
csX.nzmax = X.data.shape[0]
csX.m = X.shape[0]
csX.n = X.shape[1]
csX.p = &indptr[0]
csX.i = &indices[0]
csX.x = &data[0]
csX.nz = -1 # to indicate CSC format
result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double)
for i in prange(W.shape[1], nogil=True):
# X is in fortran format, so we can get quick access to each of its
# columns
cs_gaxpy(&csX, &W[0, i], &result[0, i])
return result
它从 CSparse 调用一些 C。
// src/cs_gaxpy.c
#include "cs.h"
/* y = A*x+y */
csi cs_gaxpy (const cs *A, const double *x, double *y)
{
csi p, j, n, *Ap, *Ai ;
double *Ax ;
if (!CS_CSC (A) || !x || !y) return (0) ; /* check inputs */
n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ;
for (j = 0 ; j < n ; j++)
{
for (p = Ap [j] ; p < Ap [j+1] ; p++)
{
y [Ai [p]] += Ax [p] * x [j] ;
}
}
return (1) ;
}
关于python - 如何并行化 scipy 稀疏矩阵乘法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16814273/