Source code for probnum.randprocs.markov.discrete._nonlinear_gaussian

"""Discrete transitions."""

from typing import Callable, Optional

import numpy as np

from probnum import randvars
from probnum.randprocs.markov import _transition
from probnum.randprocs.markov.discrete import _condition_state
from probnum.typing import ArrayLike, FloatLike, IntLike

class NonlinearGaussian(_transition.Transition):
    r"""Discrete transitions with additive Gaussian noise.

    .. math:: y \sim g(t, x) + v(t), \quad v \sim \mathcal{N}(m(t), S(t))

    for transition function :math:`g: \mathbb{R}^m \rightarrow \mathbb{R}^n`
    and Noise :math:`v`.

        Input dimension.
        Output dimension.
        Transition function :math:`g(t, x)`.
        Noise :math:`v(t)`.
        Jacobian of the transition function :math:`g(t, x)`.

    def __init__(
        input_dim: IntLike,
        output_dim: IntLike,
        transition_fun: Callable[[FloatLike, ArrayLike], ArrayLike],
        noise_fun: Callable[[FloatLike], randvars.RandomVariable],
        transition_fun_jacobian: Optional[
            Callable[[FloatLike, ArrayLike], ArrayLike]
        ] = None,
        super().__init__(input_dim=input_dim, output_dim=output_dim)
        self._transition_fun = transition_fun
        self._transition_fun_jacobian = transition_fun_jacobian
        self._noise_fun = noise_fun

    def transition_fun(self):
        return self._transition_fun

    def transition_fun_jacobian(self):
        if self._transition_fun_jacobian is None:
            raise NotImplementedError
        return self._transition_fun_jacobian

    def noise_fun(self):
        return self._noise_fun

[docs] def forward_realization( self, realization, t, compute_gain=False, _diffusion=1.0, **kwargs ): fun_eval = self.transition_fun(t, realization) noise = _diffusion * self.noise_fun(t) return fun_eval + noise, {}
[docs] def forward_rv(self, rv, t, compute_gain=False, _diffusion=1.0, **kwargs): raise NotImplementedError("Not available")
[docs] def backward_realization( self, realization_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available")
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=1.0, **kwargs, ): # Should we use the _backward_rv_classic here? # It is only intractable bc. forward_rv is intractable # and assuming its forward formula would yield a valid # gain, the backward formula would be valid. # This is the case for the UKF, for instance. raise NotImplementedError("Not available")
# Implementations that are the same for all sorts of # discrete Gaussian transitions, in particular shared # by LinearNonlinearGaussian and e.g. DiscreteUKFComponent. def _backward_rv_classic( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, _diffusion=None, _linearise_at=None, ): if rv_forwarded is None or gain is None: rv_forwarded, info_forwarded = self.forward_rv( rv, t=t, compute_gain=True, _diffusion=_diffusion, _linearise_at=_linearise_at, ) gain = info_forwarded["gain"] info = {"rv_forwarded": rv_forwarded} return ( _condition_state.condition_state_on_rv(rv_obtained, rv_forwarded, rv, gain), info, )
[docs] @classmethod def from_callable( cls, input_dim: IntLike, output_dim: IntLike, transition_fun: Callable[[FloatLike, ArrayLike], ArrayLike], transition_fun_jacobian: Callable[[FloatLike, ArrayLike], ArrayLike], ): """Turn a callable into a deterministic transition.""" return cls( input_dim=input_dim, output_dim=output_dim, transition_fun=transition_fun, transition_fun_jacobian=transition_fun_jacobian, noise_fun=lambda t: randvars.Constant(np.zeros(output_dim)), )