Source code for probnum.statespace.sde

"""SDE models as transitions."""
import functools
from typing import Callable, Optional

import numpy as np
import scipy.integrate
import scipy.linalg

from probnum import randvars
from probnum.type import FloatArgType, IntArgType
from probnum.utils.linalg import tril_to_positive_tril

from . import discrete_transition, transition
from .sde_utils import matrix_fraction_decomposition


class SDE(transition.Transition):
    """Stochastic differential equation.

    .. math:: d x(t) = g(t, x(t)) d t + L(t) d w(t),

    driven by a Wiener process with unit diffusion.
    """

    def __init__(
        self,
        dimension: IntArgType,
        driftfun: Callable[[FloatArgType, np.ndarray], np.ndarray],
        dispmatfun: Callable[[FloatArgType, np.ndarray], np.ndarray],
        jacobfun: Callable[[FloatArgType, np.ndarray], np.ndarray],
    ):
        self.dimension = dimension
        self.driftfun = driftfun
        self.dispmatfun = dispmatfun
        self.jacobfun = jacobfun
        super().__init__(input_dim=dimension, output_dim=dimension)

[docs] def forward_realization( self, realization, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): return self._forward_realization_via_forward_rv( realization, t=t, dt=dt, compute_gain=compute_gain, _diffusion=_diffusion, **kwargs, )
[docs] def forward_rv( self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available.")
[docs] def backward_realization( self, realization_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): return self._backward_realization_via_backward_rv( realization_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, dt=dt, _diffusion=_diffusion, **kwargs, )
[docs] def backward_rv( self, real_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available.")
class LinearSDE(SDE): """Linear stochastic differential equation (SDE), .. math:: d x(t) = [G(t) x(t) + v(t)] d t + L(t) x(t) d w(t). For Gaussian initial conditions, this solution is a Gaussian process. Parameters ---------- driftmatfun : This is G = G(t). The evaluations of this function are called the driftmatrix of the SDE. Returns np.ndarray with shape=(n, n) forcevecfun : This is v = v(t). Evaluations of this function are called the force(vector) of the SDE. Returns np.ndarray with shape=(n,) dispmatfun : This is L = L(t). Evaluations of this function are called the dispersion(matrix) of the SDE. Returns np.ndarray with shape=(n, s) mde_atol Absolute tolerance passed to the solver of the moment differential equations (MDEs). Optional. Default is 1e-6. mde_rtol Relative tolerance passed to the solver of the moment differential equations (MDEs). Optional. Default is 1e-6. mde_solver Method that is chosen in `scipy.integrate.solve_ivp`. Any string that is compatible with ``solve_ivp(..., method=mde_solve,...)`` works here. Usual candidates are ``[RK45, LSODA, Radau, BDF, RK23, DOP853]``. Optional. Default is LSODA. """ def __init__( self, dimension: IntArgType, driftmatfun: Callable[[FloatArgType], np.ndarray], forcevecfun: Callable[[FloatArgType], np.ndarray], dispmatfun: Callable[[FloatArgType], np.ndarray], mde_atol: Optional[FloatArgType] = 1e-6, mde_rtol: Optional[FloatArgType] = 1e-6, mde_solver: Optional[str] = "RK45", forward_implementation: Optional[str] = "classic", ): # Choose implementation for forward transitions choose_mde_forward_implementation = { "classic": self._solve_mde_forward_classic, "sqrt": self._solve_mde_forward_sqrt, } self._mde_forward_implementation = choose_mde_forward_implementation[ forward_implementation ] # Once different smoothing algorithms are available, # replicate the scheme from DiscreteGaussian here, in which # the initialisation decides between, e.g., classic and sqrt implementations. self.driftmatfun = driftmatfun self.forcevecfun = forcevecfun super().__init__( dimension=dimension, driftfun=(lambda t, x: self.driftmatfun(t) @ x + self.forcevecfun(t)), dispmatfun=dispmatfun, jacobfun=(lambda t, x: self.driftmatfun(t)), ) self.mde_atol = mde_atol self.mde_rtol = mde_rtol self.mde_solver = mde_solver
[docs] def forward_rv( self, rv, t, dt=None, _compute_gain=False, _diffusion=1.0, **kwargs, ): if dt is None: raise ValueError( "Continuous-time transitions require a time-increment ``dt``." ) return self._mde_forward_implementation(rv, t, dt, _diffusion=_diffusion)
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): if dt is None: raise ValueError( "Continuous-time transitions require a time-increment ``dt``." ) # Ignore rv_forwarded return self._solve_mde_backward( rv_obtained=rv_obtained, rv=rv, t=t, dt=dt, _diffusion=_diffusion, )
# Forward and backward implementation(s) def _solve_mde_forward_classic(self, rv, t, dt, _diffusion=1.0): """Solve forward moment differential equations (MDEs).""" dim = rv.mean.shape[0] mde, y0 = self._setup_vectorized_mde_forward_classic( rv, _diffusion=_diffusion, ) sol, new_mean, new_cov = self._solve_mde_forward(mde, y0, t, dt, dim) # Useful for backward transitions # Aka continuous time smoothing. sol_mean = lambda t: sol.sol(t)[:dim] sol_cov = lambda t: sol.sol(t)[dim:].reshape((dim, dim)) return randvars.Normal(mean=new_mean, cov=new_cov), { "sol": sol, "sol_mean": sol_mean, "sol_cov": sol_cov, } def _solve_mde_forward_sqrt(self, rv, t, dt, _diffusion=1.0): """Solve forward moment differential equations (MDEs) using a square-root implementation.""" dim = rv.mean.shape[0] mde, y0 = self._setup_vectorized_mde_forward_sqrt( rv, _diffusion=_diffusion, ) sol, new_mean, new_cov_cholesky = self._solve_mde_forward(mde, y0, t, dt, dim) new_cov = new_cov_cholesky @ new_cov_cholesky.T # Useful for backward transitions # Aka continuous time smoothing. sol_mean = lambda t: sol.sol(t)[:dim] sol_cov_cholesky = lambda t: sol.sol(t)[dim:].reshape((dim, dim)) sol_cov = ( lambda t: sol.sol(t)[dim:].reshape((dim, dim)) @ sol.sol(t)[dim:].reshape((dim, dim)).T ) return randvars.Normal( mean=new_mean, cov=new_cov, cov_cholesky=new_cov_cholesky ), { "sol": sol, "sol_mean": sol_mean, "sol_cov_cholesky": sol_cov_cholesky, "sol_cov": sol_cov, } def _solve_mde_forward(self, mde, y0, t, dt, dim): """Solve forward moment differential equations (MDEs).""" # Dense output for lambda-expression sol = scipy.integrate.solve_ivp( mde, (t, t + dt), y0, method=self.mde_solver, atol=self.mde_atol, rtol=self.mde_rtol, dense_output=True, ) y_end = sol.y[:, -1] new_mean = y_end[:dim] # If forward_sqrt is used, new_cov_or_cov_cholesky is a Cholesky factor of the covariance # If forward_classic is used, new_cov_or_cov_cholesky is the covariance new_cov_or_cov_cholesky = y_end[dim:].reshape((dim, dim)) return sol, new_mean, new_cov_or_cov_cholesky def _solve_mde_backward(self, rv_obtained, rv, t, dt, _diffusion=1.0): """Solve backward moment differential equations (MDEs).""" _, mde_forward_info = self._mde_forward_implementation( rv, t, dt, _diffusion=_diffusion ) mde_forward_sol_mean = mde_forward_info["sol_mean"] mde_forward_sol_cov = mde_forward_info["sol_cov"] mde, y0 = self._setup_vectorized_mde_backward( rv_obtained, _diffusion=_diffusion, ) # Use forward solution for mean and covariance in scipy's ivp # Dense output for lambda-expression sol = scipy.integrate.solve_ivp( mde, (t + dt, t), y0, method=self.mde_solver, atol=self.mde_atol, rtol=self.mde_rtol, args=(mde_forward_sol_mean, mde_forward_sol_cov), dense_output=True, ) dim = rv.mean.shape[0] y_end = sol.y[:, -1] new_mean = y_end[:dim] new_cov = y_end[dim:].reshape((dim, dim)) # Useful for backward transitions # Aka continuous time smoothing. sol_mean = lambda t: sol.sol(t)[:dim] sol_cov = lambda t: sol.sol(t)[dim:].reshape((dim, dim)) return randvars.Normal(mean=new_mean, cov=new_cov), { "sol": sol, "sol_mean": sol_mean, "sol_cov": sol_cov, } def _setup_vectorized_mde_forward_classic(self, initrv, _diffusion=1.0): """Set up forward moment differential equations (MDEs). Compute an ODE vector field that represents the MDEs and is compatible with scipy.solve_ivp. """ dim = len(initrv.mean) def f(t, y): # Undo vectorization mean, cov_flat = y[:dim], y[dim:] cov = cov_flat.reshape((dim, dim)) # Apply iteration G = self.driftmatfun(t) u = self.forcevecfun(t) L = self.dispmatfun(t) new_mean = G @ mean + u new_cov = G @ cov + cov @ G.T + _diffusion * L @ L.T # Vectorize outcome new_cov_flat = new_cov.flatten() y_new = np.hstack((new_mean, new_cov_flat)) return y_new initcov_flat = initrv.cov.flatten() y0 = np.hstack((initrv.mean, initcov_flat)) return f, y0 def _setup_vectorized_mde_forward_sqrt(self, initrv, _diffusion=1.0): r"""Set up forward moment differential equations (MDEs) using a square-root implementation. (https://ieeexplore.ieee.org/document/4045974) The covariance :math:`P(t)` obeys the Riccati equation .. math:: \dot P(t) = G(t)P(t) + P(t)G^\top(t) + L(t)L^\top(t). Let :math:`S(t)` be a square-root of :math:`P(t)`, :math:`P(t)` positive definite, then .. math:: P(t) = S(t)S^\top(t) and we get the Riccati-Equation .. math:: \dot P(t) = G(t)S(t)S^\top(t) + 1/2 \cdot L(t)L^\top(t)S^{-\top}S^\top + S(t)S^\top(t)G^\top(t) + 1/2 \cdot S(t)S^{-1}(t)L(t)L^\top(t). One solution can be found by the square-root :math:`\dot S(t)` .. math:: \dot S(t) = G(t)S(t) + (A + 1/2 \cdot L(t)L^\top(t))S^{-\top} where :math:`A` is an arbitrary symmetric matrix. :math:`A` can be chosen to make S lower-triangular which can be achieved by .. math:: M(t) = S^{-1}(t)\dot S(t) + \dot S(t)^\top S^{-\top} and .. math:: M(t) = \bar G(t) + \bar G^\top(t) + \bar L(t) \bar L^\top(t) and .. math:: \bar G(t) = S^{-1}(t)G(t)S(t), \bar L(t) = S^{-1}L(t) and .. math:: \dot S(t) = S(t)[M(t)]_{\mathrm{lt}} where :math:`\mathrm{lt}` denotes the lower-triangular operator defined by .. math:: [M(t)]{_{\mathrm{lt}}}_{ij} = \begin{cases} 0 & i < j\\ 1/2 m(t)_{ij} & i=j\\ m(t)_{ij} & i > j \end{cases}. Compute an ODE vector field that represents the MDEs and is compatible with scipy.solve_ivp. """ dim = len(initrv.mean) def f(t, y): # Undo vectorization mean, cov_cholesky_flat = y[:dim], y[dim:] cov_cholesky = cov_cholesky_flat.reshape((dim, dim)) # Apply iteration G = self.driftmatfun(t) u = self.forcevecfun(t) L = self.dispmatfun(t) new_mean = G @ mean + u G_bar = scipy.linalg.solve_triangular( cov_cholesky, G @ cov_cholesky, lower=True ) L_bar = np.sqrt(_diffusion) * scipy.linalg.solve_triangular( cov_cholesky, L, lower=True ) M = G_bar + G_bar.T + L_bar @ L_bar.T new_cov_cholesky = tril_to_positive_tril( cov_cholesky @ (np.tril(M, -1) + 1 / 2 * np.diag(np.diag(M))) ) # Vectorize outcome new_cov_cholesky_flat = new_cov_cholesky.flatten() y_new = np.hstack((new_mean, new_cov_cholesky_flat)) return y_new initcov_cholesky_flat = initrv.cov_cholesky.flatten() y0 = np.hstack((initrv.mean, initcov_cholesky_flat)) return f, y0 def _setup_vectorized_mde_backward(self, finalrv_obtained, _diffusion=1.0): """Set up backward moment differential equations (MDEs). Compute an ODE vector field that represents the MDEs and is compatible with scipy.solve_ivp. """ dim = len(finalrv_obtained.mean) def f(t, y, mde_forward_sol_mean, mde_forward_sol_cov): # Undo vectorization mean, cov_flat = y[:dim], y[dim:] cov = cov_flat.reshape((dim, dim)) # Apply iteration G = self.driftmatfun(t) u = self.forcevecfun(t) L = self.dispmatfun(t) mde_forward_sol_cov_mat = mde_forward_sol_cov(t) mde_forward_sol_mean_vec = mde_forward_sol_mean(t) LL = _diffusion * L @ L.T LL_inv_cov = np.linalg.solve(mde_forward_sol_cov_mat, LL.T).T new_mean = G @ mean + LL_inv_cov @ (mean - mde_forward_sol_mean_vec) + u new_cov = (G + LL_inv_cov) @ cov + cov @ (G + LL_inv_cov).T - LL new_cov_flat = new_cov.flatten() y_new = np.hstack((new_mean, new_cov_flat)) return y_new finalcov_flat = finalrv_obtained.cov.flatten() y0 = np.hstack((finalrv_obtained.mean, finalcov_flat)) return f, y0 class LTISDE(LinearSDE): """Linear time-invariant continuous Markov models of the form. .. math:: d x(t) = [G x(t) + v] d t + L d w(t). In the language of dynamic models, x(t) : state process G : drift matrix v : force term/vector L : dispersion matrix. w(t) : Wiener process with unit diffusion. Parameters ---------- driftmat : This is F. It is the drift matrix of the SDE. forcevec : This is U. It is the force vector of the SDE. dispmat : This is L. It is the dispersion matrix of the SDE. """ def __init__( self, driftmat: np.ndarray, forcevec: np.ndarray, dispmat: np.ndarray, forward_implementation="classic", backward_implementation="classic", ): _check_initial_state_dimensions(driftmat, forcevec, dispmat) dimension = len(driftmat) self.driftmat = driftmat self.forcevec = forcevec self.dispmat = dispmat super().__init__( dimension, (lambda t: self.driftmat), (lambda t: self.forcevec), (lambda t: self.dispmat), ) self.forward_implementation = forward_implementation self.backward_implementation = backward_implementation
[docs] def forward_rv( self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): if dt is None: raise ValueError( "Continuous-time transitions require a time-increment ``dt``." ) discretised_model = self.discretise(dt=dt) return discretised_model.forward_rv( rv, t, compute_gain=compute_gain, _diffusion=_diffusion )
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): if dt is None: raise ValueError( "Continuous-time transitions require a time-increment ``dt``." ) discretised_model = self.discretise(dt=dt) return discretised_model.backward_rv( rv_obtained=rv_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, _diffusion=_diffusion, )
[docs] @functools.lru_cache(maxsize=None) def discretise(self, dt): """Return a discrete transition model (i.e. mild solution to SDE) using matrix fraction decomposition. That is, matrices A(h) and Q(h) and vector s(h) such that the transition is .. math:: x | x_\\text{old} \\sim \\mathcal{N}(A(h) x_\\text{old} + s(h), Q(h)) , which is the transition of the mild solution to the LTI SDE. """ if np.linalg.norm(self.forcevec) > 0: zeros = np.zeros((self.dimension, self.dimension)) eye = np.eye(self.dimension) driftmat = np.block([[self.driftmat, eye], [zeros, zeros]]) dispmat = np.concatenate((self.dispmat, np.zeros(self.dispmat.shape))) ah_stack, qh_stack, _ = matrix_fraction_decomposition(driftmat, dispmat, dt) proj = np.eye(self.dimension, 2 * self.dimension) proj_rev = np.flip(proj, axis=1) ah = proj @ ah_stack @ proj.T sh = proj @ ah_stack @ proj_rev.T @ self.forcevec qh = proj @ qh_stack @ proj.T else: ah, qh, _ = matrix_fraction_decomposition(self.driftmat, self.dispmat, dt) sh = np.zeros(len(ah)) return discrete_transition.DiscreteLTIGaussian( ah, sh, qh, forward_implementation=self.forward_implementation, backward_implementation=self.backward_implementation, )
def _check_initial_state_dimensions(driftmat, forcevec, dispmat): """Checks that the matrices all align and are of proper shape. Parameters ---------- driftmat : np.ndarray, shape=(n, n) forcevec : np.ndarray, shape=(n,) dispmat : np.ndarray, shape=(n, s) """ if driftmat.ndim != 2 or driftmat.shape[0] != driftmat.shape[1]: raise ValueError("driftmatrix not of shape (n, n)") if forcevec.ndim != 1: raise ValueError("force not of shape (n,)") if forcevec.shape[0] != driftmat.shape[1]: raise ValueError("force not of shape (n,) or driftmatrix not of shape (n, n)") if dispmat.ndim != 2: raise ValueError("dispersion not of shape (n, s)")