# Non-linear Gaussian filtering and smoothing¶

Provided are two examples of nonlinear state-space models on which one can perform Bayesian filtering and smoothing in order to obtain a posterior distribution over a latent state trajectory based on noisy observations. In order to understand the theory behind these methods in detail we refer to [1] and [2].

We provide examples for two different types of state-space model: 1. Nonlinear, Discrete State-Space Model: Pendulum 2. Nonlinear, Continuous-Discrete State-Space Model: Benes-Daum Filter

In order to perform Bayesian Filtering and Smoothing on non-linear models, we show how to use functionalities provided by ProbNum to either linearise the respective model (Extended Kalman Filter, EKF) or to use an Unscented Transform (Unscented Kalman Filter, UKF) which does not require a Jacobian of the nonlinear model.

References: > [1] Särkkä, Simo, and Solin, Arno. Applied Stochastic Differential Equations. Cambridge University Press, 2019.
> > [2] Särkkä, Simo. Bayesian Filtering and Smoothing. Cambridge University Press, 2013.
[1]:

import numpy as np

import probnum as pn
from probnum import filtsmooth, randvars, randprocs
from probnum.problems import TimeSeriesRegressionProblem

[2]:

rng = np.random.default_rng(seed=123)

[3]:

# Make inline plots vector graphics instead of raster graphics
%matplotlib inline
from IPython.display import set_matplotlib_formats

set_matplotlib_formats("pdf", "svg")

# Plotting
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

plt.style.use("../../probnum.mplstyle")


## 1. Non-Linear Discrete State-Space Model: Pendulum¶

For nonlinear models, we assume that at least one of the components of our state-space model (i.e. either the dynamics, the measurement model, or both) is a nonlinear function of the latent state.

Consider nonlinear functions $$f: \mathbb{R}^d \rightarrow \mathbb{R}^d$$ and $$h: \mathbb{R}^d \rightarrow \mathbb{R}^m$$ where $$d$$ is the dimension of the latent state and $$m$$ is the dimension of the measurements.

For $$k = 1, \dots, K$$ and $$x_0 \sim \mathcal{N}(\mu_0, \Sigma_0)$$:

\begin{split}\begin{align} \boldsymbol{x}_k &\sim \mathcal{N}(f(\boldsymbol{x}_{k-1}), \boldsymbol{Q}) \\ \boldsymbol{y}_k &\sim \mathcal{N}(h(\boldsymbol{x}_k), \boldsymbol{R}) \end{align}\end{split}
This defines a dynamics model that assumes a state $$x_k$$ in a discrete sequence of states arising from a nonlinear map of the previous state $$x_{k-1}$$ corrupted with additive Gaussian noise under a process noise covariance matrix $$Q$$.
Similarly, the measurements $$y_k$$ are assumed to be nonlinear transformations of the latent state under additive Gaussian noise according to a measurement noise covariance $$R$$. In the described case, we cannot use the Kalman Filtering equations since these assume linear relationships. One method is to linearise the nonlinear parts of the model by a first-order Taylor Expansion (Extended Kalman Filter, EKF) or employing the Unscented Kalman Filter (UKF) which is based on the unscented transform and does not require the computation of a Jacobian. The probnum packages provides both methods by wrapping linearizing “Components” around nonlinear dynamcis and/or measurement models, as we will see below.

Note that this can be generalized to a nonlinear time-varying state-space model, as well. Then we would have $$f: \mathbb{T} \times \mathbb{R}^d \rightarrow \mathbb{R}^d$$ and $$h: \mathbb{T} \times \mathbb{R}^d \rightarrow \mathbb{R}^m$$ where $$\mathbb{T}$$ is the “time dimension”.

### Define State-Space Model¶

#### I. Discrete Dynamics Model: Nonlinear Gaussian Transitions¶

[4]:

state_dim = 2
observation_dim = 1

[5]:

# approx. gravitational constant
g = 9.81
# dt
delta_t = 0.0075

def pendulum_rhs(state):
"""Right-hand side of an ODE that defines pendulum dynamics"""
x1, x2 = state
y1 = x1 + x2 * delta_t
y2 = x2 - g * np.sin(x1) * delta_t
return np.array([y1, y2])

def pendulum_jacobian(state):
"""Jacobian of the pendulum ODE"""
x1, x2 = state
dy1_dx = [1.0, delta_t]
dy2_dx = [-g * np.cos(x1) * delta_t, 1.0]
return np.array([dy1_dx, dy2_dx])

dynamics_transition_function = lambda t, state: pendulum_rhs(state)
dynamics_transition_jacobian_function = lambda t, state: pendulum_jacobian(state)

dynamics_diffusion_matrix = 1.0 * (
np.diag(np.array([delta_t ** 3 / 3, delta_t]))
+ np.diag(np.array([delta_t ** 2 / 2]), 1)
+ np.diag(np.array([delta_t ** 2 / 2]), -1)
)


We here define the non-linear dynamics of a pendulum. Note (in the cell directly above) that probnum expects dynamicsfun to be a function of time and state. Even though we here consider a nonlinear but time-invariant model, we still have to fulfill the contract that the interface requires.

dynamics_transition_function = lambda t, state: pendulum_rhs(state)


In other words, we simply ignore the time variable t in case of time-invariant models.

To create a discrete, (non-linear) Gaussian dynamics model, probnum provides the NonlinearGaussian class that takes - input_dim and output_dim - state_trans_fun : the function that returns the next state given the current one - proc_noise_cov_mat_fun : the function that returns the covariance matrix of the Gaussian process noise - jacob_state_trans_fun : the function that returns the Jacobian matrix of the state transition function

[6]:

# Create discrete, non-linear Gaussian dynamics model
dynamics_model = randprocs.markov.discrete.NonlinearGaussian(
input_dim=state_dim,
output_dim=state_dim,
state_trans_fun=dynamics_transition_function,
proc_noise_cov_mat_fun=lambda t: dynamics_diffusion_matrix,
jacob_state_trans_fun=dynamics_transition_jacobian_function,
)


#### II. Discrete Measurement Model: Nonlinear Gaussian Measurements¶

[7]:

def pendulum_measurement(state):
x1, x2 = state
return np.array([np.sin(x1)])

def pendulum_measurement_jacobian(state):
x1, x2 = state
return np.array([[np.cos(x1), 0.0]])

measurement_function = lambda t, state: pendulum_measurement(state)
measurement_jacobian_function = lambda t, state: pendulum_measurement_jacobian(state)

measurement_variance = 0.32 ** 2
measurement_covariance = measurement_variance * np.eye(observation_dim)

[8]:

# Create discrete, non-linear Gaussian measurement model
measurement_model = randprocs.markov.discrete.NonlinearGaussian(
input_dim=state_dim,
output_dim=observation_dim,
state_trans_fun=measurement_function,
proc_noise_cov_mat_fun=lambda t: measurement_covariance,
jacob_state_trans_fun=measurement_jacobian_function,
)


#### III. Initial State Random Variable¶

[9]:

mu_0 = np.ones(state_dim)
sigma_0 = measurement_variance * np.eye(state_dim)
initial_state_rv = randvars.Normal(mean=mu_0, cov=sigma_0)


### Kalman Filtering¶

#### I. Linearize model (Extended Kalman Filter)¶

Since we could easily derive the Jacobian of our nonlinear model components above, we can use the Extended Kalman Filter (EKF) to obtain a filtering posterior over the states. Therefore, in probnum we just wrap the dynamics model and the measurement model in a DiscreteEKFComponent object. In cases in which the Jacobian is not as easy to write down, probnum also provides the DiscreteUKFComponent class that implements the linearization by means of an unscented Kalman filter that does not require a Jacobian. Alternatively, the Jacobian can be calculated via automatic differentiation.

[10]:

linearised_dynamics_model = filtsmooth.gaussian.approx.DiscreteEKFComponent(dynamics_model)

[11]:

prior_process = randprocs.markov.MarkovProcess(
transition=linearised_dynamics_model, initrv=initial_state_rv, initarg=0.0
)


#### II. Kalman Filter¶

Then, we simply pass these linearized models to the Kalman Filter (exactly the same interface as in the linear case) and proceed.

[12]:

kalman_filter = filtsmooth.gaussian.Kalman(prior_process)


#### III. Generate Data for the State-Space Model¶

statespace.generate_artificial_measurements() is used to sample both latent states and noisy observations from the specified state space model.

[13]:

time_grid = np.arange(0.0, 5.0, step=delta_t)

[14]:

latent_states, observations = randprocs.markov.utils.generate_artificial_measurements(
rng=rng,
prior_process=prior_process,
measmod=measurement_model,
times=time_grid,
)

regression_problem = TimeSeriesRegressionProblem(
observations=observations,
locations=time_grid,
measurement_models=[filtsmooth.gaussian.approx.DiscreteEKFComponent(measurement_model)]
* len(time_grid),
)


#### IV. Perform Kalman Filtering + Rauch-Tung-Striebel Smoothing¶

[15]:

state_posterior, _ = kalman_filter.filtsmooth(regression_problem)

The method filtsmooth returns a KalmanPosterior object which provides convenience functions for e.g. sampling and interpolation. We can also extract the just computed posterior smoothing state variables by querying the .state_rvs property.
This yields a list of Gaussian Random Variables from which we can extract the statistics in order to visualize them.
[16]:

posterior_state_rvs = (
state_posterior.states
)  # List of <num_time_points> Normal Random Variables
posterior_state_means = posterior_state_rvs.mean  # Shape: (num_time_points, state_dim)
posterior_state_covs = (
posterior_state_rvs.cov
)  # Shape: (num_time_points, state_dim, state_dim)


### Visualize Results¶

[17]:

state_fig = plt.figure()
state_fig_gs = gridspec.GridSpec(ncols=2, nrows=1, figure=state_fig)

# Plot means
mu_x_1, mu_x_2 = [posterior_state_means[:, i] for i in range(state_dim)]

ax_00.plot(time_grid, mu_x_1, label="posterior mean")
ax_01.plot(time_grid, mu_x_2)

# Plot marginal standard deviations
std_x_1, std_x_2 = [np.sqrt(posterior_state_covs[:, i, i]) for i in range(state_dim)]

ax_00.fill_between(
time_grid,
mu_x_1 - 1.96 * std_x_1,
mu_x_1 + 1.96 * std_x_1,
alpha=0.2,
label="1.96 marginal stddev",
)
ax_01.fill_between(
time_grid, mu_x_2 - 1.96 * std_x_2, mu_x_2 + 1.96 * std_x_2, alpha=0.2
)

# Plot 5 samples on a subsampled grid
samples = state_posterior.sample(rng=rng, t=state_posterior.locations[::5], size=5)
for smp in samples:
ax_00.plot(
state_posterior.locations[::5],
smp[:, 0],
color="gray",
alpha=0.75,
linewidth=1,
linestyle="dashed",
label="sample",
)
ax_01.plot(
state_posterior.locations[::5],
smp[:, 1],
color="gray",
alpha=0.75,
linewidth=1,
linestyle="dashed",
label="sample",
)

# Plot groundtruth
ax_00.scatter(time_grid, observations, marker=".", label="measurements")

ax_00.set_xlabel("t")
ax_01.set_xlabel("t")

ax_00.set_title(r"$x_1$")
ax_01.set_title(r"$x_2$")

# The following two lines are just to remove duplicate labels (caused by samples) from the legend
handles, labels = ax_00.get_legend_handles_labels()
by_label = dict(zip(labels, handles))

state_fig.legend(
by_label.values(), by_label.keys(), loc="center left", bbox_to_anchor=(1, 0.5)
)

state_fig.tight_layout()


## 2. Non-Linear Continuous-Discrete State-Space Model: Benes-Daum Filter¶

Now, consider we assume continuous dynamics. We assume that there is a continuous process that defines the nonlinear dynamics of our latent space from which we collect discrete nonlinear-Gaussian measurements (as above). Only the dynamics model changes. In particular, we formulate the dynamics as a stochastic process in terms of a Nonlinear Time-Invariant Stochastic Differential Equation. We refer to [1] for more details. Again, consider non-linear functions $$f: \mathbb{R}^d \rightarrow \mathbb{R}^d$$ and $$h: \mathbb{R}^d \rightarrow \mathbb{R}^m$$ where $$d$$ is the dimension of the latent state and $$m$$ is the dimension of the measurements. We define the following nonlinear continuous-discrete state-space model:

Let $$x(t_0) \sim \mathcal{N}(\mu_0, \Sigma_0)$$.

\begin{split}\begin{align} d\boldsymbol{x} &= f(\boldsymbol{x}) \, dt + \boldsymbol{L} \, d \boldsymbol{\omega} \\ \boldsymbol{y}_k &\sim \mathcal{N}(h(\boldsymbol{x}(t_k)), \boldsymbol{R}), \qquad k = 1, \dots, K \end{align}\end{split}

where $$\boldsymbol{\omega}$$ denotes a vector of driving forces (often Brownian Motion).

Note that this can be generalized to a nonlinear time-varying state-space model, as well. Then we would have $$f: \mathbb{T} \times \mathbb{R}^d \rightarrow \mathbb{R}^d$$ and $$h: \mathbb{T} \times \mathbb{R}^d \rightarrow \mathbb{R}^m$$ where $$\mathbb{T}$$ is the “time dimension”.

### Define State-Space Model¶

#### I. Continuous Dynamics Model: Nonlinear Stochastic Differential Equation¶

[18]:

state_dim = 1
observation_dim = 1

[19]:

drift_function = lambda t, state: np.tanh(state)
drift_function_jacobian = lambda t, state: 1.0 - np.tanh(state) ** 2

dispersion_function = lambda t, x: np.ones(state_dim).reshape(1, 1)


For continous, non-linear dynamics, probnum provides a general stochastic differential equation (SDE) interface that takes - driftfun: a (possibly non-linear) drift function, - dispersion_matrixfun: a (possibly non-linear) dispersion function, - jacobfun: the Jacobian of the drift function.

[20]:

# Create continuous, non-linear SDE dynamics model
dynamics_model = randprocs.markov.continuous.SDE(
state_dimension=state_dim,
wiener_process_dimension=1,
drift_function=drift_function,
dispersion_function=dispersion_function,
drift_jacobian=drift_function_jacobian,
)


#### II. Discrete Measurement Model: Linear, Time-Invariant, Gaussian Measurements¶

[21]:

measurement_marginal_variance = 1.0
measurement_matrix = np.eye(observation_dim, state_dim)
measurement_noise_matrix = measurement_marginal_variance * np.eye(observation_dim)

[22]:

measurement_model = randprocs.markov.discrete.LTIGaussian(
state_trans_mat=measurement_matrix,
shift_vec=np.zeros(observation_dim),
proc_noise_cov_mat=measurement_noise_matrix,
)


#### III. Initial State Random Variable¶

[23]:

mu_0 = np.zeros(state_dim)
sigma_0 = 3.0 * np.eye(state_dim)
initial_state_rv = randvars.Normal(mean=mu_0, cov=sigma_0)


### Kalman Filtering¶

#### I. Linearize model (Extended Kalman Filter)¶

[24]:

linearised_dynamics_model = filtsmooth.gaussian.approx.ContinuousEKFComponent(dynamics_model)


#### II. Kalman Filter¶

[25]:

prior_process = randprocs.markov.MarkovProcess(
transition=linearised_dynamics_model, initrv=initial_state_rv, initarg=0.0
)
kalman_filter = filtsmooth.gaussian.Kalman(prior_process)


#### III. Generate Data for the State-Space Model¶

statespace.generate_artificial_measurements() is used to sample both latent states and noisy observations from the specified state space model.

[26]:

time_grid = np.arange(0.0, 10.0, step=10 * delta_t)

[27]:

latent_states, observations = randprocs.markov.utils.generate_artificial_measurements(
rng=rng,
prior_process=prior_process,
measmod=measurement_model,
times=time_grid,
)

[28]:

regression_problem = TimeSeriesRegressionProblem(
observations=observations,
locations=time_grid,
measurement_models=[measurement_model] * len(time_grid),
)


#### IV. Perform Kalman Filtering + Rauch-Tung-Striebel Smoothing¶

[29]:

state_posterior, _ = kalman_filter.filter(regression_problem)

[30]:

smoothed_state_posterior = kalman_filter.smooth(filter_posterior=state_posterior)

The method filtsmooth returns a KalmanPosterior object which provides convenience functions for e.g. sampling and prediction. We can also extract the just computed posterior smoothing state variables by querying the .state_rvs property.
This yields a list of Gaussian Random Variables from which we can extract the statistics in order to visualize them.
[31]:

grid = np.linspace(0, 11, 500)

posterior_state_rvs = state_posterior(
grid
)  # List of <num_time_points> Normal Random Variables
posterior_state_means = (
posterior_state_rvs.mean.squeeze()
)  # Shape: (num_time_points, state_dim)
posterior_state_covs = (
posterior_state_rvs.cov
)  # Shape: (num_time_points, state_dim, state_dim)

[32]:

smoothed_posterior_state_rvs = (
smoothed_state_posterior.states
)  # List of <num_time_points> Normal Random Variables
smoothed_posterior_state_means = (
smoothed_posterior_state_rvs.mean.squeeze()
)  # Shape: (num_time_points, state_dim)
smoothed_posterior_state_covs = (
smoothed_posterior_state_rvs.cov
)  # Shape: (num_time_points, state_dim, state_dim)


### Visualize Results¶

[33]:

state_fig = plt.figure()

# Plot means
ax.plot(grid, posterior_state_means, label="posterior mean")

# Plot marginal standard deviations
std_x = np.sqrt(np.abs(posterior_state_covs)).squeeze()
ax.fill_between(
grid,
posterior_state_means - 1.96 * std_x,
posterior_state_means + 1.96 * std_x,
alpha=0.2,
label="1.96 marginal stddev for filter",
)

# Plot smoothed means
ax.plot(time_grid, smoothed_posterior_state_means, label="smoothed posterior mean")

# Plot marginal standard deviations
std_x = np.sqrt(smoothed_posterior_state_covs).squeeze()
ax.fill_between(
time_grid,
smoothed_posterior_state_means - 1.96 * std_x,
smoothed_posterior_state_means + 1.96 * std_x,
alpha=0.2,
label="1.96 marginal stddev for smoother",
)

ax.scatter(time_grid, observations, marker=".", label="measurements")

ax.set_xlabel("t")
ax.set_title(r"$x$")
ax.legend()

state_fig.tight_layout()


To get a closer look at filter and smoother, we inspect the interval $$[0, 1]$$ in greater detail.

[34]:

idx = time_grid < 1.0

ax.set_xlim([0, 1])
ax.set_ylim(np.min(observations[idx]), np.max(observations[idx]))

state_fig.tight_layout()
state_fig

[34]:

[ ]: