Source code for probnum.diffeq.odefiltsmooth.ivpfiltsmooth

"""Gaussian IVP filtering and smoothing."""

from typing import Callable, Optional

import numpy as np

from probnum import filtsmooth, randvars, statespace

from ..ode import IVP
from ..odesolver import ODESolver
from .initialize import (
    initialize_odefilter_with_rk,
    initialize_odefilter_with_taylormode,
)
from .kalman_odesolution import KalmanODESolution


[docs]class GaussianIVPFilter(ODESolver): """ODE solver that uses a Gaussian filter. This is based on continuous-discrete Gaussian filtering. Note: this is specific for IVPs and does not apply without further considerations to, e.g., BVPs. Parameters ---------- ivp Initial value problem to be solved. prior Prior distribution. measurement_model ODE measurement model. with_smoothing To smooth after the solve or not to smooth after the solve. init_implementation Initialization algorithm. Either via Scipy (``initialize_odefilter_with_rk``) or via Taylor-mode AD (``initialize_odefilter_with_taylormode``). For more convenient construction, consider :func:`GaussianIVPFilter.construct_with_rk_init` and :func:`GaussianIVPFilter.construct_with_taylormode_init`. initrv Initial random variable to be used by the filter. Optional. Default is `None`, which amounts to choosing a standard-normal initial RV. As part of `GaussianIVPFilter.initialise()` (called in `GaussianIVPFilter.solve()`), this variable is improved upon with the help of the initialisation algorithm. The influence of this choice on the posterior may vary depending on the initialization strategy, but is almost always weak. """ def __init__( self, ivp: IVP, prior: statespace.Integrator, measurement_model: statespace.DiscreteGaussian, with_smoothing: bool, init_implementation: Callable[ [ Callable, np.ndarray, float, statespace.Integrator, randvars.Normal, Optional[Callable], ], randvars.Normal, ], initrv: Optional[randvars.Normal] = None, ): if initrv is None: initrv = randvars.Normal( np.zeros(prior.dimension), np.eye(prior.dimension), cov_cholesky=np.eye(prior.dimension), ) self.gfilt = filtsmooth.Kalman( dynamics_model=prior, measurement_model=measurement_model, initrv=initrv ) if not isinstance(prior, statespace.Integrator): raise ValueError( "Please initialise a Gaussian filter with an Integrator (see `probnum.statespace`)" ) self.sigma_squared_mle = 1.0 self.with_smoothing = with_smoothing self.init_implementation = init_implementation super().__init__(ivp=ivp, order=prior.ordint)
[docs] @staticmethod def string_to_measurement_model( measurement_model_string, ivp, prior, measurement_noise_covariance=0.0 ): """Construct a measurement model :math:`\\mathcal{N}(g(m), R)` for an ODE. Return a :class:`DiscreteGaussian` (either a :class:`DiscreteEKFComponent` or a `DiscreteUKFComponent`) that provides a tractable approximation of the transition densities based on the local defect of the ODE .. math:: g(m) = H_1 m(t) - f(t, H_0 m(t)) and user-specified measurement noise covariance :math:`R`. Almost always, the measurement noise covariance is zero. Compute either type filter, each with a different interpretation of the Jacobian :math:`J_g`: - EKF0 thinks :math:`J_g(m) = H_1` - EKF1 thinks :math:`J_g(m) = H_1 - J_f(t, H_0 m(t)) H_0^\\top` - UKF thinks: ''What is a Jacobian?'' and uses the unscented transform to compute a tractable approximation of the transition densities. """ measurement_model_string = measurement_model_string.upper() # While "UK" is not available in probsolve_ivp (because it is not recommended) # It is an option in this function here, because there is no obvious reason to restrict # the options in this lower level function. choose_meas_model = { "EK0": filtsmooth.DiscreteEKFComponent.from_ode( ivp, prior=prior, ek0_or_ek1=0, evlvar=measurement_noise_covariance, forward_implementation="sqrt", backward_implementation="sqrt", ), "EK1": filtsmooth.DiscreteEKFComponent.from_ode( ivp, prior=prior, ek0_or_ek1=1, evlvar=measurement_noise_covariance, forward_implementation="sqrt", backward_implementation="sqrt", ), "UK": filtsmooth.DiscreteUKFComponent.from_ode( ivp, prior, evlvar=measurement_noise_covariance, ), } if measurement_model_string not in choose_meas_model.keys(): raise ValueError("Type of measurement model not supported.") return choose_meas_model[measurement_model_string]
# Construct an ODE solver from different initialisation methods. # The reason for implementing these via classmethods is that different # initialisation methods require different parameters.
[docs] @classmethod def construct_with_rk_init( cls, ivp, prior, measurement_model, with_smoothing, initrv=None, init_h0=0.01, init_method="DOP853", ): """Create a Gaussian IVP filter that is initialised via :func:`initialize_odefilter_with_rk`.""" def init_implementation(f, y0, t0, prior, initrv, df=None): return initialize_odefilter_with_rk( f=f, y0=y0, t0=t0, prior=prior, initrv=initrv, df=df, h0=init_h0, method=init_method, ) return cls( ivp, prior, measurement_model, with_smoothing, init_implementation=init_implementation, initrv=initrv, )
[docs] @classmethod def construct_with_taylormode_init( cls, ivp, prior, measurement_model, with_smoothing, initrv=None ): """Create a Gaussian IVP filter that is initialised via :func:`initialize_odefilter_with_taylormode`.""" return cls( ivp, prior, measurement_model, with_smoothing, init_implementation=initialize_odefilter_with_taylormode, initrv=initrv, )
[docs] def initialise(self): initrv = self.init_implementation( self.ivp.rhs, self.ivp.initrv.mean, self.ivp.t0, self.gfilt.dynamics_model, self.gfilt.initrv, self.ivp._jac, ) return self.ivp.t0, initrv
[docs] def step(self, t, t_new, current_rv): """Gaussian IVP filter step as nonlinear Kalman filtering with zero data.""" # Read the diffusion matrix; required for calibration / error estimation discrete_dynamics = self.gfilt.dynamics_model.discretise(t_new - t) proc_noise_cov = discrete_dynamics.proc_noise_cov_mat proc_noise_cov_cholesky = discrete_dynamics.proc_noise_cov_cholesky # 1. Predict pred_rv, _ = self.gfilt.dynamics_model.forward_rv( rv=current_rv, t=t, dt=t_new - t ) # 2. Measure meas_rv, info = self.gfilt.measurement_model.forward_rv( rv=pred_rv, t=t_new, compute_gain=False ) # 3. Estimate the diffusion (sigma squared) self.sigma_squared_mle = self._estimate_diffusion(meas_rv) # 3.1. Adjust the prediction covariance to include the diffusion pred_rv, _ = self.gfilt.dynamics_model.forward_rv( rv=current_rv, t=t, dt=t_new - t, _diffusion=self.sigma_squared_mle ) # 3.2 Update the measurement covariance (measure again) meas_rv, info = self.gfilt.measurement_model.forward_rv( rv=pred_rv, t=t_new, compute_gain=True ) # 4. Update zero_data = np.zeros(meas_rv.mean.shape) filt_rv, _ = self.gfilt.measurement_model.backward_realization( zero_data, pred_rv, rv_forwarded=meas_rv, gain=info["gain"] ) # 5. Error estimate local_errors = self._estimate_local_error( pred_rv, t_new, self.sigma_squared_mle * proc_noise_cov, np.sqrt(self.sigma_squared_mle) * proc_noise_cov_cholesky, ) err = np.linalg.norm(local_errors) return filt_rv, err
[docs] def rvlist_to_odesol(self, times, rvs): """Create an ODESolution object.""" kalman_posterior = filtsmooth.FilteringPosterior( times, rvs, self.gfilt.dynamics_model ) return KalmanODESolution(kalman_posterior)
[docs] def postprocess(self, odesol): """If specified (at initialisation), smooth the filter output.""" if False: # pylint: disable=using-constant-test # will become useful again for time-fixed diffusion models rvs = self._rescale(rvs) if self.with_smoothing is True: odesol = self._odesmooth(ode_solution=odesol) return odesol
def _rescale(self, rvs): """Rescales covariances according to estimate sigma squared value.""" rvs = [randvars.Normal(rv.mean, self.sigma_squared_mle * rv.cov) for rv in rvs] return rvs def _odesmooth(self, ode_solution, **kwargs): """Smooth out the ODE-Filter output. Be careful about the preconditioning: the GaussFiltSmooth object only knows the state space with changed coordinates! Parameters ---------- filter_solution: ODESolution Returns ------- smoothed_solution: ODESolution """ smoothing_posterior = self.gfilt.smooth(ode_solution.kalman_posterior) return KalmanODESolution(smoothing_posterior) def _estimate_local_error( self, pred_rv, t_new, calibrated_proc_noise_cov, calibrated_proc_noise_cov_cholesky, **kwargs ): """Estimate the local errors. This corresponds to the approach in [1], implemented such that it is compatible with the EKF1 and UKF. References ---------- .. [1] Schober, M., Särkkä, S. and Hennig, P.. A probabilistic model for the numerical solution of initial value problems. Statistics and Computing, 2019. """ local_pred_rv = randvars.Normal( pred_rv.mean, calibrated_proc_noise_cov, cov_cholesky=calibrated_proc_noise_cov_cholesky, ) local_meas_rv, _ = self.gfilt.measure(local_pred_rv, t_new) error = local_meas_rv.cov.diagonal() return np.sqrt(np.abs(error)) def _estimate_diffusion(self, meas_rv): """Estimate the dynamic diffusion parameter sigma_squared. This corresponds to the approach in [1], implemented such that it is compatible with the EKF1 and UKF. References ---------- .. [1] Schober, M., Särkkä, S. and Hennig, P.. A probabilistic model for the numerical solution of initial value problems. Statistics and Computing, 2019. """ std_like = meas_rv.cov_cholesky whitened_res = np.linalg.solve(std_like, meas_rv.mean) ssq = whitened_res @ whitened_res / meas_rv.size return ssq @property def prior(self): return self.gfilt.dynamics_model