我正在使用 SciPy 的 2D 插值函数 (scipy.interpolate.interp2d
)。我有存储为 Z = [x, y, p1, p2]
的字段。我想使用二维矩阵 [x, y, :, :]
在每个 x
和 y
点进行插值。 Z
的形状是 (40, 40, 6, 6)
。
我能做到这一点的唯一方法是使用 for 循环并遍历每个点。下面是我正在使用的代码。 a
和 b
是形状为 (6,)
和 (6,)
的数组。
for i in range(40):
for j in range(40):
ff = interpolate.interp2d(a, b, z[i,j,:,:])
zi[i,j] = ff(atest,btest)
有没有一种方法可以不使用 for 循环来做到这一点?
最佳答案
由于您实际上是在相同的 2d 点上插入一堆不同的函数,因此没有内置的方法来执行此操作是合理的。尽管可以省去一些计算,因为对于每个函数,您都有相同的二维网格作为插值的基础,因此期望也不是不可能的。在任何情况下,我都找不到不循环执行此操作的内置方法。
幸运的是,我们可以通过一些努力来实现 bilinear interpolator (就像循环的一样)使用矢量化一次处理所有点。这是一个实现这个的类:
import numpy as np
class VectorizedBilinearInterpolator:
def __init__(self, x0, y0, z0):
"""Create a bilinear interpolator for gridded input data
Inputs:
x0: shape (ngridx,)
y0: shape (ngridy,)
z0: shape batch_shape + (ngridy, ngridx) (viewed as batches)
"""
if z0.shape[-2:] != y0.shape + x0.shape:
raise ValueError("The last two dimensions of z0 must match that of y0 and x0, respectively!")
ind_x = np.argsort(x0)
self.x0 = x0[ind_x]
ind_y = np.argsort(y0)
self.y0 = y0[ind_y]
self.batch_shape = z0.shape[:-2]
indexer = np.ix_(ind_y, ind_x)
self.z0 = z0[..., indexer[0], indexer[1]].reshape(-1, y0.size, x0.size) # shape (nbatch, ngridy, ngridx)
# compute auxiliary coefficients for interpolation
# we have ngridx-1 boxes along x and ngridy-1 boxes along y
# for each box we need 4 coefficients for bilinear interpolation
# see e.g. https://en.wikipedia.org/wiki/Bilinear_interpolation#Alternative_algorithm
# construct a batch of matrices with size (ngridy-1, ngridx-1, 4, 4) to invert
x1 = self.x0[:-1]
x2 = self.x0[1:]
y1 = self.y0[:-1, None]
y2 = self.y0[1:, None]
x1,x2,y1,y2,one = np.broadcast_arrays(x1, x2, y1, y2, 1) # all shaped (ngridy-1, ngridx-1)
M = np.array([[one, x1, y1, x1*y1], [one, x1, y2, x1*y2],
[one, x2, y1, x2*y1], [one, x2, y2, x2*y2]]).transpose(2, 3, 0, 1) # shape (ngridy-1, ngridx-1, 4, 4)
zvec = np.array([self.z0[:, :-1, :-1], self.z0[:, 1:, :-1], self.z0[:, :-1, 1:], self.z0[:, 1:, 1:]]) # shape (4, nbatch, ngridy-1, ngridx-1)
self.coeffs = np.einsum('yxab,bnyx -> nyxa', np.linalg.inv(M), zvec) # shape (nbatch, ngridy-1, ngridx-1, 4) for "a0,a1,a2,a3" coefficients
# for a given box (i,j) the interpolated value is given by self.coeffs[:,i,j,:] @ [1, x, y, x*y]
def __call__(self, x, y):
"""Evaluate the interpolator at the given coordinates
Inputs:
x: shape (noutx,)
y: shape (nouty,)
Output:
z: shape batch_shape + (nouty, noutx) (see __init__)
"""
# identify outliers (and mask at the end)
out_x = (x < self.x0[0]) | (self.x0[-1] < x)
out_y = (y < self.y0[0]) | (self.y0[-1] < y)
# clip outliers, mask later
xbox = (self.x0.searchsorted(x) - 1).clip(0, self.x0.size - 2) # shape (noutx,) indices
ybox = (self.y0.searchsorted(y) - 1).clip(0, self.y0.size - 2) # shape (nouty,) indices
indexer = np.ix_(ybox, xbox)
xgrid,ygrid = np.meshgrid(x, y) # both shape (nouty, noutx)
coeffs_now = self.coeffs[:, indexer[0], indexer[1], :] # shape (nbatch, nouty, noutx, 4)
poly = np.array([np.ones_like(xgrid), xgrid, ygrid, xgrid*ygrid]) # shape (4, nouty, noutx)
values = np.einsum('nyxa,ayx -> nyx', coeffs_now, poly) # shape (nbatch, nouty, noutx)
# reshape final result and mask outliers
z = values.reshape(self.batch_shape + xgrid.shape)
z[..., out_y, :] = np.nan
z[..., :, out_x] = np.nan
return z
这里有一些测试:
from scipy import interpolate # only needed for the loopy comparison
# input points
x0 = np.linspace(0, 1, 6)
y0 = np.linspace(0, 1, 7)
z0 = np.random.rand(40, 41, y0.size, x0.size)
# output points
xtest = np.linspace(0, 1, 20)
ytest = np.linspace(0, 1, 21)
# the original loopy version
def loopy():
zi = np.empty(z0.shape[:2] + (ytest.size, xtest.size))
for i in range(z0.shape[0]):
for j in range(z0.shape[1]):
ff = interpolate.interp2d(x0, y0, z0[i,j,:,:])
zi[i,j] = ff(xtest, ytest)
return zi
# the new, vectorized version
def vector():
f = VectorizedBilinearInterpolator(x0, y0, z0)
zi = f(xtest, ytest)
return zi
首先,我们应该确保两个插值器做同样的事情:
>>> np.allclose(loopy(), vector())
True
其次,由于懒惰,我们可以使用 ipython 的 %timeit
魔法为这个虚拟示例计时:
>>> %timeit loopy()
... %timeit vector()
216 ms ± 2.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
25.9 ms ± 1.08 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
所以至少在这个小例子中,向量化版本快了 8 倍。实际加速在很大程度上取决于实际示例的大小。尽管我已尝试正确实现离群值(即将离群值设置为 np.nan
而不是无意义的外推),但我并未彻底检查所有边缘情况,因此请确保针对真实数据测试实现(通过几个典型案例的循环与新插值器的比较)。
如果您的问题包含真正的问题,即形状为 (40,40,6,6)
的最终数组,那么这些工作都不值得您花时间(考虑到时间非常短运行时),你应该只使用循环。
关于python - 矢量化二维插值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58386891/