"""Normally distributed / Gaussian random variables."""
from typing import Callable, Optional, Union
import numpy as np
import scipy.linalg
import scipy.stats
from probnum import linops
from probnum import utils as _utils
from probnum.type import (
ArrayLikeGetitemArgType,
FloatArgType,
RandomStateArgType,
ShapeArgType,
ShapeType,
)
from . import _random_variable
try:
# functools.cached_property is only available in Python >=3.8
from functools import cached_property
except ImportError:
from cached_property import cached_property
COV_CHOLESKY_DAMPING = 10 ** -12
_ValueType = Union[np.floating, np.ndarray, linops.LinearOperator]
class Normal(_random_variable.ContinuousRandomVariable[_ValueType]):
"""Random variable with a normal distribution.
Gaussian random variables are ubiquitous in probability theory, since the
Gaussian is the equilibrium distribution to which other distributions gravitate
under a wide variety of smooth operations, e.g., convolutions and stochastic
transformations. One example of this is the central limit theorem. Gaussian
random variables are also attractive from a numerical point of view as they
maintain their distribution family through many transformations (e.g. they are
stable). In particular, they allow for efficient closed-form Bayesian inference
given linear observations.
Parameters
----------
mean :
Mean of the random variable.
cov :
(Co-)variance of the random variable.
random_state :
Random state of the random variable. If None (or np.random), the global
:mod:`numpy.random` state is used. If integer, it is used to seed the local
:class:`~numpy.random.RandomState` instance.
See Also
--------
RandomVariable : Class representing random variables.
Examples
--------
>>> from probnum import random_variables as rvs
>>> x = rvs.Normal(mean=0.5, cov=1.0, random_state=42)
>>> x.sample(size=(2, 2))
array([[0.99671415, 0.3617357 ],
[1.14768854, 2.02302986]])
"""
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def __init__(
self,
mean: Union[float, np.floating, np.ndarray, linops.LinearOperator],
cov: Union[float, np.floating, np.ndarray, linops.LinearOperator],
cov_cholesky: Optional[Union[np.ndarray, linops.LinearOperator]] = None,
random_state: RandomStateArgType = None,
):
# Type normalization
if np.isscalar(mean):
mean = _utils.as_numpy_scalar(mean)
if np.isscalar(cov):
cov = _utils.as_numpy_scalar(cov)
# Data type normalization
is_mean_floating = mean.dtype is not None and np.issubdtype(
mean.dtype, np.floating
)
is_cov_floating = cov.dtype is not None and np.issubdtype(
cov.dtype, np.floating
)
if is_mean_floating and is_cov_floating:
dtype = np.promote_types(mean.dtype, cov.dtype)
elif is_mean_floating:
dtype = mean.dtype
elif is_cov_floating:
dtype = cov.dtype
else:
dtype = np.dtype(np.float_)
if not isinstance(mean, linops.LinearOperator):
mean = mean.astype(dtype, order="C", casting="safe", subok=True, copy=False)
else:
# TODO: Implement casting for linear operators
if mean.dtype != dtype:
raise ValueError(
f"The mean must have type `{dtype.name}` not `{mean.dtype.name}`, "
f"but a linear operator does not implement type casting."
)
if not isinstance(cov, linops.LinearOperator):
cov = cov.astype(dtype, order="C", casting="safe", subok=True, copy=False)
else:
# TODO: Implement casting for linear operators
if cov.dtype != dtype:
raise ValueError(
f"The covariance must have type `{dtype.name}` not "
f"`{cov.dtype.name}`, but a linear operator does not implement "
f"type casting."
)
# Shape checking
if len(mean.shape) not in [0, 1, 2]:
raise ValueError(
f"Gaussian random variables must either be scalars, vectors, or "
f"matrices (or linear operators), but the given mean is a {mean.ndim}-"
f"dimensional tensor."
)
expected_cov_shape = (np.prod(mean.shape),) * 2 if len(mean.shape) > 0 else ()
if len(cov.shape) != len(expected_cov_shape) or cov.shape != expected_cov_shape:
raise ValueError(
f"The covariance matrix must be of shape {expected_cov_shape}, but "
f"shape {cov.shape} was given."
)
self._mean = mean
self._cov = cov
self._compute_cov_cholesky: Callable[[], _ValueType] = None
# Method selection
univariate = len(mean.shape) == 0
dense = isinstance(mean, np.ndarray) and isinstance(cov, np.ndarray)
cov_operator = isinstance(cov, linops.LinearOperator)
if univariate:
# Univariate Gaussian
sample = self._univariate_sample
in_support = Normal._univariate_in_support
pdf = self._univariate_pdf
logpdf = self._univariate_logpdf
cdf = self._univariate_cdf
logcdf = self._univariate_logcdf
quantile = self._univariate_quantile
median = lambda: self._mean
var = lambda: self._cov
entropy = self._univariate_entropy
self._compute_cov_cholesky = self._univariate_cov_cholesky
elif dense or cov_operator:
# Multi- and matrixvariate Gaussians
sample = self._dense_sample
in_support = Normal._dense_in_support
pdf = self._dense_pdf
logpdf = self._dense_logpdf
cdf = self._dense_cdf
logcdf = self._dense_logcdf
quantile = None
median = None
var = self._dense_var
entropy = self._dense_entropy
if cov_cholesky is None:
self._compute_cov_cholesky = self.dense_cov_cholesky
else:
if not isinstance(cov_cholesky, type(self._cov)):
raise ValueError(
f"The covariance matrix is of type `{type(self._cov)}`, so its "
f"Cholesky decomposition must be of the same type, but an "
f"object of type `{type(cov_cholesky)}` was given."
)
if cov_cholesky.shape != self._cov.shape:
raise ValueError(
f"The cholesky decomposition of the covariance matrix must "
f"have the same shape as the covariance matrix, i.e. "
f"{self._cov.shape}, but shape {cov_cholesky.shape} was given"
)
if cov_cholesky.dtype != self._cov.dtype:
# TODO: Implement casting for linear operators
if not isinstance(cov_cholesky, linops.LinearOperator):
cov_cholesky = cov_cholesky.astype(self._cov.dtype)
self._compute_cov_cholesky = lambda: cov_cholesky
if isinstance(cov, linops.SymmetricKronecker):
m, n = mean.shape
if m != n or n != cov.A.shape[0] or n != cov.B.shape[1]:
raise ValueError(
"Normal distributions with symmetric Kronecker structured "
"kernels must have square mean and square kernels factors with "
"matching dimensions."
)
if cov._ABequal:
sample = self._symmetric_kronecker_identical_factors_sample
# pylint: disable=redefined-variable-type
self._compute_cov_cholesky = (
self._symmetric_kronecker_identical_factors_cov_cholesky
)
elif isinstance(cov, linops.Kronecker):
m, n = mean.shape
if (
m != cov.A.shape[0]
or m != cov.A.shape[1]
or n != cov.B.shape[0]
or n != cov.B.shape[1]
):
raise ValueError(
"Kronecker structured kernels must have factors with the same "
"shape as the mean."
)
self._compute_cov_cholesky = self._kronecker_cov_cholesky
else:
raise ValueError(
f"Cannot instantiate normal distribution with mean of type "
f"{mean.__class__.__name__} and kernels of type "
f"{cov.__class__.__name__}."
)
super().__init__(
shape=mean.shape,
dtype=mean.dtype,
random_state=random_state,
parameters={"mean": self._mean, "cov": self._cov},
sample=sample,
in_support=in_support,
pdf=pdf,
logpdf=logpdf,
cdf=cdf,
logcdf=logcdf,
quantile=quantile,
mode=lambda: self._mean,
median=median,
mean=lambda: self._mean,
cov=lambda: self._cov,
var=var,
entropy=entropy,
)
@cached_property
def cov_cholesky(self) -> _ValueType:
"""Cholesky factor :math:`L` of the covariance
:math:`\\operatorname{Cov}(X) =LL^\\top`."""
if self._compute_cov_cholesky is None:
raise NotImplementedError
return self._compute_cov_cholesky()
@cached_property
def dense_mean(self) -> Union[np.floating, np.ndarray]:
"""Dense representation of the mean."""
if isinstance(self._mean, linops.LinearOperator):
return self._mean.todense()
else:
return self._mean
@cached_property
def dense_cov(self) -> Union[np.floating, np.ndarray]:
"""Dense representation of the covariance."""
if isinstance(self._cov, linops.LinearOperator):
return self._cov.todense()
else:
return self._cov
def __getitem__(self, key: ArrayLikeGetitemArgType) -> "Normal":
"""Marginalization in multi- and matrixvariate normal random variables,
expressed as (advanced) indexing, masking and slicing.
We support all modes of array indexing presented in
https://numpy.org/doc/1.19/reference/arrays.indexing.html.
Note that, currently, this method only works for multi- and matrixvariate
normal distributions.
Parameters
----------
key : int or slice or ndarray or tuple of None, int, slice, or ndarray
Indices, slice objects and/or boolean masks specifying which entries to keep
while marginalizing over all other entries.
"""
if not isinstance(key, tuple):
key = (key,)
# Select entries from mean
mean = self.dense_mean[key]
# Select submatrix from covariance matrix
cov = self.dense_cov.reshape(self.shape + self.shape)
cov = cov[key][tuple([slice(None)] * mean.ndim) + key]
if mean.ndim > 0:
cov = cov.reshape(mean.size, mean.size)
return Normal(
mean=mean,
cov=cov,
random_state=_utils.derive_random_seed(self.random_state),
)
[docs] def reshape(self, newshape: ShapeArgType) -> "Normal":
try:
reshaped_mean = self.dense_mean.reshape(newshape)
except ValueError as exc:
raise ValueError(
f"Cannot reshape this normal random variable to the given shape: "
f"{newshape}"
) from exc
reshaped_cov = self.dense_cov
if reshaped_mean.ndim > 0 and reshaped_cov.ndim == 0:
reshaped_cov = reshaped_cov.reshape(1, 1)
return Normal(
mean=reshaped_mean,
cov=reshaped_cov,
random_state=_utils.derive_random_seed(self.random_state),
)
[docs] def transpose(self, *axes: int) -> "Normal":
if len(axes) == 1 and isinstance(axes[0], tuple):
axes = axes[0]
elif (len(axes) == 1 and axes[0] is None) or len(axes) == 0:
axes = tuple(reversed(range(self.ndim)))
mean_t = self.dense_mean.transpose(*axes).copy()
# Transpose covariance
cov_axes = axes + tuple(mean_t.ndim + axis for axis in axes)
cov_t = self.dense_cov.reshape(self.shape + self.shape)
cov_t = cov_t.transpose(*cov_axes).copy()
if mean_t.ndim > 0:
cov_t = cov_t.reshape(mean_t.size, mean_t.size)
return Normal(
mean=mean_t,
cov=cov_t,
random_state=_utils.derive_random_seed(self.random_state),
)
# Unary arithmetic operations
def __neg__(self) -> "Normal":
return Normal(
mean=-self._mean,
cov=self._cov,
random_state=_utils.derive_random_seed(self.random_state),
)
def __pos__(self) -> "Normal":
return Normal(
mean=+self._mean,
cov=self._cov,
random_state=_utils.derive_random_seed(self.random_state),
)
# TODO: Overwrite __abs__ and add absolute moments of normal
# TODO: (https://arxiv.org/pdf/1209.4340.pdf)
# Binary arithmetic operations
def _add_normal(self, other: "Normal") -> "Normal":
if other.shape != self.shape:
raise ValueError(
"Addition of two normally distributed random variables is only "
"possible if both operands have the same shape."
)
return Normal(
mean=self._mean + other._mean,
cov=self._cov + other._cov,
random_state=_utils.derive_random_seed(
self.random_state, other.random_state
),
)
def _sub_normal(self, other: "Normal") -> "Normal":
if other.shape != self.shape:
raise ValueError(
"Subtraction of two normally distributed random variables is only "
"possible if both operands have the same shape."
)
return Normal(
mean=self._mean - other._mean,
cov=self._cov + other._cov,
random_state=_utils.derive_random_seed(
self.random_state, other.random_state
),
)
# Univariate Gaussians
def _univariate_cov_cholesky(self) -> np.floating:
return np.sqrt(self._cov)
def _univariate_sample(
self, size: ShapeType = ()
) -> Union[np.floating, np.ndarray]:
sample = scipy.stats.norm.rvs(
loc=self._mean, scale=self.std, size=size, random_state=self.random_state
)
if np.isscalar(sample):
sample = _utils.as_numpy_scalar(sample, dtype=self.dtype)
else:
sample = sample.astype(self.dtype)
assert sample.shape == size
return sample
@staticmethod
def _univariate_in_support(x: _ValueType) -> bool:
return np.isfinite(x)
def _univariate_pdf(self, x: _ValueType) -> np.float_:
return scipy.stats.norm.pdf(x, loc=self._mean, scale=self.std)
def _univariate_logpdf(self, x: _ValueType) -> np.float_:
return scipy.stats.norm.logpdf(x, loc=self._mean, scale=self.std)
def _univariate_cdf(self, x: _ValueType) -> np.float_:
return scipy.stats.norm.cdf(x, loc=self._mean, scale=self.std)
def _univariate_logcdf(self, x: _ValueType) -> np.float_:
return scipy.stats.norm.logcdf(x, loc=self._mean, scale=self.std)
def _univariate_quantile(self, p: FloatArgType) -> np.floating:
return scipy.stats.norm.ppf(p, loc=self._mean, scale=self.std)
def _univariate_entropy(self: _ValueType) -> np.float_:
return _utils.as_numpy_scalar(
scipy.stats.norm.entropy(loc=self._mean, scale=self.std),
dtype=np.float_,
)
# Multi- and matrixvariate Gaussians
[docs] def dense_cov_cholesky(self) -> np.ndarray:
"""Compute the Cholesky factorization of the covariance from its dense
representation."""
dense_cov = self.dense_cov
return scipy.linalg.cholesky(
dense_cov + COV_CHOLESKY_DAMPING * np.eye(self.size, dtype=self.dtype),
lower=True,
)
def _dense_sample(self, size: ShapeType = ()) -> np.ndarray:
sample = scipy.stats.multivariate_normal.rvs(
mean=self.dense_mean.ravel(),
cov=self.dense_cov,
size=size,
random_state=self.random_state,
)
return sample.reshape(sample.shape[:-1] + self.shape)
@staticmethod
def _arg_todense(x: Union[np.ndarray, linops.LinearOperator]) -> np.ndarray:
if isinstance(x, linops.LinearOperator):
return x.todense()
elif isinstance(x, np.ndarray):
return x
else:
raise ValueError(f"Unsupported argument type {type(x)}")
@staticmethod
def _dense_in_support(x: _ValueType) -> bool:
return np.all(np.isfinite(Normal._arg_todense(x)))
def _dense_pdf(self, x: _ValueType) -> np.float_:
return scipy.stats.multivariate_normal.pdf(
Normal._arg_todense(x).reshape(x.shape[: -self.ndim] + (-1,)),
mean=self.dense_mean.ravel(),
cov=self.dense_cov,
)
def _dense_logpdf(self, x: _ValueType) -> np.float_:
return scipy.stats.multivariate_normal.logpdf(
Normal._arg_todense(x).reshape(x.shape[: -self.ndim] + (-1,)),
mean=self.dense_mean.ravel(),
cov=self.dense_cov,
)
def _dense_cdf(self, x: _ValueType) -> np.float_:
return scipy.stats.multivariate_normal.cdf(
Normal._arg_todense(x).reshape(x.shape[: -self.ndim] + (-1,)),
mean=self.dense_mean.ravel(),
cov=self.dense_cov,
)
def _dense_logcdf(self, x: _ValueType) -> np.float_:
return scipy.stats.multivariate_normal.logcdf(
Normal._arg_todense(x).reshape(x.shape[: -self.ndim] + (-1,)),
mean=self.dense_mean.ravel(),
cov=self.dense_cov,
)
def _dense_var(self) -> np.ndarray:
return np.diag(self.dense_cov).reshape(self.shape)
def _dense_entropy(self) -> np.float_:
return _utils.as_numpy_scalar(
scipy.stats.multivariate_normal.entropy(
mean=self.dense_mean.ravel(),
cov=self.dense_cov,
),
dtype=np.float_,
)
# Matrixvariate Gaussian with Kronecker covariance
def _kronecker_cov_cholesky(self) -> linops.Kronecker:
assert isinstance(self._cov, linops.Kronecker)
A = self._cov.A.todense()
B = self._cov.B.todense()
return linops.Kronecker(
A=scipy.linalg.cholesky(
A + COV_CHOLESKY_DAMPING * np.eye(A.shape[0], dtype=self.dtype),
lower=True,
),
B=scipy.linalg.cholesky(
B + COV_CHOLESKY_DAMPING * np.eye(B.shape[0], dtype=self.dtype),
lower=True,
),
dtype=self.dtype,
)
# Matrixvariate Gaussian with symmetric Kronecker covariance from identical
# factors
def _symmetric_kronecker_identical_factors_cov_cholesky(
self,
) -> linops.SymmetricKronecker:
assert isinstance(self._cov, linops.SymmetricKronecker) and self._cov._ABequal
A = self._cov.A.todense()
return linops.SymmetricKronecker(
A=scipy.linalg.cholesky(
A + COV_CHOLESKY_DAMPING * np.eye(A.shape[0], dtype=self.dtype),
lower=True,
),
dtype=self.dtype,
)
def _symmetric_kronecker_identical_factors_sample(
self, size: ShapeType = ()
) -> np.ndarray:
assert isinstance(self._cov, linops.SymmetricKronecker) and self._cov._ABequal
n = self._mean.shape[1]
# Draw standard normal samples
size_sample = (n * n,) + size
stdnormal_samples = scipy.stats.norm.rvs(
size=size_sample, random_state=self.random_state
)
# Appendix E: Bartels, S., Probabilistic Linear Algebra, PhD Thesis 2019
samples_scaled = linops.Symmetrize(dim=n) @ (
self.cov_cholesky @ stdnormal_samples
)
# TODO: can we avoid todense here and just return operator samples?
return self.dense_mean[None, :, :] + samples_scaled.T.reshape(-1, n, n)