Source code for probnum.filtsmooth.statespace.discrete.discretegaussianmodel

"""
Discrete Gauss-Markov models of the form
x_{i+1} = N(g(i, x_i), S(i))
"""

from probnum.filtsmooth.statespace.discrete import discretemodel
from probnum.random_variables import Normal

__all__ = [
    "DiscreteGaussianModel",
    "DiscreteGaussianLinearModel",
    "DiscreteGaussianLTIModel",
]


class DiscreteGaussianModel(discretemodel.DiscreteModel):
    """
    Discrete Gaussian transition models of the form

    .. math:: x_{i+1} \\sim \\mathcal{N}(g(t_i, x_i), S(t_i))

    for some (potentially non-linear) dynamics :math:`g` and diffusion matrix :math:`S`.


    Parameters
    ----------
    dynafct : callable
        Dynamics function :math:`g=g(t, x)`. Signature: ``dynafct(t, x)``.
    diffmatfct : callable
        Diffusion matrix function :math:`S=S(t)`. Signature: ``diffmatfct(t)``.
    jacfct : callable, optional.
        Jacobian of the dynamics function :math:`g`, :math:`Jg=Jg(t, x)`.
        Signature: ``jacfct(t, x)``.

    See Also
    --------
    :class:`DiscreteModel`
    :class:`DiscreteGaussianLinearModel`
    """

    def __init__(self, dynafct, diffmatfct, jacfct=None):
        self._dynafct = dynafct
        self._diffmatfct = diffmatfct
        self._jacfct = jacfct

[docs] def transition_realization(self, real, start, stop=None): newmean = self._dynafct(start, real) newcov = self._diffmatfct(start) return Normal(newmean, newcov), {}
[docs] def transition_rv(self, rv, start, stop=None, **kwargs): raise NotImplementedError
@property def dimension(self): return len(self.diffusionmatrix(0.0))
[docs] def diffusionmatrix(self, time, **kwargs): """ Compute diffusion matrix :math:`S=S(t)` at time :math:`t`. Parameters ---------- time : float Time :math:`t`. Returns ------- :class:`np.ndarray` Diffusion matrix :math:`S=S(t)`. """ return self._diffmatfct(time, **kwargs)
[docs] def dynamics(self, time, state, **kwargs): """ Compute dynamics :math:`g=g(t, x)` at time :math:`t` and state :math:`x`. Parameters ---------- time : float Time :math:`t`. state : array_like State :math:`x`. For instance, realization of a random variable. Returns ------- :class:`np.ndarray` Evaluation of :math:`g=g(t, x)`. """ return self._dynafct(time, state)
[docs] def jacobian(self, time, state, **kwargs): """ Compute diffusion matrix :math:`S=S(t)` at time :math:`t`. Parameters ---------- time : float Time :math:`t`. state : array_like State :math:`x`. For instance, realization of a random variable. Raises ------ NotImplementedError If the Jacobian is not implemented. This is the case if :meth:`jacfct` is not specified at initialization. Returns ------- :class:`np.ndarray` Evaluation of the Jacobian :math:`J g=Jg(t, x)`. """ if self._jacfct is None: raise NotImplementedError return self._jacfct(time, state)
class DiscreteGaussianLinearModel(DiscreteGaussianModel): """ Discrete, linear Gaussian transition models of the form .. math:: x_{i+1} \\sim \\mathcal{N}(G(t_i) x_i + v(t_i), S(t_i)) for some dynamics matrix :math:`G=G(t)`, force vector :math:`v=v(t)`, and diffusion matrix :math:`S=S(t)`. Parameters ---------- dynamatfct : callable Dynamics function :math:`G=G(t)`. Signature: ``dynamatfct(t)``. forcefct : callable Force function :math:`v=v(t)`. Signature: ``forcefct(t)``. diffmatfct : callable Diffusion matrix function :math:`S=S(t)`. Signature: ``diffmatfct(t)``. See Also -------- :class:`DiscreteModel` :class:`DiscreteGaussianLinearModel` """ def __init__(self, dynamatfct, forcefct, diffmatfct): def dynafct(t, x, **kwargs): return dynamatfct(t, **kwargs) @ x + forcefct(t, **kwargs) def jacfct(t, x, **kwargs): return dynamatfct(t, **kwargs) super().__init__(dynafct, diffmatfct, jacfct) self._forcefct = forcefct
[docs] def transition_rv(self, rv, start, stop=None, **kwargs): if not isinstance(rv, Normal): raise TypeError(f"Normal RV expected, but {type(rv)} received.") dynamat = self.dynamicsmatrix(time=start) diffmat = self.diffusionmatrix(time=start) force = self.forcevector(time=start) new_mean = dynamat @ rv.mean + force new_crosscov = rv.cov @ dynamat.T new_cov = dynamat @ new_crosscov + diffmat return Normal(mean=new_mean, cov=new_cov), {"crosscov": new_crosscov}
[docs] def dynamicsmatrix(self, time, **kwargs): """ Compute dynamics matrix :math:`G=G(t)` at time :math:`t`. The output is equivalent to :meth:`jacobian`. Parameters ---------- time : float Time :math:`t`. Returns ------- :class:`np.ndarray` Evaluation of the dynamics matrix :math:`G=G(t)`. """ return self._jacfct(time, None, **kwargs)
[docs] def forcevector(self, time, **kwargs): """ Compute force vector :math:`v=v(t)` at time :math:`t`. Parameters ---------- time : float Time :math:`t`. Returns ------- :class:`np.ndarray` Evaluation of the force :math:`v=v(t)`. """ return self._forcefct(time, **kwargs)
[docs]class DiscreteGaussianLTIModel(DiscreteGaussianLinearModel): """ Discrete, linear, time-invariant Gaussian transition models of the form .. math:: x_{i+1} \\sim \\mathcal{N}(G x_i + v, S) for some dynamics matrix :math:`G`, force vector :math:`v`, and diffusion matrix :math:`S`. Parameters ---------- dynamat : np.ndarray Dynamics matrix :math:`G`. forcevec : np.ndarray Force vector :math:`v`. diffmat : np.ndarray Diffusion matrix :math:`S`. Raises ------ TypeError If dynamat, forcevec and diffmat have incompatible shapes. See Also -------- :class:`DiscreteModel` :class:`DiscreteGaussianLinearModel` """ def __init__(self, dynamat, forcevec, diffmat): if dynamat.ndim != 2: raise TypeError( f"dynamat.ndim=2 expected. dynamat.ndim={dynamat.ndim} received." ) if forcevec.ndim != 1: raise TypeError( f"forcevec.ndim=1 expected. forcevec.ndim={dynamat.ndim} received." ) if diffmat.ndim != 2: raise TypeError( f"diffmat.ndim=2 expected. diffmat.ndim={dynamat.ndim} received." ) if ( not dynamat.shape[0] == forcevec.shape[0] == diffmat.shape[0] == diffmat.shape[1] ): raise TypeError( f"Dimension of dynamat, forcevec and diffmat do not align. " f"Expected: dynamat.shape=(N,*), forcevec.shape=(N,), diffmat.shape=(N, N). " f"Received: dynamat.shape={dynamat.shape}, forcevec.shape={forcevec.shape}, " f"diffmat.shape={diffmat.shape}." ) super().__init__( lambda t, **kwargs: dynamat, lambda t, **kwargs: forcevec, lambda t, **kwargs: diffmat, )
[docs] def transition_realization(self, real, start=None, stop=None): return super().transition_realization( real=real, start=None, stop=None ) # no more 'start' necessary
[docs] def transition_rv(self, rv, start=None, stop=None, **kwargs): return super().transition_rv( rv=rv, start=None, stop=None ) # no more 'start' necessary