Source code for probnum.diffeq.odefilter.init_routines._non_probabilistic_fit

"""Initialization routines based on fitting the prior to (a few steps of a) non-
probabilistc solver."""


from typing import Optional

import numpy as np
import scipy.integrate as sci

from probnum import filtsmooth, problems, randprocs, randvars
from probnum.typing import FloatLike

from ._interface import InitializationRoutine


class _NonProbabilisticFitBase(InitializationRoutine):
    """Fit the prior process to a few steps of a non-probabilistic solver."""

    def __init__(
        self, *, dt: FloatLike = 1e-2, observation_noise_std: FloatLike = 1e-7
    ):
        super().__init__(is_exact=False, requires_jax=False)
        self._dt = dt
        self._observation_noise_std = observation_noise_std

    def __call__(
        self,
        *,
        ivp: problems.InitialValueProblem,
        prior_process: randprocs.markov.MarkovProcess,
    ) -> randvars.RandomVariable:

        num_steps = prior_process.transition.num_derivatives + 1
        t_eval = np.linspace(
            start=ivp.t0,
            stop=ivp.t0 + num_steps * self._dt,
            num=num_steps,
            endpoint=True,
        )

        data = self._data(ivp=ivp, t_eval=t_eval)
        rv = self._improve(data=data, prior_process=prior_process)
        return rv

    def _data(self, *, ivp, t_eval):
        raise NotImplementedError

    def _improve(self, *, data, prior_process):

        # Measurement model for SciPy observations
        ode_dim = prior_process.transition.wiener_process_dimension
        proj_to_y = prior_process.transition.proj2coord(coord=0)
        observation_noise_std = self._observation_noise_std * np.ones(ode_dim)
        process_noise = randvars.Normal(
            mean=np.zeros(ode_dim),
            cov=np.diag(observation_noise_std**2),
            cov_cholesky=np.diag(observation_noise_std),
        )
        measmod_scipy = randprocs.markov.discrete.LTIGaussian(
            transition_matrix=proj_to_y,
            noise=process_noise,
            forward_implementation="sqrt",
            backward_implementation="sqrt",
        )

        # Regression problem
        ts, ys = data
        regression_problem = problems.TimeSeriesRegressionProblem(
            observations=ys, locations=ts, measurement_models=[measmod_scipy] * len(ts)
        )

        # Infer the solution
        kalman = filtsmooth.gaussian.Kalman(prior_process)
        out, _ = kalman.filtsmooth(regression_problem)
        estimated_initrv = out.states[0]
        return estimated_initrv


[docs]class NonProbabilisticFit(_NonProbabilisticFitBase): """Fit the prior process to a few steps of a non-probabilistic solver.""" def __init__( self, *, dt: Optional[FloatLike] = 1e-2, observation_noise_std=1e-7, method: str = "DOP853", ): super().__init__(dt=dt, observation_noise_std=observation_noise_std) self._method = method def _data(self, *, ivp, t_eval): sol = sci.solve_ivp( fun=ivp.f, t_span=(np.amin(t_eval), np.amax(t_eval)), y0=ivp.y0, atol=1e-12, rtol=1e-12, t_eval=t_eval, method=self._method, ) ts = sol.t ys = sol.y.T return ts, ys
[docs]class NonProbabilisticFitWithJacobian(_NonProbabilisticFitBase): """Fit the prior process to a few steps of a non-probabilistic solver and use Jacobians.""" def __init__( self, *, dt: Optional[FloatLike] = 1e-2, observation_noise_std=1e-7, method: str = "Radau", ): super().__init__(dt=dt, observation_noise_std=observation_noise_std) self._method = method def _data(self, *, ivp, t_eval): sol = sci.solve_ivp( fun=ivp.f, jac=ivp.df, t_span=(np.amin(t_eval), np.amax(t_eval)), y0=ivp.y0, atol=1e-12, rtol=1e-12, t_eval=t_eval, method=self._method, ) ts = sol.t ys = sol.y.T return ts, ys