Source code for probnum.statespace.sde

"""SDE models as transitions."""
import functools
from typing import Callable, Optional

import numpy as np
import scipy.integrate
import scipy.linalg

from probnum import randvars
from probnum.type import FloatArgType, IntArgType

from . import discrete_transition, transition
from .sde_utils import matrix_fraction_decomposition


[docs]class SDE(transition.Transition): """Stochastic differential equation. .. math:: d x(t) = g(t, x(t)) d t + L(t) d w(t), driven by a Wiener process with unit diffusion. """ def __init__( self, dimension: IntArgType, driftfun: Callable[[FloatArgType, np.ndarray], np.ndarray], dispmatfun: Callable[[FloatArgType, np.ndarray], np.ndarray], jacobfun: Callable[[FloatArgType, np.ndarray], np.ndarray], ): self.dimension = dimension self.driftfun = driftfun self.dispmatfun = dispmatfun self.jacobfun = jacobfun super().__init__(input_dim=dimension, output_dim=dimension)
[docs] def forward_realization( self, realization, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): return self._forward_realization_via_forward_rv( realization, t=t, dt=dt, compute_gain=compute_gain, _diffusion=_diffusion, **kwargs, )
[docs] def forward_rv( self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available.")
[docs] def backward_realization( self, realization_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): return self._backward_realization_via_backward_rv( realization_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, dt=dt, _diffusion=_diffusion, **kwargs, )
[docs] def backward_rv( self, real_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available.")
[docs]class LinearSDE(SDE): """Linear stochastic differential equation (SDE), .. math:: d x(t) = [G(t) x(t) + v(t)] d t + L(t) x(t) d w(t). For Gaussian initial conditions, this solution is a Gaussian process. Parameters ---------- driftmatfun : This is G = G(t). The evaluations of this function are called the driftmatrix of the SDE. Returns np.ndarray with shape=(n, n) forcevecfun : This is v = v(t). Evaluations of this function are called the force(vector) of the SDE. Returns np.ndarray with shape=(n,) dispmatfun : This is L = L(t). Evaluations of this function are called the dispersion(matrix) of the SDE. Returns np.ndarray with shape=(n, s) mde_atol Absolute tolerance passed to the solver of the moment differential equations (MDEs). Optional. Default is 1e-6. mde_rtol Relative tolerance passed to the solver of the moment differential equations (MDEs). Optional. Default is 1e-6. mde_solver Method that is chosen in `scipy.integrate.solve_ivp`. Any string that is compatible with ``solve_ivp(..., method=mde_solve,...)`` works here. Usual candidates are ``[RK45, LSODA, Radau, BDF, RK23, DOP853]``. Optional. Default is LSODA. """ def __init__( self, dimension: IntArgType, driftmatfun: Callable[[FloatArgType], np.ndarray], forcevecfun: Callable[[FloatArgType], np.ndarray], dispmatfun: Callable[[FloatArgType], np.ndarray], mde_atol: Optional[FloatArgType] = 1e-6, mde_rtol: Optional[FloatArgType] = 1e-6, mde_solver: Optional[str] = "LSODA", ): # Once different filtering and smoothing algorithms are available, # replicate the scheme from DiscreteGaussian here, in which # the initialisation decides between, e.g., classic and sqrt implementations. self.driftmatfun = driftmatfun self.forcevecfun = forcevecfun super().__init__( dimension=dimension, driftfun=(lambda t, x: self.driftmatfun(t) @ x + self.forcevecfun(t)), dispmatfun=dispmatfun, jacobfun=(lambda t, x: self.driftmatfun(t)), ) self.mde_atol = mde_atol self.mde_rtol = mde_rtol self.mde_solver = mde_solver
[docs] def forward_rv( self, rv, t, dt=None, _compute_gain=False, _diffusion=1.0, **kwargs, ): if dt is None: raise ValueError( "Continuous-time transitions require a time-increment ``dt``." ) return self._solve_mde_forward(rv, t, dt, _diffusion=_diffusion)
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): raise NotImplementedError("Not available (yet).")
# Forward (and soon, backward) implementation(s) def _solve_mde_forward(self, rv, t, dt, _diffusion=1.0): """Solve forward moment differential equations (MDEs).""" mde, y0 = self._setup_vectorized_mde_forward( rv, _diffusion=_diffusion, ) sol = scipy.integrate.solve_ivp( mde, (t, t + dt), y0, method=self.mde_solver, atol=self.mde_atol, rtol=self.mde_rtol, ) dim = rv.mean.shape[0] y_end = sol.y[:, -1] new_mean = y_end[:dim] new_cov = y_end[dim:].reshape((dim, dim)) # Will come in useful for backward transitions # Aka continuous time smoothing. sol_mean = lambda t: sol.sol(t)[:dim] sol_cov = lambda t: sol.sol(t)[dim:].reshape((dim, dim)) return randvars.Normal(mean=new_mean, cov=new_cov), { "sol": sol, "sol_mean": sol_mean, "sol_cov": sol_cov, } def _setup_vectorized_mde_forward(self, initrv, _diffusion=1.0): """Set up forward moment differential equations (MDEs). Compute an ODE vector field that represents the MDEs and is compatible with scipy.solve_ivp. """ dim = len(initrv.mean) def f(t, y): # Undo vectorization mean, cov_flat = y[:dim], y[dim:] cov = cov_flat.reshape((dim, dim)) # Apply iteration F = self.driftmatfun(t) u = self.forcevecfun(t) L = self.dispmatfun(t) new_mean = F @ mean + u new_cov = F @ cov + cov @ F.T + _diffusion * L @ L.T # Vectorize outcome new_cov_flat = new_cov.flatten() y_new = np.hstack((new_mean, new_cov_flat)) return y_new initcov_flat = initrv.cov.flatten() y0 = np.hstack((initrv.mean, initcov_flat)) return f, y0
[docs]class LTISDE(LinearSDE): """Linear time-invariant continuous Markov models of the form. .. math:: d x(t) = [G x(t) + v] d t + L d w(t). In the language of dynamic models, x(t) : state process G : drift matrix v : force term/vector L : dispersion matrix. w(t) : Wiener process with unit diffusion. Parameters ---------- driftmat : This is F. It is the drift matrix of the SDE. forcevec : This is U. It is the force vector of the SDE. dispmat : This is L. It is the dispersion matrix of the SDE. """ def __init__( self, driftmat: np.ndarray, forcevec: np.ndarray, dispmat: np.ndarray, forward_implementation="classic", backward_implementation="classic", ): _check_initial_state_dimensions(driftmat, forcevec, dispmat) dimension = len(driftmat) self.driftmat = driftmat self.forcevec = forcevec self.dispmat = dispmat super().__init__( dimension, (lambda t: self.driftmat), (lambda t: self.forcevec), (lambda t: self.dispmat), ) self.forward_implementation = forward_implementation self.backward_implementation = backward_implementation
[docs] def forward_rv( self, rv, t, dt=None, compute_gain=False, _diffusion=1.0, **kwargs, ): discretised_model = self.discretise(dt=dt) return discretised_model.forward_rv( rv, t, compute_gain=compute_gain, _diffusion=_diffusion )
[docs] def backward_rv( self, rv_obtained, rv, rv_forwarded=None, gain=None, t=None, dt=None, _diffusion=1.0, **kwargs, ): discretised_model = self.discretise(dt=dt) return discretised_model.backward_rv( rv_obtained=rv_obtained, rv=rv, rv_forwarded=rv_forwarded, gain=gain, t=t, _diffusion=_diffusion, )
[docs] @functools.lru_cache(maxsize=None) def discretise(self, dt): """Return a discrete transition model (i.e. mild solution to SDE) using matrix fraction decomposition. That is, matrices A(h) and Q(h) and vector s(h) such that the transition is .. math:: x | x_\\text{old} \\sim \\mathcal{N}(A(h) x_\\text{old} + s(h), Q(h)) , which is the transition of the mild solution to the LTI SDE. """ if np.linalg.norm(self.forcevec) > 0: zeros = np.zeros((self.dimension, self.dimension)) eye = np.eye(self.dimension) driftmat = np.block([[self.driftmat, eye], [zeros, zeros]]) dispmat = np.concatenate((self.dispmat, np.zeros(self.dispmat.shape))) ah_stack, qh_stack, _ = matrix_fraction_decomposition(driftmat, dispmat, dt) proj = np.eye(self.dimension, 2 * self.dimension) proj_rev = np.flip(proj, axis=1) ah = proj @ ah_stack @ proj.T sh = proj @ ah_stack @ proj_rev.T @ self.forcevec qh = proj @ qh_stack @ proj.T else: ah, qh, _ = matrix_fraction_decomposition(self.driftmat, self.dispmat, dt) sh = np.zeros(len(ah)) return discrete_transition.DiscreteLTIGaussian( ah, sh, qh, forward_implementation=self.forward_implementation, backward_implementation=self.backward_implementation, )
def _check_initial_state_dimensions(driftmat, forcevec, dispmat): """Checks that the matrices all align and are of proper shape. Parameters ---------- driftmat : np.ndarray, shape=(n, n) forcevec : np.ndarray, shape=(n,) dispmat : np.ndarray, shape=(n, s) """ if driftmat.ndim != 2 or driftmat.shape[0] != driftmat.shape[1]: raise ValueError("driftmatrix not of shape (n, n)") if forcevec.ndim != 1: raise ValueError("force not of shape (n,)") if forcevec.shape[0] != driftmat.shape[1]: raise ValueError("force not of shape (n,) or driftmatrix not of shape (n, n)") if dispmat.ndim != 2: raise ValueError("dispersion not of shape (n, s)")