Source code for probnum.filtsmooth.gaussfiltsmooth.unscentedkalman

"""
General Gaussian filters based on approximating intractable
quantities with numerical quadrature.
Examples include the unscented Kalman filter / RTS smoother
which is based on a third degree fully symmetric rule.
"""

import numpy as np

from probnum.filtsmooth.gaussfiltsmooth.gaussfiltsmooth import (
    GaussFiltSmooth,
    linear_discrete_update,
)
from probnum.filtsmooth.gaussfiltsmooth.unscentedtransform import UnscentedTransform
from probnum.filtsmooth.statespace import (
    ContinuousModel,
    DiscreteGaussianLinearModel,
    DiscreteGaussianModel,
    DiscreteModel,
    LinearSDEModel,
)
from probnum.random_variables import Normal


[docs]class UnscentedKalman(GaussFiltSmooth): """ Factory method for Unscented Kalman filters. """ def __new__(cls, dynamod, measmod, initrv, alpha, beta, kappa, **kwargs): if cls is UnscentedKalman: if _cont_disc(dynamod, measmod): return _ContDiscUnscentedKalman( dynamod, measmod, initrv, alpha, beta, kappa, **kwargs ) if _disc_disc(dynamod, measmod): return _DiscDiscUnscentedKalman( dynamod, measmod, initrv, alpha, beta, kappa, **kwargs ) else: errmsg = ( "Cannot instantiate Unscented Kalman filter with " "given dynamic model and measurement model." ) raise ValueError(errmsg) else: return super().__new__(cls)
def _cont_disc(dynamod, measmod): dyna_is_cont = issubclass(type(dynamod), ContinuousModel) meas_is_disc = issubclass(type(measmod), DiscreteModel) return dyna_is_cont and meas_is_disc def _disc_disc(dynamod, measmod): dyna_is_disc = issubclass(type(dynamod), DiscreteModel) meas_is_disc = issubclass(type(measmod), DiscreteModel) return dyna_is_disc and meas_is_disc class _ContDiscUnscentedKalman(UnscentedKalman): def __init__(self, dynamod, measmod, initrv, alpha, beta, kappa, **kwargs): if not issubclass(type(dynamod), LinearSDEModel): raise ValueError( "This implementation of " "_ContDiscUnscentedKalman " "requires a linear dynamic model." ) if not issubclass(type(measmod), DiscreteGaussianModel): raise ValueError( "_DiscDiscUnscentedKalman requires " "a Gaussian measurement model." ) if "cke_nsteps" in kwargs.keys(): self.cke_nsteps = kwargs["cke_nsteps"] else: self.cke_nsteps = 1 super().__init__(dynamod, measmod, initrv) self.ut = UnscentedTransform(self.dynamod.dimension, alpha, beta, kappa) def predict(self, start, stop, randvar, **kwargs): step = (stop - start) / self.cke_nsteps return self.dynamicmodel.transition_rv( rv=randvar, start=start, stop=stop, step=step, **kwargs ) def update(self, time, randvar, data, **kwargs): return _discrete_unskalman_update( time, randvar, data, self.measmod, self.ut, **kwargs ) class _DiscDiscUnscentedKalman(UnscentedKalman): def __init__(self, dynamod, measmod, initrv, alpha, beta, kappa, **kwargs): """ Checks that dynamod and measmod are linear and moves on. """ if not issubclass(type(dynamod), DiscreteGaussianModel): raise ValueError( "_DiscDiscUnscentedKalman requires " "a Gaussian dynamic model." ) if not issubclass(type(measmod), DiscreteGaussianModel): raise ValueError( "_DiscDiscUnscentedKalman requires " "a Gaussian measurement model." ) super().__init__(dynamod, measmod, initrv) self.ut = UnscentedTransform(self.dynamod.dimension, alpha, beta, kappa) def predict(self, start, stop, randvar, **kwargs): if issubclass(type(self.dynamod), DiscreteGaussianLinearModel): return self._predict_linear(start, randvar, **kwargs) else: return self._predict_nonlinear(start, randvar, **kwargs) def _predict_linear(self, start, randvar, **kwargs): """ Basic Kalman update because model is linear. """ mean, covar = randvar.mean, randvar.cov if np.isscalar(mean) and np.isscalar(covar): mean, covar = mean * np.ones(1), covar * np.eye(1) dynamat = self.dynamod.dynamicsmatrix(start, **kwargs) forcevec = self.dynamod.forcevector(start, **kwargs) diffmat = self.dynamod.diffusionmatrix(start, **kwargs) mpred = dynamat @ mean + forcevec crosscov = covar @ dynamat.T cpred = dynamat @ crosscov + diffmat return Normal(mpred, cpred), {"crosscov": crosscov} def _predict_nonlinear(self, start, randvar, **kwargs): """ Executes unscented transform! """ mean, covar = randvar.mean, randvar.cov if np.isscalar(mean) and np.isscalar(covar): mean, covar = mean * np.ones(1), covar * np.eye(1) sigmapts = self.ut.sigma_points(mean, covar) proppts = self.ut.propagate(start, sigmapts, self.dynamod.dynamics) diffmat = self.dynamod.diffusionmatrix(start, **kwargs) mpred, cpred, crosscov = self.ut.estimate_statistics( proppts, sigmapts, diffmat, mean ) return Normal(mpred, cpred), {"crosscov": crosscov} def update(self, time, randvar, data, **kwargs): return _discrete_unskalman_update( time, randvar, data, self.measmod, self.ut, **kwargs ) def _discrete_unskalman_update(time, randvar, data, measmod, ut, **kwargs): if issubclass(type(measmod), DiscreteGaussianLinearModel): return _update_discrete_linear(time, randvar, data, measmod, **kwargs) else: return _update_discrete_nonlinear(time, randvar, data, measmod, ut, **kwargs) def _update_discrete_linear(time, randvar, data, measmod, **kwargs): mpred, cpred = randvar.mean, randvar.cov if np.isscalar(mpred) and np.isscalar(cpred): mpred, cpred = mpred * np.ones(1), cpred * np.eye(1) measmat = measmod.dynamicsmatrix(time, **kwargs) meascov = measmod.diffusionmatrix(time, **kwargs) meanest = measmat @ mpred return linear_discrete_update(meanest, cpred, data, meascov, measmat, mpred) def _update_discrete_nonlinear(time, randvar, data, measmod, ut, **kwargs): mpred, cpred = randvar.mean, randvar.cov if np.isscalar(mpred) and np.isscalar(cpred): mpred, cpred = mpred * np.ones(1), cpred * np.eye(1) sigmapts = ut.sigma_points(mpred, cpred) proppts = ut.propagate(time, sigmapts, measmod.dynamics) meascov = measmod.diffusionmatrix(time, **kwargs) meanest, covest, ccest = ut.estimate_statistics(proppts, sigmapts, meascov, mpred) mean = mpred + ccest @ np.linalg.solve(covest, data - meanest) cov = cpred - ccest @ np.linalg.solve(covest.T, ccest.T) return Normal(mean, cov), covest, ccest, meanest