我的问题如下,我有一个迭代算法,因此在每次迭代时它需要执行几个矩阵-矩阵乘法 dot(A_i, B_i),对于我 = 1 ... k。由于这些乘法是使用 Numpy 的点执行的,我知道它们正在调用 BLAS-3 实现,这非常快。问题是调用次数巨大,结果成为我程序的瓶颈。我想通过制造更少的产品但使用更大的矩阵来最大程度地减少所有这些调用带来的开销。
为简单起见,假设所有矩阵都是 n x n(通常 n 不大,范围在 1 到 1000 之间)。解决我的问题的一种方法是考虑分块对角矩阵 diag(A_i) 并执行下面的乘积。
这只是对函数 dot 的一次调用,但现在程序浪费了很多时间来执行与零的乘法。这个想法似乎行不通,但它给出了结果 [A_1 B_1, ..., A_k B_k],即所有产品堆叠在一个大矩阵中.
我的问题是,有没有一种方法可以通过单个函数调用来计算 [A_1 B_1, ..., A_k B_k]?或者更重要的是,我怎样才能比制作一个 Numpy 点循环更快地计算这些乘积?
最佳答案
这取决于矩阵的大小
编辑
对于较大的 nxn 矩阵(大约大小 20),编译代码的 BLAS 调用速度更快,对于较小的矩阵,自定义 Numba 或 Cython 内核通常更快。
以下方法为给定的输入形状生成自定义点函数。通过这种方法,还可以受益于与编译器相关的优化,例如循环展开,这对于小矩阵尤为重要。
需要注意的是,生成和编译一个内核大约需要 2 分钟。 1s,因此请确保仅在确实需要时才调用生成器。
生成器函数
def gen_dot_nm(x,y,z):
#small kernels
@nb.njit(fastmath=True,parallel=True)
def dot_numba(A,B):
"""
calculate dot product for (x,y)x(y,z)
"""
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
assert A.shape[1]==x
assert B.shape[1]==y
assert B.shape[2]==z
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
for i in range(x):
for j in range(z):
acc=0.
for k in range(y):
acc+=A[ii,i,k]*B[ii,k,j]
res[ii,i,j]=acc
return res
#large kernels
@nb.njit(fastmath=True,parallel=True)
def dot_BLAS(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
res[ii]=np.dot(A[ii],B[ii])
return res
#At square matices above size 20
#calling BLAS is faster
if x>=20 or y>=20 or z>=20:
return dot_BLAS
else:
return dot_numba
使用示例
A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)
dot22=gen_dot_nm(2,2,2)
X=dot22(A,B)
%timeit X3=dot22(A,B)
#5.94 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
旧答案
另一种方法是使用一些特殊的 BLAS 实现,但需要做更多的工作,这会创建 custom kernels对于非常小的矩阵,而不是从 C 中调用这个内核。
示例
import numpy as np
import numba as nb
#Don't use this for larger submatrices
@nb.njit(fastmath=True,parallel=True)
def dot(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
for i in range(A.shape[1]):
for j in range(B.shape[2]):
acc=0.
for k in range(B.shape[1]):
acc+=A[ii,i,k]*B[ii,k,j]
res[ii,i,j]=acc
return res
@nb.njit(fastmath=True,parallel=True)
def dot_22(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[1]==2
assert A.shape[2]==2
assert B.shape[1]==2
assert B.shape[2]==2
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
res[ii,0,0]=A[ii,0,0]*B[ii,0,0]+A[ii,0,1]*B[ii,1,0]
res[ii,0,1]=A[ii,0,0]*B[ii,0,1]+A[ii,0,1]*B[ii,1,1]
res[ii,1,0]=A[ii,1,0]*B[ii,0,0]+A[ii,1,1]*B[ii,1,0]
res[ii,1,1]=A[ii,1,0]*B[ii,0,1]+A[ii,1,1]*B[ii,1,1]
return res
时间
A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)
X=A@B
X2=np.einsum("xik,xkj->xij",A,B)
X3=dot_22(A,B) #avoid measurig compilation overhead
X4=dot(A,B) #avoid measurig compilation overhead
%timeit X=A@B
#262 µs ± 2.55 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit np.einsum("xik,xkj->xij",A,B,optimize=True)
#264 µs ± 3.22 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit X3=dot_22(A,B)
#5.68 µs ± 27.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%timeit X4=dot(A,B)
#9.79 µs ± 61.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
关于performance - 最大限度地减少由于大量 Numpy 点调用而产生的开销,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59347796/