Source code for probnum.filtsmooth.statespace.sde

"""SDE models as transitions."""
import functools

import numpy as np
import scipy.linalg

import probnum.random_variables as pnrv

from . import discrete_transition, transition


class SDE(transition.Transition):
    """Stochastic differential equation.

    .. math:: d x_t = g(t, x_t) d t + L(t) d w_t,

    driven by a Wiener process with unit diffusion.
    """

    def __init__(self, driftfun, dispmatrixfun, jacobfun):
        self._driftfun = driftfun
        self._dispmatrixfun = dispmatrixfun
        self._jacobfun = jacobfun

[docs] def transition_realization(self, real, start, stop, **kwargs): raise NotImplementedError
[docs] def transition_rv(self, rv, start, stop, **kwargs): raise NotImplementedError
[docs] def drift(self, time, state, **kwargs): return self._driftfun(time, state, **kwargs)
[docs] def dispersionmatrix(self, time, **kwargs): return self._dispmatrixfun(time, **kwargs)
[docs] def jacobian(self, time, state, **kwargs): return self._jacobfun(time, state, **kwargs)
@property def dimension(self): raise NotImplementedError class LinearSDE(SDE): """Linear stochastic differential equation (SDE), .. math:: d x_t = [G(t) x_t + v(t)] d t + L(t) x_t d w_t. For Gaussian initial conditions, this solution is a Gaussian process. Parameters ---------- driftmatrixfun : callable, signature=(t, \\**kwargs) This is F = F(t). The evaluations of this function are called the drift(matrix) of the SDE. Returns np.ndarray with shape=(n, n) forcevecfun : callable, signature=(t, \\**kwargs) This is u = u(t). Evaluations of this function are called the force(vector) of the SDE. Returns np.ndarray with shape=(n,) dispmatrixfun : callable, signature=(t, \\**kwargs) This is L = L(t). Evaluations of this function are called the dispersion(matrix) of the SDE. Returns np.ndarray with shape=(n, s) Notes ----- If initial conditions are Gaussian, the solution is a Gauss-Markov process. """ def __init__(self, driftmatrixfun, forcevecfun, dispmatrixfun): self._driftmatrixfun = driftmatrixfun self._forcevecfun = forcevecfun super().__init__( driftfun=(lambda t, x: driftmatrixfun(t) @ x + forcevecfun(t)), dispmatrixfun=dispmatrixfun, jacobfun=(lambda t, x: dispmatrixfun(t)), )
[docs] def transition_realization(self, real, start, stop, step, **kwargs): rv = pnrv.Normal(real, 0 * np.eye(len(real))) return linear_sde_statistics( rv, start, stop, step, self._driftfun, self._driftmatrixfun, self._dispmatrixfun, )
[docs] def transition_rv(self, rv, start, stop, step, **kwargs): if not isinstance(rv, pnrv.Normal): errormsg = ( "Closed form transitions in linear SDE models is only " "available for Gaussian initial conditions." ) raise TypeError(errormsg) return linear_sde_statistics( rv, start, stop, step, self._driftfun, self._driftmatrixfun, self._dispmatrixfun, )
@property def dimension(self): """Spatial dimension (utility attribute).""" return len(self._driftmatrixfun(0.0))
[docs]class LTISDE(LinearSDE): """Linear time-invariant continuous Markov models of the form dx = [F x(t) + u] dt + L dBt. In the language of dynamic models, x(t) : state process F : drift matrix u : forcing term L : dispersion matrix. Bt : Brownian motion with constant diffusion matrix Q. Parameters ---------- driftmatrix : np.ndarray, shape=(n, n) This is F. It is the drift matrix of the SDE. forcevec : np.ndarray, shape=(n,) This is U. It is the force vector of the SDE. dispmatrix : np.ndarray, shape(n, s) This is L. It is the dispersion matrix of the SDE. Notes ----- It assumes Gaussian initial conditions (otherwise it is no Gauss-Markov process). """ def __init__(self, driftmatrix, forcevec, dispmatrix): _check_initial_state_dimensions(driftmatrix, forcevec, dispmatrix) super().__init__( (lambda t, **kwargs: driftmatrix), (lambda t, **kwargs: forcevec), (lambda t, **kwargs: dispmatrix), ) self._driftmatrix = driftmatrix self._forcevec = forcevec self._dispmatrix = dispmatrix @property def driftmatrix(self): return self._driftmatrix @property def forcevec(self): return self._forcevec @property def dispersionmatrix(self): # pylint: disable=invalid-overridden-method return self._dispmatrix
[docs] def transition_realization(self, real, start, stop, **kwargs): if not isinstance(real, np.ndarray): raise TypeError(f"Numpy array expected, {type(real)} received.") discretised_model = self.discretise(step=stop - start) return discretised_model.transition_realization(real, start, stop)
[docs] def transition_rv(self, rv, start, stop, **kwargs): if not isinstance(rv, pnrv.Normal): errormsg = ( "Closed form transitions in LTI SDE models is only " "available for Gaussian initial conditions." ) raise TypeError(errormsg) discretised_model = self.discretise(step=stop - start) return discretised_model.transition_rv(rv, start, stop)
[docs] def discretise(self, step): """Returns a discrete transition model (i.e. mild solution to SDE) using matrix fraction decomposition. That is, matrices A(h) and Q(h) and vector s(h) such that the transition is .. math:: x | x_\\text{old} \\sim \\mathcal{N}(A(h) x_\\text{old} + s(h), Q(h)) , which is the transition of the mild solution to the LTI SDE. """ if np.linalg.norm(self._forcevec) > 0: raise NotImplementedError("MFD does not work for force>0 (yet).") ah, qh, _ = matrix_fraction_decomposition( self.driftmatrix, self.dispersionmatrix, step ) sh = np.zeros(len(ah)) return discrete_transition.DiscreteLTIGaussian(ah, sh, qh)
def _check_initial_state_dimensions(drift, force, disp): """Checks that the matrices all align and are of proper shape. If all the bugs are removed and the tests run, these asserts are turned into Exception-catchers. Parameters ---------- drift : np.ndarray, shape=(n, n) force : np.ndarray, shape=(n,) disp : np.ndarray, shape=(n, s) """ if drift.ndim != 2 or drift.shape[0] != drift.shape[1]: raise ValueError("driftmatrix not of shape (n, n)") if force.ndim != 1: raise ValueError("force not of shape (n,)") if force.shape[0] != drift.shape[1]: raise ValueError("force not of shape (n,) or driftmatrix not of shape (n, n)") if disp.ndim != 2: raise ValueError("dispersion not of shape (n, s)")
[docs]def linear_sde_statistics(rv, start, stop, step, driftfun, jacobfun, dispmatfun): """Computes mean and covariance of SDE solution. For a linear(ised) SDE .. math:: d x_t = [G(t) x_t + v(t)] d t + L(t) x_t d w_t. mean and covariance of the solution are computed by solving .. math:: \\frac{dm}{dt}(t) = G(t) m(t) + v(t), \\frac{dC}{dt}(t) = G(t) C(t) + C(t) G(t)^\\top + L(t) L(t)^\\top, which is done here with a few steps of the RK4 method. This function is also called by the continuous-time extended Kalman filter, which is why the drift can be any function. Parameters ---------- rv : Normal random variable. Distribution of mean and covariance at the starting point. start : Start of the time-interval stop : End of the time-interval step : Step-size used in RK4. driftfun : Drift of the (non)linear SDE jacobfun : Jacobian of the drift function dispmatfun : Dispersion matrix function Returns ------- Normal random variable Mean and covariance are the solution of the differential equation dict Empty dict, may be extended in the future to contain information about the solution process, e.g. number of function evaluations. """ if step <= 0.0: raise ValueError("Step-size must be positive.") mean, cov = rv.mean, rv.cov time = start # Set default arguments for frequently used functions. increment_fun = functools.partial( _increment_fun, driftfun=driftfun, jacobfun=jacobfun, dispmatfun=dispmatfun, ) rk4_step = functools.partial(_rk4_step, step=step, fun=increment_fun) while time < stop: mean, cov, time = rk4_step(mean, cov, time) return pnrv.Normal(mean, cov), {}
def _rk4_step(mean, cov, time, step, fun): """Do a single RK4 step to compute the solution.""" m1, c1 = fun(time, mean, cov) m2, c2 = fun(time, mean + step * m1 / 2.0, cov + step * c1 / 2.0) m3, c3 = fun(time, mean + step * m2 / 2.0, cov + step * c2 / 2.0) m4, c4 = fun(time, mean + step * m3, cov + step * c3) mean = mean + step * (m1 + 2 * m2 + 2 * m3 + m4) / 6.0 cov = cov + step * (c1 + 2 * c2 + 2 * c3 + c4) / 6.0 time = time + step return mean, cov, time def _increment_fun(time, mean, cov, driftfun, jacobfun, dispmatfun): """Euler step for closed form solutions of ODE defining mean and covariance of the closed-form transition. Maybe make this into a different solver (euler sucks). See RHS of Eq. 10.82 in Applied SDEs. """ dispersion_matrix = dispmatfun(time) jacobian = jacobfun(time) mean_increment = driftfun(time, mean) cov_increment = ( cov @ jacobian.T + jacobian @ cov.T + dispersion_matrix @ dispersion_matrix.T ) return mean_increment, cov_increment
[docs]def matrix_fraction_decomposition(F, L, h): """Matrix fraction decomposition.""" if F.ndim != 2 or L.ndim != 2: raise TypeError("F and L must be matrices.") if not np.isscalar(h): raise TypeError("h must be a float/scalar") topleft = F topright = L @ L.T bottomright = -F.T bottomleft = np.zeros(F.shape) toprow = np.hstack((topleft, topright)) bottomrow = np.hstack((bottomleft, bottomright)) bigmat = np.vstack((toprow, bottomrow)) Phi = scipy.linalg.expm(bigmat * h) projmat1 = np.eye(*toprow.shape) projmat2 = np.flip(projmat1) Ah = projmat1 @ Phi @ projmat1.T C, D = projmat1 @ Phi @ projmat2.T, Ah.T Qh = C @ D return Ah, Qh, bigmat