"""
Continuous Markov models implicitly defined through SDEs,
dx(t) = f(t, x(t)) dt + l(t, x(t)) dB(t).
Possible input for StateSpace.dynmod or StateSpace.measmod.
Note
----
ContinuousModel only implements
StateSpaceComponent.sample(). It passes on the
responsibility of implementing chapmankolmogorov() to its
subclasses and creates further abstract methods drift(),
dispersion() as well as a hook for jacobian().
When implementing continuous models we think of
Gauss-Markov models (see linearsde.py)
but do our very best to not enforce Gaussianity.
In the future, this might be more general than Gauss-
Markov models.
"""
from abc import ABC, abstractmethod
import numpy as np
__all__ = ["ContinuousModel"]
class ContinuousModel(ABC):
"""
Interface for time-continuous
Markov models of the form
dx = f(t, x) dt + l(t, x) dBt.
In the language of dynamic models:
x(t) : state process
f(t, x(t)) : drift function
l(t, x(t)) : dispersion matrix.
B(t) : Brownian motion with const. diffusion matrix Q.
"""
[docs] def sample(self, start, stop, step, initstate, **kwargs):
"""
Samples from ``initstate`` at ``start`` to ``stop`` with
stepsize ``step``.
Start, stop and step lead to a np.arange-like
interface.
Returns a single element at the end of the time, not the
entire array!
"""
if not isinstance(initstate, np.ndarray):
raise ValueError("Init state is not array!")
times = np.arange(start, stop, step)
currstate = initstate
for idx in range(1, len(times)):
bmsamp = np.random.multivariate_normal(
np.zeros(len(self.diffusionmatrix)), self.diffusionmatrix * step
)
driftvl = self.drift(times[idx - 1], currstate, **kwargs)
dispvl = self.dispersion(times[idx - 1], currstate, **kwargs)
currstate = currstate + step * driftvl + dispvl @ bmsamp
return currstate
[docs] @abstractmethod
def drift(self, time, state, **kwargs):
"""
Evaluates f = f(t, x(t)).
"""
raise NotImplementedError
[docs] @abstractmethod
def dispersion(self, time, state, **kwargs):
"""
Evaluates l = l(t, x(t)).
"""
raise NotImplementedError
[docs] def jacobian(self, time, state, **kwargs):
"""
Jacobian of drift w.r.t. state: d_x f(t, x(t))
"""
raise NotImplementedError
@property
@abstractmethod
def diffusionmatrix(self):
"""
Evaluates Q.
In 1D, this is \\sigma^2.
"""
raise NotImplementedError
@property
@abstractmethod
def ndim(self):
"""
Spatial dimension (utility attribute).
"""
raise NotImplementedError
[docs] def chapmankolmogorov(self, start, stop, step, randvar, **kwargs):
"""
If available, this returns the closed form solution to the
Chapman-Kolmogorov equations (CKEs).
Solutions to the CKEs are important in filtering.
Available for instance for linear SDEs.
"""
raise NotImplementedError("Chap.-Kolg. not implemented.")