"""Integrated processes such as the integrated Wiener process or the Matern process.
This is the sucessor of the former ODEPrior.
"""
try:
# cached_property is only available in Python >=3.8
from functools import cached_property
except ImportError:
from cached_property import cached_property
import numpy as np
import scipy.special
import probnum.type as pntype
from probnum import randvars
from . import discrete_transition, sde
from .preconditioner import NordsieckLikeCoordinates
[docs]class Integrator:
"""An integrator is a special kind of SDE, where the :math:`i` th coordinate models
the :math:`i` th derivative."""
def __init__(self, ordint, spatialdim):
self.ordint = ordint
self.spatialdim = spatialdim
self.precon = NordsieckLikeCoordinates.from_order(self.ordint, self.spatialdim)
[docs] def proj2coord(self, coord: int) -> np.ndarray:
"""Projection matrix to :math:`i` th coordinates.
Computes the matrix
.. math:: H_i = \\left[ I_d \\otimes e_i \\right] P^{-1},
where :math:`e_i` is the :math:`i` th unit vector,
that projects to the :math:`i` th coordinate of a vector.
If the ODE is multidimensional, it projects to **each** of the
:math:`i` th coordinates of each ODE dimension.
Parameters
----------
coord : int
Coordinate index :math:`i` which to project to.
Expected to be in range :math:`0 \\leq i \\leq q + 1`.
Returns
-------
np.ndarray, shape=(d, d*(q+1))
Projection matrix :math:`H_i`.
"""
projvec1d = np.eye(self.ordint + 1)[:, coord]
projmat1d = projvec1d.reshape((1, self.ordint + 1))
projmat = np.kron(np.eye(self.spatialdim), projmat1d)
return projmat
@property
def _derivwise2coordwise_projmat(self) -> np.ndarray:
r"""Projection matrix to change the ordering of the state representation in an :class:`Integrator` from coordinate-wise to derivative-wise representation.
A coordinate-wise ordering is
.. math:: (y_1, \dot y_1, \ddot y_1, y_2, \dot y_2, ..., y_d^{(\nu)})
and a derivative-wise ordering is
.. math:: (y_1, y_2, ..., y_d, \dot y_1, \dot y_2, ..., \dot y_d, \ddot y_1, ..., y_d^{(\nu)}).
Default representation in an :class:`Integrator` is coordinate-wise ordering, but sometimes, derivative-wise ordering is more convenient.
See Also
--------
:attr:`Integrator._convert_coordwise_to_derivwise`
:attr:`Integrator._convert_derivwise_to_coordwise`
"""
dim = (self.ordint + 1) * self.spatialdim
projmat = np.zeros((dim, dim))
E = np.eye(dim)
for q in range(self.ordint + 1):
projmat[q :: (self.ordint + 1)] = E[
q * self.spatialdim : (q + 1) * self.spatialdim
]
return projmat
@property
def _coordwise2derivwise_projmat(self) -> np.ndarray:
r"""Projection matrix to change the ordering of the state representation in an :class:`Integrator` from derivative-wise to coordinate-wise representation.
A coordinate-wise ordering is
.. math:: (y_1, \dot y_1, \ddot y_1, y_2, \dot y_2, ..., y_d^{(\nu)})
and a derivative-wise ordering is
.. math:: (y_1, y_2, ..., y_d, \dot y_1, \dot y_2, ..., \dot y_d, \ddot y_1, ..., y_d^{(\nu)}).
Default representation in an :class:`Integrator` is coordinate-wise ordering, but sometimes, derivative-wise ordering is more convenient.
See Also
--------
:attr:`Integrator._convert_coordwise_to_derivwise`
:attr:`Integrator._convert_derivwise_to_coordwise`
"""
return self._derivwise2coordwise_projmat.T
@staticmethod
def _convert_coordwise_to_derivwise(
state: np.ndarray, ordint: pntype.IntArgType, spatialdim: pntype.IntArgType
) -> np.ndarray:
"""Convert coordinate-wise representation to derivative-wise representation.
Lightweight call to the respective property in :class:`Integrator`.
Parameters
----------
state:
State to be converted. Assumed to be in coordinate-wise representation.
ordint:
Order of the integrator-state. Usually, this is the order of the highest derivative in the state.
spatialdim:
Spatial dimension of the integrator. Usually, this is the number of states associated with each derivative.
See Also
--------
:attr:`Integrator._coordwise2derivwise_projmat`
:attr:`Integrator._derivwise2coordwise_projmat`
"""
projmat = Integrator(ordint, spatialdim)._coordwise2derivwise_projmat
return projmat @ state
@staticmethod
def _convert_derivwise_to_coordwise(
state: np.ndarray, ordint: pntype.IntArgType, spatialdim: pntype.IntArgType
) -> np.ndarray:
"""Convert coordinate-wise representation to derivative-wise representation.
Lightweight call to the respective property in :class:`Integrator`.
Parameters
----------
state:
State to be converted. Assumed to be in derivative-wise representation.
ordint:
Order of the integrator-state. Usually, this is the order of the highest derivative in the state.
spatialdim:
Spatial dimension of the integrator. Usually, this is the number of states associated with each derivative.
See Also
--------
:attr:`Integrator._coordwise2derivwise_projmat`
:attr:`Integrator._derivwise2coordwise_projmat`
"""
projmat = Integrator(ordint, spatialdim)._derivwise2coordwise_projmat
return projmat @ state
[docs]class IBM(Integrator, sde.LTISDE):
"""Integrated Brownian motion in :math:`d` dimensions."""
def __init__(
self,
ordint,
spatialdim,
forward_implementation="classic",
backward_implementation="classic",
):
# initialise BOTH superclasses' inits.
# I don't like it either, but it does the job.
Integrator.__init__(self, ordint=ordint, spatialdim=spatialdim)
sde.LTISDE.__init__(
self,
driftmat=self._driftmat,
forcevec=self._forcevec,
dispmat=self._dispmat,
forward_implementation=forward_implementation,
backward_implementation=backward_implementation,
)
@cached_property
def _driftmat(self):
driftmat_1d = np.diag(np.ones(self.ordint), 1)
return np.kron(np.eye(self.spatialdim), driftmat_1d)
@cached_property
def _forcevec(self):
force_1d = np.zeros(self.ordint + 1)
return np.kron(np.ones(self.spatialdim), force_1d)
@cached_property
def _dispmat(self):
dispmat_1d = np.zeros(self.ordint + 1)
dispmat_1d[-1] = 1.0 # Unit diffusion
return np.kron(np.eye(self.spatialdim), dispmat_1d).T
@cached_property
def equivalent_discretisation_preconditioned(self):
"""Discretised IN THE PRECONDITIONED SPACE."""
empty_shift = np.zeros(self.spatialdim * (self.ordint + 1))
return discrete_transition.DiscreteLTIGaussian(
state_trans_mat=self._state_trans_mat,
shift_vec=empty_shift,
proc_noise_cov_mat=self._proc_noise_cov_mat,
proc_noise_cov_cholesky=np.linalg.cholesky(self._proc_noise_cov_mat),
forward_implementation=self.forward_implementation,
backward_implementation=self.backward_implementation,
)
@cached_property
def _state_trans_mat(self):
# Loop, but cached anyway
driftmat_1d = np.array(
[
[
scipy.special.binom(self.ordint - i, self.ordint - j)
for j in range(0, self.ordint + 1)
]
for i in range(0, self.ordint + 1)
]
)
return np.kron(np.eye(self.spatialdim), driftmat_1d)
@cached_property
def _proc_noise_cov_mat(self):
# Optimised with broadcasting
range = np.arange(0, self.ordint + 1)
denominators = 2.0 * self.ordint + 1.0 - range[:, None] - range[None, :]
proc_noise_cov_mat_1d = 1.0 / denominators
return np.kron(np.eye(self.spatialdim), proc_noise_cov_mat_1d)
[docs] def forward_rv(
self,
rv,
t,
dt=None,
compute_gain=False,
_diffusion=1.0,
**kwargs,
):
rv = _apply_precon(self.precon.inverse(dt), rv)
rv, info = self.equivalent_discretisation_preconditioned.forward_rv(
rv, t, compute_gain=compute_gain, _diffusion=_diffusion
)
info["crosscov"] = self.precon(dt) @ info["crosscov"] @ self.precon(dt).T
if "gain" in info:
info["gain"] = self.precon(dt) @ info["gain"] @ self.precon.inverse(dt).T
return _apply_precon(self.precon(dt), rv), info
[docs] def backward_rv(
self,
rv_obtained,
rv,
rv_forwarded=None,
gain=None,
t=None,
dt=None,
_diffusion=1.0,
**kwargs,
):
rv_obtained = _apply_precon(self.precon.inverse(dt), rv_obtained)
rv = _apply_precon(self.precon.inverse(dt), rv)
rv_forwarded = (
_apply_precon(self.precon.inverse(dt), rv_forwarded)
if rv_forwarded is not None
else None
)
gain = (
self.precon.inverse(dt) @ gain @ self.precon.inverse(dt).T
if gain is not None
else None
)
rv, info = self.equivalent_discretisation_preconditioned.backward_rv(
rv_obtained=rv_obtained,
rv=rv,
rv_forwarded=rv_forwarded,
gain=gain,
t=t,
_diffusion=_diffusion,
)
# assert info is empty. Otherwise, we need to change
# things in info in which case we want to be warned.
assert not info
return _apply_precon(self.precon(dt), rv), info
[docs] def discretise(self, dt):
"""Equivalent discretisation of the process.
Overwrites matrix-fraction decomposition in the super-class.
Only present for user's convenience and to maintain a clean
interface. Not used for forward_rv, etc..
"""
state_trans_mat = (
self.precon(dt)
[docs] @ self.equivalent_discretisation_preconditioned.state_trans_mat
@ self.precon.inverse(dt)
)
proc_noise_cov_mat = (
self.precon(dt)
@ self.equivalent_discretisation_preconditioned.proc_noise_cov_mat
@ self.precon(dt).T
)
zero_shift = np.zeros(len(state_trans_mat))
return discrete_transition.DiscreteLTIGaussian(
state_trans_mat=state_trans_mat,
shift_vec=zero_shift,
proc_noise_cov_mat=proc_noise_cov_mat,
forward_implementation=self.forward_implementation,
backward_implementation=self.forward_implementation,
)
class IOUP(Integrator, sde.LTISDE):
"""Integrated Ornstein-Uhlenbeck process in :math:`d` dimensions."""
def __init__(
self,
ordint: int,
spatialdim: int,
driftspeed: float,
forward_implementation="classic",
backward_implementation="classic",
):
self.driftspeed = driftspeed
Integrator.__init__(self, ordint=ordint, spatialdim=spatialdim)
sde.LTISDE.__init__(
self,
driftmat=self._driftmat,
forcevec=self._forcevec,
dispmat=self._dispmat,
forward_implementation=forward_implementation,
backward_implementation=backward_implementation,
)
@cached_property
def _driftmat(self):
driftmat_1d = np.diag(np.ones(self.ordint), 1)
driftmat_1d[-1, -1] = -self.driftspeed
return np.kron(np.eye(self.spatialdim), driftmat_1d)
@cached_property
def _forcevec(self):
force_1d = np.zeros(self.ordint + 1)
return np.kron(np.ones(self.spatialdim), force_1d)
@cached_property
def _dispmat(self):
dispmat_1d = np.zeros(self.ordint + 1)
dispmat_1d[-1] = 1.0 # Unit Diffusion
return np.kron(np.eye(self.spatialdim), dispmat_1d).T
[docs] def forward_rv(
self,
rv,
t,
dt=None,
compute_gain=False,
_diffusion=1.0,
**kwargs,
):
# Fetch things into preconditioned space
rv = _apply_precon(self.precon.inverse(dt), rv)
# Apply preconditioning to system matrices
self.driftmat = self.precon.inverse(dt) @ self.driftmat @ self.precon(dt)
self.forcevec = self.precon.inverse(dt) @ self.forcevec
self.dispmat = self.precon.inverse(dt) @ self.dispmat
# Discretise and propagate
discretised_model = self.discretise(dt=dt)
rv, info = discretised_model.forward_rv(
rv, t, compute_gain=compute_gain, _diffusion=_diffusion
)
# Undo preconditioning and return
rv = _apply_precon(self.precon(dt), rv)
info["crosscov"] = self.precon(dt) @ info["crosscov"] @ self.precon(dt).T
if "gain" in info:
info["gain"] = self.precon(dt) @ info["gain"] @ self.precon.inverse(dt).T
self.driftmat = self.precon(dt) @ self.driftmat @ self.precon.inverse(dt)
self.forcevec = self.precon(dt) @ self.forcevec
self.dispmat = self.precon(dt) @ self.dispmat
return rv, info
[docs] def backward_rv(
self,
rv_obtained,
rv,
rv_forwarded=None,
gain=None,
t=None,
dt=None,
_diffusion=1.0,
**kwargs,
):
# Fetch things into preconditioned space
rv_obtained = _apply_precon(self.precon.inverse(dt), rv_obtained)
rv = _apply_precon(self.precon.inverse(dt), rv)
rv_forwarded = (
_apply_precon(self.precon.inverse(dt), rv_forwarded)
if rv_forwarded is not None
else None
)
gain = (
self.precon.inverse(dt) @ gain @ self.precon.inverse(dt).T
if gain is not None
else None
)
# Apply preconditioning to system matrices
self.driftmat = self.precon.inverse(dt) @ self.driftmat @ self.precon(dt)
self.forcevec = self.precon.inverse(dt) @ self.forcevec
self.dispmat = self.precon.inverse(dt) @ self.dispmat
# Discretise and propagate
discretised_model = self.discretise(dt=dt)
rv, info = discretised_model.backward_rv(
rv_obtained=rv_obtained,
rv=rv,
rv_forwarded=rv_forwarded,
gain=gain,
t=t,
_diffusion=_diffusion,
)
# assert info is empty. Otherwise, we need to change
# things in info in which case we want to be warned.
assert not info
# Undo preconditioning and return
rv = _apply_precon(self.precon(dt), rv)
self.driftmat = self.precon(dt) @ self.driftmat @ self.precon.inverse(dt)
self.forcevec = self.precon(dt) @ self.forcevec
self.dispmat = self.precon(dt) @ self.dispmat
return rv, info
[docs]class Matern(Integrator, sde.LTISDE):
"""Matern process in :math:`d` dimensions."""
def __init__(
self,
ordint: int,
spatialdim: int,
lengthscale: float,
forward_implementation="classic",
backward_implementation="classic",
):
self.lengthscale = lengthscale
Integrator.__init__(self, ordint=ordint, spatialdim=spatialdim)
sde.LTISDE.__init__(
self,
driftmat=self._driftmat,
forcevec=self._forcevec,
dispmat=self._dispmat,
forward_implementation=forward_implementation,
backward_implementation=backward_implementation,
)
@property
def _driftmat(self):
driftmat = np.diag(np.ones(self.ordint), 1)
nu = self.ordint + 0.5
D, lam = self.ordint + 1, np.sqrt(2 * nu) / self.lengthscale
driftmat[-1, :] = np.array(
[-scipy.special.binom(D, i) * lam ** (D - i) for i in range(D)]
)
return np.kron(np.eye(self.spatialdim), driftmat)
@property
def _forcevec(self):
force_1d = np.zeros(self.ordint + 1)
return np.kron(np.ones(self.spatialdim), force_1d)
@property
def _dispmat(self):
dispmat_1d = np.zeros(self.ordint + 1)
dispmat_1d[-1] = 1.0 # Unit diffusion
return np.kron(np.eye(self.spatialdim), dispmat_1d).T
[docs] def forward_rv(
self,
rv,
t,
dt=None,
compute_gain=False,
_diffusion=1.0,
**kwargs,
):
# Fetch things into preconditioned space
rv = _apply_precon(self.precon.inverse(dt), rv)
# Apply preconditioning to system matrices
self.driftmat = self.precon.inverse(dt) @ self.driftmat @ self.precon(dt)
self.forcevec = self.precon.inverse(dt) @ self.forcevec
self.dispmat = self.precon.inverse(dt) @ self.dispmat
# Discretise and propagate
discretised_model = self.discretise(dt=dt)
rv, info = discretised_model.forward_rv(
rv, t, compute_gain=compute_gain, _diffusion=_diffusion
)
# Undo preconditioning and return
rv = _apply_precon(self.precon(dt), rv)
info["crosscov"] = self.precon(dt) @ info["crosscov"] @ self.precon(dt).T
if "gain" in info:
info["gain"] = self.precon(dt) @ info["gain"] @ self.precon.inverse(dt).T
self.driftmat = self.precon(dt) @ self.driftmat @ self.precon.inverse(dt)
self.forcevec = self.precon(dt) @ self.forcevec
self.dispmat = self.precon(dt) @ self.dispmat
return rv, info
[docs] def backward_rv(
self,
rv_obtained,
rv,
rv_forwarded=None,
gain=None,
t=None,
dt=None,
_diffusion=1.0,
**kwargs,
):
# Fetch things into preconditioned space
rv_obtained = _apply_precon(self.precon.inverse(dt), rv_obtained)
rv = _apply_precon(self.precon.inverse(dt), rv)
rv_forwarded = (
_apply_precon(self.precon.inverse(dt), rv_forwarded)
if rv_forwarded is not None
else None
)
gain = (
self.precon.inverse(dt) @ gain @ self.precon.inverse(dt).T
if gain is not None
else None
)
# Apply preconditioning to system matrices
self.driftmat = self.precon.inverse(dt) @ self.driftmat @ self.precon(dt)
self.forcevec = self.precon.inverse(dt) @ self.forcevec
self.dispmat = self.precon.inverse(dt) @ self.dispmat
# Discretise and propagate
discretised_model = self.discretise(dt=dt)
rv, info = discretised_model.backward_rv(
rv_obtained=rv_obtained,
rv=rv,
rv_forwarded=rv_forwarded,
gain=gain,
t=t,
_diffusion=_diffusion,
)
# assert info is empty. Otherwise, we need to change
# things in info in which case we want to be warned.
assert not info
# Undo preconditioning and return
rv = _apply_precon(self.precon(dt), rv)
self.driftmat = self.precon(dt) @ self.driftmat @ self.precon.inverse(dt)
self.forcevec = self.precon(dt) @ self.forcevec
self.dispmat = self.precon(dt) @ self.dispmat
return rv, info
def _apply_precon(precon, rv):
# There is no way of checking whether `rv` has its Cholesky factor computed already or not.
# Therefore, since we need to update the Cholesky factor for square-root filtering,
# we also update the Cholesky factor for non-square-root algorithms here,
# which implies additional cost.
# See Issues #319 and #329.
# When they are resolved, this function here will hopefully be superfluous.
new_mean = precon @ rv.mean
new_cov_cholesky = precon @ rv.cov_cholesky # precon is diagonal, so this is valid
new_cov = new_cov_cholesky @ new_cov_cholesky.T
return randvars.Normal(new_mean, new_cov, cov_cholesky=new_cov_cholesky)