Source code for probnum.filtsmooth.statespace.continuous.linearsdemodel

"""
Time continuous Gauss-Markov models implicitly defined
through being a solution to the SDE
dx(t) = F(t) x(t) dt + L(t) dB(t).

If initial condition is Gaussian RV, the solution
is a Gauss-Markov process.
"""

import numpy as np
import scipy.linalg
from probnum.filtsmooth.statespace.continuous import continuousmodel
from probnum.random_variables import Normal

__all__ = ["LinearSDEModel", "LTISDEModel"]


class LinearSDEModel(continuousmodel.ContinuousModel):
    """
    Linear time-continuous Markov models given by the solution of the
    stochastic differential equation
    :math:`dx = [F(t) x(t) + u(t)] dt + L(t) dB(t)`.


    Parameters
    ----------
    driftmatrixfct : callable, signature=(t, \\**kwargs)
        This is F = F(t). The evaluations of this function are called
        the drift(matrix) of the SDE.
        Returns np.ndarray with shape=(n, n)
    forcfct : callable, signature=(t, \\**kwargs)
        This is u = u(t). Evaluations of this function are called
        the force(vector) of the SDE.
        Returns np.ndarray with shape=(n,)
    dispmatrixfct : callable, signature=(t, \\**kwargs)
        This is L = L(t). Evaluations of this function are called
        the dispersion(matrix) of the SDE.
        Returns np.ndarray with shape=(n, s)
    diffmatrix : np.ndarray, shape=(s, s)
        This is the diffusion matrix Q of the Brownian motion.
        It is always a square matrix and the size of this matrix matches
        the number of columns of the dispersionmatrix.

    Notes
    -----
    If initial conditions are Gaussian, the solution is a Gauss-Markov process.
    We assume Gaussianity for :meth:`chapmankolmogorov`.
    """

    def __init__(self, driftmatrixfct, forcfct, dispmatrixfct, diffmatrix):
        self._driftmatrixfct = driftmatrixfct
        self._forcefct = forcfct
        self._dispmatrixfct = dispmatrixfct
        self._diffmatrix = diffmatrix

[docs] def drift(self, time, state, **kwargs): """ Evaluates f(t, x(t)) = F(t) x(t) + u(t). """ driftmatrix = self._driftmatrixfct(time, **kwargs) force = self._forcefct(time, **kwargs) return driftmatrix @ state + force
[docs] def dispersion(self, time, state, **kwargs): """ Evaluates l(t, x(t)) = L(t). """ return self._dispmatrixfct(time, **kwargs)
[docs] def jacobian(self, time, state, **kwargs): """ maps t -> F(t) """ return self._driftmatrixfct(time, **kwargs)
@property def diffusionmatrix(self): """ Evaluates Q. """ return self._diffmatrix @property def ndim(self): """ Spatial dimension (utility attribute). """ return len(self._driftmatrixfct(0.0))
[docs] def chapmankolmogorov(self, start, stop, step, randvar, **kwargs): """ Solves differential equations for mean and kernels of the SDE solution (Eq. 5.50 and 5.51 or Eq. 10.73 in Applied SDEs). By default, we assume that ``randvar`` is Gaussian. """ if not issubclass(type(randvar), Normal): errormsg = ( "Closed form solution for Chapman-Kolmogorov " "equations in linear SDE models is only " "available for Gaussian initial conditions." ) raise ValueError(errormsg) mean, covar = randvar.mean, randvar.cov time = start while time < stop: meanincr, covarincr = self._increment(time, mean, covar, **kwargs) mean, covar = mean + step * meanincr, covar + step * covarincr time = time + step return Normal(mean, covar), None
def _increment(self, time, mean, covar, **kwargs): """ Euler step for closed form solutions of ODE defining mean and kernels of the solution of the Chapman-Kolmogoro equations (via Fokker-Planck equations, but that is not crucial here). See RHS of Eq. 10.82 in Applied SDEs. """ disped = self.dispersion(time, mean, **kwargs) jacob = self.jacobian(time, mean, **kwargs) diff = self.diffusionmatrix newmean = self.drift(time, mean, **kwargs) newcovar = covar @ jacob.T + jacob @ covar.T + disped @ diff @ disped.T return newmean, newcovar
[docs]class LTISDEModel(LinearSDEModel): """ Linear time-invariant continuous Markov models of the form dx = [F x(t) + u] dt + L dBt. In the language of dynamic models, x(t) : state process F : drift matrix u : forcing term L : dispersion matrix. Bt : Brownian motion with constant diffusion matrix Q. Parameters ---------- driftmatrix : np.ndarray, shape=(n, n) This is F. It is the drift matrix of the SDE. force : np.ndarray, shape=(n,) This is U. It is the force vector of the SDE. dispmatrix : np.ndarray, shape(n, s) This is L. It is the dispersion matrix of the SDE. diffmatrix : np.ndarray, shape=(s, s) This is the diffusion matrix Q of the Brownian motion driving the SDE. Notes ----- It assumes Gaussian initial conditions (otherwise it is no Gauss-Markov process). """ def __init__(self, driftmatrix, force, dispmatrix, diffmatrix): """ Parameters ---------- driftmatrix : ndarray (F) force : ndarray (u) dispmatrix : ndarray (L) diffmatrix : ndarray (Q) """ _check_initial_state_dimensions(driftmatrix, force, dispmatrix, diffmatrix) super().__init__( (lambda t, **kwargs: driftmatrix), (lambda t, **kwargs: force), (lambda t, **kwargs: dispmatrix), diffmatrix, ) self._driftmatrix = driftmatrix self._force = force self._dispmatrix = dispmatrix self._diffmatrix = diffmatrix @property def driftmatrix(self): return self._driftmatrix @property def force(self): return self._force @property def dispersionmatrix(self): return self._dispmatrix
[docs] def chapmankolmogorov(self, start, stop, step, randvar, **kwargs): """ Solves Chapman-Kolmogorov equation from start to stop via step. For LTISDEs, there is a closed form solutions to the ODE for mean and kernels (see super().chapmankolmogorov(...)). We exploit this for [(stop - start)/step] steps. References ---------- Eq. (8) in http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.390.380&rep=rep1&type=pdf and Eq. 6.41 and Eq. 6.42 in Applied SDEs. """ mean, cov = randvar.mean, randvar.cov if np.isscalar(mean) and np.isscalar(cov): mean, cov = mean * np.ones(1), cov * np.eye(1) increment = stop - start newmean = self._predict_mean(increment, mean, **kwargs) newcov, crosscov = self._predict_covar(increment, cov, **kwargs) return Normal(newmean, newcov), crosscov
def _predict_mean(self, h, mean, **kwargs): """ Predicts mean via closed-form solution to Chapman-Kolmogorov equation for Gauss-Markov processes according to Eq. (8) in http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.390.380&rep=rep1&type=pdf This function involves a lot of concatenation of matrices, hence readibility is hard to guarantee. If you know better how to make this readable, feedback is welcome! """ drift = self.driftmatrix force = self.force extended_state = np.hstack((mean, force)) firstrowblock = np.hstack((drift, np.eye(*drift.shape))) blockmat = np.hstack((firstrowblock.T, 0.0 * firstrowblock.T)).T proj = np.eye(*firstrowblock.shape) return proj @ scipy.linalg.expm(h * blockmat) @ extended_state def _predict_covar(self, increment, cov, **kwargs): """ Predicts kernels via closed-form solution to Chapman-Kolmogorov equation for Gauss-Markov processes according to Eq. 6.41 and Eq. 6.42 in Applied SDEs. This function involves a lot of concatenation of matrices, hence readibility is hard to guarantee. If you know better how to make this readable, feedback is welcome! """ drift = self.driftmatrix disp = self.dispersionmatrix diff = self.diffusionmatrix firstrowblock = np.hstack((drift, disp @ diff @ disp.T)) secondrowblock = np.hstack((0 * drift.T, -1.0 * drift.T)) blockmat = np.hstack((firstrowblock.T, secondrowblock.T)).T proj = np.eye(*firstrowblock.shape) initstate = np.flip(proj).T transformed_sol = scipy.linalg.expm(increment * blockmat) @ initstate trans = scipy.linalg.expm(increment * drift) transdiff = proj @ transformed_sol @ trans.T crosscov = cov @ trans.T newcov = trans @ crosscov + transdiff return newcov, crosscov
def _check_initial_state_dimensions(drift, force, disp, diff): """ Checks that the matrices all align and are of proper shape. If all the bugs are removed and the tests run, these asserts are turned into Exception-catchers. Parameters ---------- drift : np.ndarray, shape=(n, n) force : np.ndarray, shape=(n,) disp : np.ndarray, shape=(n, s) diff : np.ndarray, shape=(s, s) """ if drift.ndim != 2 or drift.shape[0] != drift.shape[1]: raise ValueError("driftmatrix not of shape (n, n)") if force.ndim != 1: raise ValueError("force not of shape (n,)") if force.shape[0] != drift.shape[1]: raise ValueError("force not of shape (n,)" "or driftmatrix not of shape (n, n)") if disp.ndim != 2: raise ValueError("dispersion not of shape (n, s)") if diff.ndim != 2 or diff.shape[0] != diff.shape[1]: raise ValueError("diffusion not of shape (s, s)") if disp.shape[1] != diff.shape[0]: raise ValueError( "dispersion not of shape (n, s)" "or diffusion not of shape (s, s)" )