Source code for probnum.filtsmooth.particle._particle_filter

"""Particle filters."""

from typing import Iterable, Union

import numpy as np

from probnum import problems, randprocs, randvars
from probnum.filtsmooth import _bayesfiltsmooth
from probnum.filtsmooth.particle import (
    _importance_distributions,
    _particle_filter_posterior,
)
from probnum.typing import FloatLike, IntLike

# Terribly long variable names, but internal only, so no worries.
ParticleFilterMeasurementModelArgType = Union[
    randprocs.markov.discrete.NonlinearGaussian,
    Iterable[randprocs.markov.discrete.NonlinearGaussian],
]
ParticleFilterLinearisedMeasurementModelArgType = Union[
    randprocs.markov.discrete.NonlinearGaussian,
    Iterable[randprocs.markov.discrete.NonlinearGaussian],
]


[docs]def effective_number_of_events(categ_rv: randvars.Categorical) -> float: """Approximate effective number of events in the support of a categorical random variable. In a particle filter, this is used as the effective number of particles which may indicate the need for resampling. """ return 1.0 / np.sum(categ_rv.probabilities**2)
class ParticleFilter(_bayesfiltsmooth.BayesFiltSmooth): r"""Particle filter (PF). Also known as sequential Monte Carlo method. A PF estimates the posterior distribution of a Markov process given noisy, non-linear observations, with a set of particles. The random state of the particle filter is inferred from the random state of the initial random variable. Parameters ---------- prior_process : Prior Gauss-Markov process. importance_distribution : Importance distribution. num_particles : Number of particles to use. rng Random number generator. with_resampling Whether after each step the effective number of particles shall be checked, and, if too low, the state should be resampled. Optional. Default is `True`. resampling_percentage_threshold Percentage threshold for resampling. That is, it is the value :math:`p` such that resampling is performed if :math:`N_{\text{eff}} < p \, N_\text{particles}` holds. Optional. Default is 0.1. If this value is non-positive, resampling is never performed. If it is larger than 1, resampling is performed after each step. """ def __init__( self, prior_process: randprocs.markov.MarkovProcess, importance_distribution: _importance_distributions.ImportanceDistribution, num_particles: IntLike, rng: np.random.Generator, with_resampling: bool = True, resampling_percentage_threshold: FloatLike = 0.1, ) -> None: super().__init__( prior_process=prior_process, ) self.num_particles = num_particles self.importance_distribution = importance_distribution self.rng = rng self.with_resampling = with_resampling self.resampling_percentage_threshold = resampling_percentage_threshold self.min_effective_num_of_particles = ( resampling_percentage_threshold * num_particles )
[docs] def filter( self, regression_problem: problems.TimeSeriesRegressionProblem, ): """Apply particle filtering to a data set. Parameters ---------- regression_problem : Regression problem. Returns ------- posterior Posterior distribution of the filtered output info_dicts list of dictionaries containing filtering information See Also -------- TimeSeriesRegressionProblem: a regression problem data class """ filtered_rvs = [] info_dicts = [] for rv, info in self.filter_generator(regression_problem): filtered_rvs.append(rv) info_dicts.append(info) posterior = _particle_filter_posterior.ParticleFilterPosterior( states=filtered_rvs, locations=regression_problem.locations, ) return posterior, info_dicts
[docs] def filter_generator( self, regression_problem: problems.TimeSeriesRegressionProblem, ): """Apply Particle filtering to a data set. Parameters ---------- regression_problem Regression problem. Raises ------ ValueError If repeating time-points are encountered. Yields ------ curr_rv Filtering random variable at each grid point. info_dict Dictionary containing filtering information See Also -------- TimeSeriesRegressionProblem: a regression problem data class """ # It is not clear at the moment how to handle this. if not np.all(np.diff(regression_problem.locations) > 0): raise ValueError( "Particle filtering cannot handle repeating time points currently." ) initarg = self.prior_process.initarg t_old = self.prior_process.initarg # If the initial time of the prior equals the location of the first data point, # the initial set of particles is overwritten. # Here, we set them to unimportant values. # If the initial time of the prior is NOT the location of the first data point, # we have to sample an initial set of particles. weights = np.ones(self.num_particles) / self.num_particles particle_set_shape = (self.num_particles,) + self.prior_process.initrv.shape if regression_problem.locations[0] == initarg: particles = np.nan * np.ones(particle_set_shape) else: particles = self.prior_process.initrv.sample( rng=self.rng, size=(self.num_particles,) ) for t, data, measmod in regression_problem: dt = t - t_old new_particles = particles.copy() new_weights = weights.copy() # Capture the inputs in a variable for more compact code layout inputs = measmod, particles, weights, data, t_old, dt, t if t == initarg: particle_generator = self._importance_rv_generator_initial_time(*inputs) else: particle_generator = self._importance_rv_generator(*inputs) for idx, (importance_rv, dynamics_rv, p, w) in enumerate( particle_generator ): # Importance sampling step new_particle = importance_rv.sample(rng=self.rng) meas_rv, _ = measmod.forward_realization(new_particle, t=t) loglikelihood = meas_rv.logpdf(data) log_correction_factor = ( self.importance_distribution.log_correction_factor( proposal_state=new_particle, importance_rv=importance_rv, dynamics_rv=dynamics_rv, old_weight=w, ) ) new_weight = np.exp(loglikelihood + log_correction_factor) new_particles[idx] = new_particle new_weights[idx] = new_weight weights = new_weights / np.sum(new_weights) particles = new_particles new_rv = randvars.Categorical(support=particles, probabilities=weights) if self.with_resampling: N = effective_number_of_events(new_rv) if N < self.min_effective_num_of_particles: new_rv = new_rv.resample(rng=self.rng) yield new_rv, {} t_old = t
def _importance_rv_generator_initial_time( self, measmod, particles, weights, data, t_old, dt, t, ): processed = self.importance_distribution.process_initrv_with_data( self.prior_process.initrv, data, t, measurement_model=measmod ) importance_rv, dynamics_rv, _ = processed for p, w in zip(particles, weights): yield importance_rv, dynamics_rv, p, w def _importance_rv_generator( self, measmod, particles, weights, data, t_old, dt, t, ): for p, w in zip(particles, weights): output = self.importance_distribution.generate_importance_rv( p, data, t_old, dt, measurement_model=measmod ) importance_rv, dynamics_rv, _ = output yield importance_rv, dynamics_rv, p, w