Source code for probnum.filtsmooth.statespace.integrator

"""Integrated processes such as the integrated Wiener process or the Matern process.

This is the sucessor of the former ODEPrior.
"""
try:
    # cached_property is only available in Python >=3.8
    from functools import cached_property
except ImportError:
    from cached_property import cached_property

import numpy as np
import scipy.special

import probnum.random_variables as pnrv

from . import discrete_transition, sde
from .preconditioner import NordsieckLikeCoordinates


[docs]class Integrator: """An integrator is a special kind of SDE, where the :math:`i` th coordinate models the :math:`i` th derivative.""" def __init__(self, ordint, spatialdim): self.ordint = ordint self.spatialdim = spatialdim
[docs] def proj2coord(self, coord: int) -> np.ndarray: """Projection matrix to :math:`i` th coordinates. Computes the matrix .. math:: H_i = \\left[ I_d \\otimes e_i \\right] P^{-1}, where :math:`e_i` is the :math:`i` th unit vector, that projects to the :math:`i` th coordinate of a vector. If the ODE is multidimensional, it projects to **each** of the :math:`i` th coordinates of each ODE dimension. Parameters ---------- coord : int Coordinate index :math:`i` which to project to. Expected to be in range :math:`0 \\leq i \\leq q + 1`. Returns ------- np.ndarray, shape=(d, d*(q+1)) Projection matrix :math:`H_i`. """ projvec1d = np.eye(self.ordint + 1)[:, coord] projmat1d = projvec1d.reshape((1, self.ordint + 1)) projmat = np.kron(np.eye(self.spatialdim), projmat1d) return projmat
[docs]class IBM(Integrator, sde.LTISDE): """Integrated Brownian motion in :math:`d` dimensions.""" def __init__(self, ordint, spatialdim, diffconst): self.diffconst = diffconst # initialise BOTH superclasses' inits. # I don't like it either, but it does the job. Integrator.__init__(self, ordint=ordint, spatialdim=spatialdim) sde.LTISDE.__init__( self, driftmat=self._driftmat, forcevec=self._forcevec, dispmat=self._dispmat, ) self.precon = NordsieckLikeCoordinates.from_order(ordint, spatialdim) @property def _driftmat(self): driftmat_1d = np.diag(np.ones(self.ordint), 1) return np.kron(np.eye(self.spatialdim), driftmat_1d) @property def _forcevec(self): force_1d = np.zeros(self.ordint + 1) return np.kron(np.ones(self.spatialdim), force_1d) @property def _dispmat(self): dispmat_1d = np.zeros(self.ordint + 1) dispmat_1d[-1] = self.diffconst return np.kron(np.eye(self.spatialdim), dispmat_1d).T @cached_property def equivalent_discretisation_preconditioned(self): """Discretised IN THE PRECONDITIONED SPACE.""" empty_force = np.zeros(self.spatialdim * (self.ordint + 1)) return discrete_transition.DiscreteLTIGaussian( dynamicsmat=self._dynamicsmat, forcevec=empty_force, diffmat=self._diffmat, ) @cached_property def _dynamicsmat(self): # Loop, but cached anyway driftmat_1d = np.array( [ [ scipy.special.binom(self.ordint - i, self.ordint - j) for j in range(0, self.ordint + 1) ] for i in range(0, self.ordint + 1) ] ) return np.kron(np.eye(self.spatialdim), driftmat_1d) @cached_property def _diffmat(self): # Optimised with broadcasting range = np.arange(0, self.ordint + 1) denominators = 2.0 * self.ordint + 1.0 - range[:, None] - range[None, :] diffmat_1d = 1.0 / denominators return np.kron(np.eye(self.spatialdim), self.diffconst ** 2 * diffmat_1d)
[docs] def transition_rv(self, rv, start, stop, **kwargs): if not isinstance(rv, pnrv.Normal): errormsg = ( "Closed form transitions in LTI SDE models is only " "available for Gaussian initial conditions." ) raise TypeError(errormsg) step = stop - start rv = self.precon.inverse(step) @ rv rv, info = self.transition_rv_preconditioned(rv, start) info["crosscov"] = self.precon(step) @ info["crosscov"] @ self.precon(step).T return self.precon(step) @ rv, info
[docs] def transition_rv_preconditioned(self, rv, start, **kwargs): return self.equivalent_discretisation_preconditioned.transition_rv(rv, start)
[docs] def transition_realization(self, real, start, stop, **kwargs): if not isinstance(real, np.ndarray): raise TypeError(f"Numpy array expected, {type(real)} received.") step = stop - start real = self.precon.inverse(step) @ real real, info = self.transition_realization_preconditioned(real, start) info["crosscov"] = self.precon(step) @ info["crosscov"] @ self.precon(step).T return self.precon(step) @ real, info
[docs] def transition_realization_preconditioned(self, real, start, **kwargs): return self.equivalent_discretisation_preconditioned.transition_realization( real, start )
[docs] def discretise(self, step): """Equivalent discretisation of the process. Overwrites matrix-fraction decomposition in the super-class. Only present for user's convenience and to maintain a clean interface. Not used for transition_rv, etc.. """ dynamicsmat = ( self.precon(step)
[docs] @ self.equivalent_discretisation_preconditioned.dynamicsmat @ self.precon.inverse(step) ) diffmat = ( self.precon(step) @ self.equivalent_discretisation_preconditioned.diffmat @ self.precon(step).T ) zero_force = np.zeros(len(dynamicsmat)) return discrete_transition.DiscreteLTIGaussian( dynamicsmat=dynamicsmat, forcevec=zero_force, diffmat=diffmat )
class IOUP(Integrator, sde.LTISDE): """Integrated Ornstein-Uhlenbeck process in :math:`d` dimensions.""" def __init__( self, ordint: int, spatialdim: int, driftspeed: float, diffconst: float, ): # Other than previously in ProbNum, we do not use preconditioning for IOUP by default. self.driftspeed = driftspeed self.diffconst = diffconst Integrator.__init__(self, ordint=ordint, spatialdim=spatialdim) sde.LTISDE.__init__( self, driftmat=self._driftmat, forcevec=self._forcevec, dispmat=self._dispmat, ) @property def _driftmat(self): driftmat_1d = np.diag(np.ones(self.ordint), 1) driftmat_1d[-1, -1] = -self.driftspeed return np.kron(np.eye(self.spatialdim), driftmat_1d) @property def _forcevec(self): force_1d = np.zeros(self.ordint + 1) return np.kron(np.ones(self.spatialdim), force_1d) @property def _dispmat(self): dispmat_1d = np.zeros(self.ordint + 1) dispmat_1d[-1] = self.diffconst return np.kron(np.eye(self.spatialdim), dispmat_1d).T
[docs]class Matern(Integrator, sde.LTISDE): """Matern process in :math:`d` dimensions.""" def __init__( self, ordint: int, spatialdim: int, lengthscale: float, diffconst: float, ): # Other than previously in ProbNum, we do not use preconditioning for Matern by default. self.lengthscale = lengthscale self.diffconst = diffconst Integrator.__init__(self, ordint=ordint, spatialdim=spatialdim) sde.LTISDE.__init__( self, driftmat=self._driftmat, forcevec=self._forcevec, dispmat=self._dispmat, ) @property def _driftmat(self): driftmat = np.diag(np.ones(self.ordint), 1) nu = self.ordint + 0.5 D, lam = self.ordint + 1, np.sqrt(2 * nu) / self.lengthscale driftmat[-1, :] = np.array( [-scipy.special.binom(D, i) * lam ** (D - i) for i in range(D)] ) return np.kron(np.eye(self.spatialdim), driftmat) @property def _forcevec(self): force_1d = np.zeros(self.ordint + 1) return np.kron(np.ones(self.spatialdim), force_1d) @property def _dispmat(self): dispmat_1d = np.zeros(self.ordint + 1) dispmat_1d[-1] = self.diffconst return np.kron(np.eye(self.spatialdim), dispmat_1d).T