"""Matrix-based probabilistic linear solvers.
Implementations of matrix-based linear solvers which perform inference
on the matrix or its inverse given linear observations.
"""
import abc
import warnings
import numpy as np
from probnum import linops, randvars
# pylint: disable="too-many-branches,too-many-lines,too-complex,too-many-statements,redefined-builtin,arguments-differ,abstract-method,unused-argument"
[docs]class MatrixBasedSolver(abc.ABC):
"""Abstract class for matrix-based probabilistic linear solvers.
Parameters
----------
A : array-like or LinearOperator or RandomVariable, shape=(n,n)
A square matrix or linear operator. A prior distribution can be provided as a
:class:`~randvars.RandomVariable`. If an array or linear operator is given,
a prior distribution is
chosen automatically.
b : RandomVariable, shape=(n,) or (n, nrhs)
Right-hand side vector, matrix or RandomVariable of :math:`A x = b`.
x0 : array-like, shape=(n,) or (n, nrhs)
Optional. Guess for the solution of the linear system.
"""
def __init__(self, A, b, x0=None):
self.x0 = x0
self.A = A
self.b = b
self.n = A.shape[1]
def _get_prior_params(self, A0, Ainv0, x0, b):
"""Find the parameters of the prior distribution.
Parameters
----------
A0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional
A square matrix, linear operator or random variable representing the prior
belief over the linear operator :math:`A`. If an array or linear operator is
given, a prior distribution is chosen automatically.
Ainv0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional
A square matrix, linear operator or random variable representing the prior
belief over the inverse :math:`H=A^{-1}`. This can be viewed as taking the
form of a pre-conditioner. If an array or linear operator is given, a prior
distribution is chosen automatically.
x0 : array-like, or RandomVariable, shape=(n,) or (n, nrhs)
Optional. Prior belief for the solution of the linear system. Will be
ignored if ``A0`` or ``Ainv0`` is given.
b : array_like, shape=(n,) or (n, nrhs)
Right-hand side vector or matrix in :math:`A x = b`.
"""
raise NotImplementedError
def _construct_symmetric_matrix_prior_means(self, A, x0, b):
"""Create matrix prior means from an initial guess for the solution of the
linear system.
Constructs a matrix-variate prior mean for H from ``x0`` and ``b`` such that
:math:`H_0b = x_0`, :math:`H_0` symmetric positive definite and
:math:`A_0 = H_0^{-1}`.
Parameters
----------
A : array-like or LinearOperator, shape=(n,n)
System matrix assumed to be square.
x0 : array-like, shape=(n,) or (n, nrhs)
Optional. Guess for the solution of the linear system.
b : array_like, shape=(n,) or (n, nrhs)
Right-hand side vector or matrix in :math:`A x = b`.
Returns
-------
A0_mean : linops.LinearOperator
Mean of the matrix-variate prior distribution on the system matrix
:math:`A`.
Ainv0_mean : linops.LinearOperator
Mean of the matrix-variate prior distribution on the inverse of the system
matrix :math:`H = A^{-1}`.
"""
# Check inner product between x0 and b; if negative or zero, choose better
# initialization
bx0 = np.squeeze(b.T @ x0)
bb = np.linalg.norm(b) ** 2
if bx0 < 0:
x0 = -x0
bx0 = -bx0
print("Better initialization found, setting x0 = - x0.")
elif bx0 == 0:
if np.all(b == np.zeros_like(b)):
print("Right-hand-side is zero. Initializing with solution x0 = 0.")
x0 = b
else:
print("Better initialization found, setting x0 = (b'b/b'Ab) * b.")
bAb = np.squeeze(b.T @ (A @ b))
x0 = bb / bAb * b
bx0 = bb ** 2 / bAb
# Construct prior mean of A and H
alpha = 0.5 * bx0 / bb
def _matmul(M):
return (x0 - alpha * b) @ (x0 - alpha * b).T @ M
Ainv0_mean = linops.Scaling(
alpha, shape=(self.n, self.n)
) + 2 / bx0 * linops.LinearOperator(
shape=(self.n, self.n),
dtype=np.result_type(x0.dtype, alpha.dtype, b.dtype),
matmul=_matmul,
)
A0_mean = linops.Scaling(1 / alpha, shape=(self.n, self.n)) - 1 / (
alpha * np.squeeze((x0 - alpha * b).T @ x0)
) * linops.LinearOperator(
shape=(self.n, self.n),
dtype=np.result_type(x0.dtype, alpha.dtype, b.dtype),
matmul=_matmul,
)
return A0_mean, Ainv0_mean
[docs] def has_converged(self, iter, maxiter, **kwargs):
"""Check convergence of a linear solver.
Evaluates a set of convergence criteria based on its input arguments to decide
whether the iteration has converged.
Parameters
----------
iter : int
Current iteration of solver.
maxiter : int
Maximum number of iterations
Returns
-------
has_converged : bool
True if the method has converged.
convergence_criterion : str
Convergence criterion which caused termination.
"""
# maximum iterations
if iter >= maxiter:
warnings.warn(
"Iteration terminated. Solver reached the maximum number of iterations."
)
return True, "maxiter"
else:
return False, ""
[docs] def solve(self, callback=None, **kwargs):
"""Solve the linear system :math:`Ax=b`.
Parameters
----------
callback : function, optional
User-supplied function called after each iteration of the linear solver. It
is called as ``callback(xk, Ak, Ainvk, sk, yk, alphak, resid, **kwargs)``
and can be used to return quantities from the iteration. Note that depending
on the function supplied, this can slow down the solver.
kwargs
Key-word arguments adjusting the behaviour of the ``solve`` iteration. These
are usually convergence criteria.
Returns
-------
x : RandomVariable, shape=(n,) or (n, nrhs)
Approximate solution :math:`x` to the linear system. Shape of the return
matches the shape of ``b``.
A : RandomVariable, shape=(n,n)
Posterior belief over the linear operator.
Ainv : RandomVariable, shape=(n,n)
Posterior belief over the linear operator inverse :math:`H=A^{-1}`.
info : dict
Information on convergence of the solver.
"""
raise NotImplementedError
[docs]class SymmetricMatrixBasedSolver(MatrixBasedSolver):
"""Symmetric matrix-based probabilistic linear solver.
Implements the solve iteration of the symmetric matrix-based probabilistic linear
solver described in [1]_ and [2]_.
Parameters
----------
A : array-like or LinearOperator or RandomVariable, shape=(n,n)
The square matrix or linear operator of the linear system.
b : array_like, shape=(n,) or (n, nrhs)
Right-hand side vector or matrix in :math:`A x = b`.
A0 : array-like or LinearOperator or RandomVariable, shape=(n, n), optional
A square matrix, linear operator or random variable representing the prior
belief over the linear operator :math:`A`. If an array or linear operator is
given, a prior distribution is chosen automatically.
Ainv0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional
A square matrix, linear operator or random variable representing the prior
belief over the inverse :math:`H=A^{-1}`. This can be viewed as taking the form
of a pre-conditioner. If an array or linear operator is given, a prior
distribution is chosen automatically.
x0 : array-like, or RandomVariable, shape=(n,) or (n, nrhs)
Optional. Prior belief for the solution of the linear system. Will be ignored if
``Ainv0`` is given.
Returns
-------
A : RandomVariable
Posterior belief over the linear operator.
Ainv : RandomVariable
Posterior belief over the inverse linear operator.
x : RandomVariable
Posterior belief over the solution of the linear system.
info : dict
Information about convergence and the solution.
References
----------
.. [1] Wenger, J. and Hennig, P., Probabilistic Linear Solvers for Machine Learning,
*Advances in Neural Information Processing Systems (NeurIPS)*, 2020
.. [2] Hennig, P., Probabilistic Interpretation of Linear Solvers, *SIAM Journal on
Optimization*, 2015, 25, 234-260
See Also
--------
NoisySymmetricMatrixBasedSolver :
Class implementing the noisy symmetric probabilistic linear solver.
"""
def __init__(self, A, b, A0=None, Ainv0=None, x0=None):
# Assume constant right hand side
if isinstance(b, randvars.RandomVariable):
_b = b.sample(size=1)
else:
_b = b
super().__init__(A=A, b=_b, x0=x0)
# Get or construct prior parameters
A_mean0, A_covfactor0, Ainv_mean0, Ainv_covfactor0 = self._get_prior_params(
A0=A0, Ainv0=Ainv0, x0=self.x0, b=self.b
)
# Initialize prior parameters
self.A_mean = A_mean0
self.A_mean0 = A_mean0
self.A_covfactor = A_covfactor0
self.A_covfactor0 = A_covfactor0
self.Ainv_mean = Ainv_mean0
self.Ainv_mean0 = Ainv_mean0
self.Ainv_covfactor = Ainv_covfactor0
self.Ainv_covfactor0 = Ainv_covfactor0
if isinstance(x0, np.ndarray):
self.x_mean = x0
elif x0 is None:
self.x_mean = Ainv_mean0 @ self.b
else:
raise NotImplementedError
self.x0 = self.x_mean
# Computed search directions and observations
self.search_dir_list = []
self.obs_list = []
self.sy = []
def _get_prior_params(self, A0, Ainv0, x0, b):
"""Get the parameters of the matrix priors on A and H.
Retrieves and / or initializes prior parameters of ``A0`` and ``Ainv0``.
Parameters
----------
A0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional
A square matrix, linear operator or random variable representing the prior
belief over the linear operator :math:`A`. If an array or linear operator is
given, a prior distribution is chosen automatically. Ainv0 : array-like or
LinearOperator or RandomVariable, shape=(n,n), optional A square matrix,
linear operator or random variable representing the prior belief over the
inverse :math:`H=A^{-1}`. This can be viewed as taking the form of a
pre-conditioner. If an array or linear operator is given, a prior
distribution is chosen automatically.
x0 : array-like, or RandomVariable, shape=(n,) or (n, nrhs)
Optional. Prior belief for the solution of the linear system. Will be
ignored if ``A0`` or ``Ainv0`` is given.
b : array_like, shape=(n,) or (n, nrhs)
Right-hand side vector or matrix in :math:`A x = b`.
Returns
-------
A0_mean : array-like or LinearOperator, shape=(n,n)
Prior mean of the linear operator :math:`A`.
A0_covfactor : array-like or LinearOperator, shape=(n,n)
Factor :math:`W^A` of the symmetric Kronecker product prior covariance
:math:`W^A \\otimes_s W^A` of :math:`A`.
Ainv0_mean : array-like or LinearOperator, shape=(n,n)
Prior mean of the linear operator :math:`H`.
Ainv0_covfactor : array-like or LinearOperator, shape=(n,n)
Factor :math:`W^H` of the symmetric Kronecker product prior covariance
:math:`W^H \\otimes_s W^H` of :math:`H`.
"""
self.is_calib_covclass = False
# No matrix priors specified
if A0 is None and Ainv0 is None:
self.is_calib_covclass = True
# No prior information given
if x0 is None:
Ainv0_mean = linops.Identity(shape=self.n)
Ainv0_covfactor = linops.Identity(shape=self.n)
# Symmetric posterior correspondence
A0_mean = linops.Identity(shape=self.n)
A0_covfactor = self.A
return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor
# Construct matrix priors from initial guess x0
elif isinstance(x0, np.ndarray):
A0_mean, Ainv0_mean = self._construct_symmetric_matrix_prior_means(
A=self.A, x0=x0, b=b
)
Ainv0_covfactor = Ainv0_mean
# Symmetric posterior correspondence
A0_covfactor = self.A
return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor
elif isinstance(x0, randvars.RandomVariable):
raise NotImplementedError
# Prior on Ainv specified
if not isinstance(A0, randvars.RandomVariable) and Ainv0 is not None:
if isinstance(Ainv0, randvars.RandomVariable):
Ainv0_mean = Ainv0.mean
Ainv0_covfactor = Ainv0.cov.A
else:
self.is_calib_covclass = True
Ainv0_mean = Ainv0
Ainv0_covfactor = Ainv0 # Symmetric posterior correspondence
try:
if A0 is not None:
A0_mean = A0
elif isinstance(Ainv0, randvars.RandomVariable):
A0_mean = Ainv0.mean.inv()
else:
A0_mean = Ainv0.inv()
except AttributeError:
warnings.warn(
"Prior specified only for Ainv. Inverting prior mean naively. "
"This operation is computationally costly! Specify an inverse "
"prior (mean) instead."
)
A0_mean = np.linalg.inv(Ainv0.mean)
except NotImplementedError:
A0_mean = linops.Identity(self.n)
warnings.warn(
"Prior specified only for Ainv. Automatic prior mean inversion "
"not implemented, falling back to standard normal prior."
)
# Symmetric posterior correspondence
A0_covfactor = self.A
return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor
# Prior on A specified
elif A0 is not None and not isinstance(Ainv0, randvars.RandomVariable):
if isinstance(A0, randvars.RandomVariable):
A0_mean = A0.mean
A0_covfactor = A0.cov.A
else:
self.is_calib_covclass = True
A0_mean = A0
A0_covfactor = A0 # Symmetric posterior correspondence
try:
if Ainv0 is not None:
Ainv0_mean = Ainv0
elif isinstance(A0, randvars.RandomVariable):
Ainv0_mean = A0.mean.inv()
else:
Ainv0_mean = A0.inv()
except AttributeError:
warnings.warn(
"Prior specified only for A. Inverting prior mean naively. "
"This operation is computationally costly! "
"Specify an inverse prior (mean)."
)
Ainv0_mean = np.linalg.inv(A0.mean)
except NotImplementedError:
Ainv0_mean = linops.Identity(self.n)
warnings.warn(
"Prior specified only for A. Automatic prior mean inversion "
"failed, falling back to standard normal prior."
)
# Symmetric posterior correspondence
Ainv0_covfactor = Ainv0_mean
return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor
# Both matrix priors on A and H specified via random variables
elif isinstance(A0, randvars.RandomVariable) and isinstance(
Ainv0, randvars.RandomVariable
):
A0_mean = A0.mean
A0_covfactor = A0.cov.A
Ainv0_mean = Ainv0.mean
Ainv0_covfactor = Ainv0.cov.A
return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor
else:
raise NotImplementedError
def _compute_trace_Ainv_covfactor0(self, Y, unc_scale):
"""Computes the trace of the prior covariance factor for the inverse view.
Parameters
----------
Y : np.ndarray, shape=(n,k)
Observations.
unc_scale : float
Uncertainty scale :math:`\\psi` of the inverse view.
Returns
-------
trace_Ainv_covfactor0 : float
Trace of prior covariance factor.
"""
# Initialization
if Y is not None:
k = Y.shape[1]
else:
k = 0
if unc_scale is None:
unc_scale = 0
if (
isinstance(self.Ainv_covfactor0, linops.Scaling)
and self.Ainv_covfactor0.is_isotropic
):
# Scalar prior mean
if self.is_calib_covclass and k > 0 and unc_scale != 0:
_trace = self.Ainv_covfactor0.scalar * k
else:
_trace = self.Ainv_covfactor0.trace()
else:
# General prior mean
if self.is_calib_covclass and k > 0 and unc_scale != 0:
# General prior mean with calibration covariance class
_trace = np.trace(
np.linalg.solve(
Y.T @ self.Ainv_mean0 @ Y,
Y.T @ self.Ainv_mean0 @ self.Ainv_mean0 @ Y,
)
)
else:
_trace = self.Ainv_covfactor0.trace()
if self.is_calib_covclass:
# Additive term from uncertainty calibration
_trace += unc_scale * (self.n - k)
return _trace
def _compute_trace_solution_covariance(self, bWb, Wb):
"""Computes the trace of the solution covariance :math:`\\tr(\\operatorname{
Cov}[x])`
Parameters
----------
bWb : float
Inner product of right hand side and the inverse covariance factor.
Wb : np.ndarray
Matrix-vector product between the inverse covariance factor and the right
hand side.
Returns
-------
trace_x_cov : float
Trace of solution covariance.
"""
# Trace of inverse covariance factor after k iterations
return 0.5 * (bWb * self.trace_Ainv_covfactor + np.linalg.norm(Wb, ord=2) ** 2)
[docs] def has_converged(self, iter, maxiter, resid=None, atol=None, rtol=None):
"""Check convergence of a linear solver.
Evaluates a set of convergence criteria based on its input arguments to decide
whether the iteration has converged.
Parameters
----------
iter : int
Current iteration of solver.
maxiter : int
Maximum number of iterations
resid : array-like
Residual vector :math:`\\lVert r_i \\rVert = \\lVert Ax_i - b \\rVert` of
the current iteration.
atol : float
Absolute residual tolerance. Stops if
:math:`\\min(\\lVert r_i \\rVert, \\sqrt{\\operatorname{tr}(\\operatorname{Cov}(x))}) \\leq \\text{atol}`.
rtol : float
Relative residual tolerance. Stops if
:math:`\\min(\\lVert r_i \\rVert, \\sqrt{\\operatorname{tr}(\\operatorname{Cov}(x))}) \\leq \\text{rtol} \\lVert b \\rVert`.
Returns
-------
has_converged : bool
True if the method has converged.
convergence_criterion : str
Convergence criterion which caused termination.
""" # pylint: disable=line-too-long
# maximum iterations
if iter >= maxiter:
warnings.warn(
"Iteration terminated. Solver reached the maximum number of iterations."
)
return True, "maxiter"
# residual below error tolerance
resid_norm = np.linalg.norm(resid)
b_norm = np.linalg.norm(self.b)
if resid_norm <= atol:
return True, "resid_atol"
elif resid_norm <= rtol * b_norm:
return True, "resid_rtol"
# uncertainty-based
if np.sqrt(self.trace_sol_cov) <= atol:
return True, "tracecov_atol"
elif np.sqrt(self.trace_sol_cov) <= rtol * b_norm:
return True, "tracecov_rtol"
else:
return False, ""
def _calibrate_uncertainty(self, S, sy, method):
"""Calibrate uncertainty based on the Rayleigh coefficients.
A regression model for the log-Rayleigh coefficient is built based on the
collected observations. The degrees of freedom in the kernels of A and H are set
according to the predicted log-Rayleigh coefficient for the remaining unexplored
dimensions.
Parameters
----------
S : np.ndarray, shape=(n, k)
Array of search directions
sy : np.ndarray
Array of inner products ``s_i'As_i``
method : str
Type of calibration method to use based on the Rayleigh quotient. Available
calibration procedures are
==================================== ==================
Most recent Rayleigh quotient ``adhoc``
Running (weighted) mean ``weightedmean``
GP regression for kernel matrices ``gpkern``
==================================== ==================
Returns
-------
phi : float
Uncertainty scale of the null space of span(S) for the A view
psi : float
Uncertainty scale of the null space of span(Y) for the Ainv view
"""
# Rayleigh quotient
iters = np.arange(self.iter_ + 1)
logR = np.log(sy) - np.log(np.einsum("nk,nk->k", S, S))
# only calibrate if enough iterations for a regression model have been performed
if self.iter_ > 1:
if method == "adhoc":
logR_pred = logR[-1]
elif method == "weightedmean":
deprecation_rate = 0.9
logR_pred = logR * np.repeat(
deprecation_rate, self.iter_ + 1
) ** np.arange(self.iter_ + 1)
elif method == "gpkern":
try:
import GPy # pylint: disable=import-outside-toplevel
# GP mean function via Weyl's result on spectra of Gram matrices for
# differentiable kernels
# ln(sigma(n)) ~= theta_0 - theta_1 ln(n)
lnmap = GPy.core.Mapping(1, 1)
lnmap.f = lambda n: np.log(n + 10 ** -16)
lnmap.update_gradients = lambda a, b: None
mf = GPy.mappings.Additive(
GPy.mappings.Constant(1, 1, value=0),
GPy.mappings.Compound(lnmap, GPy.mappings.Linear(1, 1)),
)
k = GPy.kern.RBF(input_dim=1, lengthscale=1, variance=1)
m = GPy.models.GPRegression(
iters[:, None] + 1, logR[:, None], kernel=k, mean_function=mf
)
m.optimize(messages=False)
# Predict Rayleigh quotient
remaining_dims = np.arange(self.iter_, self.A.shape[0])[:, None]
logR_pred = m.predict(remaining_dims + 1)[0].ravel()
except ImportError as err:
raise ImportError(
"Cannot perform GP-based calibration without optional "
"dependency GPy. Try installing GPy via `pip install GPy`."
) from err
else:
raise ValueError("Calibration method not recognized.")
# Set uncertainty scale (degrees of freedom in calibration covariance class)
Phi = (np.exp(np.mean(logR_pred))).item()
Psi = (np.exp(-np.mean(logR_pred))).item()
else:
# For too few iterations take the most recent Rayleigh quotient
Phi = np.exp(logR[-1])
Psi = 1 / Phi
return Phi, Psi
def _get_calibration_covariance_update_terms(self, phi=None, psi=None):
"""For the calibration covariance class set the calibration update terms of the
covariance in the null spaces of span(S) and span(Y) based on the degrees of
freedom."""
# Search directions and observations as arrays
S = np.hstack(self.search_dir_list)
Y = np.hstack(self.obs_list)
def get_null_space_map(V, unc_scale):
"""Returns a function mapping to the null space of span(V), scaling with a
single degree of freedom and mapping back."""
def null_space_proj(x):
try:
VVinvVx = np.linalg.solve(V.T @ V, V.T @ x)
return x - V @ VVinvVx
except np.linalg.LinAlgError:
return np.zeros_like(x)
# For a scalar uncertainty scale projecting to the null space twice is
# equivalent to projecting once
return lambda y: unc_scale * null_space_proj(y)
# Compute calibration term in the A view as a linear operator with scaling from
# degrees of freedom
calibration_term_A = linops.LinearOperator(
shape=(self.n, self.n),
dtype=S.dtype,
matmul=linops.LinearOperator.broadcast_matvec(
get_null_space_map(V=S, unc_scale=phi)
),
)
# Compute calibration term in the Ainv view as a linear operator with scaling
# from degrees of freedom
calibration_term_Ainv = linops.LinearOperator(
shape=(self.n, self.n),
dtype=S.dtype,
matmul=linops.LinearOperator.broadcast_matvec(
get_null_space_map(V=Y, unc_scale=psi)
),
)
return calibration_term_A, calibration_term_Ainv
def _get_output_randvars(self, Y_list, sy_list, phi=None, psi=None):
"""Return output random variables x, A, Ainv from their means and
covariances."""
if self.iter_ > 0:
# Observations and inner products in A-space between actions
Y = np.hstack(Y_list)
sy = np.vstack(sy_list).ravel()
# Posterior covariance factors
if self.is_calib_covclass and (not phi is None) and (not psi is None):
# Ensure prior covariance class only acts in span(S) like A
@linops.LinearOperator.broadcast_matvec
def _matmul(x):
# First term of calibration covariance class: AS(S'AS)^{-1}S'A
return (Y * sy ** -1) @ (Y.T @ x.ravel())
_A_covfactor0 = linops.LinearOperator(
shape=(self.n, self.n),
dtype=np.result_type(Y, sy),
matmul=_matmul,
)
@linops.LinearOperator.broadcast_matvec
def _matmul(x):
# Term in covariance class: A_0^{-1}Y(Y'A_0^{-1}Y)^{-1}Y'A_0^{-1}
# TODO: for efficiency ensure that we dont have to compute
# (Y.T Y)^{-1} two times! For a scalar mean this is the same as in
# the null space projection
YAinv0Y_inv_YAinv0x = np.linalg.solve(
Y.T @ (self.Ainv_mean0 @ Y), Y.T @ (self.Ainv_mean0 @ x)
)
return self.Ainv_mean0 @ (Y @ YAinv0Y_inv_YAinv0x)
_Ainv_covfactor0 = linops.LinearOperator(
shape=(self.n, self.n),
dtype=np.result_type(Y, self.Ainv_mean0),
matmul=_matmul,
)
# Set degrees of freedom based on uncertainty calibration in unexplored
# space
(
calibration_term_A,
calibration_term_Ainv,
) = self._get_calibration_covariance_update_terms(phi=phi, psi=psi)
_A_covfactor = (
_A_covfactor0 - self._A_covfactor_update_term + calibration_term_A
)
_Ainv_covfactor = (
_Ainv_covfactor0
- self._Ainv_covfactor_update_term
+ calibration_term_Ainv
)
else:
# No calibration
_A_covfactor = self.A_covfactor
_Ainv_covfactor = self.Ainv_covfactor
else:
# Converged before making any observations
_A_covfactor = self.A_covfactor0
_Ainv_covfactor = self.Ainv_covfactor0
# Create output random variables
A = randvars.Normal(
mean=self.A_mean, cov=linops.SymmetricKronecker(A=_A_covfactor)
)
Ainv = randvars.Normal(
mean=self.Ainv_mean,
cov=linops.SymmetricKronecker(A=_Ainv_covfactor),
)
# Induced distribution on x via Ainv
# Exp(x) = Ainv b, Cov(x) = 1/2 (W b'Wb + Wbb'W)
Wb = _Ainv_covfactor @ self.b
bWb = np.squeeze(Wb.T @ self.b)
def _matmul(x):
return 0.5 * (bWb * _Ainv_covfactor @ x + Wb @ (Wb.T @ x))
cov_op = linops.LinearOperator(
shape=(self.n, self.n),
dtype=np.result_type(Wb.dtype, bWb.dtype),
matmul=_matmul,
)
x = randvars.Normal(mean=self.x_mean.ravel(), cov=cov_op)
# Compute trace of solution covariance: tr(Cov(x))
self.trace_sol_cov = np.real_if_close(
self._compute_trace_solution_covariance(bWb=bWb, Wb=Wb)
).item()
return x, A, Ainv
def _mean_update(self, u, v):
"""Linear operator implementing the symmetric rank 2 mean update (+= uv' +
vu')."""
def _matmul(x):
return u @ (v.T @ x) + v @ (u.T @ x)
return linops.LinearOperator(
shape=(self.n, self.n),
dtype=np.result_type(u.dtype, v.dtype),
matmul=_matmul,
)
def _covariance_update(self, u, Ws):
"""Linear operator implementing the symmetric rank 2 kernels update (-= Ws
u^T)."""
def _matmul(x):
return Ws @ (u.T @ x)
return linops.LinearOperator(
shape=(self.n, self.n),
dtype=np.result_type(u.dtype, Ws.dtype),
matmul=_matmul,
)
[docs] def solve(
self, callback=None, maxiter=None, atol=None, rtol=None, calibration=None
):
"""Solve the linear system :math:`Ax=b`.
Parameters
----------
callback : function, optional
User-supplied function called after each iteration of the linear solver. It
is called as ``callback(xk, Ak, Ainvk, sk, yk, alphak, resid)`` and can be
used to return quantities from the iteration. Note that depending on the
function supplied, this can slow down the solver.
maxiter : int
Maximum number of iterations
atol : float
Absolute residual tolerance. Stops if
:math:`\\min(\\lVert r_i \\rVert, \\sqrt{\\operatorname{tr}(\\operatorname{Cov}(x))}) \\leq \\text{atol}`.
rtol : float
Relative residual tolerance. Stops if
:math:`\\min(\\lVert r_i \\rVert, \\sqrt{\\operatorname{tr}(\\operatorname{Cov}(x))}) \\leq \\text{rtol} \\lVert b \\rVert`.
calibration : str or float, default=False
If supplied calibrates the output via the given procedure or uncertainty
scale. Available calibration procedures / choices are
==================================== ================
No calibration None
Provided scale float
Most recent Rayleigh quotient ``adhoc``
Running (weighted) mean ``weightedmean``
GP regression for kernel matrices ``gpkern``
==================================== ================
Returns
-------
x : RandomVariable, shape=(n,) or (n, nrhs)
Approximate solution :math:`x` to the linear system. Shape of the return
matches the shape of ``b``.
A : RandomVariable, shape=(n,n)
Posterior belief over the linear operator.calibrate
Ainv : RandomVariable, shape=(n,n)
Posterior belief over the linear operator inverse :math:`H=A^{-1}`.
info : dict
Information on convergence of the solver.
""" # pylint: disable=line-too-long
# Initialization
self.iter_ = 0
resid = self.A @ self.x_mean - self.b
# Initialize uncertainty calibration
phi = None
psi = None
if calibration is None:
pass
elif (
calibration is not None or calibration is not False
) and not self.is_calib_covclass:
warnings.warn(
message="Cannot use calibration without a compatible covariance class."
)
elif isinstance(calibration, str) and self.is_calib_covclass:
pass
elif self.is_calib_covclass:
if calibration < 0:
raise ValueError("Calibration scale must be non-negative.")
elif calibration == 0.0:
pass
else:
phi = calibration
psi = 1 / calibration
# Trace of solution covariance
_trace_Ainv_covfactor_update = 0
self.trace_Ainv_covfactor = self._compute_trace_Ainv_covfactor0(
Y=None, unc_scale=psi
)
# Create output random variables
x, A, Ainv = self._get_output_randvars(
Y_list=self.obs_list, sy_list=self.sy, phi=phi, psi=psi
)
# Iteration with stopping criteria
while True:
# Check convergence
_has_converged, _conv_crit = self.has_converged(
iter=self.iter_, maxiter=maxiter, resid=resid, atol=atol, rtol=rtol
)
if _has_converged:
break
# Compute search direction (with implicit reorthogonalization) via policy
search_dir = -self.Ainv_mean @ resid
self.search_dir_list.append(search_dir)
# Perform action and observe
obs = self.A @ search_dir
self.obs_list.append(obs)
# Compute step size
sy = search_dir.T @ obs
step_size = -np.squeeze((search_dir.T @ resid) / sy)
self.sy.append(sy)
# Step and residual update
self.x_mean = self.x_mean + step_size * search_dir
resid = resid + step_size * obs
# (Symmetric) mean and covariance updates
Vs = self.A_covfactor @ search_dir
delta_A = obs - self.A_mean @ search_dir
u_A = Vs / (search_dir.T @ Vs)
v_A = delta_A - 0.5 * (search_dir.T @ delta_A) * u_A
Wy = self.Ainv_covfactor @ obs
delta_Ainv = search_dir - self.Ainv_mean @ obs
yWy = np.squeeze(obs.T @ Wy)
u_Ainv = Wy / yWy
v_Ainv = delta_Ainv - 0.5 * (obs.T @ delta_Ainv) * u_Ainv
# Rank 2 mean updates (+= uv' + vu')
self.A_mean = linops.aslinop(self.A_mean) + self._mean_update(u=u_A, v=v_A)
self.Ainv_mean = linops.aslinop(self.Ainv_mean) + self._mean_update(
u=u_Ainv, v=v_Ainv
)
# Rank 1 covariance Kronecker factor update (-= u_A(Vs)' and -= u_Ainv(Wy)')
if self.iter_ == 0:
self._A_covfactor_update_term = self._covariance_update(u=u_A, Ws=Vs)
self._Ainv_covfactor_update_term = self._covariance_update(
u=u_Ainv, Ws=Wy
)
else:
self._A_covfactor_update_term = (
self._A_covfactor_update_term
+ self._covariance_update(u=u_A, Ws=Vs)
)
self._Ainv_covfactor_update_term = (
self._Ainv_covfactor_update_term
+ self._covariance_update(u=u_Ainv, Ws=Wy)
)
self.A_covfactor = (
linops.aslinop(self.A_covfactor0) - self._A_covfactor_update_term
)
self.Ainv_covfactor = (
linops.aslinop(self.Ainv_covfactor0) - self._Ainv_covfactor_update_term
)
# Calibrate uncertainty based on Rayleigh quotient
if isinstance(calibration, str) and self.is_calib_covclass:
phi, psi = self._calibrate_uncertainty(
S=np.hstack(self.search_dir_list),
sy=np.vstack(self.sy).ravel(),
method=calibration,
)
# Update trace of solution covariance: tr(Cov(Hb))
_trace_Ainv_covfactor_update += 1 / yWy * np.squeeze(Wy.T @ Wy)
self.trace_Ainv_covfactor = np.real_if_close(
self._compute_trace_Ainv_covfactor0(
Y=np.hstack(self.obs_list), unc_scale=psi
)
- _trace_Ainv_covfactor_update
).item()
# Create output random variables
x, A, Ainv = self._get_output_randvars(
Y_list=self.obs_list, sy_list=self.sy, phi=phi, psi=psi
)
# Callback function used to extract quantities from iteration
if callback is not None:
callback(
xk=x,
Ak=A,
Ainvk=Ainv,
sk=search_dir,
yk=obs,
alphak=step_size,
resid=resid,
)
# Iteration increment
self.iter_ += 1
# Log information on solution
info = {
"iter": self.iter_,
"maxiter": maxiter,
"resid_l2norm": np.linalg.norm(resid, ord=2),
"trace_sol_cov": self.trace_sol_cov,
"conv_crit": _conv_crit,
"rel_cond": None, # TODO: matrix condition from solver (see scipy solvers)
}
return x, A, Ainv, info