python - 如何并行化 scipy 稀疏矩阵乘法

标签 python parallel-processing scipy sparse-matrix

我有一个 scipy.sparse.csr_matrix 格式的大稀疏矩阵 X,我想利用并行性将其乘以一个 numpy 数组 W。经过一些研究,我发现我需要在多处理中使用 Array 以避免在进程之间复制 X 和 W(来自例如:How to combine Pool.map with Array (shared memory) in Python multiprocessing?Is shared readonly data copied to different processes for Python multiprocessing?)。这是我最近的尝试

import multiprocessing 
import numpy 
import scipy.sparse 
import time 

def initProcess(data, indices, indptr, shape, Warr, Wshp):
    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    XData = data 
    XIndices = indices 
    XIntptr = indptr 
    Xshape = shape 

    global WArray
    global WShape 

    WArray = Warr     
    WShape = Wshp 

def dot2(args):
    rowInds, i = args     

    global XData 
    global XIndices
    global XIntptr 
    global Xshape 

    data = numpy.frombuffer(XData, dtype=numpy.float)
    indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
    indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
    Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)

    global WArray
    global WShape 
    W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)

    return Xr[rowInds[i]:rowInds[i+1], :].dot(W)

def getMatmat(X): 
    numJobs = multiprocessing.cpu_count()
    rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)

    #Store the data in X as RawArray objects so we can share it amoung processes
    XData = multiprocessing.RawArray("d", X.data)
    XIndices = multiprocessing.RawArray("i", X.indices)
    XIndptr = multiprocessing.RawArray("i", X.indptr)

    def matmat(W): 
        WArray = multiprocessing.RawArray("d", W.flatten())
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape)) 
        params = [] 

        for i in range(numJobs): 
            params.append((rowInds, i))

        iterator = pool.map(dot2, params)
        P = numpy.zeros((X.shape[0], W.shape[1])) 

        for i in range(numJobs): 
            P[rowInds[i]:rowInds[i+1], :] = iterator[i]

        return P   

    return matmat 

if __name__ == '__main__':
    #Create a random sparse matrix X and a random dense one W     
    X = scipy.sparse.rand(10000, 8000, 0.1)
    X = X.tocsr()
    W = numpy.random.rand(8000, 20)

    startTime = time.time()
    A = getMatmat(X)(W)
    parallelTime = time.time()-startTime 

    startTime = time.time()
    B = X.dot(W)
    nonParallelTime = time.time()-startTime 

    print(parallelTime, nonParallelTime)

但是输出类似于:(4.431, 0.165) 表明并行版本比非并行乘法慢得多。

我相信在将大数据复制到进程时类似的情况下可能会导致减速,但这里不是这种情况,因为我使用 Array 来存储共享变量(除非它发生在 numpy.frombuffer 中或创建时一个 csr_matrix,但后来我找不到直接共享 csr_matrix 的方法)。速度慢的另一个可能原因是为每个进程返回每个矩阵乘法的大结果但是我不确定解决这个问题的方法。

有人能看出我哪里错了吗? 谢谢你的帮助!

更新:我不能确定,但​​我认为在进程之间共享大量数据并不是那么有效,理想情况下我应该使用多线程(尽管全局解释器锁 (GIL) 使这变得非常困难)。解决此问题的一种方法是使用 Cython 发布 GIL(请参阅 http://docs.cython.org/src/userguide/parallelism.html ),尽管许多 numpy 函数需要通过 GIL。

最佳答案

最好的选择是使用 Cython 降到 C。这样你就可以击败 GIL 并使用 OpenMP。我对多处理速度变慢并不感到惊讶——那里有很多开销。

这是 CSparse 稀疏矩阵的简单包装器 OpenMP 包装器 - python 中的矢量乘积代码。

在我的笔记本电脑上,它的运行速度比 scipy 快一点。但是我没有那么多核心。代码,包括 setup.py 脚本和 C 头文件和东西在这个要点中:https://gist.github.com/rmcgibbo/6019670

我怀疑如果你真的希望并行代码很快(在我的笔记本电脑上,它只比单线程 scipy 快 20%,即使使用 4 个线程),你需要更仔细地考虑并行性在哪里发生的事情比我做的要多,注意缓存位置。

# psparse.pyx

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
cimport cython
cimport numpy as np
import numpy as np
import scipy.sparse
from libc.stddef cimport ptrdiff_t
from cython.parallel import parallel, prange

#-----------------------------------------------------------------------------
# Headers
#-----------------------------------------------------------------------------

ctypedef int csi

ctypedef struct cs:
    # matrix in compressed-column or triplet form
    csi nzmax       # maximum number of entries
    csi m           # number of rows
    csi n           # number of columns
    csi *p          # column pointers (size n+1) or col indices (size nzmax)
    csi *i          # row indices, size nzmax
    double *x       # numerical values, size nzmax
    csi nz          # # of entries in triplet matrix, -1 for compressed-col

cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil
cdef extern csi cs_print (cs *A, csi brief) nogil

assert sizeof(csi) == 4

#-----------------------------------------------------------------------------
# Functions
#-----------------------------------------------------------------------------

@cython.boundscheck(False)
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None):
    """Multiply a sparse CSC matrix by a dense matrix

    Parameters
    ----------
    X : scipy.sparse.csc_matrix
        A sparse matrix, of size N x M
    W : np.ndarray[dtype=float564, ndim=2, mode='fortran']
        A dense matrix, of size M x P. Note, W must be contiguous and in
        fortran (column-major) order. You can ensure this using
        numpy's `asfortranarray` function.

    Returns
    -------
    A : np.ndarray[dtype=float64, ndim=2, mode='fortran']
        A dense matrix, of size N x P, the result of multiplying X by W.

    Notes
    -----
    This function is parallelized over the columns of W using OpenMP. You
    can control the number of threads at runtime using the OMP_NUM_THREADS
    environment variable. The internal sparse matrix code is from CSPARSE, 
    a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis.
    http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the
    GNU LGPL v2.1+.

    References
    ----------
    .. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems
    (Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136
    """
    if X.shape[1] != W.shape[0]:
        raise ValueError('matrices are not aligned')

    cdef int i
    cdef cs csX
    cdef np.ndarray[double, ndim=2, mode='fortran'] result
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr  = X.indptr
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices
    cdef np.ndarray[double, ndim=1, mode = 'c']    data = X.data

    # Pack the scipy data into the CSparse struct. This is just copying some
    # pointers.
    csX.nzmax = X.data.shape[0]
    csX.m = X.shape[0]
    csX.n = X.shape[1]
    csX.p = &indptr[0]
    csX.i = &indices[0]
    csX.x = &data[0]
    csX.nz = -1  # to indicate CSC format

    result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double)
    for i in prange(W.shape[1], nogil=True):
        # X is in fortran format, so we can get quick access to each of its
        # columns
        cs_gaxpy(&csX, &W[0, i], &result[0, i])

    return result

它从 CSparse 调用一些 C。

// src/cs_gaxpy.c

#include "cs.h"
/* y = A*x+y */
csi cs_gaxpy (const cs *A, const double *x, double *y)
{
  csi p, j, n, *Ap, *Ai ;
  double *Ax ;
  if (!CS_CSC (A) || !x || !y) return (0) ;       /* check inputs */
  n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ;
  for (j = 0 ; j < n ; j++)
    {
      for (p = Ap [j] ; p < Ap [j+1] ; p++)
        {
      y [Ai [p]] += Ax [p] * x [j] ;
        }
    }
  return (1) ;
}

关于python - 如何并行化 scipy 稀疏矩阵乘法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16814273/

相关文章:

c# - Parallel.ForEach 与元组返回 c#

java - Java除了并行流之外如何实现隐式并发

python - 从 tcpdump 中剥离有效负载?

python - 改变 Macbook(Pro) 键盘背光的亮度

matlab - 使用 Parallel Computing Toolbox 窃取工作

python - 使用 py2exe 创建的 exe 不工作并返回有错误的日志文件

python - 如何在 python 中应用 2d 和 3d 过滤器

python - 在给定概率函数的情况下绘制贝叶斯决策边界的内置函数

python - Tkinter.OptionMenu 未按预期工作

python - 如何在列出所有 VM 时使用 Azure Python SDK 中的 nextLink 属性