Source code for probnum.filtsmooth.gaussfiltsmooth.extendedkalman

"""Gaussian filtering and smoothing based on making intractable quantities tractable
through Taylor-method approximations, e.g. linearization."""
import functools

import numpy as np

import probnum.random_variables as pnrv
from probnum.filtsmooth import statespace


[docs]class ContinuousEKFComponent(statespace.Transition): """Continuous extended Kalman filter transition.""" def __init__(self, non_linear_sde, num_steps): if not isinstance(non_linear_sde, statespace.SDE): raise TypeError("Continuous EKF transition requires a (non-linear) SDE.") self.non_linear_sde = non_linear_sde self.num_steps = num_steps
[docs] def transition_realization(self, real, start, stop, linearise_at=None, **kwargs): compute_jacobian_at = linearise_at.mean if linearise_at else real jacobfun = functools.partial( self.non_linear_sde.jacobian, state=compute_jacobian_at ) step = (stop - start) / self.num_steps return statespace.linear_sde_statistics( rv=pnrv.Normal(mean=real, cov=np.zeros((len(real), len(real)))), start=start, stop=stop, step=step, driftfun=self.non_linear_sde.drift, jacobfun=jacobfun, dispmatfun=self.non_linear_sde.dispersionmatrix, )
[docs] def transition_rv(self, rv, start, stop, linearise_at=None, **kwargs): compute_jacobian_at = linearise_at.mean if linearise_at else rv.mean jacobfun = functools.partial( self.non_linear_sde.jacobian, state=compute_jacobian_at ) step = (stop - start) / self.num_steps return statespace.linear_sde_statistics( rv=rv, start=start, stop=stop, step=step, driftfun=self.non_linear_sde.drift, jacobfun=jacobfun, dispmatfun=self.non_linear_sde.dispersionmatrix, )
@property def dimension(self): raise NotImplementedError
[docs]class DiscreteEKFComponent(statespace.Transition): """Discrete extended Kalman filter transition.""" def __init__(self, disc_model): self.disc_model = disc_model
[docs] def transition_realization(self, real, start, **kwargs): return self.disc_model.transition_realization(real, start, **kwargs)
[docs] def transition_rv(self, rv, start, linearise_at=None, **kwargs): diffmat = self.disc_model.diffusionmatrix(start) if linearise_at is None: jacob = self.disc_model.jacobian(start, rv.mean) else: jacob = self.disc_model.jacobian(start, linearise_at.mean) mpred = self.disc_model.dynamics(start, rv.mean) crosscov = rv.cov @ jacob.T cpred = jacob @ crosscov + diffmat return pnrv.Normal(mpred, cpred), {"crosscov": crosscov}
@property def dimension(self): raise NotImplementedError
[docs] @classmethod def from_ode(cls, ode, prior, evlvar, ek0_or_ek1=0): spatialdim = prior.spatialdim h0 = prior.proj2coord(coord=0) h1 = prior.proj2coord(coord=1) def dyna(t, x, **kwargs): return h1 @ x - ode.rhs(t, h0 @ x) def diff(t, **kwargs): return evlvar * np.eye(spatialdim) def jaco_ek1(t, x, **kwargs): return h1 - ode.jacobian(t, h0 @ x) @ h0 def jaco_ek0(t, x, **kwargs): return h1 if ek0_or_ek1 == 0: jaco = jaco_ek0 elif ek0_or_ek1 == 1: jaco = jaco_ek1 else: raise TypeError("ek0_or_ek1 must be 0 or 1, resp.") discrete_model = statespace.DiscreteGaussian(dyna, diff, jaco) return cls(discrete_model)