Source code for probnum.filtsmooth._timeseriesposterior

"""Abstract Base Class for posteriors over states after applying filtering/smoothing."""

from __future__ import annotations

import abc
from typing import Iterable, Optional, Union

import numpy as np

from probnum import randvars
from probnum.typing import ArrayIndicesLike, ArrayLike, FloatLike, IntLike, ShapeLike

DenseOutputValueType = Union[randvars.RandomVariable, randvars._RandomVariableList]
"""Output type of interpolation.

Dense evaluation of a TimeSeriesPosterior returns a RandomVariable
if evaluated at a single location,
and a _RandomVariableList if evaluated at an array of locations.

class TimeSeriesPosterior(abc.ABC):
    """Posterior Distribution over States after time-series algorithms such as
    filtering/smoothing or solving ODEs.

    locations :
        Locations of the posterior states (represented as random variables).
    states :
        Posterior random variables.

    def __init__(
        locations: Optional[Iterable[FloatLike]] = None,
        states: Optional[Iterable[randvars.RandomVariable]] = None,
    ) -> None:
        self._locations = list(locations) if locations is not None else []
        self._states = list(states) if states is not None else []
        self._frozen = False

    def _check_location(self, location: FloatLike) -> FloatLike:
        if len(self._locations) > 0 and location <= self._locations[-1]:
            _err_msg = "Locations have to be strictly ascending. "
            _err_msg += f"Received {location} <= {self._locations[-1]}."
            raise ValueError(_err_msg)
        return location

[docs] def append( self, location: FloatLike, state: randvars.RandomVariable, ) -> None: """Append a state to the posterior.""" if self.frozen: raise ValueError("Cannot append to frozen TimeSeriesPosterior object.") self._locations.append(self._check_location(location)) self._states.append(state)
[docs] def freeze(self) -> None: """Freeze the posterior.""" self._frozen = True
@property def frozen(self): """Whether the posterior is frozen.""" return self._frozen @property def locations(self): """Locations of the states of the posterior.""" return np.asarray(self._locations) @property def states(self): """States of the posterior.""" return randvars._RandomVariableList(self._states) def __len__(self) -> int: """Length of the discrete-time solution. Corresponds to the number of filtering/smoothing steps. """ return len(self.locations) def __getitem__(self, idx: ArrayIndicesLike) -> randvars.RandomVariable: return self.states[idx]
[docs] def __call__(self, t: ArrayLike) -> DenseOutputValueType: """Evaluate the time-continuous posterior at location `t` Algorithm: 1. Find closest t_prev and t_next, with t_prev < t < t_next 2. Predict from t_prev to t 3. (if `self._with_smoothing=True`) Predict from t to t_next 4. (if `self._with_smoothing=True`) Smooth from t_next to t 5. Return random variable for time t Parameters ---------- t : Location, or time, at which to evaluate the posterior. Raises ------ ValueError If time-points are not strictly increasing. Returns ------- randvars.RandomVariable or randvars._RandomVariableList Estimate of the states at time ``t``. """ # The variable "squeeze_eventually" indicates whether # the dimension of the time-array has been promoted # (which is there for a well-behaved loop). # If this has been the case, the final result needs to be # reshaped ("squeezed") accordingly. if np.isscalar(t): t = np.atleast_1d(t) t_has_been_promoted = True else: t_has_been_promoted = False if not np.all(np.diff(t) >= 0.0): raise ValueError("Time-points have to be sorted.") # Split left-extrapolation, interpolation, right_extrapolation t0, tmax = np.amin(self.locations), np.amax(self.locations) t_extra_left = t[t < t0] t_extra_right = t[t > tmax] t_inter = t[(t0 <= t) & (t <= tmax)] # Indices of t where they would be inserted # into self.locations # ("left": right-closest states -- this is the default in searchsorted) indices = np.searchsorted(self.locations, t_inter, side="left") interpolated_values = [ self.interpolate( t=ti, previous_index=previdx, next_index=nextidx, ) for ti, previdx, nextidx in zip( t_inter, indices - 1, indices, ) ] extrapolated_values_left = [ self.interpolate(t=ti, previous_index=None, next_index=0) for ti in t_extra_left ] extrapolated_values_right = [ self.interpolate(t=ti, previous_index=-1, next_index=None) for ti in t_extra_right ] dense_output_values = extrapolated_values_left dense_output_values.extend(interpolated_values) dense_output_values.extend(extrapolated_values_right) if t_has_been_promoted: return dense_output_values[0] return randvars._RandomVariableList(dense_output_values)
[docs] @abc.abstractmethod def interpolate( self, t: FloatLike, previous_index: Optional[IntLike] = None, next_index: Optional[IntLike] = None, ) -> randvars.RandomVariable: """Evaluate the posterior at a measurement-free point. Returns ------- randvars.RandomVariable or randvars._RandomVariableList Dense evaluation. """ raise NotImplementedError
[docs] @abc.abstractmethod def sample( self, rng: np.random.Generator, t: Optional[ArrayLike] = None, size: Optional[ShapeLike] = (), ) -> np.ndarray: """Draw samples from the filtering/smoothing posterior. If nothing is specified, a single sample is drawn (supported on self.locations). If locations are specified, a single sample is drawn on those locations. If size is specified, more than a single sample is drawn. Internally, samples from a base measure are drawn and transformed via self.transform_base_measure_realizations. Parameters ---------- rng Random number generator. t Locations on which the samples are wanted. Default is none, which implies that self.location is used. size Indicates how many samples are drawn. Default is an empty tuple, in which case a single sample is returned. Returns ------- np.ndarray Drawn samples. If size has shape (A1, ..., Z1), locations have shape (L,), and the state space model has shape (A2, ..., Z2), the output has shape (A1, ..., Z1, L, A2, ..., Z2). For example: size=4, len(locations)=4, dim=3 gives shape (4, 4, 3). """ raise NotImplementedError("Sampling is not implemented.")
[docs] @abc.abstractmethod def transform_base_measure_realizations( self, base_measure_realizations: np.ndarray, t: Optional[ArrayLike], ) -> np.ndarray: """Transform base-measure-realizations into posteriors samples. Parameters ---------- base_measure_realizations Base measure realizations. t Locations on which the transformed realizations shall represent realizations from the posterior. Returns ------- np.ndarray Transformed realizations. """ raise NotImplementedError( "Transforming base measure realizations is not implemented." )