python - Numpy/Scipy "along a path"中的快速线性插值

标签 python numpy scipy interpolation

假设我有来自山上 3 个(已知)高度的气象站的数据。具体来说,每个站点每分钟都会记录其所在位置的温度测量值。我有两种想要执行的插值。而且我希望能够快速执行每个操作。

所以让我们设置一些数据:

import numpy as np
from scipy.interpolate import interp1d
import pandas as pd
import seaborn as sns

np.random.seed(0)
N, sigma = 1000., 5

basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
alltemps = np.array([basetemps, midtemps, toptemps]).T # note transpose!
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]

altitudes = np.array([500, 1500, 4000]).astype(float)

finaltemps = pd.DataFrame(alltemps + trend, columns=altitudes)
finaltemps.index.names, finaltemps.columns.names = ['Time'], ['Altitude']
finaltemps.plot()

太好了,所以我们的温度是这样的: Raw Temperature Data

所有时间都插值到相同的高度:

我认为这个很简单。假设我想每次获得海拔 1,000 的温度。我可以使用内置的 scipy 插值方法:

interping_function = interp1d(altitudes, finaltemps.values)
interped_to_1000 = interping_function(1000)

fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_to_1000, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)

Temperature with static interp

这很好用。让我们看看速度:

%%timeit
res = interp1d(altitudes, finaltemps.values)(1000)
#-> 1000 loops, best of 3: 207 µs per loop

“沿路径”插值:

所以现在我有第二个相关的问题。假设我知道远足派对的高度是时间的函数,并且我想通过随时间线性插值我的数据来计算他们(移动)位置的温度。 特别是,我知道远足聚会地点的时间与我知道气象站温度的时间相同我也可以做到这一点努力:

location = np.linspace(altitudes[0], altitudes[-1], N)
interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) 
                                             for i, loc in enumerate(location)])

fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_along_path, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)

Temperature with moving interp

所以这非常有效,但重要的是要注意上面的关键行是使用列表推导来隐藏大量工作。在前面的例子中,scipy 正在为我们创建一个插值函数,并在大量数据上对其进行一次评估。在这种情况下,scipy 实际上是在构造 N 个单独的插值函数,并在少量数据上对每个插值函数进行一次评估。这感觉本质上是低效的。这里(在列表理解中)潜伏着一个 for 循环,而且,这感觉很松散。

毫不奇怪,这比前一种情况要慢得多:

%%timeit
res = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) 
                            for i, loc in enumerate(location)])
#-> 10 loops, best of 3: 145 ms per loop

所以第二个例子的运行速度比第一个慢 1,000。 IE。与繁重的工作是“制作线性插值函数”步骤的想法一致……在第二个示例中发生了 1,000 次,但在第一个示例中仅发生了一次。

那么,问题是:有没有更好的方法来解决第二个问题?例如,有没有一种很好的方法来设置二维插值(也许可以处理这种情况哪里知道远足聚会地点的时间不是温度采样的时间)?或者有没有一种特别巧妙的方式来处理时间紧迫的事情?还是其他?

最佳答案

两个值 y1y2 在位置 x1x2 之间关于点的线性插值xi 很简单:

yi = y1 + (y2-y1) * (xi-x1) / (x2-x1)

通过一些向量化的 Numpy 表达式,我们可以从数据集中选择相关点并应用上述函数:

I = np.searchsorted(altitudes, location)

x1 = altitudes[I-1]
x2 = altitudes[I]

time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]

xI = location

yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)

问题是有些点位于已知范围的边界(甚至之外),应该考虑到这一点:

I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode='clip'))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1  # Prevent index-errors

x1 = altitudes[I-1]
x2 = altitudes[I]

time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]

xI = location

yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan

幸运的是,Scipy 已经提供了 ND 插值,这也很容易处理不匹配时间,例如:

from scipy.interpolate import interpn

time = np.arange(len(alltemps))

M = 150
hiketime = np.linspace(time[0], time[-1], M)
location = np.linspace(altitudes[0], altitudes[-1], M)
xI = np.column_stack((hiketime, location))

yI = interpn((time, altitudes), alltemps, xI)

这是一个基准代码(实际上没有任何 pandas,我确实包含了另一个答案中的解决方案):

import numpy as np
from scipy.interpolate import interp1d, interpn

def original():
    return np.array([interp1d(altitudes, alltemps[i, :])(loc)
                                for i, loc in enumerate(location)])

def OP_self_answer():
    return np.diagonal(interp1d(altitudes, alltemps)(location))

def interp_checked():
    I = np.searchsorted(altitudes, location)
    same = (location == altitudes.take(I, mode='clip'))
    out_of_range = ~same & ((I == 0) | (I == altitudes.size))
    I[out_of_range] = 1  # Prevent index-errors

    x1 = altitudes[I-1]
    x2 = altitudes[I]

    time = np.arange(len(alltemps))
    y1 = alltemps[time,I-1]
    y2 = alltemps[time,I]

    xI = location

    yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
    yI[out_of_range] = np.nan

    return yI

def scipy_interpn():
    time = np.arange(len(alltemps))
    xI = np.column_stack((time, location))
    yI = interpn((time, altitudes), alltemps, xI)
    return yI

N, sigma = 1000., 5

basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
alltemps = np.array([basetemps, midtemps, toptemps]).T + trend
altitudes = np.array([500, 1500, 4000], dtype=float)
location = np.linspace(altitudes[0], altitudes[-1], N)

funcs = [original, interp_checked, scipy_interpn]
for func in funcs:
    print(func.func_name)
    %timeit func()

from itertools import combinations
outs = [func() for func in funcs]
print('Output allclose:')
print([np.allclose(out1, out2) for out1, out2 in combinations(outs, 2)])

在我的系统上出现以下结果:

original
10 loops, best of 3: 184 ms per loop
OP_self_answer
10 loops, best of 3: 89.3 ms per loop
interp_checked
1000 loops, best of 3: 224 µs per loop
scipy_interpn
1000 loops, best of 3: 1.36 ms per loop
Output allclose:
[True, True, True, True, True, True]

Scipy 的 interpn 与最快的方法相比在速度方面有所下降,但由于它的通用性和易用性,它绝对是要走的路。

关于python - Numpy/Scipy "along a path"中的快速线性插值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33069366/

相关文章:

python - 将一个或多个初始位置提供给 scipy.optimize.differential_evolution

python - 使用序列创建的 numpy 数组

python - Python 中的二阶导数 - scipy/numpy/pandas

python - kafka-python消费者没有收到消息

python - Selenium : how to get element in shadow root of html page code?

python - 使用列表理解对多个数组执行 cumsum

python - 如何用另一列中的每个值减去一个列值( Pandas )

python - Odoo v10 @onchange(stage_id) 不适用于 <field name ="stage_id"widget ="statusbar"clickable ="True"/>

python - 将复杂的 str 更改为 pandas Dataframe 中的 float

python - 如何创建一个 MxN 的 scipy 矩阵,其中 M 和 N 由用户决定,并且元素依次输入?