我有一个关于如何尽可能快地计算 numpy 中的距离的问题,
def getR1(VVm,VVs,HHm,HHs):
t0=time.time()
R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
R*=R
R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
R1*=R1
R+=R1
del R1
print "R1\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500)
print numpy.max(R) #4176.26290975
# uses 17.5Gb ram
return R
def getR2(VVm,VVs,HHm,HHs):
t0=time.time()
precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
#print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
R = numpy.einsum('ijk,ijk->ij', deltas, deltas)
print "R2\t",time.time()-t0,R.shape, #14.5291359425 (108225, 10500)
print numpy.max(R) #4176.26290975
# uses 26Gb ram
return R
def getR3(VVm,VVs,HHm,HHs):
from numpy.core.umath_tests import inner1d
t0=time.time()
precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
#print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
R = inner1d(deltas, deltas)
print "R3\t",time.time()-t0, R.shape, #12.6972110271 (108225, 10500)
print numpy.max(R) #4176.26290975
#Uses 26Gb
return R
def getR4(VVm,VVs,HHm,HHs):
from scipy.spatial.distance import cdist
t0=time.time()
precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
R=spdist.cdist(precomputed_flat,measured_flat, 'sqeuclidean') #.T
print "R4\t",time.time()-t0, R.shape, #17.7022118568 (108225, 10500)
print numpy.max(R) #4176.26290975
# uses 9 Gb ram
return R
def getR5(VVm,VVs,HHm,HHs):
from scipy.spatial.distance import cdist
t0=time.time()
precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
R=spdist.cdist(precomputed_flat,measured_flat, 'euclidean') #.T
print "R5\t",time.time()-t0, R.shape, #15.6070930958 (108225, 10500)
print numpy.max(R) #64.6240118667
# uses only 9 Gb ram
return R
def getR6(VVm,VVs,HHm,HHs):
from scipy.weave import blitz
t0=time.time()
R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
blitz("R=R*R") # R*=R
R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
blitz("R1=R1*R1") # R1*=R1
blitz("R=R+R1") # R+=R1
del R1
print "R6\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500)
print numpy.max(R) #4176.26290975
return R
结果如下:
R1 11.7737319469 (108225, 10500) 4909.66881791
R2 15.1279799938 (108225, 10500) 4909.66881791
R3 12.7408981323 (108225, 10500) 4909.66881791
R4 17.3336868286 (10500, 108225) 4909.66881791
R5 15.7530870438 (10500, 108225) 70.0690289494
R6 11.670968771 (108225, 10500) 4909.66881791
虽然最后一个给出了sqrt((VVm-VVs)^2+(HHm-HHs)^2),而其他的给出了(VVm-VVs)^2+(HHm-HHs)^2,这不是真的很重要,因为在我的代码中,我对每个 i 取 R[i,:] 的最小值,无论如何 sqrt 不会影响最小值,(如果我对距离感兴趣,我只取 sqrt(value ),而不是对整个数组执行 sqrt,因此实际上没有时间差异。
问题仍然存在:为什么第一个解决方案是最好的,(第二个和第三个较慢的原因是因为 deltas=... 需要 5.8 秒,(这也是为什么这两个方法需要 26Gb 的原因)),并且为什么 sqeuclidean 比 euclidean 慢?
sqeuclidean 应该只是做 (VVm-VVs)^2+(HHm-HHs)^2,而我认为它做了一些不同的事情。任何人都知道如何找到该方法的源代码(C 或底部的任何内容)?我认为它确实 sqrt((VVm-VVs)^2+(HHm-HHs)^2)^2 (我能想到为什么它会比 (VVm-VVs)^2+(HHm-HHs) 慢的唯一原因) ^2 - 我知道这是一个愚蠢的理由,有人有更合乎逻辑的理由吗?)
由于我对 C 语言一无所知,我如何将其与 scipy.weave 内联?并且该代码是否可以像使用 python 一样正常编译?或者我需要为此安装特殊的东西吗?
编辑:好的,我用 scipy.weave.blitz 试过,(R6 方法),速度稍快一些,但我假设比我更了解 C 的人仍然可以提高这个速度?我只是采用 a+=b 或 *= 形式的行,并查看它们在 C 中的情况,并将它们放入 blitz 语句中,但我想如果我将带有 flatten 和 newaxis 的语句放在行中C 也一样,它也应该运行得更快,但我不知道我该怎么做(了解 C 的人可能会解释一下?)。现在,我猜 blitz 和我的第一种方法之间的区别还不够大,不足以真正由 C vs numpy 引起?
我想其他方法,比如 deltas=... 也可以运行得更快,当我将它放在 C 中时?
最佳答案
每当您进行乘法和求和运算时,请尝试使用点积函数之一或 np.einsum
。由于您正在预分配数组,而不是为水平和垂直坐标使用不同的数组,因此将它们堆叠在一起:
precomputed_flat = np.column_stack((svf.flatten(), shf.flatten()))
measured_flat = np.column_stack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat - measured_flat[:, None, :]
从这里开始,最简单的是:
dist = np.einsum('ijk,ijk->ij', deltas, deltas)
你也可以试试这样的:
from numpy.core.umath_tests import inner1d
dist = inner1d(deltas, deltas)
当然还有SciPy的空间模块cdist
:
from scipy.spatial.distance import cdist
dist = cdist(precomputed_flat, measured_flat, 'euclidean')
编辑 我无法在如此大的数据集上运行测试,但这些时间安排很有启发性:
len_a, len_b = 10000, 1000
a = np.random.rand(2, len_a)
b = np.random.rand(2, len_b)
c = np.random.rand(len_a, 2)
d = np.random.rand(len_b, 2)
In [3]: %timeit a[:, None, :] - b[..., None]
10 loops, best of 3: 76.7 ms per loop
In [4]: %timeit c[:, None, :] - d
1 loops, best of 3: 221 ms per loop
对于上述较小的数据集,我可以使用 scipy.spatial.distance.cdist
稍微加快您的方法,并将其与 inner1d
相匹配,通过排列数据内存不同:
precomputed_flat = np.vstack((svf.flatten(), shf.flatten()))
measured_flat = np.vstack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat[:, None, :] - measured_flat
import scipy.spatial.distance as spdist
from numpy.core.umath_tests import inner1d
In [13]: %timeit r0 = a[0, None, :] - b[0, :, None]; r1 = a[1, None, :] - b[1, :, None]; r0 *= r0; r1 *= r1; r0 += r1
10 loops, best of 3: 146 ms per loop
In [14]: %timeit deltas = (a[:, None, :] - b[..., None]).T; inner1d(deltas, deltas)
10 loops, best of 3: 145 ms per loop
In [15]: %timeit spdist.cdist(a.T, b.T)
10 loops, best of 3: 124 ms per loop
In [16]: %timeit deltas = a[:, None, :] - b[..., None]; np.einsum('ijk,ijk->jk', deltas, deltas)
10 loops, best of 3: 163 ms per loop
关于python - 在 numpy 中计算距离的更有效方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17527340/