"""Convenience functions for filtering and smoothing."""
from __future__ import annotations
import numpy as np
from probnum import problems, randprocs, randvars
from probnum.filtsmooth import gaussian
from probnum.typing import ArrayLike
__all__ = ["filter_kalman", "smooth_rts"]
[docs]def filter_kalman(
observations: ArrayLike,
locations: ArrayLike,
F: ArrayLike,
L: ArrayLike,
H: ArrayLike,
R: ArrayLike,
m0: ArrayLike,
C0: ArrayLike,
prior_model: str = "continuous",
):
r"""Estimate a trajectory with a Kalman filter.
A Kalman filter estimates the unknown trajectory :math:`X`
from a set of observations `Y`.
There is a continuous-discrete and a discrete-discrete version
(describing whether the prior model and measurement model are continuous/discrete).
In a continuous-discrete model, the prior distribution is described by the SDE
.. math:: \text{d}X(t) = F X(t) \text{d}t + L \text{d}W(t)
driven by Wiener process :math:`W` and subject to initial condition
.. math:: X(t_0) \sim N(m_0, C_0).
By default, :math:`t_0` is set to the location of the first observation.
In a discrete-discrete model, the prior distribution
is described by the transition
.. math:: X_{n+1} \,|\, X_n \sim N(F X_n, L)
subject to the same initial condition.
In both cases, the measurement model is
(write :math:`X(t_n)=X_n` in the continuous case)
.. math:: Y_n \,|\, X_n \sim N(H X_n, R)
and the Kalman filter estimates :math:`X` given
:math:`Y_n=y_n`, :math:`Y=[y_1, ..., y_N]`.
Parameters
----------
observations
*(shape=(N, m))* -- A list of noisy observations of the hidden trajectory.
locations
*(shape=(N, ))* -- Time-locations of the observations.
F
*(shape=(n, n))* -- State transition matrix.
Either the drift matrix in an SDE model,
or the transition matrix in a discrete model
(depending on the value of `prior_model`).
L
*(shape=(n, n))* or *(shape=(n, s))* -- Diffusion/dispersion matrix.
Either the dispersion matrix in an SDE model,
or the diffusion matrix in a discrete model
(depending on the value of `prior_model`).
In a continuous model, the matrix has shape (n, s)
for s-dimensional driving Wiener process.
In a discrete model, the matrix has shape (n, n).
H
*(shape=(m, n))* -- Transition matrix of the (discrete) observation model.
R
*(shape=(m, m))* -- Covariance matrix of the observation noise.
m0
*(shape=(n,))* -- Initial mean of the prior model.
C0
*(shape=(n, n))* -- Initial covariance of the prior model.
prior_model
Either discrete (``discrete``) or continuous (``continuous``).
This affects the role of `F` and `L`.
Optional. Default is `continuous`.
Raises
------
ValueError
If `prior_model` is neither ``discrete`` nor ``continuous``.
Returns
-------
gaussian.FilteringPosterior
Filtering distribution as returned by the Kalman filter.
"""
regression_problem = _setup_regression_problem(
H=H, R=R, observations=observations, locations=locations
)
prior_process = _setup_prior_process(
F=F, L=L, m0=m0, C0=C0, t0=locations[0], prior_model=prior_model
)
kalman = gaussian.Kalman(prior_process)
return kalman.filter(regression_problem)[0]
[docs]def smooth_rts(
observations: ArrayLike,
locations: ArrayLike,
F: ArrayLike,
L: ArrayLike,
H: ArrayLike,
R: ArrayLike,
m0: ArrayLike,
C0: ArrayLike,
prior_model: str = "continuous",
):
r"""Estimate a trajectory with a Rauch-Tung-Striebel smoother.
A Rauch-Tung-Striebel smoother estimates the unknown trajectory
:math:`X` from a set of observations `Y`.
There is a continuous-discrete and a discrete-discrete version
(describing whether the prior model and measurement model are continuous/discrete).
In a continuous-discrete model, the prior distribution is described by the SDE
.. math:: \text{d}X(t) = F X(t) \text{d}t + L \text{d}W(t)
driven by Wiener process :math:`W` and subject to initial condition
.. math:: X(t_0) \sim N(m_0, C_0).
By default, :math:`t_0` is set to the location of the first observation.
In a discrete-discrete model, the prior distribution
is described by the transition
.. math:: X_{n+1} \,|\, X_n \sim N(F X_n, L)
subject to the same initial condition.
In both cases, the measurement model is
(write :math:`X(t_n)=X_n` in the continuous case)
.. math:: Y_n \,|\, X_n \sim N(H X_n, R)
and the Rauch-Tung-Striebel smoother estimates
:math:`X` given :math:`Y_n=y_n`, :math:`Y=[y_1, ..., y_N]`.
Parameters
----------
observations
*(shape=(N, m))* -- A list of noisy observations of the hidden trajectory.
locations
*(shape=(N, ))* -- Time-locations of the observations.
F
*(shape=(n, n))* -- State transition matrix.
Either the drift matrix in an SDE model,
or the transition matrix in a discrete model
(depending on the value of `prior_model`).
L
*(shape=(n, n))* or *(shape=(n, s))* -- Diffusion/dispersion matrix.
Either the dispersion matrix in an SDE model,
or the diffusion matrix in a discrete model
(depending on the value of `prior_model`).
In a continuous model, the matrix has shape (n, s)
for s-dimensional driving Wiener process.
In a discrete model, the matrix has shape (n, n).
H
*(shape=(m, n))* -- Transition matrix of the (discrete) observation model.
R
*(shape=(m, m))* -- Covariance matrix of the observation noise.
m0
*(shape=(n,))* -- Initial mean of the prior model.
C0
*(shape=(n, n))* -- Initial covariance of the prior model.
prior_model
Either discrete (``discrete``) or continuous (``continuous``).
This affects the role of `F` and `L`.
Optional. Default is `continuous`.
Raises
------
ValueError
If `prior_model` is neither ``discrete`` nor ``continuous``.
Returns
-------
gaussian.SmoothingPosterior
Smoothing distribution as returned by the Rauch-Tung-Striebel smoother.
"""
regression_problem = _setup_regression_problem(
H=H, R=R, observations=observations, locations=locations
)
prior_process = _setup_prior_process(
F=F, L=L, m0=m0, C0=C0, t0=locations[0], prior_model=prior_model
)
kalman = gaussian.Kalman(prior_process)
return kalman.filtsmooth(regression_problem)[0]
def _setup_prior_process(F, L, m0, C0, t0, prior_model):
zero_shift_prior = np.zeros(F.shape[0])
initrv = randvars.Normal(m0, C0)
initarg = t0
if prior_model == "discrete":
prior = randprocs.markov.discrete.LTIGaussian(
transition_matrix=F,
noise=randvars.Normal(mean=zero_shift_prior, cov=L),
)
return randprocs.markov.MarkovSequence(
transition=prior, initrv=initrv, initarg=initarg
)
if prior_model == "continuous":
prior = randprocs.markov.continuous.LTISDE(
drift_matrix=F, force_vector=zero_shift_prior, dispersion_matrix=L
)
return randprocs.markov.MarkovProcess(
transition=prior, initrv=initrv, initarg=initarg
)
raise ValueError
def _setup_regression_problem(H, R, observations, locations):
zero_shift_mm = np.zeros(H.shape[0])
measmod = randprocs.markov.discrete.LTIGaussian(
transition_matrix=H, noise=randvars.Normal(mean=zero_shift_mm, cov=R)
)
measurement_models = [measmod] * len(locations)
regression_problem = problems.TimeSeriesRegressionProblem(
observations=observations,
locations=locations,
measurement_models=measurement_models,
)
return regression_problem