Source code for probnum.filtsmooth.statespace.transition

"""Markov transition rules: continuous and discrete."""

import abc
from typing import Dict, Optional

import numpy as np

from probnum.random_variables import RandomVariable

__all__ = ["Transition", "generate"]


[docs]class Transition(abc.ABC): """Markov transition rules in discrete or continuous time. In continuous time, this is a Markov process and described by a stochastic differential equation (SDE) .. math:: d x_t = f(t, x_t) d t + d w_t driven by a Wiener process :math:`w`. In discrete time, it is defined by a transformation .. math:: x_{t + \\Delta t} \\sim p(x_{t + \\Delta t} | x_t). Sometimes, these can be equivalent. For example: mild solutions to linear, time-invariant SDEs have an equivalent, discretised form that can be written as a transformation. See Also -------- :class:`ContinuousModel` Continuously indexed transitions (SDEs) :class:`DiscreteModel` Discretely indexed transitions (transformations) """ def __init__(self): self.precon = None
[docs] @abc.abstractmethod def transition_realization( self, real: np.ndarray, start: float, stop: Optional[float] = None, step: Optional[float] = None, linearise_at: Optional[RandomVariable] = None, ) -> (RandomVariable, Dict): """Transition a realization of a random variable from time :math:`t` to time :math:`t+\\Delta t`. For random variable :math:`x_t`, it returns the random variable defined by .. math:: x_{t + \\Delta t} \\sim p(x_{t + \\Delta t} | x_t = r) . This is different to :meth:`transition_rv` which computes the parametrization of :math:`x_{t + \\Delta t}` based on the parametrization of :math:`x_t`. Nb: Think of transition as a verb, i.e. this method "transitions" a realization of a random variable. Parameters ---------- real : Realization of the random variable. start : Starting point :math:`t`. stop : End point :math:`t + \\Delta t`. step : Intermediate step-size. Optional, default is None. linearise_at : For approximate transitions , for instance ContinuousEKFComponent, this argument overloads the state at which the Jacobian is computed. Returns ------- RandomVariable Random variable, describing the state at time :math:`t + \\Delta t` based on realization at time :math:`t`. dict Additional information in form of a dictionary, for instance the cross-covariance in the prediction step, access to which is useful in smoothing. See Also -------- :meth:`transition_rv` Apply transition to a random variable. """ raise NotImplementedError
[docs] def transition_realization_preconditioned( self, real: np.ndarray, start: float, stop: Optional[float] = None, step: Optional[float] = None, linearise_at: Optional[RandomVariable] = None, ) -> (RandomVariable, Dict): """Applies the transition, assuming that the state is already preconditioned. This is useful for numerically stable implementation of Kalman smoothing steps and Kalman updates. """ if self.precon is None: errormsg = ( "There is no preconditioned associated with this transition. " "Did you mean 'transition_realization'?" ) raise NotImplementedError(errormsg) raise NotImplementedError( "'transition_realization_preconditioned' is not implemented." )
[docs] @abc.abstractmethod def transition_rv( self, rv: "RandomVariable", start: float, stop: Optional[float] = None, step: Optional[float] = None, linearise_at: Optional[RandomVariable] = None, ) -> (RandomVariable, Dict): """Transition a random variable from time :math:`t` to time :math:`t+\\Delta t`. For random variable :math:`x_t`, it returns the random variable defined by .. math:: x_{t + \\Delta t} \\sim p(x_{t + \\Delta t} | x_t) . This returns a random variable where the parametrization depends on the paramtrization of :math:`x_t`. This is different to :meth:`transition_rv` which computes the parametrization of :math:`x_{t + \\Delta t}` based on a realization of :math:`x_t`. Nb: Think of transition as a verb, i.e. this method "transitions" a random variable. Parameters ---------- rv : Realization of the random variable. start : Starting point :math:`t`. stop : End point :math:`t + \\Delta t`. step : Intermediate step-size. Optional, default is None. linearise_at : For approximate transitions , for instance ContinuousEKFComponent, this argument overloads the state at which the Jacobian is computed. Returns ------- RandomVariable Random variable, describing the state at time :math:`t + \\Delta t` based on realization at time :math:`t`. dict Additional information in form of a dictionary, for instance the cross-covariance in the prediction step, access to which is useful in smoothing. See Also -------- :meth:`transition_realization` Apply transition to a realization of a random variable. """ raise NotImplementedError
[docs] def transition_rv_preconditioned( self, rv: "RandomVariable", start: float, stop: Optional[float] = None, step: Optional[float] = None, linearise_at: Optional[RandomVariable] = None, ) -> (RandomVariable, Dict): """Applies the transition, assuming that the state is already preconditioned. This is useful for numerically stable implementation of Kalman smoothing steps and Kalman updates. """ if self.precon is None: errormsg = ( "There is no preconditioned associated with this transition. " "Did you mean 'transition_rv'?" ) raise NotImplementedError(errormsg) raise NotImplementedError("'transition_rv_preconditioned' is not implemented.")
@property def dimension(self) -> int: """Dimension of the transition model. Not all transition models have a unique dimension. Some turn a state (x, y) into a scalar z and it is not clear whether the dimension should be 2 or 1. """ raise NotImplementedError
[docs]def generate(dynmod, measmod, initrv, times, num_steps=5): """Samples true states and observations at pre-determined timesteps "times" for a state space model. Parameters ---------- dynmod : statespace.Transition Transition model describing the prior dynamics. measmod : statespace.Transition Transition model describing the measurement model. initrv : probnum.RandomVariable object Random variable according to initial distribution times : np.ndarray, shape (n,) Timesteps on which the states are to be sampled. num_steps : int Number of steps to be taken for numerical integration of the continuous prior model. Optional. Default is 5. Irrelevant for time-invariant or discrete models. Returns ------- states : np.ndarray; shape (len(times), dynmod.dimension) True states according to dynamic model. obs : np.ndarray; shape (len(times)-1, measmod.dimension) Observations according to measurement model. """ states = np.zeros((len(times), _read_dimension(dynmod, initrv))) obs = np.zeros((len(times) - 1, _read_dimension(measmod, initrv))) states[0] = initrv.sample() for idx in range(1, len(times)): start, stop = times[idx - 1], times[idx] step = (stop - start) / num_steps next_state_rv, _ = dynmod.transition_realization( real=states[idx - 1], start=start, stop=stop, step=step ) states[idx] = next_state_rv.sample() next_obs_rv, _ = measmod.transition_realization(real=states[idx], start=stop) obs[idx - 1] = next_obs_rv.sample() return states, obs
def _read_dimension(transition, initrv): """Extracts dimension of a transition without calling .dimension(), which is not implemented everywhere.""" # relies on evaluating at zero, which is a dangerous endeavour and therefore, # this method is not used in Transition.dimension transitioned, _ = transition.transition_realization( real=initrv.mean, start=0.0, stop=1.0 ) return len(transitioned.sample())