python - 使用循环(周期性)边界条件计算 numpy 数组中点之间距离的更快代码

标签 python performance numpy scipy distance

我知道如何使用计算数组中点之间的欧几里得距离
scipy.spatial.distance.cdist
类似于这个问题的答案:
Calculate Distances Between One Point in Matrix From All Other Points
但是,我想在假设循环边界条件的情况下进行计算,例如因此,在这种情况下,点 [0,0] 与点 [0,n-1] 的距离为 1,而不是 n-1 的距离。 (然后,我将为目标单元格阈值距离内的所有点制作一个蒙版,但这不是问题的核心)。
我能想到的唯一方法是重复计算 9 次,域索引在 x、y 和 x&y 方向上添加/减去 n,然后堆叠结果并在 9 个切片中找到最小值。为了说明需要 9 次重复,我将一个简单的示意图放在一起,其中只有 1 个 J 点,用圆圈标记,并显示了一个示例,在这种情况下,三角形标记的单元格在域中的最近邻居反射(reflect)为左上角。
enter image description here
这是我使用 cdist 为此开发的代码:

import numpy as np
from scipy import spatial
    
n=5 # size of 2D box (n X n points)
np.random.seed(1) # to make reproducible
a=np.random.uniform(size=(n,n)) 
i=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
j=np.argwhere(a>0.85) # set of J locations to find distance to.

# this will be used in the KDtree soln 
global maxdist
maxdist=2.0

def dist_v1(i,j):
    dist=[]
    # 3x3 search required for periodic boundaries.
    for xoff in [-n,0,n]:
        for yoff in [-n,0,n]:
            jo=j.copy()
            jo[:,0]-=xoff
            jo[:,1]-=yoff
            dist.append(np.amin(spatial.distance.cdist(i,jo,metric='euclidean'),1)) 
    dist=np.amin(np.stack(dist),0).reshape([n,n])
    return(dist)
这有效,并产生例如:
print(dist_v1(i,j))


[[1.41421356 1.         1.41421356 1.41421356 1.        ]
 [2.23606798 2.         1.41421356 1.         1.41421356]
 [2.         2.         1.         0.         1.        ]
 [1.41421356 1.         1.41421356 1.         1.        ]
 [1.         0.         1.         1.         0.        ]]
零显然标记了 J 点,距离是正确的(这个编辑纠正了我之前不正确的尝试)。
请注意,如果您更改最后两行以堆叠原始距离,然后仅使用一个最小值,如下所示:
def dist_v2(i,j):
    dist=[]
    # 3x3 search required for periodic boundaries.
    for xoff in [-n,0,n]:
        for yoff in [-n,0,n]:
            jo=j.copy()
            jo[:,0]-=xoff
            jo[:,1]-=yoff
            dist.append(spatial.distance.cdist(i,jo,metric='euclidean')) 
    dist=np.amin(np.dstack(dist),(1,2)).reshape([n,n])
    return(dist)
对于小 n (<10),它更快,但对于较大的阵列 (n>10) 速度要慢得多
...但无论哪种方式,对于我的大型数组(N = 500 和 J 点数约为 70),它是 ,此搜索占用了大约 99% 的计算时间,(并且使用它也有点难看循环) - 有更好/更快的方法吗?
我想到的其他选择是:
  • scipy.spatial.KDTree.query_ball_point

  • 通过进一步搜索,我发现有一个功能
    scipy.spatial.KDTree.query_ball_point 直接计算我的 J 点半径内的坐标,但它似乎没有任何使用周期性边界的设施,所以我认为仍然需要以某种方式使用 3x3 循环,堆叠然后使用 amin 作为我在上面做,所以我不确定这是否会更快。
    我使用这个函数编写了一个解决方案,而不用担心周期性边界条件(即这不能回答我的问题)
    def dist_v3(n,j):
        x, y = np.mgrid[0:n, 0:n]
        points = np.c_[x.ravel(), y.ravel()]
        tree=spatial.KDTree(points)
        mask=np.zeros([n,n])
        for results in tree.query_ball_point((j), maxdist):
            mask[points[results][:,0],points[results][:,1]]=1
        return(mask)
    
    也许我没有以最有效的方式使用它,但是即使没有周期性边界,这已经和我基于 cdist 的解决方案一样慢。在两个 cdist 解决方案中包括掩码函数,即在这些函数中用 return(dist) 替换 return(np.where(dist<=maxdist,1,0)),然后使用 timeit,我得到以下 n=100 的时序:
    from timeit import timeit
    
    print("cdist v1:",timeit(lambda: dist_v1(i,j), number=3)*100)
    print("cdist v2:",timeit(lambda: dist_v2(i,j), number=3)*100)
    print("KDtree:", timeit(lambda: dist_v3(n,j), number=3)*100)
    
    cdist v1: 181.80927299981704
    cdist v2: 554.8205785999016
    KDtree: 605.119637199823
    
  • 为设置距离 [0,0] 内的点创建一个相对坐标数组,然后手动循环 J 点,使用此相对点列表设置掩码 - 这具有“相对距离”计算的优点只执行一次(我的 J 点每个时间步都会改变),但我怀疑循环会很慢。
  • 为 2D 域中的每个点预先计算一组掩码,因此在模型集成的每个时间步长中,我只需选择 J 点的掩码并应用。这将使用大量内存(与 n^4 成比例)并且可能仍然很慢,因为您需要循环 J 个点以组合掩码。
  • 最佳答案

    [编辑] - 我发现代码跟踪工作完成点的方式有误,用 mask_kernel 修复了它。较新代码的纯 python 版本慢约 1.5 倍,但 numba 版本稍快(由于一些其他优化)。
    [当前最佳:~100x 到 120x 原始速度]
    首先,感谢您提交此问题,我在优化它时获得了很多乐趣!
    我目前的最佳解决方案依赖于网格是规则的并且“源”点(我们需要计算距离的点)大致均匀分布的假设。
    这里的想法是,所有的距离都将是 1、sqrt(2)sqrt(3),...所以我们可以预先进行数值计算。然后我们简单地将这些值放在一个矩阵中,并在每个源点周围复制该矩阵(并确保保留在每个点找到的最小值)。这涵盖了绝大多数点(> 99%)。然后我们对剩下的 1% 应用另一种更“经典”的方法。
    这是代码:

    import numpy as np
    
    def sq_distance(x1, y1, x2, y2, n): 
        # computes the pairwise squared distance between 2 sets of points (with periodicity)
        # x1, y1 : coordinates of the first set of points (source)
        # x2, y2 : same
        dx = np.abs((np.subtract.outer(x1, x2) + n//2)%(n) - n//2)
        dy = np.abs((np.subtract.outer(y1, y2) + n//2)%(n) - n//2)
        d  = (dx*dx + dy*dy)
        return d
    
    def apply_kernel(sources, sqdist, kern_size, n, mask):
        ker_i, ker_j = np.meshgrid(np.arange(-kern_size, kern_size+1), np.arange(-kern_size, kern_size+1), indexing="ij")
        kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
        mask_kernel = kernel > kern_size**2
    
        for pi, pj in sources:
            ind_i = (pi+ker_i)%n
            ind_j = (pj+ker_j)%n
            sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
            mask[ind_i,ind_j] *= mask_kernel
    
    def dist_vf(sources, n, kernel_size):
        sources = np.asfortranarray(sources) #for memory contiguity
    
        kernel_size = min(kernel_size, n//2)
        kernel_size = max(kernel_size, 1)
    
        sqdist = np.full((n,n), 10*n**2, dtype=np.int32) #preallocate with a huge distance (>max**2)
        mask   = np.ones((n,n), dtype=bool)              #which points have not been reached?
    
        #main code
        apply_kernel(sources, sqdist, kernel_size, n, mask) 
    
        #remaining points
        rem_i, rem_j = np.nonzero(mask)
        if len(rem_i) > 0:
            sq_d = sq_distance(sources[:,0], sources[:,1], rem_i, rem_j, n).min(axis=0)
            sqdist[rem_i, rem_j] = sq_d
    
        #eff = 1-rem_i.size/n**2
        #print("covered by kernel :", 100*eff, "%")
        #print("overlap :", sources.shape[0]*(1+2*kernel_size)**2/n**2)
        #print()
    
        return np.sqrt(sqdist)
    
    
    测试这个版本
    n=500  # size of 2D box (n X n points)
    np.random.seed(1) # to make reproducible
    a=np.random.uniform(size=(n,n)) 
    all_points=np.argwhere(a>-1)  # all points, for each loc we want distance to nearest J 
    source_points=np.argwhere(a>1-70/n**2) # set of J locations to find distance to.
    
    #
    # code for dist_v1 and dist_vf
    #
    
    overlap=5.2
    kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
    
    print("cdist v1      :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
    print("kernel version:", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
    
    
    cdist v1      : 1148.6694 ms
    kernel version: 69.21876999999998 ms
    
    这已经是大约 17 倍的加速!我还实现了 sq_distanceapply_kernel 的 numba 版本:[这是新的正确版本]
    @njit(cache=True)
    def sq_distance(x1, y1, x2, y2, n):
        m1 = x1.size
        m2 = x2.size
        n2 = n//2
        d = np.empty((m1,m2), dtype=np.int32)
        for i in range(m1):
            for j in range(m2):
                dx = np.abs(x1[i] - x2[j] + n2)%n - n2
                dy = np.abs(y1[i] - y2[j] + n2)%n - n2
                d[i,j]  = (dx*dx + dy*dy)
        return d
    
    @njit(cache=True)
    def apply_kernel(sources, sqdist, kern_size, n, mask):
        # creating the kernel
        kernel = np.empty((2*kern_size+1, 2*kern_size+1))
        vals = np.arange(-kern_size, kern_size+1)**2
        for i in range(2*kern_size+1):
            for j in range(2*kern_size+1):
                kernel[i,j] = vals[i] + vals[j]
        mask_kernel = kernel > kern_size**2
    
        I = sources[:,0]
        J = sources[:,1]
    
        # applying the kernel for each point
        for l in range(sources.shape[0]):
            pi = I[l]
            pj = J[l]
    
            if pj - kern_size >= 0 and pj + kern_size<n: #if we are in the middle, no need to do the modulo for j
                for i in range(2*kern_size+1):
                    ind_i = np.mod((pi+i-kern_size), n)
                    for j in range(2*kern_size+1):
                        ind_j = (pj+j-kern_size)
                        sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
                        mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]
    
            else:
                for i in range(2*kern_size+1):
                    ind_i = np.mod((pi+i-kern_size), n)
                    for j in range(2*kern_size+1):
                        ind_j = np.mod((pj+j-kern_size), n)
                        sqdist[ind_i,ind_j] = np.minimum(kernel[i,j], sqdist[ind_i,ind_j])
                        mask[ind_i,ind_j] = mask_kernel[i,j] and mask[ind_i,ind_j]
        return
    
    
    并测试
    overlap=5.2
    kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
    
    print("cdist v1                :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
    print("kernel numba (first run):", timeit(lambda: dist_vf(source_points, n, kernel_size), number=1)*1000, "ms") #first run = cimpilation = long
    print("kernel numba            :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
    
    这给出了以下结果
    cdist v1                : 1163.0742 ms
    kernel numba (first run): 2060.0802 ms
    kernel numba            : 8.80377000000001 ms
    
    由于 JIT 编译,第一次运行很慢,但除此之外,它的改进是 120 倍!
    通过调整 kernel_size 参数(或 overlap ),可以从这个算法中获得更多的好处。目前选择的kernel_size只对少量源点有效。例如,这个选择在使用 source_points=np.argwhere(a>0.85) (13s) 时惨遭失败,而手动设置 kernel_size=5 在 22ms 内给出答案。
    我希望我的帖子不要(不必要地)太复杂,我真的不知道如何更好地组织它。
    [编辑 2] :
    我对代码的非 numba 部分给予了更多关注,并设法获得了非常显着的加速,非常接近 numba 可以实现的目标:这是函数 apply_kernel 的新版本:
    def apply_kernel(sources, sqdist, kern_size, n, mask):
        ker_i = np.arange(-kern_size, kern_size+1).reshape((2*kern_size+1,1))
        ker_j = np.arange(-kern_size, kern_size+1).reshape((1,2*kern_size+1))
    
        kernel = np.add.outer(np.arange(-kern_size, kern_size+1)**2, np.arange(-kern_size, kern_size+1)**2)
        mask_kernel = kernel > kern_size**2
    
        for pi, pj in sources:
    
            imin = pi-kern_size
            jmin = pj-kern_size
            imax = pi+kern_size+1
            jmax = pj+kern_size+1
            if imax < n and jmax < n and imin >=0 and jmin >=0: # we are inside
                sqdist[imin:imax,jmin:jmax] = np.minimum(kernel, sqdist[imin:imax,jmin:jmax])
                mask[imin:imax,jmin:jmax] *= mask_kernel
    
            elif imax < n and imin >=0:
                ind_j = (pj+ker_j.ravel())%n
                sqdist[imin:imax,ind_j] = np.minimum(kernel, sqdist[imin:imax,ind_j])
                mask[imin:imax,ind_j] *= mask_kernel
    
            elif jmax < n and jmin >=0:
                ind_i = (pi+ker_i.ravel())%n
                sqdist[ind_i,jmin:jmax] = np.minimum(kernel, sqdist[ind_i,jmin:jmax])
                mask[ind_i,jmin:jmax] *= mask_kernel
    
            else :
                ind_i = (pi+ker_i)%n
                ind_j = (pj+ker_j)%n
                sqdist[ind_i,ind_j] = np.minimum(kernel, sqdist[ind_i,ind_j])
                mask[ind_i,ind_j] *= mask_kernel
    
    主要的优化是
  • 切片索引(而不是密集数组)
  • 使用稀疏索引(我之前怎么没想到)

  • 测试
    overlap=5.4
    kernel_size = int(np.sqrt(overlap*n**2/source_points.shape[0])/2)
    
    print("cdist v1  :", timeit(lambda: dist_v1(all_points,source_points), number=1)*1000, "ms")
    print("kernel v2 :", timeit(lambda: dist_vf(source_points, n, kernel_size), number=10)*100, "ms")
    
    cdist v1  : 1209.8163000000002 ms
    kernel v2 : 11.319049999999997 ms
    
    这比 cdist 提高了 100 倍,比之前仅使用 numpy 的版本提高了约 5.5 倍,仅比我使用 numba 所能达到的速度慢约 25%。

    关于python - 使用循环(周期性)边界条件计算 numpy 数组中点之间距离的更快代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52649815/

    相关文章:

    python - 写入新的 Python 缓冲区接口(interface)

    python - 在 Pandas 中按行而不是按列舍入小数

    Python:如何解包循环数据以消除不连续性?

    python - 在python中更改数组的数据类型

    python - 有效删除不同行之间包含重复元素的行

    python - 是否有不取消引用符号链接(symbolic link)的 os.getcwd() 版本?

    python - 我需要一个使用 python 面板重叠 curses 窗口的例子

    mysql 调整变量 - 当前和默认值

    python - numpy python 中的乘法()

    javascript - 优化慢速搜索算法 - javascript、JSON 和 localstorage