"""General Gaussian filters based on approximating intractable quantities with numerical
quadrature.
Examples include the unscented Kalman filter / RTS smoother which is
based on a third degree fully symmetric rule.
"""
from typing import Dict, Optional, Tuple
import numpy as np
from probnum import randvars, statespace
from probnum.type import FloatArgType
from .unscentedtransform import UnscentedTransform
[docs]class UKFComponent:
"""Interface for unscented Kalman filtering components."""
def __init__(
self,
non_linear_model,
spread: Optional[FloatArgType] = 1e-4,
priorpar: Optional[FloatArgType] = 2.0,
special_scale: Optional[FloatArgType] = 0.0,
) -> None:
self.non_linear_model = non_linear_model
self.ut = UnscentedTransform(
non_linear_model.input_dim, spread, priorpar, special_scale
)
# Determine the linearization.
# Will be constructed later.
self.sigma_points = None
[docs] def assemble_sigma_points(self, at_this_rv: randvars.Normal) -> np.ndarray:
"""Assemble the sigma-points."""
return self.ut.sigma_points(at_this_rv)
[docs]class ContinuousUKFComponent(UKFComponent, statespace.SDE):
"""Continuous-time unscented Kalman filter transition.
Parameters
----------
non_linear_model
Non-linear continuous-time model (:class:`SDE`) that is approximated with the UKF.
mde_atol
Absolute tolerance passed to the solver of the moment differential equations (MDEs). Optional. Default is 1e-6.
mde_rtol
Relative tolerance passed to the solver of the moment differential equations (MDEs). Optional. Default is 1e-6.
mde_solver
Method that is chosen in `scipy.integrate.solve_ivp`. Any string that is compatible with ``solve_ivp(..., method=mde_solve,...)`` works here.
Usual candidates are ``[RK45, LSODA, Radau, BDF, RK23, DOP853]``. Optional. Default is LSODA.
"""
def __init__(
self,
non_linear_model,
spread: Optional[FloatArgType] = 1e-4,
priorpar: Optional[FloatArgType] = 2.0,
special_scale: Optional[FloatArgType] = 0.0,
mde_atol: Optional[FloatArgType] = 1e-6,
mde_rtol: Optional[FloatArgType] = 1e-6,
mde_solver: Optional[str] = "LSODA",
) -> None:
UKFComponent.__init__(
self,
non_linear_model,
spread=spread,
priorpar=priorpar,
special_scale=special_scale,
)
statespace.SDE.__init__(
self,
non_linear_model.dimension,
non_linear_model.driftfun,
non_linear_model.dispmatfun,
non_linear_model.jacobfun,
)
self.mde_atol = mde_atol
self.mde_rtol = mde_rtol
self.mde_solver = mde_solver
raise NotImplementedError(
"Implementation of the continuous UKF is incomplete. It cannot be used."
)
[docs] def forward_realization(
self,
realization,
t,
dt=None,
compute_gain=False,
_diffusion=1.0,
_linearise_at=None,
) -> Tuple[randvars.Normal, Dict]:
return self._forward_realization_as_rv(
realization,
t=t,
dt=dt,
compute_gain=compute_gain,
_diffusion=_diffusion,
_linearise_at=_linearise_at,
)
[docs] def forward_rv(
self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, _linearise_at=None
) -> Tuple[randvars.Normal, Dict]:
raise NotImplementedError("TODO") # Issue #234
[docs] def backward_realization(
self,
realization_obtained,
rv,
rv_forwarded=None,
gain=None,
t=None,
dt=None,
_diffusion=1.0,
_linearise_at=None,
):
return self._backward_realization_as_rv(
realization_obtained,
rv=rv,
rv_forwarded=rv_forwarded,
gain=gain,
t=t,
dt=dt,
_diffusion=_diffusion,
_linearise_at=_linearise_at,
)
[docs] def backward_rv(
self,
rv_obtained,
rv,
rv_forwarded=None,
gain=None,
t=None,
dt=None,
_diffusion=1.0,
_linearise_at=None,
):
raise NotImplementedError("Not available (yet).")
[docs]class DiscreteUKFComponent(UKFComponent, statespace.DiscreteGaussian):
"""Discrete unscented Kalman filter transition."""
def __init__(
self,
non_linear_model,
spread: Optional[FloatArgType] = 1e-4,
priorpar: Optional[FloatArgType] = 2.0,
special_scale: Optional[FloatArgType] = 0.0,
) -> None:
UKFComponent.__init__(
self,
non_linear_model,
spread=spread,
priorpar=priorpar,
special_scale=special_scale,
)
statespace.DiscreteGaussian.__init__(
self,
non_linear_model.input_dim,
non_linear_model.output_dim,
non_linear_model.state_trans_fun,
non_linear_model.proc_noise_cov_mat_fun,
non_linear_model.jacob_state_trans_fun,
non_linear_model.proc_noise_cov_cholesky_fun,
)
[docs] def forward_rv(
self, rv, t, compute_gain=False, _diffusion=1.0, _linearise_at=None, **kwargs
) -> Tuple[randvars.Normal, Dict]:
compute_sigmapts_at = _linearise_at if _linearise_at is not None else rv
self.sigma_points = self.assemble_sigma_points(at_this_rv=compute_sigmapts_at)
proppts = self.ut.propagate(
t, self.sigma_points, self.non_linear_model.state_trans_fun
)
meascov = _diffusion * self.non_linear_model.proc_noise_cov_mat_fun(t)
mean, cov, crosscov = self.ut.estimate_statistics(
proppts, self.sigma_points, meascov, rv.mean
)
info = {"crosscov": crosscov}
if compute_gain:
gain = crosscov @ np.linalg.inv(cov)
info["gain"] = gain
return randvars.Normal(mean, cov), info
[docs] def forward_realization(
self, realization, t, _diffusion=1.0, _linearise_at=None, **kwargs
):
return self._forward_realization_via_forward_rv(
realization,
t=t,
compute_gain=False,
_diffusion=_diffusion,
_linearise_at=_linearise_at,
)
[docs] def backward_rv(
self,
rv_obtained,
rv,
rv_forwarded=None,
gain=None,
t=None,
_diffusion=1.0,
_linearise_at=None,
**kwargs
):
# this method is inherited from DiscreteGaussian.
return self._backward_rv_classic(
rv_obtained,
rv,
rv_forwarded,
gain=gain,
t=t,
_diffusion=_diffusion,
_linearise_at=None,
)
[docs] def backward_realization(
self,
realization_obtained,
rv,
rv_forwarded=None,
gain=None,
t=None,
_diffusion=1.0,
_linearise_at=None,
**kwargs
):
# this method is inherited from DiscreteGaussian.
return self._backward_realization_via_backward_rv(
realization_obtained,
rv,
rv_forwarded,
gain=gain,
t=t,
_diffusion=_diffusion,
_linearise_at=_linearise_at,
)
@property
def dimension(self) -> int:
return self.ut.dimension
[docs] @classmethod
def from_ode(
cls,
ode,
prior,
evlvar=0.0,
):
discrete_model = statespace.DiscreteGaussian.from_ode(
ode=ode, prior=prior, evlvar=evlvar
)
return cls(discrete_model)