Source code for probnum.filtsmooth.gaussian.approx._extendedkalman

"""Gaussian filtering and smoothing based on making intractable quantities tractable
through Taylor-method approximations, e.g. linearization."""

import abc
from typing import Dict, Tuple

from probnum import randprocs, randvars


class EKFComponent(abc.ABC):
    """Interface for extended Kalman filtering components."""

    def __init__(
        self,
        non_linear_model,
    ) -> None:

        self.non_linear_model = non_linear_model

        # Will be constructed later
        self.linearized_model = None

[docs] def forward_realization( self, realization, t, dt=None, compute_gain=False, _diffusion=1.0, _linearise_at=None, ) -> Tuple[randvars.Normal, Dict]: return self._forward_realization_via_forward_rv( realization, t=t, dt=dt, compute_gain=compute_gain, _diffusion=_diffusion, _linearise_at=_linearise_at, )
[docs] def forward_rv( self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, _linearise_at=None, ) -> Tuple[randvars.Normal, Dict]: compute_jacobian_at = _linearise_at if _linearise_at is not None else rv self.linearized_model = self.linearize(at_this_rv=compute_jacobian_at) return self.linearized_model.forward_rv( rv=rv, t=t, dt=dt, compute_gain=compute_gain, _diffusion=_diffusion, )
[docs] def backward_realization( self, realization_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, _linearise_at=None, ): return self._backward_realization_via_backward_rv( realization_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, dt=dt, _diffusion=_diffusion, _linearise_at=_linearise_at, )
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, _linearise_at=None, ): compute_jacobian_at = _linearise_at if _linearise_at is not None else rv self.linearized_model = self.linearize(at_this_rv=compute_jacobian_at) return self.linearized_model.backward_rv( rv_obtained=rv_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, dt=dt, _diffusion=_diffusion, )
[docs] @abc.abstractmethod def linearize( self, at_this_rv: randvars.RandomVariable ) -> randprocs.markov.Transition: """Linearize the transition and make it tractable.""" raise NotImplementedError
# Order of inheritance matters, because forward and backward # are defined in EKFComponent, and must not be inherited from SDE. class ContinuousEKFComponent(EKFComponent, randprocs.markov.continuous.SDE): """Continuous-time extended Kalman filter transition. Parameters ---------- non_linear_model Non-linear continuous-time model (:class:`SDE`) that is approximated with the EKF. mde_atol Absolute tolerance passed to the solver of the moment differential equations (MDEs). Optional. Default is 1e-6. mde_rtol Relative tolerance passed to the solver of the moment differential equations (MDEs). Optional. Default is 1e-6. mde_solver Method that is chosen in `scipy.integrate.solve_ivp`. Any string that is compatible with ``solve_ivp(..., method=mde_solve,...)`` works here. Usual candidates are ``[RK45, LSODA, Radau, BDF, RK23, DOP853]``. Optional. Default is LSODA. """ def __init__( self, non_linear_model, mde_atol=1e-5, mde_rtol=1e-5, mde_solver="RK45", forward_implementation="classic", ) -> None: randprocs.markov.continuous.SDE.__init__( self, state_dimension=non_linear_model.state_dimension, wiener_process_dimension=non_linear_model.wiener_process_dimension, drift_function=non_linear_model.drift_function, dispersion_function=non_linear_model.dispersion_function, drift_jacobian=non_linear_model.drift_jacobian, ) EKFComponent.__init__(self, non_linear_model=non_linear_model) self.mde_atol = mde_atol self.mde_rtol = mde_rtol self.mde_solver = mde_solver self.forward_implementation = forward_implementation
[docs] def linearize(self, at_this_rv: randvars.Normal): """Linearize the drift function with a first order Taylor expansion.""" g = self.non_linear_model.drift_function dg = self.non_linear_model.drift_jacobian l = self.non_linear_model.dispersion_function x0 = at_this_rv.mean def force_vector_function(t): return g(t, x0) - dg(t, x0) @ x0 def drift_matrix_function(t): return dg(t, x0) def dispersion_matrix_function(t): return l(t, x0) return randprocs.markov.continuous.LinearSDE( state_dimension=self.non_linear_model.state_dimension, wiener_process_dimension=self.non_linear_model.wiener_process_dimension, drift_matrix_function=drift_matrix_function, force_vector_function=force_vector_function, dispersion_matrix_function=dispersion_matrix_function, mde_atol=self.mde_atol, mde_rtol=self.mde_rtol, mde_solver=self.mde_solver, forward_implementation=self.forward_implementation, )
class DiscreteEKFComponent(EKFComponent, randprocs.markov.discrete.NonlinearGaussian): """Discrete extended Kalman filter transition.""" def __init__( self, non_linear_model, forward_implementation="classic", backward_implementation="classic", ) -> None: randprocs.markov.discrete.NonlinearGaussian.__init__( self, non_linear_model.input_dim, non_linear_model.output_dim, non_linear_model.state_trans_fun, non_linear_model.proc_noise_cov_mat_fun, non_linear_model.jacob_state_trans_fun, non_linear_model.proc_noise_cov_cholesky_fun, ) EKFComponent.__init__(self, non_linear_model=non_linear_model) self.forward_implementation = forward_implementation self.backward_implementation = backward_implementation
[docs] def linearize(self, at_this_rv: randvars.Normal): """Linearize the dynamics function with a first order Taylor expansion.""" g = self.non_linear_model.state_trans_fun dg = self.non_linear_model.jacob_state_trans_fun x0 = at_this_rv.mean def force_vector_function(t): return g(t, x0) - dg(t, x0) @ x0 def dynamicsmatfun(t): return dg(t, x0) return randprocs.markov.discrete.LinearGaussian( input_dim=self.non_linear_model.input_dim, output_dim=self.non_linear_model.output_dim, state_trans_mat_fun=dynamicsmatfun, shift_vec_fun=force_vector_function, proc_noise_cov_mat_fun=self.non_linear_model.proc_noise_cov_mat_fun, proc_noise_cov_cholesky_fun=self.non_linear_model.proc_noise_cov_cholesky_fun, forward_implementation=self.forward_implementation, backward_implementation=self.backward_implementation, )