"""Discrete transitions."""
import probnum.random_variables as pnrv
from . import transition as trans
class DiscreteGaussian(trans.Transition):
"""Random variable transitions with additive Gaussian noise.
.. math:: x_{i+1} \\sim \\mathcal{N}(g(t_i, x_i), S(t_i))
for some (potentially non-linear) dynamics :math:`g` and diffusion matrix :math:`S`.
This is used for, but not restricted to time-series.
Parameters
----------
dynamicsfun : callable
Dynamics function :math:`g=g(t, x)`. Signature: ``dynafct(t, x)``.
diffmatfun : callable
Diffusion matrix function :math:`S=S(t)`. Signature: ``diffmatfct(t)``.
jacobfun : callable, optional.
Jacobian of the dynamics function :math:`g`, :math:`Jg=Jg(t, x)`.
Signature: ``jacfct(t, x)``.
See Also
--------
:class:`DiscreteModel`
:class:`DiscreteGaussianLinearModel`
"""
def __init__(self, dynamicsfun, diffmatfun, jacobfun=None):
self._dynamicsfun = dynamicsfun
self._diffmatfun = diffmatfun
self._jacobfun = jacobfun
[docs] def transition_realization(self, real, start, stop=None, **kwargs):
newmean = self._dynamicsfun(start, real)
newcov = self._diffmatfun(start)
return pnrv.Normal(newmean, newcov), {}
[docs] def transition_rv(self, rv, start, stop=None, **kwargs):
raise NotImplementedError
[docs] def dynamics(self, time, state, **kwargs):
"""Compute dynamics :math:`g=g(t, x)` at time :math:`t` and state
:math:`x`.
Parameters
----------
time : float
Time :math:`t`.
state : array_like
State :math:`x`. For instance, realization of a random variable.
Returns
-------
:class:`np.ndarray`
Evaluation of :math:`g=g(t, x)`.
"""
return self._dynamicsfun(time, state)
[docs] def diffusionmatrix(self, time, **kwargs):
"""Compute diffusion matrix :math:`S=S(t)` at time :math:`t`.
Parameters
----------
time : float
Time :math:`t`.
Returns
-------
:class:`np.ndarray`
Diffusion matrix :math:`S=S(t)`.
"""
return self._diffmatfun(time, **kwargs)
[docs] def jacobian(self, time, state, **kwargs):
"""Compute diffusion matrix :math:`S=S(t)` at time :math:`t`.
Parameters
----------
time : float
Time :math:`t`.
state : array_like
State :math:`x`. For instance, realization of a random variable.
Raises
------
NotImplementedError
If the Jacobian is not implemented.
This is the case if :meth:`jacfct` is not specified at initialization.
Returns
-------
:class:`np.ndarray`
Evaluation of the Jacobian :math:`J g=Jg(t, x)`.
"""
if self._jacobfun is None:
raise NotImplementedError
return self._jacobfun(time, state)
class DiscreteLinearGaussian(DiscreteGaussian):
"""Discrete, linear Gaussian transition models of the form.
.. math:: x_{i+1} \\sim \\mathcal{N}(G(t_i) x_i + v(t_i), S(t_i))
for some dynamics matrix :math:`G=G(t)`, force vector :math:`v=v(t)`,
and diffusion matrix :math:`S=S(t)`.
Parameters
----------
dynamatfct : callable
Dynamics function :math:`G=G(t)`. Signature: ``dynamatfct(t)``.
forcefct : callable
Force function :math:`v=v(t)`. Signature: ``forcefct(t)``.
diffmatfct : callable
Diffusion matrix function :math:`S=S(t)`. Signature: ``diffmatfct(t)``.
See Also
--------
:class:`DiscreteModel`
:class:`DiscreteGaussianLinearModel`
"""
def __init__(self, dynamatfct, forcefct, diffmatfct):
def dynafct(t, x, **kwargs):
return dynamatfct(t, **kwargs) @ x + forcefct(t, **kwargs)
def jacfct(t, x, **kwargs):
return dynamatfct(t, **kwargs)
super().__init__(dynafct, diffmatfct, jacfct)
self._forcefct = forcefct
[docs] def transition_rv(self, rv, start, stop=None, **kwargs):
if not isinstance(rv, pnrv.Normal):
raise TypeError(f"Normal RV expected, but {type(rv)} received.")
dynamat = self.dynamicsmatrix(time=start)
diffmat = self.diffusionmatrix(time=start)
force = self.forcevector(time=start)
new_mean = dynamat @ rv.mean + force
new_crosscov = rv.cov @ dynamat.T
new_cov = dynamat @ new_crosscov + diffmat
return pnrv.Normal(mean=new_mean, cov=new_cov), {"crosscov": new_crosscov}
[docs] def dynamicsmatrix(self, time, **kwargs):
"""Compute dynamics matrix :math:`G=G(t)` at time :math:`t`. The output is
equivalent to :meth:`jacobian`.
Parameters
----------
time : float
Time :math:`t`.
Returns
-------
:class:`np.ndarray`
Evaluation of the dynamics matrix :math:`G=G(t)`.
"""
return self._jacobfun(time, None, **kwargs)
[docs] def forcevector(self, time, **kwargs):
"""Compute force vector :math:`v=v(t)` at time :math:`t`.
Parameters
----------
time : float
Time :math:`t`.
Returns
-------
:class:`np.ndarray`
Evaluation of the force :math:`v=v(t)`.
"""
return self._forcefct(time, **kwargs)
@property
def dimension(self):
return len(self.dynamicsmatrix(0.0).T)
[docs]class DiscreteLTIGaussian(DiscreteLinearGaussian):
"""Discrete, linear, time-invariant Gaussian transition models of the form.
.. math:: x_{i+1} \\sim \\mathcal{N}(G x_i + v, S)
for some dynamics matrix :math:`G`, force vector :math:`v`,
and diffusion matrix :math:`S`.
Parameters
----------
dynamat : np.ndarray
Dynamics matrix :math:`G`.
forcevec : np.ndarray
Force vector :math:`v`.
diffmat : np.ndarray
Diffusion matrix :math:`S`.
Raises
------
TypeError
If dynamat, forcevec and diffmat have incompatible shapes.
See Also
--------
:class:`DiscreteModel`
:class:`DiscreteGaussianLinearModel`
"""
def __init__(self, dynamat, forcevec, diffmat):
if dynamat.ndim != 2:
raise TypeError(
f"dynamat.ndim=2 expected. dynamat.ndim={dynamat.ndim} received."
)
if forcevec.ndim != 1:
raise TypeError(
f"forcevec.ndim=1 expected. forcevec.ndim={dynamat.ndim} received."
)
if diffmat.ndim != 2:
raise TypeError(
f"diffmat.ndim=2 expected. diffmat.ndim={dynamat.ndim} received."
)
if (
dynamat.shape[0] != forcevec.shape[0]
or forcevec.shape[0] != diffmat.shape[0]
or diffmat.shape[0] != diffmat.shape[1]
):
raise TypeError(
f"Dimension of dynamat, forcevec and diffmat do not align. "
f"Expected: dynamat.shape=(N,*), forcevec.shape=(N,), diffmat.shape=(N, N). "
f"Received: dynamat.shape={dynamat.shape}, forcevec.shape={forcevec.shape}, "
f"diffmat.shape={diffmat.shape}."
)
super().__init__(
lambda t, **kwargs: dynamat,
lambda t, **kwargs: forcevec,
lambda t, **kwargs: diffmat,
)