Source code for probnum.filtsmooth.gaussfiltsmooth.gaussfiltsmooth

"""
Gaussian filtering.
"""
from abc import ABC, abstractmethod

import numpy as np

from probnum._randomvariablelist import _RandomVariableList
from probnum.filtsmooth.bayesfiltsmooth import BayesFiltSmooth
from probnum.filtsmooth.gaussfiltsmooth.kalmanposterior import KalmanPosterior
from probnum.random_variables import Normal


class GaussFiltSmooth(BayesFiltSmooth, ABC):
    """
    Interface for Gaussian filtering and smoothing.
    """

    def __init__(self, dynamod, measmod, initrv):
        """Check that the initial distribution is Gaussian."""
        if not issubclass(type(initrv), Normal):
            raise ValueError(
                "Gaussian filters/smoothers need initial "
                "random variables with Normal distribution."
            )
        super().__init__(dynamod, measmod, initrv)

[docs] def filtsmooth(self, dataset, times, **kwargs): """ Apply Gaussian filtering and smoothing to a data set. Parameters ---------- dataset : array_like, shape (N, M) Data set that is filtered. times : array_like, shape (N,) Temporal locations of the data points. kwargs : ??? Returns ------- KalmanPosterior Posterior distribution of the smoothed output """ dataset, times = np.asarray(dataset), np.asarray(times) filter_posterior = self.filter(dataset, times, **kwargs) smooth_posterior = self.smooth(filter_posterior, **kwargs) return smooth_posterior
[docs] def filter(self, dataset, times, **kwargs): """ Apply Gaussian filtering (no smoothing!) to a data set. Parameters ---------- dataset : array_like, shape (N, M) Data set that is filtered. times : array_like, shape (N,) Temporal locations of the data points. kwargs : ??? Returns ------- KalmanPosterior Posterior distribution of the filtered output """ dataset, times = np.asarray(dataset), np.asarray(times) filtrv = self.initialrandomvariable rvs = [filtrv] for idx in range(1, len(times)): filtrv = self.filter_step( start=times[idx - 1], stop=times[idx], randvar=filtrv, data=dataset[idx - 1], ) rvs.append(filtrv) return KalmanPosterior(times, rvs, self, with_smoothing=False)
[docs] def smooth(self, filter_posterior, **kwargs): """ Apply Gaussian smoothing to a set of filtered means and covariances. Parameters ---------- filter_posterior : KalmanPosterior Posterior distribution obtained after filtering Returns ------- KalmanPosterior Posterior distribution of the smoothed output """ rv_list = self.smooth_list( filter_posterior, filter_posterior.locations, **kwargs ) return KalmanPosterior( filter_posterior.locations, rv_list, self, with_smoothing=True )
[docs] def smooth_list(self, rv_list, locations, final_rv=None, **kwargs): """ Apply smoothing to a list of RVs with desired final random variable. Specification of a final RV is useful to compute joint samples from a KalmanPosterior object, because in this case, the final RV is a Dirac (over a sample from the final Normal RV) and not a Normal RV. Parameters ---------- rv_list : _RandomVariableList or array_like List of random variables to be smoothed. locations : array_like Locations of the random variables in rv_list. final_rv : RandomVariable, optional. RandomVariable at the final point. Default is None, in which case standard smoothing is applied. If a random variable is specified, the smoothing iteration is based on this one, which is used for sampling (in which case the final random variable is a Dirac that represents a sample) Returns ------- _RandomVariableList List of smoothed random variables. """ if final_rv is None: final_rv = rv_list[-1] curr_rv = final_rv out_rvs = [curr_rv] for idx in reversed(range(1, len(locations))): unsmoothed_rv = rv_list[idx - 1] pred_rv, info = self.predict( start=locations[idx - 1], stop=locations[idx], randvar=unsmoothed_rv, **kwargs ) if "crosscov" not in info.keys(): raise TypeError("Cross-covariance required for smoothing.") curr_rv = self.smooth_step( unsmoothed_rv, pred_rv, curr_rv, info["crosscov"] ) out_rvs.append(curr_rv) out_rvs.reverse() return _RandomVariableList(out_rvs)
[docs] def filter_step(self, start, stop, randvar, data, **kwargs): """ A single filter step. Consists of a prediction step (t -> t+1) and an update step (at t+1). Parameters ---------- start : float Predict FROM this time point. stop : float Predict TO this time point. randvar : RandomVariable Predict based on this random variable. For instance, this can be the result of a previous call to filter_step. data : array_like Compute the update based on this data. Returns ------- RandomVariable Resulting filter estimate after the single step. """ data = np.asarray(data) predrv, _ = self.predict(start, stop, randvar) filtrv, _, _, _ = self.update(stop, predrv, data) return filtrv
[docs] def smooth_step(self, unsmoothed_rv, pred_rv, smoothed_rv, crosscov): """ A single smoother step. Consists of predicting from the filtering distribution at time t to time t+1 and then updating based on the discrepancy to the smoothing solution at time t+1. Parameters ---------- unsmoothed_rv : RandomVariable Filtering distribution at time t. pred_rv : RandomVariable Prediction at time t+1 of the filtering distribution at time t. smoothed_rv : RandomVariable Smoothing distribution at time t+1. crosscov : array_like Cross-covariance between unsmoothed_rv and pred_rv as returned by predict(). """ crosscov = np.asarray(crosscov) initmean, initcov = unsmoothed_rv.mean, unsmoothed_rv.cov predmean, predcov = pred_rv.mean, pred_rv.cov currmean, currcov = smoothed_rv.mean, smoothed_rv.cov if np.isscalar(predmean) and np.isscalar(predcov): predmean = predmean * np.ones(1) predcov = predcov * np.eye(1) newmean = initmean + crosscov @ np.linalg.solve(predcov, currmean - predmean) firstsolve = crosscov @ np.linalg.solve(predcov, currcov - predcov) secondsolve = crosscov @ np.linalg.solve(predcov, firstsolve.T) newcov = initcov + secondsolve.T return Normal(newmean, newcov)
[docs] @abstractmethod def predict(self, start, stop, randvar, **kwargs): raise NotImplementedError
[docs] @abstractmethod def update(self, time, randvar, data, **kwargs): raise NotImplementedError
def linear_discrete_update(meanest, cpred, data, meascov, measmat, mpred): """Kalman update, potentially after linearization.""" covest = measmat @ cpred @ measmat.T + meascov ccest = cpred @ measmat.T mean = mpred + ccest @ np.linalg.solve(covest, data - meanest) cov = cpred - ccest @ np.linalg.solve(covest.T, ccest.T) return Normal(mean, cov), covest, ccest, meanest