numpy - pykalman:(默认)处理缺失值

标签 numpy missing-data smoothing kalman-filter pykalman

我正在使用 pykalman 模块中的 KalmanFilter,我想知道它如何处理缺失的观察结果。根据文档:

In real world systems, it is common to have sensors occasionally fail. The Kalman Filter, Kalman Smoother, and EM algorithm are all equipped to handle this scenario. To make use of it, one only need apply a NumPy mask to the measurement at the missing time step:

from numpy import ma X = ma.array([1,2,3]) X1 = ma.masked # hide measurement at time step 1 kf.em(X).smooth(X)

我们可以平滑输入时间序列。由于这是一个“附加”功能,我认为它不是自动完成的;那么在变量中包含 NaN 时的默认方法是什么?

这里解释了可能发生的理论方法;这也是 pykalman 所做的吗(在我看来这真的很棒):

Cross Validated - How to handle incomplete data in Kalman Filter?

最佳答案

我们看一下源码:

filter_update 函数中,pykalman 检查当前观察是否被屏蔽。

def filter_update(...)

        # Make a masked observation if necessary
        if observation is None:
            n_dim_obs = observation_covariance.shape[0]
            observation = np.ma.array(np.zeros(n_dim_obs))
            observation.mask = True
        else:
            observation = np.ma.asarray(observation) 

它不影响预测步骤。但校正步骤有两种选择。它发生在 _filter_correct 函数中。

def _filter_correct(...)

    if not np.any(np.ma.getmask(observation)):

         # the normal Kalman Filter math

    else:
        n_dim_state = predicted_state_covariance.shape[0]
        n_dim_obs = observation_matrix.shape[0]
        kalman_gain = np.zeros((n_dim_state, n_dim_obs))

        # !!!! the corrected state takes the result of the prediction !!!!

        corrected_state_mean = predicted_state_mean
        corrected_state_covariance = predicted_state_covariance

如您所见,这正是理论上的方法。

这是一个简短的示例和可供使用的工作数据。

假设您有一个 GPS 接收器,并且您想在行走时跟踪自己。接收机具有良好的精度。为简化起见,假设您只向前走。

pykalman estimation of the position using a gps receiver without masked observations

没有什么有趣的事情发生。由于 GPS 信号良好,过滤器可以很好地估计您的位置。有一段时间没有信号怎么办?

pykalman estimation using masked observations

过滤器只能根据现有状态和系统动力学知识进行预测(见矩阵Q)。随着每个预测步骤的不确定性增加。估计位置周围的 1-Sigma 范围变大。一旦再次出现新的观察,状态就会得到纠正。

这里是代码和 data :

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt
from numpy import ma

# enable or disable missing observations
use_mask = 1

# reading data (quick and dirty)
Time=[]
X=[]

for line in open('data/dataset_01.csv'):
    f1, f2  = line.split(';')
    Time.append(float(f1))
    X.append(float(f2))

if (use_mask):
    X = ma.asarray(X)
    X[300:500] = ma.masked

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  dt,   0.5*dt*dt], 
     [0,   1,          dt],
     [0,   0,           1]]  

# observation_matrix   
H = [1, 0, 0]

# transition_covariance 
Q = [[   1,     0,     0], 
     [   0,  1e-4,     0],
     [   0,     0,  1e-6]] 

# observation_covariance 
R = [0.04] # max error = 0.6m

# initial_state_mean
X0 = [0,
      0,
      0]

# initial_state_covariance
P0 = [[ 10,    0,   0], 
      [  0,    1,   0],
      [  0,    0,   1]]

n_timesteps = len(Time)
n_dim_state = 3

filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R, 
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)


# iterative estimation for each new measurement
for t in range(n_timesteps):
    if t == 0:
        filtered_state_means[t] = X0
        filtered_state_covariances[t] = P0
    else:
        filtered_state_means[t], filtered_state_covariances[t] = (
        kf.filter_update(
            filtered_state_means[t-1],
            filtered_state_covariances[t-1],
            observation = X[t])
        )

position_sigma = np.sqrt(filtered_state_covariances[:, 0, 0]);        

# plot of the resulted trajectory        
plt.plot(Time, filtered_state_means[:, 0], "g-", label="Filtered position", markersize=1)
plt.plot(Time, filtered_state_means[:, 0] + position_sigma, "r--", label="+ sigma", markersize=1)
plt.plot(Time, filtered_state_means[:, 0] - position_sigma, "r--", label="- sigma", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.xlabel("Time (s)")
plt.ylabel("Position (m)")
plt.show()      

更新

如果你掩盖更长的时期(300:700),它看起来会更有趣。

filter position without gps signal

正如你所看到的,位置又回来了。这是因为转移矩阵 F 绑定(bind)了位置、速度和加速度的预测。

如果你看一下速度状态,它会解释下降的位置。

velocity estimation using pykalman and lost gps signal

在 300 秒的时间点,加速度卡住。速度以恒定斜率下降并穿过 0 值。在这个时间点之后,位置必须返回。

要绘制速度,请使用以下代码:

velocity_sigma = np.sqrt(filtered_state_covariances[:, 1, 1]);     

# plot of the estimated velocity        
plt.plot(Time, filtered_state_means[:, 1], "g-", label="Filtered velocity", markersize=1)
plt.plot(Time, filtered_state_means[:, 1] + velocity_sigma, "r--", label="+ sigma", markersize=1)
plt.plot(Time, filtered_state_means[:, 1] - velocity_sigma, "r--", label="- sigma", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.xlabel("Time (s)")
plt.ylabel("Velocity (m/s)")
plt.show()   

关于numpy - pykalman:(默认)处理缺失值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49662567/

相关文章:

iphone - 苹果网站视差效果

ios - 着色/绘图代码。如何以较低的不透明度使其更光滑?去除刷点?

ios - 自从获取照片以来,用户界面有点卡住

python - 重新排列 numpy 数组

python - 按键用先前或后续值填充缺失值

python - 想要将彩色图像附加到列表中并使用 OpenCV 将该列表转换为灰度

julia - 参数错误: quantiles are undefined in presence of NaNs or missing values

python - groupby 内的条件前向填充

python - 从 2D numpy 点数组中获取最小 x 和 y

python - 将 Pandas DF 转换为 Numpy Array 在尝试预测时会出现 # of features 错误?