"""Gaussian filtering and smoothing."""
import numpy as np
from probnum._randomvariablelist import _RandomVariableList
from probnum.filtsmooth.bayesfiltsmooth import BayesFiltSmooth
from probnum.filtsmooth.gaussfiltsmooth.kalmanposterior import KalmanPosterior
from probnum.random_variables import Normal
[docs]class Kalman(BayesFiltSmooth):
"""Gaussian filtering and smoothing, i.e. Kalman-like filters and smoothers."""
def __init__(self, dynamics_model, measurement_model, initrv):
"""Check that the initial distribution is Gaussian."""
if not issubclass(type(initrv), Normal):
raise ValueError(
"Gaussian filters/smoothers need initial "
"random variables with Normal distribution."
)
super().__init__(dynamics_model, measurement_model, initrv)
[docs] def filtsmooth(self, dataset, times, intermediate_step=None):
"""Apply Gaussian filtering and smoothing to a data set.
Parameters
----------
dataset : array_like, shape (N, M)
Data set that is filtered.
times : array_like, shape (N,)
Temporal locations of the data points.
Returns
-------
KalmanPosterior
Posterior distribution of the smoothed output
"""
dataset, times = np.asarray(dataset), np.asarray(times)
filter_posterior = self.filter(
dataset, times, intermediate_step=intermediate_step
)
smooth_posterior = self.smooth(
filter_posterior, intermediate_step=intermediate_step
)
return smooth_posterior
[docs] def filter(self, dataset, times, intermediate_step=None, linearise_at=None):
"""Apply Gaussian filtering (no smoothing!) to a data set.
Parameters
----------
dataset : array_like, shape (N, M)
Data set that is filtered.
times : array_like, shape (N,)
Temporal locations of the data points.
Returns
-------
KalmanPosterior
Posterior distribution of the filtered output
"""
# linearise_at is not used here, only in IteratedKalman.filter_step
# which is overwritten by IteratedKalman
dataset, times = np.asarray(dataset), np.asarray(times)
filtrv = self.initrv
rvs = [filtrv]
for idx in range(1, len(times)):
filtrv, _ = self.filter_step(
start=times[idx - 1],
stop=times[idx],
current_rv=filtrv,
data=dataset[idx - 1],
intermediate_step=intermediate_step,
)
rvs.append(filtrv)
return KalmanPosterior(times, rvs, self, with_smoothing=False)
[docs] def filter_step(self, start, stop, current_rv, data, intermediate_step=None):
"""A single filter step.
Consists of a prediction step (t -> t+1) and an update step (at t+1).
Parameters
----------
start : float
Predict FROM this time point.
stop : float
Predict TO this time point.
current_rv : RandomVariable
Predict based on this random variable. For instance, this can be the result
of a previous call to filter_step.
data : array_like
Compute the update based on this data.
Returns
-------
RandomVariable
Resulting filter estimate after the single step.
dict
Additional information provided by predict() and update().
Contains keys `pred_rv`, `info_pred`, `meas_rv`, `info_upd`.
"""
data = np.asarray(data)
info = {}
info["pred_rv"], info["info_pred"] = self.predict(
start, stop, current_rv, intermediate_step=intermediate_step
)
filtrv, info["meas_rv"], info["info_upd"] = self.update(
stop, info["pred_rv"], data
)
return filtrv, info
[docs] def predict(self, start, stop, randvar, intermediate_step=None):
return self.dynamics_model.transition_rv(
randvar,
start,
stop=stop,
step=intermediate_step,
)
[docs] def measure(self, time, randvar):
"""Propagate the state through the measurement model.
Parameters
----------
time : float
Time of the measurement.
randvar : Normal
Random variable to be propagated through the measurement model.
Returns
-------
meas_rv : Normal
Measured random variable, as returned by the measurement model.
info : dict
Additional info. Contains at leas the key `crosscov` which is the cross
covariance between the input random variable and the measured random
variable.
"""
return self.measurement_model.transition_rv(randvar, time)
[docs] def condition_state_on_measurement(self, randvar, meas_rv, data, crosscov):
"""Condition the state on the observed data.
Parameters
----------
randvar : Normal
Random variable to be updated with the measurement and data.
meas_rv : Normal
Measured random variable, as returned by the measurement model.
data : np.ndarray
Data to update on.
crosscov : np.ndarray
Cross-covariance between the state random variable `randvar` and the
measurement random variable `meas_rv`.
Returns
-------
Normal
Updated Normal random variable (new filter estimate)
"""
new_mean = randvar.mean + crosscov @ np.linalg.solve(
meas_rv.cov, data - meas_rv.mean
)
new_cov = randvar.cov - crosscov @ np.linalg.solve(meas_rv.cov, crosscov.T)
filt_rv = Normal(new_mean, new_cov)
return filt_rv
[docs] def update(self, time, randvar, data):
"""Gaussian filter update step. Consists of a measurement step and a
conditioning step.
Parameters
----------
time : float
Time of the update.
randvar : RandomVariable
Random variable to be updated. Result of :meth:`predict()`.
data : np.ndarray
Data to update on.
Returns
-------
filt_rv : Normal
Updated Normal RV (new filter estimate).
meas_rv : Normal
Measured random variable, as returned by the measurement model.
info : dict
Additional info. Contains at least the key `crosscov`,
which is the crosscov between input RV and measured RV.
The crosscov does not relate to the updated RV!
"""
meas_rv, info = self.measure(time, randvar)
filt_rv = self.condition_state_on_measurement(
randvar, meas_rv, data, info["crosscov"]
)
return filt_rv, meas_rv, info
[docs] def smooth(self, filter_posterior, intermediate_step=None):
"""Apply Gaussian smoothing to the filtering outcome (i.e. a KalmanPosterior).
Parameters
----------
filter_posterior : KalmanPosterior
Posterior distribution obtained after filtering
intermediate_step :
Step-size to be taken by approximate transition methods.
Returns
-------
KalmanPosterior
Posterior distribution of the smoothed output
"""
rv_list = self.smooth_list(
filter_posterior,
filter_posterior.locations,
intermediate_step=intermediate_step,
)
return KalmanPosterior(
filter_posterior.locations, rv_list, self, with_smoothing=True
)
[docs] def smooth_list(self, rv_list, locations, intermediate_step=None):
"""Apply smoothing to a list of RVs with desired final random variable.
Specification of a final RV is useful to compute joint samples from a KalmanPosterior object,
because in this case, the final RV is a Dirac (over a sample from the final Normal RV)
and not a Normal RV.
Parameters
----------
rv_list : _RandomVariableList or array_like
List of random variables to be smoothed.
locations : array_like
Locations of the random variables in rv_list.
intermediate_step :
Step-size to be taken by approximate transition methods.
Returns
-------
_RandomVariableList
List of smoothed random variables.
"""
final_rv = rv_list[-1]
curr_rv = final_rv
out_rvs = [curr_rv]
for idx in reversed(range(1, len(locations))):
unsmoothed_rv = rv_list[idx - 1]
curr_rv = self.smooth_step(
unsmoothed_rv,
curr_rv,
start=locations[idx - 1],
stop=locations[idx],
intermediate_step=intermediate_step,
)
out_rvs.append(curr_rv)
out_rvs.reverse()
return _RandomVariableList(out_rvs)
[docs] def smooth_step(
self, unsmoothed_rv, smoothed_rv, start, stop, intermediate_step=None
):
"""A single smoother step.
Consists of predicting from the filtering distribution at time t
to time t+1 and then updating based on the discrepancy to the
smoothing solution at time t+1.
If preconditioning is available in the dynamic model, this is leveraged here.
If not, a classic smoothing step estimate is taken.
Parameters
----------
unsmoothed_rv : RandomVariable
Filtering distribution at time t.
smoothed_rv : RandomVariable
Prediction at time t+1 of the filtering distribution at time t.
start : float
Time-point of the to-be-smoothed RV.
stop : float
Time-point of the already-smoothed RV.
intermediate_step :
Step-size to be taken by approximate transition methods.
"""
if self.dynamics_model.precon is None:
return self._smooth_step_classic(
unsmoothed_rv,
smoothed_rv,
start,
stop,
intermediate_step=intermediate_step,
)
else:
return self._smooth_step_with_preconditioning(
unsmoothed_rv,
smoothed_rv,
start,
stop,
intermediate_step=intermediate_step,
)
def _smooth_step_classic(
self, unsmoothed_rv, smoothed_rv, start, stop, intermediate_step=None
):
"""A single smoother step.
Consists of predicting from the filtering distribution at time t
to time t+1 and then updating based on the discrepancy to the
smoothing solution at time t+1.
Parameters
----------
unsmoothed_rv : RandomVariable
Filtering distribution at time t.
smoothed_rv : RandomVariable
Prediction at time t+1 of the filtering distribution at time t.
start : float
Time-point of the to-be-smoothed RV.
stop : float
Time-point of the already-smoothed RV.
"""
predicted_rv, info = self.dynamics_model.transition_rv(
unsmoothed_rv, start, stop=stop, step=intermediate_step
)
crosscov = info["crosscov"]
smoothing_gain = np.linalg.solve(predicted_rv.cov.T, crosscov.T).T
new_mean = unsmoothed_rv.mean + smoothing_gain @ (
smoothed_rv.mean - predicted_rv.mean
)
new_cov = (
unsmoothed_rv.cov
+ smoothing_gain @ (smoothed_rv.cov - predicted_rv.cov) @ smoothing_gain.T
)
return Normal(new_mean, new_cov)
def _smooth_step_with_preconditioning(
self, unsmoothed_rv, smoothed_rv, start, stop, intermediate_step=None
):
"""A single smoother step.
Consists of predicting from the filtering distribution at time t
to time t+1 and then updating based on the discrepancy to the
smoothing solution at time t+1.
Parameters
----------
unsmoothed_rv : RandomVariable
Filtering distribution at time t.
smoothed_rv : RandomVariable
Prediction at time t+1 of the filtering distribution at time t.
start : float
Time-point of the to-be-smoothed RV.
stop : float
Time-point of the already-smoothed RV.
"""
# It is not clear to me how to best test this, except running IBM smoothing for high-ish order. (N)
precon_inv = self.dynamics_model.precon.inverse(stop - start)
unsmoothed_rv = precon_inv @ unsmoothed_rv
smoothed_rv = precon_inv @ smoothed_rv
predicted_rv, info = self.dynamics_model.transition_rv_preconditioned(
unsmoothed_rv, start, stop=stop, step=intermediate_step
)
crosscov = info["crosscov"]
smoothing_gain = np.linalg.solve(predicted_rv.cov.T, crosscov.T).T
new_mean = unsmoothed_rv.mean + smoothing_gain @ (
smoothed_rv.mean - predicted_rv.mean
)
new_cov = (
unsmoothed_rv.cov
+ smoothing_gain @ (smoothed_rv.cov - predicted_rv.cov) @ smoothing_gain.T
)
return self.dynamics_model.precon(stop - start) @ Normal(new_mean, new_cov)