Source code for probnum.randprocs.markov.continuous._sde

"""SDE models as transitions."""

from typing import Callable, Optional

import numpy as np

from probnum.randprocs.markov import _transition
from probnum.typing import FloatLike, IntLike


class SDE(_transition.Transition):
    r"""Stochastic differential equation.

    .. math:: d x(t) = g(t, x(t)) d t + l(t, x(t)) d w(t),

    driven by a Wiener process :math:`w` with isotropic diffusion :math:`\Gamma(t) = \gamma(t) I_d`.
    """

    def __init__(
        self,
        state_dimension: IntLike,
        wiener_process_dimension: IntLike,
        drift_function: Callable[[FloatLike, np.ndarray], np.ndarray],
        dispersion_function: Callable[[FloatLike, np.ndarray], np.ndarray],
        drift_jacobian: Optional[Callable[[FloatLike, np.ndarray], np.ndarray]],
    ):
        super().__init__(input_dim=state_dimension, output_dim=state_dimension)

        # Mandatory arguments
        self._state_dimension = state_dimension
        self._wiener_process_dimension = wiener_process_dimension
        self._drift_function = drift_function
        self._dispersion_function = dispersion_function

        # Optional arguments
        self._drift_jacobian = drift_jacobian

    @property
    def state_dimension(self):
        return self._state_dimension

    @property
    def wiener_process_dimension(self):
        return self._wiener_process_dimension

[docs] def drift_function(self, t, x): return self._drift_function(t, x)
[docs] def dispersion_function(self, t, x): return self._dispersion_function(t, x)
[docs] def drift_jacobian(self, t, x): if self._drift_jacobian is not None: return self._drift_jacobian(t, x) raise NotImplementedError("Jacobian not provided.")
[docs] def forward_realization( self, realization, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): return self._forward_realization_via_forward_rv( realization, t=t, dt=dt, compute_gain=compute_gain, _diffusion=_diffusion, **kwargs, )
[docs] def forward_rv( self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available.")
[docs] def backward_realization( self, realization_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): return self._backward_realization_via_backward_rv( realization_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, dt=dt, _diffusion=_diffusion, **kwargs, )
[docs] def backward_rv( self, real_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available.")