Source code for probnum.filtsmooth.particlefiltsmooth._particle_filter

"""Particle filters."""

from typing import Optional, Union

import numpy as np

from probnum import randvars, statespace
from probnum.filtsmooth.bayesfiltsmooth import BayesFiltSmooth
from probnum.type import FloatArgType, IntArgType

from ._particle_filter_posterior import ParticleFilterPosterior


[docs]def effective_number_of_events(categ_rv: randvars.Categorical) -> float: """Approximate effective number of events in the support of a categorical random variable. In a particle filter, this is used as the effective number of particles which may indicate the need for resampling. """ return 1.0 / np.sum(categ_rv.probabilities ** 2)
[docs]class ParticleFilter(BayesFiltSmooth): r"""Particle filter (PF). Also known as sequential Monte Carlo method. A PF estimates the posterior distribution of a Markov process given noisy, non-linear observations, with a set of particles. The random state of the particle filter is inferred from the random state of the initial random variable. Parameters ---------- dynamics_model : Prior dynamics. Since the PF is essentially a discrete-time algorithm, the prior must be a discrete model (or at least one with an equivalent discretisation). This transition must support `forward_realization`. measurement_model : Measurement model. Must be a discrete model that supports `forward_realization`. initrv : Initial random variable. Can be any `RandomVariable` object that implements `sample()`. num_particles : Number of particles to use. linearized_measurement_model : Linearized measurement model that is used as an importance density. In principle, any discrete-time model that supports `backward_realization` is applicable. In practice, it will almost always be one out of `DiscreteEKFComponent`, `DiscreteUKFComponent`, or `IteratedDiscreteComponent`. Linear components are also possible, but would most often imply that a particle filter is not required, because the filtering problem can be used much faster with a Kalman filter. The exception to this rule is if the initial random variable is not Gaussian. Optional. Default is None, which implies the bootstrap PF. with_resampling : Whether after each step the effective number of particles shall be checked, and, if too low, the state should be resampled. Optional. Default is `True`. resampling_percentage_threshold : Percentage threshold for resampling. That is, it is the value :math:`p` such that resampling is performed if :math:`N_{\text{eff}} < p \, N_\text{particles}` holds. Optional. Default is 0.1. If this value is non-positive, resampling is never performed. If it is larger than 1, resampling is performed after each step. """ def __init__( self, dynamics_model: Union[statespace.LTISDE, statespace.DiscreteGaussian], measurement_model: statespace.DiscreteGaussian, initrv: randvars.RandomVariable, num_particles: IntArgType, linearized_measurement_model: Optional[statespace.DiscreteGaussian] = None, with_resampling: bool = True, resampling_percentage_threshold: FloatArgType = 0.1, ) -> None: super().__init__( dynamics_model=dynamics_model, measurement_model=measurement_model, initrv=initrv, ) self.num_particles = num_particles self.with_resampling = with_resampling self.resampling_percentage_threshold = resampling_percentage_threshold self.min_effective_num_of_particles = ( resampling_percentage_threshold * num_particles ) # If None, the dynamics model is used as a fallback option # which results in the bootstrap PF. # Any linearised measurement model that could be used in a # Gaussian filter can be used here and will likely be a better # choice than the bootstrap. self.linearized_measurement_model = linearized_measurement_model
[docs] def filter(self, dataset, times): """Apply Particle filtering to a data set. Parameters ---------- dataset : array_like, shape (N, M) Data set that is filtered. times : array_like, shape (N,) Temporal locations of the data points. The zeroth element in times and dataset is the location of the initial random variable. Returns ------- ParticleFilterPosterior Posterior distribution of the filtered output. """ # Initialize: condition on first data point using the initial random # variable as a dynamics rv (i.e. as the current prior). particles_and_weights = np.array( [ self.compute_new_particle(dataset[0], times[0], self.initrv) for _ in range(self.num_particles) ], dtype=object, ) particles = np.stack(particles_and_weights[:, 0], axis=0) weights = np.stack(particles_and_weights[:, 1], axis=0) weights = np.array(weights) / np.sum(weights) curr_rv = randvars.Categorical( support=particles, probabilities=weights, random_state=self.random_state ) rvs = [curr_rv] for idx in range(1, len(times)): curr_rv, _ = self.filter_step( start=times[idx - 1], stop=times[idx], randvar=curr_rv, data=dataset[idx], ) rvs.append(curr_rv) return ParticleFilterPosterior(states=rvs, locations=times)
[docs] def filter_step(self, start, stop, randvar, data): """Perform a particle filter step. This method implements sequential importance (re)sampling. It consists of the following steps: 1. Propagating the "past" particles through the dynamics model. 2. Computing a "proposal" random variable. This is either the prior dynamics model or the output of a filter step of an (approximate) Gaussian filter. 3. Sample from the proposal random variable. This is the "new" particle. 4. Propagate the particle through the measurement model. This is required in order to evaluate the PDF of the resulting RV at the data. If this is small, the weight of the particle will be small. 5. Compute weights ("event probabilities") of the new particle. This requires evaluating the PDFs of all three RVs (dynamics, proposal, measurement). After this is done for all particles, the weights are normalized in order to sum to 1. If the effective number of particles is low, the particles are resampled. """ new_weights = randvar.probabilities.copy() new_support = randvar.support.copy() for idx, (particle, weight) in enumerate(zip(new_support, new_weights)): dynamics_rv, _ = self.dynamics_model.forward_realization( particle, t=start, dt=(stop - start) ) proposal_state, proposal_weight = self.compute_new_particle( data, stop, dynamics_rv ) new_support[idx] = proposal_state new_weights[idx] = proposal_weight new_weights = new_weights / np.sum(new_weights) new_rv = randvars.Categorical( support=new_support, probabilities=new_weights, random_state=self.random_state, ) if self.with_resampling: if effective_number_of_events(new_rv) < self.min_effective_num_of_particles: new_rv = new_rv.resample() return new_rv, {}
[docs] def compute_new_particle(self, data, stop, dynamics_rv): """Compute a new particle. Turn the dynamics RV into a proposal RV, apply the measurement model and compute new weights via the respective PDFs. """ proposal_rv = self.dynamics_to_proposal_rv(dynamics_rv, data=data, t=stop) proposal_state = proposal_rv.sample() meas_rv, _ = self.measurement_model.forward_realization(proposal_state, t=stop) # For the bootstrap PF, the dynamics and proposal PDFs cancel out. # Therefore we make the following exception. if self.linearized_measurement_model is None: log_proposal_weight = meas_rv.logpdf(data) else: log_proposal_weight = ( meas_rv.logpdf(data) + dynamics_rv.logpdf(proposal_state) - proposal_rv.logpdf(proposal_state) ) proposal_weight = np.exp(log_proposal_weight) return proposal_state, proposal_weight
[docs] def dynamics_to_proposal_rv(self, dynamics_rv, data, t): """Turn a dynamics RV into a proposal RV. The output of this function depends on the choice of PF. For the bootstrap PF, nothing happens. For other PFs, the importance density is used to improve the proposal. Currently, only approximate Gaussian importance densities are provided. """ proposal_rv = dynamics_rv if self.linearized_measurement_model is not None: proposal_rv, _ = self.linearized_measurement_model.backward_realization( data, proposal_rv, t=t ) return proposal_rv
@property def random_state(self): """Random state of the particle filter. Inferred from the random state of the initial random variable. """ return self.initrv.random_state