Source code for probnum.diffeq.odefilter.initialization_routines._runge_kutta

"""Runge-Kutta initialisation."""


from typing import Optional

import numpy as np
import scipy.integrate as sci

from probnum import filtsmooth, problems, randprocs, randvars
from probnum.diffeq.odefilter.initialization_routines import _initialization_routine
from probnum.typing import FloatArgType


class RungeKuttaInitialization(_initialization_routine.InitializationRoutine):
    r"""Initialize a probabilistic ODE solver by fitting the prior process to a few steps of an approximate ODE solution computed with Scipy's Runge-Kutta methods.

    Parameters
    ----------
    dt
        Maximum step-size to use for computing the approximate ODE solution. The smaller, the more accurate, but also, the smaller, the less stable.
        The best value here depends on the ODE problem, and probably the chosen method. Optional. Default is ``1e-2``.
    method
        Which solver to use. This is communicated as a string that is compatible with ``scipy.integrate.solve_ivp(..., method=method)``.
        Optional. Default is `DOP853`.

    Examples
    --------

    >>> import numpy as np
    >>> from probnum.randvars import Normal
    >>> from probnum.problems.zoo.diffeq import vanderpol
    >>> from probnum.randprocs.markov.integrator import IntegratedWienerProcess

    Compute the initial values of the van-der-Pol problem as follows.
    First, we set up the ODE problem and the prior process.

    >>> ivp = vanderpol()
    >>> print(ivp.y0)
    [2. 0.]
    >>> prior_process = IntegratedWienerProcess(initarg=ivp.t0, num_derivatives=3, wiener_process_dimension=2)

    Next, we call the initialization routine.

    >>> rk_init = RungeKuttaInitialization()
    >>> improved_initrv = rk_init(ivp=ivp, prior_process=prior_process)
    >>> print(prior_process.transition.proj2coord(0) @ improved_initrv.mean)
    [2. 0.]
    >>> print(np.round(improved_initrv.mean, 1))
    [    2.      0.     -2.     58.2     0.     -2.     60.  -1745.7]
    >>> print(np.round(np.log10(improved_initrv.std), 1))
    [-13.8 -11.3  -9.   -1.5 -13.8 -11.3  -9.   -1.5]
    """

    def __init__(
        self, dt: Optional[FloatArgType] = 1e-2, method: Optional[str] = "DOP853"
    ):
        self.dt = dt
        self.method = method
        super().__init__(is_exact=False, requires_jax=False)

[docs] def __call__( self, ivp: problems.InitialValueProblem, prior_process: randprocs.markov.MarkovProcess, ) -> randvars.RandomVariable: """Compute the initial distribution. For Runge-Kutta initialization, it goes as follows: 1. The ODE integration problem is set up on the interval ``[t0, t0 + (2*order+1)*h0]`` and solved with a call to ``scipy.integrate.solve_ivp``. The solver is uses adaptive steps with ``atol=rtol=1e-12``, but is forced to pass through the events ``(t0, t0+h0, t0 + 2*h0, ..., t0 + (2*order+1)*h0)``. The result is a vector of time points and states, with at least ``(2*order+1)``. Potentially, the adaptive steps selected many more steps, but because of the events, fewer steps cannot have happened. 2. A prescribed prior is fitted to the first ``(2*order+1)`` (t, y) pairs of the solution. ``order`` is the order of the prior. 3. The value of the resulting posterior at time ``t=t0`` is an estimate of the state and all its derivatives. The resulting marginal standard deviations estimate the error. This random variable is returned. Parameters ---------- ivp Initial value problem. prior_process Prior Gauss-Markov process. Returns ------- Normal Estimated (improved) initial random variable. Compatible with the specified prior. """ f, y0, t0, df = ivp.f, ivp.y0, ivp.t0, ivp.df y0 = np.asarray(y0) ode_dim = y0.shape[0] if y0.ndim > 0 else 1 order = prior_process.transition.num_derivatives # order + 1 would suffice in theory, 2*order + 1 is for good measure # (the "+1" is a safety factor for order=1) num_steps = 2 * order + 1 t_eval = np.arange(t0, t0 + (num_steps + 1) * self.dt, self.dt) sol = sci.solve_ivp( f, (t0, t0 + (num_steps + 1) * self.dt), y0=y0, atol=1e-12, rtol=1e-12, t_eval=t_eval, method=self.method, ) # Measurement model for SciPy observations proj_to_y = prior_process.transition.proj2coord(coord=0) zeros_shift = np.zeros(ode_dim) zeros_cov = np.zeros((ode_dim, ode_dim)) measmod_scipy = randprocs.markov.discrete.LTIGaussian( proj_to_y, zeros_shift, zeros_cov, proc_noise_cov_cholesky=zeros_cov, forward_implementation="sqrt", backward_implementation="sqrt", ) # Measurement model for initial condition observations proj_to_dy = prior_process.transition.proj2coord(coord=1) if df is not None and order > 1: proj_to_ddy = prior_process.transition.proj2coord(coord=2) projmat_initial_conditions = np.vstack((proj_to_y, proj_to_dy, proj_to_ddy)) initial_data = np.hstack((y0, f(t0, y0), df(t0, y0) @ f(t0, y0))) else: projmat_initial_conditions = np.vstack((proj_to_y, proj_to_dy)) initial_data = np.hstack((y0, f(t0, y0))) zeros_shift = np.zeros(len(projmat_initial_conditions)) zeros_cov = np.zeros( (len(projmat_initial_conditions), len(projmat_initial_conditions)) ) measmod_initcond = randprocs.markov.discrete.LTIGaussian( projmat_initial_conditions, zeros_shift, zeros_cov, proc_noise_cov_cholesky=zeros_cov, forward_implementation="sqrt", backward_implementation="sqrt", ) # Create regression problem and measurement model list ts = sol.t[:num_steps] ys = list(sol.y[:, :num_steps].T) ys[0] = initial_data measmod_list = [measmod_initcond] + [measmod_scipy] * (len(ts) - 1) regression_problem = problems.TimeSeriesRegressionProblem( observations=ys, locations=ts, measurement_models=measmod_list ) # Infer the solution kalman = filtsmooth.gaussian.Kalman(prior_process) out, _ = kalman.filtsmooth(regression_problem) estimated_initrv = out.states[0] return estimated_initrv