python - 矢量化二维插值

标签 python numpy scipy vectorization interpolation

我正在使用 SciPy 的 2D 插值函数 (scipy.interpolate.interp2d)。我有存储为 Z = [x, y, p1, p2] 的字段。我想使用二维矩阵 [x, y, :, :] 在每个 xy 点进行插值。 Z 的形状是 (40, 40, 6, 6)

我能做到这一点的唯一方法是使用 for 循环并遍历每个点。下面是我正在使用的代码。 ab 是形状为 (6,)(6,) 的数组。

for i in range(40):
    for j in range(40):
        ff = interpolate.interp2d(a, b, z[i,j,:,:]) 
        zi[i,j] = ff(atest,btest)

有没有一种方法可以不使用 for 循环来做到这一点?

最佳答案

由于您实际上是在相同的 2d 点上插入一堆不同的函数,因此没有内置的方法来执行此操作是合理的。尽管可以省去一些计算,因为对于每个函数,您都有相同的二维网格作为插值的基础,因此期望也不是不可能的。在任何情况下,我都找不到不循环执行此操作的内置方法。

幸运的是,我们可以通过一些努力来实现 bilinear interpolator (就像循环的一样)使用矢量化一次处理所有点。这是一个实现这个的类:

import numpy as np 

class VectorizedBilinearInterpolator: 
    def __init__(self, x0, y0, z0): 
        """Create a bilinear interpolator for gridded input data 

        Inputs: 
            x0: shape (ngridx,)
            y0: shape (ngridy,)
            z0: shape batch_shape + (ngridy, ngridx) (viewed as batches) 
        """

        if z0.shape[-2:] != y0.shape + x0.shape: 
            raise ValueError("The last two dimensions of z0 must match that of y0 and x0, respectively!") 

        ind_x = np.argsort(x0) 
        self.x0 = x0[ind_x] 

        ind_y = np.argsort(y0) 
        self.y0 = y0[ind_y] 

        self.batch_shape = z0.shape[:-2] 
        indexer = np.ix_(ind_y, ind_x) 
        self.z0 = z0[..., indexer[0], indexer[1]].reshape(-1, y0.size, x0.size)  # shape (nbatch, ngridy, ngridx)

        # compute auxiliary coefficients for interpolation 
        # we have ngridx-1 boxes along x and ngridy-1 boxes along y
        # for each box we need 4 coefficients for bilinear interpolation
        # see e.g. https://en.wikipedia.org/wiki/Bilinear_interpolation#Alternative_algorithm
        # construct a batch of matrices with size (ngridy-1, ngridx-1, 4, 4) to invert
        x1 = self.x0[:-1]
        x2 = self.x0[1:]
        y1 = self.y0[:-1, None]
        y2 = self.y0[1:, None]
        x1,x2,y1,y2,one = np.broadcast_arrays(x1, x2, y1, y2, 1)  # all shaped (ngridy-1, ngridx-1)

        M = np.array([[one, x1, y1, x1*y1], [one, x1, y2, x1*y2],
                      [one, x2, y1, x2*y1], [one, x2, y2, x2*y2]]).transpose(2, 3, 0, 1)  # shape (ngridy-1, ngridx-1, 4, 4)
        zvec = np.array([self.z0[:, :-1, :-1], self.z0[:, 1:, :-1], self.z0[:, :-1, 1:], self.z0[:, 1:, 1:]])  # shape (4, nbatch, ngridy-1, ngridx-1)

        self.coeffs = np.einsum('yxab,bnyx -> nyxa', np.linalg.inv(M), zvec)  # shape (nbatch, ngridy-1, ngridx-1, 4) for "a0,a1,a2,a3" coefficients
        # for a given box (i,j) the interpolated value is given by self.coeffs[:,i,j,:] @ [1, x, y, x*y]

    def __call__(self, x, y): 
        """Evaluate the interpolator at the given coordinates 

        Inputs: 
            x: shape (noutx,) 
            y: shape (nouty,) 

        Output: 
            z: shape batch_shape + (nouty, noutx) (see __init__) 
        """

        # identify outliers (and mask at the end)
        out_x = (x < self.x0[0]) | (self.x0[-1] < x)
        out_y = (y < self.y0[0]) | (self.y0[-1] < y)

        # clip outliers, mask later
        xbox = (self.x0.searchsorted(x) - 1).clip(0, self.x0.size - 2)  # shape (noutx,) indices
        ybox = (self.y0.searchsorted(y) - 1).clip(0, self.y0.size - 2)  # shape (nouty,) indices
        indexer = np.ix_(ybox, xbox)

        xgrid,ygrid = np.meshgrid(x, y)  # both shape (nouty, noutx)

        coeffs_now = self.coeffs[:, indexer[0], indexer[1], :]  # shape (nbatch, nouty, noutx, 4)
        poly = np.array([np.ones_like(xgrid), xgrid, ygrid, xgrid*ygrid])  # shape (4, nouty, noutx)
        values =  np.einsum('nyxa,ayx -> nyx', coeffs_now, poly)  # shape (nbatch, nouty, noutx)

        # reshape final result and mask outliers
        z = values.reshape(self.batch_shape + xgrid.shape)
        z[..., out_y, :] = np.nan
        z[..., :, out_x] = np.nan

        return z

这里有一些测试:

from scipy import interpolate  # only needed for the loopy comparison

# input points
x0 = np.linspace(0, 1, 6)
y0 = np.linspace(0, 1, 7)
z0 = np.random.rand(40, 41, y0.size, x0.size)

# output points
xtest = np.linspace(0, 1, 20)
ytest = np.linspace(0, 1, 21)

# the original loopy version
def loopy():
    zi = np.empty(z0.shape[:2] + (ytest.size, xtest.size))
    for i in range(z0.shape[0]):
        for j in range(z0.shape[1]):
            ff = interpolate.interp2d(x0, y0, z0[i,j,:,:])
            zi[i,j] = ff(xtest, ytest)
    return zi

# the new, vectorized version
def vector():
    f = VectorizedBilinearInterpolator(x0, y0, z0)
    zi = f(xtest, ytest)
    return zi

首先,我们应该确保两个插值器做同样的事情:

>>> np.allclose(loopy(), vector())
True

其次,由于懒惰,我们可以使用 ipython 的 %timeit 魔法为这个虚拟示例计时:

>>> %timeit loopy()
... %timeit vector()
216 ms ± 2.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
25.9 ms ± 1.08 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

所以至少在这个小例子中,向量化版本快了 8 倍。实际加速在很大程度上取决于实际示例的大小。尽管我已尝试正确实现离群值(即将离群值设置为 np.nan 而不是无意义的外推),但我并未彻底检查所有边缘情况,因此请确保针对真实数据测试实现(通过几个典型案例的循环与新插值器的比较)。


如果您的问题包含真正的问题,即形状为 (40,40,6,6) 的最终数组,那么这些工作都不值得您花时间(考虑到时间非常短运行时),你应该只使用循环。

关于python - 矢量化二维插值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58386891/

相关文章:

python - POSTGRES - 具有多个连接的高效 SELECT 或 INSERT

python-3.x - Python 中的缺失值插补

python - Numpy:如何查询多个多维数组?

python - 如何从 Py_CompileString 打印出错误(包括语法)?

python - scipy,对数正态分布 - 参数

python - 如何在缺失值 NaN 的条件下将两个数据帧(相同索引)中的值平均到一个 df 中?

python - 了解 scipy.optimize.basinhopping 的输出

Python:获取分割图中每个簇的边界框坐标(2D numpy 数组)

python - 如何绘制堆叠和归一化直方图?

python - Scipy 稀疏行/列点积