Source code for probnum.statespace.discrete_transition

"""Discrete transitions."""
import typing
from typing import Callable, Optional

import numpy as np
import scipy.linalg

from probnum import randvars
from probnum.type import FloatArgType, IntArgType
from probnum.utils.linalg import cholesky_update, tril_to_positive_tril

from . import transition as trans
from .discrete_transition_utils import condition_state_on_rv

try:
    # functools.cached_property is only available in Python >=3.8
    from functools import cached_property, lru_cache
except ImportError:
    from functools import lru_cache

    from cached_property import cached_property

try:
    # functools.cached_property is only available in Python >=3.8
    from functools import cached_property, lru_cache
except ImportError:
    from functools import lru_cache

    from cached_property import cached_property


class DiscreteGaussian(trans.Transition):
    r"""Discrete transitions with additive Gaussian noise.

    .. math:: x_{i+1} \sim \mathcal{N}(g(t_i, x_i), S(t_i))

    for some (potentially non-linear) dynamics :math:`g: \mathbb{R}^m \rightarrow \mathbb{R}^n` and process noise covariance matrix :math:`S`.

    Parameters
    ----------
    input_dim
        Dimension of the support of :math:`g` (in terms of :math:`x`), i.e. the input dimension.
    output_dim
        Dimension of the image of :math:`g`, i.e. the output dimension.
    state_trans_fun :
        State transition function :math:`g=g(t, x)`. Signature: ``state_trans_fun(t, x)``.
    proc_noise_cov_mat_fun :
        Process noise covariance matrix function :math:`S=S(t)`. Signature: ``proc_noise_cov_mat_fun(t)``.
    jacob_state_trans_fun :
        Jacobian of the state transition function :math:`g` (with respect to :math:`x`), :math:`Jg=Jg(t, x)`.
        Signature: ``jacob_state_trans_fun(t, x)``.
    proc_noise_cov_cholesky_fun :
        Cholesky factor of the process noise covariance matrix function :math:`\sqrt{S}=\sqrt{S}(t)`. Signature: ``proc_noise_cov_cholesky_fun(t)``.


    See Also
    --------
    :class:`DiscreteModel`
    :class:`DiscreteGaussianLinearModel`
    """

    def __init__(
        self,
        input_dim: IntArgType,
        output_dim: IntArgType,
        state_trans_fun: Callable[[FloatArgType, np.ndarray], np.ndarray],
        proc_noise_cov_mat_fun: Callable[[FloatArgType], np.ndarray],
        jacob_state_trans_fun: Optional[
            Callable[[FloatArgType, np.ndarray], np.ndarray]
        ] = None,
        proc_noise_cov_cholesky_fun: Optional[
            Callable[[FloatArgType], np.ndarray]
        ] = None,
    ):
        self.state_trans_fun = state_trans_fun
        self.proc_noise_cov_mat_fun = proc_noise_cov_mat_fun

        # "Private", bc. if None, overwritten by the property with the same name
        self._proc_noise_cov_cholesky_fun = proc_noise_cov_cholesky_fun

        def dummy_if_no_jacobian(t, x):
            raise NotImplementedError

        self.jacob_state_trans_fun = (
            jacob_state_trans_fun
            if jacob_state_trans_fun is not None
            else dummy_if_no_jacobian
        )
        super().__init__(input_dim=input_dim, output_dim=output_dim)

[docs] def forward_realization( self, realization, t, compute_gain=False, _diffusion=1.0, **kwargs ): newmean = self.state_trans_fun(t, realization) newcov = _diffusion * self.proc_noise_cov_mat_fun(t) return randvars.Normal(newmean, newcov), {}
[docs] def forward_rv(self, rv, t, compute_gain=False, _diffusion=1.0, **kwargs): raise NotImplementedError("Not available")
[docs] def backward_realization( self, realization_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available")
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=1.0, **kwargs, ): # Should we use the _backward_rv_classic here? # It is only intractable bc. forward_rv is intractable # and assuming its forward formula would yield a valid # gain, the backward formula would be valid. # This is the case for the UKF, for instance. raise NotImplementedError("Not available")
# Implementations that are the same for all sorts of # discrete Gaussian transitions, in particular shared # by LinearDiscreteGaussian and e.g. DiscreteUKFComponent. def _backward_rv_classic( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=None, _linearise_at=None, ): if rv_forwarded is None or gain is None: rv_forwarded, info = self.forward_rv( rv, t=t, compute_gain=True, _diffusion=_diffusion, _linearise_at=_linearise_at, ) gain = info["gain"] return condition_state_on_rv(rv_obtained, rv_forwarded, rv, gain), {}
[docs] @lru_cache(maxsize=None) def proc_noise_cov_cholesky_fun(self, t): if self._proc_noise_cov_cholesky_fun is not None: return self._proc_noise_cov_cholesky_fun(t) covmat = self.proc_noise_cov_mat_fun(t) return np.linalg.cholesky(covmat)
[docs] @classmethod def from_ode( cls, ode, prior, evlvar=0.0, ): spatialdim = prior.spatialdim h0 = prior.proj2coord(coord=0) h1 = prior.proj2coord(coord=1) def dyna(t, x): return h1 @ x - ode.rhs(t, h0 @ x) def diff(t): return evlvar * np.eye(spatialdim) def diff_cholesky(t): return np.sqrt(evlvar) * np.eye(spatialdim) def jacobian(t, x): return h1 - ode.jacobian(t, h0 @ x) @ h0 return cls( input_dim=prior.dimension, output_dim=prior.spatialdim, state_trans_fun=dyna, jacob_state_trans_fun=jacobian, proc_noise_cov_mat_fun=diff, proc_noise_cov_cholesky_fun=diff_cholesky, )
class DiscreteLinearGaussian(DiscreteGaussian): """Discrete, linear Gaussian transition models of the form. .. math:: x_{i+1} \\sim \\mathcal{N}(G(t_i) x_i + v(t_i), S(t_i)) for some dynamics matrix :math:`G=G(t)`, force vector :math:`v=v(t)`, and diffusion matrix :math:`S=S(t)`. Parameters ---------- state_trans_mat_fun : callable State transition matrix function :math:`G=G(t)`. Signature: ``state_trans_mat_fun(t)``. shift_vec_fun : callable Shift vector function :math:`v=v(t)`. Signature: ``shift_vec_fun(t)``. proc_noise_cov_mat_fun : callable Process noise covariance matrix function :math:`S=S(t)`. Signature: ``proc_noise_cov_mat_fun(t)``. See Also -------- :class:`DiscreteModel` :class:`DiscreteGaussianLinearModel` """ def __init__( self, input_dim: IntArgType, output_dim: IntArgType, state_trans_mat_fun: Callable[[FloatArgType], np.ndarray], shift_vec_fun: Callable[[FloatArgType], np.ndarray], proc_noise_cov_mat_fun: Callable[[FloatArgType], np.ndarray], proc_noise_cov_cholesky_fun: Optional[ Callable[[FloatArgType], np.ndarray] ] = None, forward_implementation="classic", backward_implementation="classic", ): # Choose implementation for forward and backward transitions choose_forward_implementation = { "classic": self._forward_rv_classic, "sqrt": self._forward_rv_sqrt, } choose_backward_implementation = { "classic": self._backward_rv_classic, "sqrt": self._backward_rv_sqrt, "joseph": self._backward_rv_joseph, } self._forward_implementation = choose_forward_implementation[ forward_implementation ] self._backward_implementation = choose_backward_implementation[ backward_implementation ] self.state_trans_mat_fun = state_trans_mat_fun self.shift_vec_fun = shift_vec_fun super().__init__( input_dim=input_dim, output_dim=output_dim, state_trans_fun=lambda t, x: ( self.state_trans_mat_fun(t) @ x + self.shift_vec_fun(t) ), proc_noise_cov_mat_fun=proc_noise_cov_mat_fun, proc_noise_cov_cholesky_fun=proc_noise_cov_cholesky_fun, jacob_state_trans_fun=lambda t, x: state_trans_mat_fun(t), )
[docs] def forward_rv(self, rv, t, compute_gain=False, _diffusion=1.0, **kwargs): return self._forward_implementation( rv=rv, t=t, compute_gain=compute_gain, _diffusion=_diffusion, )
[docs] def forward_realization(self, realization, t, _diffusion=1.0, **kwargs): return self._forward_realization_via_forward_rv( realization, t=t, compute_gain=False, _diffusion=_diffusion )
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=1.0, **kwargs, ): return self._backward_implementation( rv_obtained=rv_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, _diffusion=_diffusion, )
[docs] def backward_realization( self, realization_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=1.0, **kwargs, ): return self._backward_realization_via_backward_rv( realization_obtained, rv, rv_forwarded=rv_forwarded, gain=gain, t=t, _diffusion=_diffusion, )
# Forward and backward implementations # _backward_rv_classic is inherited from DiscreteGaussian def _forward_rv_classic( self, rv, t, compute_gain=False, _diffusion=1.0 ) -> (randvars.RandomVariable, typing.Dict): H = self.state_trans_mat_fun(t) R = self.proc_noise_cov_mat_fun(t) shift = self.shift_vec_fun(t) new_mean = H @ rv.mean + shift crosscov = rv.cov @ H.T new_cov = H @ crosscov + _diffusion * R info = {"crosscov": crosscov} if compute_gain: gain = scipy.linalg.solve(new_cov.T, crosscov.T, assume_a="sym").T info["gain"] = gain return randvars.Normal(new_mean, cov=new_cov), info def _forward_rv_sqrt( self, rv, t, compute_gain=False, _diffusion=1.0 ) -> (randvars.RandomVariable, typing.Dict): H = self.state_trans_mat_fun(t) SR = self.proc_noise_cov_cholesky_fun(t) shift = self.shift_vec_fun(t) new_mean = H @ rv.mean + shift new_cov_cholesky = cholesky_update( H @ rv.cov_cholesky, np.sqrt(_diffusion) * SR ) new_cov = new_cov_cholesky @ new_cov_cholesky.T crosscov = rv.cov @ H.T info = {"crosscov": crosscov} if compute_gain: info["gain"] = scipy.linalg.cho_solve( (new_cov_cholesky, True), crosscov.T ).T return ( randvars.Normal(new_mean, cov=new_cov, cov_cholesky=new_cov_cholesky), info, ) def _backward_rv_sqrt( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=1.0, ): """See Section 4.1f of: ``https://www.sciencedirect.com/science/article/abs/pii/S0005109805001810``. """ # forwarded_rv is ignored in square-root smoothing. # Smoothing updates need the gain, but # filtering updates "compute their own". # Thus, if we are doing smoothing (|cov_obtained|>0) an the gain is not provided, # make an extra prediction to compute the gain. if gain is None: if np.linalg.norm(rv_obtained.cov) > 0: _, info = self.forward_rv( rv, t=t, compute_gain=True, _diffusion=_diffusion ) gain = info["gain"] else: gain = np.zeros((len(rv.mean), len(rv_obtained.mean))) state_trans = self.state_trans_mat_fun(t) proc_noise_chol = np.sqrt(_diffusion) * self.proc_noise_cov_cholesky_fun(t) shift = self.shift_vec_fun(t) chol_past = rv.cov_cholesky chol_obtained = rv_obtained.cov_cholesky output_dim = self.output_dim input_dim = self.input_dim zeros_bottomleft = np.zeros((output_dim, output_dim)) zeros_middleright = np.zeros((output_dim, input_dim)) blockmat = np.block( [ [chol_past.T @ state_trans.T, chol_past.T], [proc_noise_chol.T, zeros_middleright], [zeros_bottomleft, chol_obtained.T @ gain.T], ] ) big_triu = np.linalg.qr(blockmat, mode="r") new_chol_triu = big_triu[ output_dim : (output_dim + input_dim), output_dim : (output_dim + input_dim) ] # If no initial gain was provided, compute it from the QR-results # This is required in the Kalman update, where, other than in the smoothing update, # no initial gain was provided. # Recall that above, gain was set to zero in this setting. if np.linalg.norm(gain) == 0.0: R1 = big_triu[:output_dim, :output_dim] R12 = big_triu[:output_dim, output_dim:] gain = scipy.linalg.solve_triangular(R1, R12, lower=False).T new_mean = rv.mean + gain @ (rv_obtained.mean - state_trans @ rv.mean - shift) new_cov_cholesky = tril_to_positive_tril(new_chol_triu.T) new_cov = new_cov_cholesky @ new_cov_cholesky.T return randvars.Normal(new_mean, new_cov, cov_cholesky=new_cov_cholesky), {} def _backward_rv_joseph( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=None, ): # forwarded_rv is ignored in Joseph updates. if gain is None: _, info = self.forward_rv(rv, t=t, compute_gain=True, _diffusion=_diffusion) gain = info["gain"] H = self.state_trans_mat_fun(t) R = _diffusion * self.proc_noise_cov_mat_fun(t) shift = self.shift_vec_fun(t) new_mean = rv.mean + gain @ (rv_obtained.mean - H @ rv.mean - shift) joseph_factor = np.eye(len(rv.mean)) - gain @ H new_cov = ( joseph_factor @ rv.cov @ joseph_factor.T + gain @ R @ gain.T + gain @ rv_obtained.cov @ gain.T ) return randvars.Normal(new_mean, new_cov), {} class DiscreteLTIGaussian(DiscreteLinearGaussian): """Discrete, linear, time-invariant Gaussian transition models of the form. .. math:: x_{i+1} \\sim \\mathcal{N}(G x_i + v, S) for some dynamics matrix :math:`G`, force vector :math:`v`, and diffusion matrix :math:`S`. Parameters ---------- state_trans_mat : State transition matrix :math:`G`. shift_vec : Shift vector :math:`v`. proc_noise_cov_mat : Process noise covariance matrix :math:`S`. Raises ------ TypeError If state_trans_mat, shift_vec and proc_noise_cov_mat have incompatible shapes. See Also -------- :class:`DiscreteModel` :class:`DiscreteGaussianLinearModel` """ def __init__( self, state_trans_mat: np.ndarray, shift_vec: np.ndarray, proc_noise_cov_mat: np.ndarray, proc_noise_cov_cholesky: Optional[np.ndarray] = None, forward_implementation="classic", backward_implementation="classic", ): _check_dimensions(state_trans_mat, shift_vec, proc_noise_cov_mat) output_dim, input_dim = state_trans_mat.shape super().__init__( input_dim, output_dim, state_trans_mat_fun=lambda t: state_trans_mat, shift_vec_fun=lambda t: shift_vec, proc_noise_cov_mat_fun=lambda t: proc_noise_cov_mat, proc_noise_cov_cholesky_fun=lambda t: proc_noise_cov_cholesky, forward_implementation=forward_implementation, backward_implementation=backward_implementation, ) self.state_trans_mat = state_trans_mat self.shift_vec = shift_vec self.proc_noise_cov_mat = proc_noise_cov_mat self._proc_noise_cov_cholesky = proc_noise_cov_cholesky
[docs] def proc_noise_cov_cholesky_fun(self, t): return self.proc_noise_cov_cholesky
@cached_property def proc_noise_cov_cholesky(self): if self._proc_noise_cov_cholesky is not None: return self._proc_noise_cov_cholesky return np.linalg.cholesky(self.proc_noise_cov_mat) def _check_dimensions(state_trans_mat, shift_vec, proc_noise_cov_mat): """LTI SDE model needs matrices which are compatible with each other in size.""" if state_trans_mat.ndim != 2: raise TypeError( f"dynamat.ndim=2 expected. dynamat.ndim={state_trans_mat.ndim} received." ) if shift_vec.ndim != 1: raise TypeError( f"shift_vec.ndim=1 expected. shift_vec.ndim={shift_vec.ndim} received." ) if proc_noise_cov_mat.ndim != 2: raise TypeError( f"proc_noise_cov_mat.ndim=2 expected. proc_noise_cov_mat.ndim={proc_noise_cov_mat.ndim} received." ) if ( state_trans_mat.shape[0] != shift_vec.shape[0] or shift_vec.shape[0] != proc_noise_cov_mat.shape[0] or proc_noise_cov_mat.shape[0] != proc_noise_cov_mat.shape[1] ): raise TypeError( f"Dimension of dynamat, forcevec and diffmat do not align. " f"Expected: dynamat.shape=(N,*), forcevec.shape=(N,), diffmat.shape=(N, N). " f"Received: dynamat.shape={state_trans_mat.shape}, forcevec.shape={shift_vec.shape}, " f"proc_noise_cov_mat.shape={proc_noise_cov_mat.shape}." )