Source code for probnum.randprocs.markov.continuous._lti_sde

"""LTI SDE models as transitions."""
import functools

import numpy as np

from probnum import randvars
from probnum.randprocs.markov import discrete
from probnum.randprocs.markov.continuous import _linear_sde, _mfd


class LTISDE(_linear_sde.LinearSDE):
    """Linear time-invariant continuous Markov models of the form.

    .. math:: d x(t) = [G x(t) + v] d t + L d w(t).

    In the language of dynamic models,

    x(t) : state process
    G : drift matrix
    v : force term/vector
    L : dispersion matrix.
    w(t) : Wiener process with unit diffusion.

    Parameters
    ----------
    drift_matrix :
        This is F. It is the drift matrix of the SDE.
    force_vector :
        This is U. It is the force vector of the SDE.
    dispersion_matrix :
        This is L. It is the dispersion matrix of the SDE.
    """

    def __init__(
        self,
        drift_matrix: np.ndarray,
        force_vector: np.ndarray,
        dispersion_matrix: np.ndarray,
        forward_implementation="classic",
        backward_implementation="classic",
    ):

        # Convert input into super() compatible format and initialize super()
        state_dimension = drift_matrix.shape[0]
        wiener_process_dimension = dispersion_matrix.shape[1]

        # Point to attributes, to make sure that changes to self.drift_matrix
        # are reflected in this function.
        def drift_matrix_function(t):
            return self.drift_matrix

        def force_vector_function(t):
            return self.force_vector

        def dispersion_matrix_function(t):
            return self.dispersion_matrix

        super().__init__(
            state_dimension=state_dimension,
            wiener_process_dimension=wiener_process_dimension,
            drift_matrix_function=drift_matrix_function,
            dispersion_matrix_function=dispersion_matrix_function,
            force_vector_function=force_vector_function,
        )

        # Assert all shapes match and store matrices.
        _check_initial_state_dimensions(drift_matrix, force_vector, dispersion_matrix)
        self._drift_matrix = drift_matrix
        self._force_vector = force_vector
        self._dispersion_matrix = dispersion_matrix
        self._forward_implementation_string = forward_implementation
        self._backward_implementation_string = backward_implementation

    @property
    def drift_matrix(self):
        return self._drift_matrix

    @property
    def force_vector(self):
        return self._force_vector

    @property
    def dispersion_matrix(self):
        return self._dispersion_matrix

    @property
    def forward_implementation(self):
        return self._forward_implementation_string

    @property
    def backward_implementation(self):
        return self._backward_implementation_string

[docs] def forward_rv( self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): if dt is None: raise ValueError( "Continuous-time transitions require a time-increment ``dt``." ) discretised_model = self.discretise(dt=dt) return discretised_model.forward_rv( rv, t, compute_gain=compute_gain, _diffusion=_diffusion )
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): if dt is None: raise ValueError( "Continuous-time transitions require a time-increment ``dt``." ) discretised_model = self.discretise(dt=dt) return discretised_model.backward_rv( rv_obtained=rv_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, _diffusion=_diffusion, )
[docs] @functools.lru_cache(maxsize=None) def discretise(self, dt): """Return a discrete transition model (i.e. mild solution to SDE) using matrix fraction decomposition. That is, matrices A(h) and Q(h) and vector s(h) such that the transition is .. math:: x | x_\\text{old} \\sim \\mathcal{N}(A(h) x_\\text{old} + s(h), Q(h)) , which is the transition of the mild solution to the LTI SDE. """ if np.linalg.norm(self.force_vector) > 0: zeros = np.zeros((self.state_dimension, self.state_dimension)) eye = np.eye(self.state_dimension) drift_matrix = np.block([[self.drift_matrix, eye], [zeros, zeros]]) dispersion_matrix = np.concatenate( (self.dispersion_matrix, np.zeros(self.dispersion_matrix.shape)) ) ah_stack, qh_stack, _ = _mfd.matrix_fraction_decomposition( drift_matrix, dispersion_matrix, dt ) proj = np.eye(self.state_dimension, 2 * self.state_dimension) proj_rev = np.flip(proj, axis=1) ah = proj @ ah_stack @ proj.T sh = proj @ ah_stack @ proj_rev.T @ self.force_vector qh = proj @ qh_stack @ proj.T else: ah, qh, _ = _mfd.matrix_fraction_decomposition( self.drift_matrix, self.dispersion_matrix, dt ) sh = np.zeros(len(ah)) return discrete.LTIGaussian( transition_matrix=ah, noise=randvars.Normal(mean=sh, cov=qh), forward_implementation=self._forward_implementation_string, backward_implementation=self._backward_implementation_string, )
def _check_initial_state_dimensions(drift_matrix, force_vector, dispersion_matrix): """Checks that the matrices all align and are of proper shape. Parameters ---------- drift_matrix : np.ndarray, shape=(n, n) force_vector : np.ndarray, shape=(n,) dispersion_matrix : np.ndarray, shape=(n, s) """ if drift_matrix.ndim != 2 or drift_matrix.shape[0] != drift_matrix.shape[1]: raise ValueError("drift_matrixrix not of shape (n, n)") if force_vector.ndim != 1: raise ValueError("force not of shape (n,)") if force_vector.shape[0] != drift_matrix.shape[1]: raise ValueError( "force not of shape (n,) or drift_matrixrix not of shape (n, n)" ) if dispersion_matrix.ndim != 2: raise ValueError("dispersion not of shape (n, s)")