Source code for probnum.quad._integration_measures

"""Contains integration measures."""
from __future__ import annotations

import abc
from typing import Optional, Union

import numpy as np
import scipy.stats

from probnum.randvars import Normal
from probnum.typing import FloatLike, IntLike

from ._quad_typing import DomainLike
from ._utils import as_domain


class IntegrationMeasure(abc.ABC):
    """An abstract class for a measure against which a target function is integrated.

    Child classes implement specific integration measures and, if available, make use
    of random variables for sampling and evaluation of the density function.

    Parameters
    ----------
    domain
        Domain of integration. Contains lower and upper bound as a scalar or
        ``np.ndarray``.
    input_dim
        Dimension of the integration domain.
    """

    def __init__(
        self,
        domain: DomainLike,
        input_dim: Optional[IntLike],
    ) -> None:

        self.domain, self.input_dim = as_domain(domain, input_dim)

[docs] def __call__(self, points: Union[FloatLike, np.ndarray]) -> np.ndarray: """Evaluate the density function of the integration measure. Parameters ---------- points *shape=(n_points, input_dim)* -- Input locations. Returns ------- density_evals : *shape=(n_points,)* -- Density evaluated at given locations. """ # pylint: disable=no-member return self.random_variable.pdf(points).reshape(-1)
[docs] def sample( self, n_sample: IntLike, rng: Optional[np.random.Generator] = np.random.default_rng(), ) -> np.ndarray: """Sample ``n_sample`` points from the integration measure. Parameters ---------- n_sample Number of points to be sampled rng Random number generator. Optional. Default is `np.random.default_rng()`. Returns ------- points : *shape=(n_sample,input_dim)* -- Sampled points """ # pylint: disable=no-member return np.reshape( self.random_variable.sample(size=n_sample, rng=rng), newshape=(n_sample, self.input_dim), )
class LebesgueMeasure(IntegrationMeasure): """Lebesgue measure on a hyper-rectangle. Parameters ---------- domain Domain of integration. Contains lower and upper bound as a scalar or ``np.ndarray``. input_dim Dimension of the integration domain. If not given, inferred from ``domain``. normalized Boolean which controls whether or not the measure is normalized (i.e., integral over the domain is one). Defaults to ``False``. """ def __init__( self, domain: DomainLike, input_dim: Optional[IntLike] = None, normalized: bool = False, ) -> None: super().__init__(input_dim=input_dim, domain=domain) # Set normalization constant if normalized: normalization_constant = 1.0 / np.prod(self.domain[1] - self.domain[0]) else: normalization_constant = 1.0 if normalization_constant in [0, np.Inf, -np.Inf]: raise ValueError( "Normalization constant is too small or too large. " "Consider setting normalized = False." ) self.normalized = normalized self.normalization_constant = normalization_constant # Use scipy's uniform random variable since uniform random variables are not # yet implemented in probnum self.random_variable = scipy.stats.uniform( loc=self.domain[0], scale=self.domain[1] - self.domain[0] )
[docs] def __call__(self, points: np.ndarray) -> np.ndarray: num_dat = points.shape[0] return np.full((num_dat,), self.normalization_constant)
[docs] def sample( self, n_sample: IntLike, rng: Optional[np.random.Generator] = np.random.default_rng(), ) -> np.ndarray: return self.random_variable.rvs( size=(n_sample, self.input_dim), random_state=rng )
# pylint: disable=too-few-public-methods class GaussianMeasure(IntegrationMeasure): """Gaussian measure on Euclidean space with given mean and covariance. If ``mean`` and ``cov`` are scalars but ``input_dim`` is larger than one, ``mean`` and ``cov`` are extended to a constant vector and diagonal matrix, respectively, of appropriate dimensions. Parameters ---------- mean *shape=(input_dim,)* -- Mean of the Gaussian measure. cov *shape=(input_dim, input_dim)* -- Covariance matrix of the Gaussian measure. input_dim Dimension of the integration domain. """ def __init__( self, mean: Union[float, np.floating, np.ndarray], cov: Union[float, np.floating, np.ndarray], input_dim: Optional[IntLike] = None, ) -> None: # Extend scalar mean and covariance to higher dimensions if input_dim has been # supplied by the user if ( (np.isscalar(mean) or mean.size == 1) and (np.isscalar(cov) or cov.size == 1) and input_dim is not None ): mean = np.full((input_dim,), mean) cov = cov * np.eye(input_dim) # Set dimension based on the mean vector if np.isscalar(mean): input_dim = 1 else: input_dim = mean.size # Set domain as whole R^n domain = (np.full((input_dim,), -np.Inf), np.full((input_dim,), np.Inf)) super().__init__(input_dim=input_dim, domain=domain) # Exploit random variables to carry out mean and covariance checks # squeezes are needed due to the way random variables are currently implemented # pylint: disable=no-member self.random_variable = Normal(mean=np.squeeze(mean), cov=np.squeeze(cov)) self.mean = np.reshape(self.random_variable.mean, (self.input_dim,)) self.cov = np.reshape( self.random_variable.cov, (self.input_dim, self.input_dim) ) # Set diagonal_covariance flag if input_dim == 1: self.diagonal_covariance = True else: self.diagonal_covariance = ( np.count_nonzero(self.cov - np.diag(np.diagonal(self.cov))) == 0 )