Source code for probnum.linalg.linearsolvers.matrixbased

"""Matrix-based probabilistic linear solvers.

Implementations of matrix-based linear solvers which perform inference
on the matrix or its inverse given linear observations.
"""
import abc
import warnings

import numpy as np

import probnum
from probnum import linops, randvars


class ProbabilisticLinearSolver(abc.ABC):
    """An abstract base class for probabilistic linear solvers.

    This class is designed to be subclassed with new (probabilistic) linear solvers,
    which implement a ``.solve()`` method. Objects of this type are instantiated in
    wrapper functions such as :meth:``problinsolve``.

    Parameters
    ----------
    A : array-like or LinearOperator or RandomVariable, shape=(n,n)
        A square matrix or linear operator. A prior distribution can be provided as a
        :class:`~probnum.RandomVariable`. If an array or linear operator is given,
        a prior distribution is chosen automatically.
    b : RandomVariable, shape=(n,) or (n, nrhs)
        Right-hand side vector, matrix or RandomVariable of :math:`A x = b`.
    """

    def __init__(self, A, b):
        self.A = A
        self.b = b
        self.n = A.shape[1]

[docs] def has_converged(self, iter, maxiter, **kwargs): """Check convergence of a linear solver. Evaluates a set of convergence criteria based on its input arguments to decide whether the iteration has converged. Parameters ---------- iter : int Current iteration of solver. maxiter : int Maximum number of iterations Returns ------- has_converged : bool True if the method has converged. convergence_criterion : str Convergence criterion which caused termination. """ # maximum iterations if iter >= maxiter: warnings.warn( "Iteration terminated. Solver reached the maximum number of iterations." ) return True, "maxiter" else: return False, ""
[docs] def solve(self, callback=None, **kwargs): """Solve the linear system :math:`Ax=b`. Parameters ---------- callback : function, optional User-supplied function called after each iteration of the linear solver. It is called as ``callback(xk, Ak, Ainvk, sk, yk, alphak, resid, **kwargs)`` and can be used to return quantities from the iteration. Note that depending on the function supplied, this can slow down the solver. kwargs Key-word arguments adjusting the behaviour of the ``solve`` iteration. These are usually convergence criteria. Returns ------- x : RandomVariable, shape=(n,) or (n, nrhs) Approximate solution :math:`x` to the linear system. Shape of the return matches the shape of ``b``. A : RandomVariable, shape=(n,n) Posterior belief over the linear operator. Ainv : RandomVariable, shape=(n,n) Posterior belief over the linear operator inverse :math:`H=A^{-1}`. info : dict Information on convergence of the solver. """ raise NotImplementedError
class MatrixBasedSolver(ProbabilisticLinearSolver, abc.ABC): """Abstract class for matrix-based probabilistic linear solvers. Parameters ---------- A : array-like or LinearOperator or RandomVariable, shape=(n,n) A square matrix or linear operator. A prior distribution can be provided as a :class:`~probnum.RandomVariable`. If an array or linear operator is given, a prior distribution is chosen automatically. b : RandomVariable, shape=(n,) or (n, nrhs) Right-hand side vector, matrix or RandomVariable of :math:`A x = b`. x0 : array-like, shape=(n,) or (n, nrhs) Optional. Guess for the solution of the linear system. """ def __init__(self, A, b, x0=None): self.x0 = x0 super().__init__(A=A, b=b) def _get_prior_params(self, A0, Ainv0, x0, b): """Find the parameters of the prior distribution. Parameters ---------- A0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional A square matrix, linear operator or random variable representing the prior belief over the linear operator :math:`A`. If an array or linear operator is given, a prior distribution is chosen automatically. Ainv0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional A square matrix, linear operator or random variable representing the prior belief over the inverse :math:`H=A^{-1}`. This can be viewed as taking the form of a pre-conditioner. If an array or linear operator is given, a prior distribution is chosen automatically. x0 : array-like, or RandomVariable, shape=(n,) or (n, nrhs) Optional. Prior belief for the solution of the linear system. Will be ignored if ``A0`` or ``Ainv0`` is given. b : array_like, shape=(n,) or (n, nrhs) Right-hand side vector or matrix in :math:`A x = b`. """ raise NotImplementedError def _construct_symmetric_matrix_prior_means(self, A, x0, b): """Create matrix prior means from an initial guess for the solution of the linear system. Constructs a matrix-variate prior mean for H from ``x0`` and ``b`` such that :math:`H_0b = x_0`, :math:`H_0` symmetric positive definite and :math:`A_0 = H_0^{-1}`. Parameters ---------- A : array-like or LinearOperator, shape=(n,n) System matrix assumed to be square. x0 : array-like, shape=(n,) or (n, nrhs) Optional. Guess for the solution of the linear system. b : array_like, shape=(n,) or (n, nrhs) Right-hand side vector or matrix in :math:`A x = b`. Returns ------- A0_mean : linops.LinearOperator Mean of the matrix-variate prior distribution on the system matrix :math:`A`. Ainv0_mean : linops.LinearOperator Mean of the matrix-variate prior distribution on the inverse of the system matrix :math:`H = A^{-1}`. """ # Check inner product between x0 and b; if negative or zero, choose better # initialization bx0 = np.squeeze(b.T @ x0) bb = np.linalg.norm(b) ** 2 if bx0 < 0: x0 = -x0 bx0 = -bx0 print("Better initialization found, setting x0 = - x0.") elif bx0 == 0: if np.all(b == np.zeros_like(b)): print("Right-hand-side is zero. Initializing with solution x0 = 0.") x0 = b else: print("Better initialization found, setting x0 = (b'b/b'Ab) * b.") bAb = np.squeeze(b.T @ (A @ b)) x0 = bb / bAb * b bx0 = bb ** 2 / bAb # Construct prior mean of A and H alpha = 0.5 * bx0 / bb def _mv(v): return (x0 - alpha * b) * (x0 - alpha * b).T @ v def _mm(M): return (x0 - alpha * b) @ (x0 - alpha * b).T @ M Ainv0_mean = linops.ScalarMult( scalar=alpha, shape=(self.n, self.n) ) + 2 / bx0 * linops.LinearOperator( matvec=_mv, matmat=_mm, shape=(self.n, self.n) ) A0_mean = linops.ScalarMult(scalar=1 / alpha, shape=(self.n, self.n)) - 1 / ( alpha * np.squeeze((x0 - alpha * b).T @ x0) ) * linops.LinearOperator(matvec=_mv, matmat=_mm, shape=(self.n, self.n)) return A0_mean, Ainv0_mean
[docs] def has_converged(self, iter, maxiter, **kwargs): raise NotImplementedError
[docs] def solve(self, callback=None, maxiter=None, atol=None): raise NotImplementedError
[docs]class AsymmetricMatrixBasedSolver(ProbabilisticLinearSolver): """Asymmetric matrix-based probabilistic linear solver. Parameters ---------- A : array-like or LinearOperator or RandomVariable, shape=(n,n) The square matrix or linear operator of the linear system. b : array_like, shape=(n,) or (n, nrhs) Right-hand side vector or matrix in :math:`A x = b`. """ def __init__(self, A, b, x0): self.x0 = x0 super().__init__(A=A, b=b)
[docs] def has_converged(self, iter, maxiter, **kwargs): raise NotImplementedError
[docs] def solve(self, callback=None, maxiter=None, atol=None): raise NotImplementedError
[docs]class SymmetricMatrixBasedSolver(MatrixBasedSolver): """Symmetric matrix-based probabilistic linear solver. Implements the solve iteration of the symmetric matrix-based probabilistic linear solver described in [1]_ and [2]_. Parameters ---------- A : array-like or LinearOperator or RandomVariable, shape=(n,n) The square matrix or linear operator of the linear system. b : array_like, shape=(n,) or (n, nrhs) Right-hand side vector or matrix in :math:`A x = b`. A0 : array-like or LinearOperator or RandomVariable, shape=(n, n), optional A square matrix, linear operator or random variable representing the prior belief over the linear operator :math:`A`. If an array or linear operator is given, a prior distribution is chosen automatically. Ainv0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional A square matrix, linear operator or random variable representing the prior belief over the inverse :math:`H=A^{-1}`. This can be viewed as taking the form of a pre-conditioner. If an array or linear operator is given, a prior distribution is chosen automatically. x0 : array-like, or RandomVariable, shape=(n,) or (n, nrhs) Optional. Prior belief for the solution of the linear system. Will be ignored if ``Ainv0`` is given. Returns ------- A : RandomVariable Posterior belief over the linear operator. Ainv : RandomVariable Posterior belief over the inverse linear operator. x : RandomVariable Posterior belief over the solution of the linear system. info : dict Information about convergence and the solution. References ---------- .. [1] Wenger, J. and Hennig, P., Probabilistic Linear Solvers for Machine Learning, *Advances in Neural Information Processing Systems (NeurIPS)*, 2020 .. [2] Hennig, P., Probabilistic Interpretation of Linear Solvers, *SIAM Journal on Optimization*, 2015, 25, 234-260 See Also -------- NoisySymmetricMatrixBasedSolver : Class implementing the noisy symmetric probabilistic linear solver. """ def __init__(self, A, b, A0=None, Ainv0=None, x0=None): # Assume constant right hand side if isinstance(b, probnum.RandomVariable): _b = b.sample(size=1) else: _b = b super().__init__(A=A, b=_b, x0=x0) # Get or construct prior parameters A_mean0, A_covfactor0, Ainv_mean0, Ainv_covfactor0 = self._get_prior_params( A0=A0, Ainv0=Ainv0, x0=self.x0, b=self.b ) # Initialize prior parameters self.A_mean = A_mean0 self.A_mean0 = A_mean0 self.A_covfactor = A_covfactor0 self.A_covfactor0 = A_covfactor0 self.Ainv_mean = Ainv_mean0 self.Ainv_mean0 = Ainv_mean0 self.Ainv_covfactor = Ainv_covfactor0 self.Ainv_covfactor0 = Ainv_covfactor0 if isinstance(x0, np.ndarray): self.x_mean = x0 elif x0 is None: self.x_mean = Ainv_mean0 @ self.b else: raise NotImplementedError self.x0 = self.x_mean # Computed search directions and observations self.search_dir_list = [] self.obs_list = [] self.sy = [] def _get_prior_params(self, A0, Ainv0, x0, b): """Get the parameters of the matrix priors on A and H. Retrieves and / or initializes prior parameters of ``A0`` and ``Ainv0``. Parameters ---------- A0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional A square matrix, linear operator or random variable representing the prior belief over the linear operator :math:`A`. If an array or linear operator is given, a prior distribution is chosen automatically. Ainv0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional A square matrix, linear operator or random variable representing the prior belief over the inverse :math:`H=A^{-1}`. This can be viewed as taking the form of a pre-conditioner. If an array or linear operator is given, a prior distribution is chosen automatically. x0 : array-like, or RandomVariable, shape=(n,) or (n, nrhs) Optional. Prior belief for the solution of the linear system. Will be ignored if ``A0`` or ``Ainv0`` is given. b : array_like, shape=(n,) or (n, nrhs) Right-hand side vector or matrix in :math:`A x = b`. Returns ------- A0_mean : array-like or LinearOperator, shape=(n,n) Prior mean of the linear operator :math:`A`. A0_covfactor : array-like or LinearOperator, shape=(n,n) Factor :math:`W^A` of the symmetric Kronecker product prior covariance :math:`W^A \\otimes_s W^A` of :math:`A`. Ainv0_mean : array-like or LinearOperator, shape=(n,n) Prior mean of the linear operator :math:`H`. Ainv0_covfactor : array-like or LinearOperator, shape=(n,n) Factor :math:`W^H` of the symmetric Kronecker product prior covariance :math:`W^H \\otimes_s W^H` of :math:`H`. """ self.is_calib_covclass = False # No matrix priors specified if A0 is None and Ainv0 is None: self.is_calib_covclass = True # No prior information given if x0 is None: Ainv0_mean = linops.Identity(shape=self.n) Ainv0_covfactor = linops.Identity(shape=self.n) # Symmetric posterior correspondence A0_mean = linops.Identity(shape=self.n) A0_covfactor = self.A return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor # Construct matrix priors from initial guess x0 elif isinstance(x0, np.ndarray): A0_mean, Ainv0_mean = self._construct_symmetric_matrix_prior_means( A=self.A, x0=x0, b=b ) Ainv0_covfactor = Ainv0_mean # Symmetric posterior correspondence A0_covfactor = self.A return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor elif isinstance(x0, probnum.RandomVariable): raise NotImplementedError # Prior on Ainv specified if not isinstance(A0, probnum.RandomVariable) and Ainv0 is not None: if isinstance(Ainv0, probnum.RandomVariable): Ainv0_mean = Ainv0.mean Ainv0_covfactor = Ainv0.cov.A else: self.is_calib_covclass = True Ainv0_mean = Ainv0 Ainv0_covfactor = Ainv0 # Symmetric posterior correspondence try: if A0 is not None: A0_mean = A0 elif isinstance(Ainv0, probnum.RandomVariable): A0_mean = Ainv0.mean.inv() else: A0_mean = Ainv0.inv() except AttributeError: warnings.warn( "Prior specified only for Ainv. Inverting prior mean naively. " "This operation is computationally costly! Specify an inverse " "prior (mean) instead." ) A0_mean = np.linalg.inv(Ainv0.mean) except NotImplementedError: A0_mean = linops.Identity(self.n) warnings.warn( "Prior specified only for Ainv. Automatic prior mean inversion " "not implemented, falling back to standard normal prior." ) # Symmetric posterior correspondence A0_covfactor = self.A return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor # Prior on A specified elif A0 is not None and not isinstance(Ainv0, probnum.RandomVariable): if isinstance(A0, probnum.RandomVariable): A0_mean = A0.mean A0_covfactor = A0.cov.A else: self.is_calib_covclass = True A0_mean = A0 A0_covfactor = A0 # Symmetric posterior correspondence try: if Ainv0 is not None: Ainv0_mean = Ainv0 elif isinstance(A0, probnum.RandomVariable): Ainv0_mean = A0.mean.inv() else: Ainv0_mean = A0.inv() except AttributeError: warnings.warn( "Prior specified only for A. Inverting prior mean naively. " "This operation is computationally costly! " "Specify an inverse prior (mean)." ) Ainv0_mean = np.linalg.inv(A0.mean) except NotImplementedError: Ainv0_mean = linops.Identity(self.n) warnings.warn( "Prior specified only for A. Automatic prior mean inversion " "failed, falling back to standard normal prior." ) # Symmetric posterior correspondence Ainv0_covfactor = Ainv0_mean return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor # Both matrix priors on A and H specified via random variables elif isinstance(A0, probnum.RandomVariable) and isinstance( Ainv0, probnum.RandomVariable ): A0_mean = A0.mean A0_covfactor = A0.cov.A Ainv0_mean = Ainv0.mean Ainv0_covfactor = Ainv0.cov.A return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor else: raise NotImplementedError def _compute_trace_Ainv_covfactor0(self, Y, unc_scale): """Computes the trace of the prior covariance factor for the inverse view. Parameters ---------- Y : np.ndarray, shape=(n,k) Observations. unc_scale : float Uncertainty scale :math:`\\psi` of the inverse view. Returns ------- trace_Ainv_covfactor0 : float Trace of prior covariance factor. """ # Initialization if Y is not None: k = Y.shape[1] else: k = 0 if unc_scale is None: unc_scale = 0 if isinstance(self.Ainv_covfactor0, linops.ScalarMult): # Scalar prior mean if self.is_calib_covclass and k > 0 and unc_scale != 0: _trace = self.Ainv_covfactor0.scalar * k else: _trace = self.Ainv_covfactor0.trace() else: # General prior mean if self.is_calib_covclass and k > 0 and unc_scale != 0: # General prior mean with calibration covariance class _trace = np.trace( np.linalg.solve( Y.T @ self.Ainv_mean0 @ Y, Y.T @ self.Ainv_mean0 @ self.Ainv_mean0 @ Y, ) ) else: _trace = self.Ainv_covfactor0.trace() if self.is_calib_covclass: # Additive term from uncertainty calibration _trace += unc_scale * (self.n - k) return _trace def _compute_trace_solution_covariance(self, bWb, Wb): """Computes the trace of the solution covariance :math:`\\tr(\\operatorname{ Cov}[x])` Parameters ---------- bWb : float Inner product of right hand side and the inverse covariance factor. Wb : np.ndarray Matrix-vector product between the inverse covariance factor and the right hand side. Returns ------- trace_x_cov : float Trace of solution covariance. """ # Trace of inverse covariance factor after k iterations return 0.5 * (bWb * self.trace_Ainv_covfactor + np.linalg.norm(Wb, ord=2) ** 2)
[docs] def has_converged(self, iter, maxiter, resid=None, atol=None, rtol=None): """Check convergence of a linear solver. Evaluates a set of convergence criteria based on its input arguments to decide whether the iteration has converged. Parameters ---------- iter : int Current iteration of solver. maxiter : int Maximum number of iterations resid : array-like Residual vector :math:`\\lVert r_i \\rVert = \\lVert Ax_i - b \\rVert` of the current iteration. atol : float Absolute residual tolerance. Stops if :math:`\\min(\\lVert r_i \\rVert, \\sqrt{\\operatorname{tr}(\\operatorname{Cov}(x))}) \\leq \\text{atol}`. rtol : float Relative residual tolerance. Stops if :math:`\\min(\\lVert r_i \\rVert, \\sqrt{\\operatorname{tr}(\\operatorname{Cov}(x))}) \\leq \\text{rtol} \\lVert b \\rVert`. Returns ------- has_converged : bool True if the method has converged. convergence_criterion : str Convergence criterion which caused termination. """ # pylint: disable=line-too-long # maximum iterations if iter >= maxiter: warnings.warn( "Iteration terminated. Solver reached the maximum number of iterations." ) return True, "maxiter" # residual below error tolerance resid_norm = np.linalg.norm(resid) b_norm = np.linalg.norm(self.b) if resid_norm <= atol: return True, "resid_atol" elif resid_norm <= rtol * b_norm: return True, "resid_rtol" # uncertainty-based if np.sqrt(self.trace_sol_cov) <= atol: return True, "tracecov_atol" elif np.sqrt(self.trace_sol_cov) <= rtol * b_norm: return True, "tracecov_rtol" else: return False, ""
def _calibrate_uncertainty(self, S, sy, method): """Calibrate uncertainty based on the Rayleigh coefficients. A regression model for the log-Rayleigh coefficient is built based on the collected observations. The degrees of freedom in the kernels of A and H are set according to the predicted log-Rayleigh coefficient for the remaining unexplored dimensions. Parameters ---------- S : np.ndarray, shape=(n, k) Array of search directions sy : np.ndarray Array of inner products ``s_i'As_i`` method : str Type of calibration method to use based on the Rayleigh quotient. Available calibration procedures are ==================================== ================== Most recent Rayleigh quotient ``adhoc`` Running (weighted) mean ``weightedmean`` GP regression for kernel matrices ``gpkern`` ==================================== ================== Returns ------- phi : float Uncertainty scale of the null space of span(S) for the A view psi : float Uncertainty scale of the null space of span(Y) for the Ainv view """ # Rayleigh quotient iters = np.arange(self.iter_ + 1) logR = np.log(sy) - np.log(np.einsum("nk,nk->k", S, S)) # only calibrate if enough iterations for a regression model have been performed if self.iter_ > 1: if method == "adhoc": logR_pred = logR[-1] elif method == "weightedmean": deprecation_rate = 0.9 logR_pred = logR * np.repeat( deprecation_rate, self.iter_ + 1 ) ** np.arange(self.iter_ + 1) elif method == "gpkern": try: import GPy # pylint: disable=import-outside-toplevel # GP mean function via Weyl's result on spectra of Gram matrices for # differentiable kernels # ln(sigma(n)) ~= theta_0 - theta_1 ln(n) lnmap = GPy.core.Mapping(1, 1) lnmap.f = lambda n: np.log(n + 10 ** -16) lnmap.update_gradients = lambda a, b: None mf = GPy.mappings.Additive( GPy.mappings.Constant(1, 1, value=0), GPy.mappings.Compound(lnmap, GPy.mappings.Linear(1, 1)), ) k = GPy.kern.RBF(input_dim=1, lengthscale=1, variance=1) m = GPy.models.GPRegression( iters[:, None] + 1, logR[:, None], kernel=k, mean_function=mf ) m.optimize(messages=False) # Predict Rayleigh quotient remaining_dims = np.arange(self.iter_, self.A.shape[0])[:, None] logR_pred = m.predict(remaining_dims + 1)[0].ravel() except ImportError as err: raise ImportError( "Cannot perform GP-based calibration without optional " "dependency GPy. Try installing GPy via `pip install GPy`." ) from err else: raise ValueError("Calibration method not recognized.") # Set uncertainty scale (degrees of freedom in calibration covariance class) Phi = (np.exp(np.mean(logR_pred))).item() Psi = (np.exp(-np.mean(logR_pred))).item() else: # For too few iterations take the most recent Rayleigh quotient Phi = np.exp(logR[-1]) Psi = 1 / Phi return Phi, Psi def _get_calibration_covariance_update_terms(self, phi=None, psi=None): """For the calibration covariance class set the calibration update terms of the covariance in the null spaces of span(S) and span(Y) based on the degrees of freedom.""" # Search directions and observations as arrays S = np.hstack(self.search_dir_list) Y = np.hstack(self.obs_list) def get_null_space_map(V, unc_scale): """Returns a function mapping to the null space of span(V), scaling with a single degree of freedom and mapping back.""" def null_space_proj(x): try: VVinvVx = np.linalg.solve(V.T @ V, V.T @ x) return x - V @ VVinvVx except np.linalg.LinAlgError: return np.zeros_like(x) # For a scalar uncertainty scale projecting to the null space twice is # equivalent to projecting once return lambda y: unc_scale * null_space_proj(y) # Compute calibration term in the A view as a linear operator with scaling from # degrees of freedom calibration_term_A = linops.LinearOperator( shape=(self.n, self.n), matvec=get_null_space_map(V=S, unc_scale=phi) ) # Compute calibration term in the Ainv view as a linear operator with scaling # from degrees of freedom calibration_term_Ainv = linops.LinearOperator( shape=(self.n, self.n), matvec=get_null_space_map(V=Y, unc_scale=psi) ) return calibration_term_A, calibration_term_Ainv def _get_output_randvars(self, Y_list, sy_list, phi=None, psi=None): """Return output random variables x, A, Ainv from their means and covariances.""" if self.iter_ > 0: # Observations and inner products in A-space between actions Y = np.hstack(Y_list) sy = np.vstack(sy_list).ravel() # Posterior covariance factors if self.is_calib_covclass and (not phi is None) and (not psi is None): # Ensure prior covariance class only acts in span(S) like A def _matvec(x): # First term of calibration covariance class: AS(S'AS)^{-1}S'A return (Y * sy ** -1) @ (Y.T @ x.ravel()) _A_covfactor0 = linops.LinearOperator( shape=(self.n, self.n), matvec=_matvec ) def _matvec(x): # Term in covariance class: A_0^{-1}Y(Y'A_0^{-1}Y)^{-1}Y'A_0^{-1} # TODO: for efficiency ensure that we dont have to compute # (Y.T Y)^{-1} two times! For a scalar mean this is the same as in # the null space projection YAinv0Y_inv_YAinv0x = np.linalg.solve( Y.T @ (self.Ainv_mean0 @ Y), Y.T @ (self.Ainv_mean0 @ x) ) return self.Ainv_mean0 @ (Y @ YAinv0Y_inv_YAinv0x) _Ainv_covfactor0 = linops.LinearOperator( shape=(self.n, self.n), matvec=_matvec ) # Set degrees of freedom based on uncertainty calibration in unexplored # space ( calibration_term_A, calibration_term_Ainv, ) = self._get_calibration_covariance_update_terms(phi=phi, psi=psi) _A_covfactor = ( _A_covfactor0 - self._A_covfactor_update_term + calibration_term_A ) _Ainv_covfactor = ( _Ainv_covfactor0 - self._Ainv_covfactor_update_term + calibration_term_Ainv ) else: # No calibration _A_covfactor = self.A_covfactor _Ainv_covfactor = self.Ainv_covfactor else: # Converged before making any observations _A_covfactor = self.A_covfactor0 _Ainv_covfactor = self.Ainv_covfactor0 # Create output random variables A = randvars.Normal( mean=self.A_mean, cov=linops.SymmetricKronecker(A=_A_covfactor) ) Ainv = randvars.Normal( mean=self.Ainv_mean, cov=linops.SymmetricKronecker(A=_Ainv_covfactor), ) # Induced distribution on x via Ainv # Exp(x) = Ainv b, Cov(x) = 1/2 (W b'Wb + Wbb'W) Wb = _Ainv_covfactor @ self.b bWb = np.squeeze(Wb.T @ self.b) def _mv(x): return 0.5 * (bWb * _Ainv_covfactor @ x + Wb @ (Wb.T @ x)) cov_op = linops.LinearOperator( shape=(self.n, self.n), dtype=float, matvec=_mv, matmat=_mv ) x = randvars.Normal(mean=self.x_mean.ravel(), cov=cov_op) # Compute trace of solution covariance: tr(Cov(x)) self.trace_sol_cov = np.real_if_close( self._compute_trace_solution_covariance(bWb=bWb, Wb=Wb) ).item() return x, A, Ainv def _mean_update(self, u, v): """Linear operator implementing the symmetric rank 2 mean update (+= uv' + vu').""" def mv(x): return u @ (v.T @ x) + v @ (u.T @ x) return linops.LinearOperator(shape=(self.n, self.n), matvec=mv, matmat=mv) def _covariance_update(self, u, Ws): """Linear operator implementing the symmetric rank 2 kernels update (-= Ws u^T).""" def mv(x): return Ws @ (u.T @ x) return linops.LinearOperator(shape=(self.n, self.n), matvec=mv, matmat=mv)
[docs] def solve( self, callback=None, maxiter=None, atol=None, rtol=None, calibration=None ): """Solve the linear system :math:`Ax=b`. Parameters ---------- callback : function, optional User-supplied function called after each iteration of the linear solver. It is called as ``callback(xk, Ak, Ainvk, sk, yk, alphak, resid)`` and can be used to return quantities from the iteration. Note that depending on the function supplied, this can slow down the solver. maxiter : int Maximum number of iterations atol : float Absolute residual tolerance. Stops if :math:`\\min(\\lVert r_i \\rVert, \\sqrt{\\operatorname{tr}(\\operatorname{Cov}(x))}) \\leq \\text{atol}`. rtol : float Relative residual tolerance. Stops if :math:`\\min(\\lVert r_i \\rVert, \\sqrt{\\operatorname{tr}(\\operatorname{Cov}(x))}) \\leq \\text{rtol} \\lVert b \\rVert`. calibration : str or float, default=False If supplied calibrates the output via the given procedure or uncertainty scale. Available calibration procedures / choices are ==================================== ================ No calibration None Provided scale float Most recent Rayleigh quotient ``adhoc`` Running (weighted) mean ``weightedmean`` GP regression for kernel matrices ``gpkern`` ==================================== ================ Returns ------- x : RandomVariable, shape=(n,) or (n, nrhs) Approximate solution :math:`x` to the linear system. Shape of the return matches the shape of ``b``. A : RandomVariable, shape=(n,n) Posterior belief over the linear operator.calibrate Ainv : RandomVariable, shape=(n,n) Posterior belief over the linear operator inverse :math:`H=A^{-1}`. info : dict Information on convergence of the solver. """ # pylint: disable=line-too-long # Initialization self.iter_ = 0 resid = self.A @ self.x_mean - self.b # Initialize uncertainty calibration phi = None psi = None if calibration is None: pass elif ( calibration is not None or calibration is not False ) and not self.is_calib_covclass: warnings.warn( message="Cannot use calibration without a compatible covariance class." ) elif isinstance(calibration, str) and self.is_calib_covclass: pass elif self.is_calib_covclass: if calibration < 0: raise ValueError("Calibration scale must be non-negative.") elif calibration == 0.0: pass else: phi = calibration psi = 1 / calibration # Trace of solution covariance _trace_Ainv_covfactor_update = 0 self.trace_Ainv_covfactor = self._compute_trace_Ainv_covfactor0( Y=None, unc_scale=psi ) # Create output random variables x, A, Ainv = self._get_output_randvars( Y_list=self.obs_list, sy_list=self.sy, phi=phi, psi=psi ) # Iteration with stopping criteria while True: # Check convergence _has_converged, _conv_crit = self.has_converged( iter=self.iter_, maxiter=maxiter, resid=resid, atol=atol, rtol=rtol ) if _has_converged: break # Compute search direction (with implicit reorthogonalization) via policy search_dir = -self.Ainv_mean @ resid self.search_dir_list.append(search_dir) # Perform action and observe obs = self.A @ search_dir self.obs_list.append(obs) # Compute step size sy = search_dir.T @ obs step_size = -np.squeeze((search_dir.T @ resid) / sy) self.sy.append(sy) # Step and residual update self.x_mean = self.x_mean + step_size * search_dir resid = resid + step_size * obs # (Symmetric) mean and covariance updates Vs = self.A_covfactor @ search_dir delta_A = obs - self.A_mean @ search_dir u_A = Vs / (search_dir.T @ Vs) v_A = delta_A - 0.5 * (search_dir.T @ delta_A) * u_A Wy = self.Ainv_covfactor @ obs delta_Ainv = search_dir - self.Ainv_mean @ obs yWy = np.squeeze(obs.T @ Wy) u_Ainv = Wy / yWy v_Ainv = delta_Ainv - 0.5 * (obs.T @ delta_Ainv) * u_Ainv # Rank 2 mean updates (+= uv' + vu') self.A_mean = linops.aslinop(self.A_mean) + self._mean_update(u=u_A, v=v_A) self.Ainv_mean = linops.aslinop(self.Ainv_mean) + self._mean_update( u=u_Ainv, v=v_Ainv ) # Rank 1 covariance Kronecker factor update (-= u_A(Vs)' and -= u_Ainv(Wy)') if self.iter_ == 0: self._A_covfactor_update_term = self._covariance_update(u=u_A, Ws=Vs) self._Ainv_covfactor_update_term = self._covariance_update( u=u_Ainv, Ws=Wy ) else: self._A_covfactor_update_term = ( self._A_covfactor_update_term + self._covariance_update(u=u_A, Ws=Vs) ) self._Ainv_covfactor_update_term = ( self._Ainv_covfactor_update_term + self._covariance_update(u=u_Ainv, Ws=Wy) ) self.A_covfactor = ( linops.aslinop(self.A_covfactor0) - self._A_covfactor_update_term ) self.Ainv_covfactor = ( linops.aslinop(self.Ainv_covfactor0) - self._Ainv_covfactor_update_term ) # Calibrate uncertainty based on Rayleigh quotient if isinstance(calibration, str) and self.is_calib_covclass: phi, psi = self._calibrate_uncertainty( S=np.hstack(self.search_dir_list), sy=np.vstack(self.sy).ravel(), method=calibration, ) # Update trace of solution covariance: tr(Cov(Hb)) _trace_Ainv_covfactor_update += 1 / yWy * np.squeeze(Wy.T @ Wy) self.trace_Ainv_covfactor = np.real_if_close( self._compute_trace_Ainv_covfactor0( Y=np.hstack(self.obs_list), unc_scale=psi ) - _trace_Ainv_covfactor_update ).item() # Create output random variables x, A, Ainv = self._get_output_randvars( Y_list=self.obs_list, sy_list=self.sy, phi=phi, psi=psi ) # Callback function used to extract quantities from iteration if callback is not None: callback( xk=x, Ak=A, Ainvk=Ainv, sk=search_dir, yk=obs, alphak=step_size, resid=resid, ) # Iteration increment self.iter_ += 1 # Log information on solution info = { "iter": self.iter_, "maxiter": maxiter, "resid_l2norm": np.linalg.norm(resid, ord=2), "trace_sol_cov": self.trace_sol_cov, "conv_crit": _conv_crit, "rel_cond": None, # TODO: matrix condition from solver (see scipy solvers) } return x, A, Ainv, info
class NoisySymmetricMatrixBasedSolver(MatrixBasedSolver): """Solver iteration of the noisy symmetric probabilistic linear solver. Implements the solve iteration of the symmetric matrix-based probabilistic linear solver taking into account noisy matrix-vector products :math:`y_k = (A + E_k)s_k` as described in [1]_ and [2]_. Parameters ---------- A : LinearOperator or RandomVariable, shape=(n,n) The square matrix or linear operator of the linear system. b : array_like, shape=(n,) or (n, nrhs) Right-hand side vector or matrix in :math:`A x = b`. A0 : array-like or LinearOperator or RandomVariable, shape=(n, n), optional A square matrix, linear operator or random variable representing the prior belief over the linear operator :math:`A`. If an array or linear operator is given, a prior distribution is chosen automatically. Ainv0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional A square matrix, linear operator or random variable representing the prior belief over the inverse :math:`H=A^{-1}`. This can be viewed as taking the form of a pre-conditioner. If an array or linear operator is given, a prior distribution is chosen automatically. x0 : array-like, or RandomVariable, shape=(n,) or (n, nrhs) Optional. Prior belief for the solution of the linear system. Will be ignored if ``Ainv0`` is given. Returns ------- A : RandomVariable Posterior belief over the linear operator. Ainv : RandomVariable Posterior belief over the inverse linear operator. x : RandomVariable Posterior belief over the solution of the linear system. info : dict Information about convergence and the solution. References ---------- .. [1] Wenger, J., de Roos, F. and Hennig, P., Probabilistic Solution of Noisy Linear Systems, 2020 .. [2] Hennig, P., Probabilistic Interpretation of Linear Solvers, *SIAM Journal on Optimization*, 2015, 25, 234-260 See Also -------- SymmetricMatrixBasedSolver : Class implementing the symmetric probabilistic linear solver. """ def __init__(self, A, b, A0=None, Ainv0=None, x0=None): # Transform right hand side to random variable if not isinstance(b, probnum.RandomVariable): _b = probnum.asrandvar(b) else: _b = b super().__init__(A=A, b=_b, x0=x0) # Get or initialize prior parameters ( A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor, b_mean, ) = self._get_prior_params(A0=A0, Ainv0=Ainv0, x0=x0, b=_b) # Matrix prior parameters self.A0_mean = linops.aslinop(A0_mean) self.A_mean = linops.aslinop(A0_mean) self.A0_covfactor = A0_covfactor self.Ainv0_mean = linops.aslinop(Ainv0_mean) self.Ainv_mean = linops.aslinop(Ainv0_mean) self.Ainv0_covfactor = Ainv0_covfactor self.b_mean = b_mean # Induced distribution on x via Ainv # Exp = x = A^-1 b, Cov = 1/2 (W b'Wb + Wbb'W) Wb = Ainv0_covfactor @ self.b_mean bWb = np.squeeze(Wb.T @ self.b_mean) def _mv(x): return 0.5 * (bWb * Ainv0_covfactor @ x + Wb @ (Wb.T @ x)) self.x_cov = linops.LinearOperator( shape=(self.n, self.n), dtype=float, matvec=_mv, matmat=_mv ) if isinstance(x0, np.ndarray): self.x_mean = x0 elif x0 is None: self.x_mean = Ainv0_mean @ self.b_mean else: raise NotImplementedError self.x0 = self.x_mean def _get_prior_params(self, A0, Ainv0, x0, b): """Get the parameters of the matrix priors on A and H. Retrieves and / or initializes prior parameters of ``A0`` and ``Ainv0``. Parameters ---------- A0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional A square matrix, linear operator or random variable representing the prior belief over the linear operator :math:`A`. If an array or linear operator is given, a prior distribution is chosen automatically. Ainv0 : array-like or LinearOperator or RandomVariable, shape=(n,n), optional A square matrix, linear operator or random variable representing the prior belief over the inverse :math:`H=A^{-1}`. This can be viewed as taking the form of a pre-conditioner. If an array or linear operator is given, a prior distribution is chosen automatically. x0 : array-like, or RandomVariable, shape=(n,) Optional. Prior belief for the solution of the linear system. Will be ignored if ``A0`` or ``Ainv0`` is given. b : RandomVariable, shape=(n,) or (n, nrhs) Right-hand side random variable `b` in :math:`A x = b`. Returns ------- A0_mean : array-like or LinearOperator, shape=(n,n) Prior mean of the linear operator :math:`A`. A0_covfactor : array-like or LinearOperator, shape=(n,n) Factor :math:`W^A` of the symmetric Kronecker product prior covariance :math:`W^A \\otimes_s W^A` of :math:`A`. Ainv0_mean : array-like or LinearOperator, shape=(n,n) Prior mean of the linear operator :math:`H`. Ainv0_covfactor : array-like or LinearOperator, shape=(n,n) Factor :math:`W^H` of the symmetric Kronecker product prior covariance :math:`W^H \\otimes_s W^H` of :math:`H`. b_mean : array-like, shape=(n,nrhs) Prior mean of the right hand side :math:`b`. """ # Right hand side mean b_mean = b.sample(1) # TODO: build prior model for rhs and change to b.mean # No matrix priors specified if A0 is None and Ainv0 is None: # No prior information given if x0 is None: Ainv0_mean = linops.Identity(shape=self.n) Ainv0_covfactor = linops.Identity(shape=self.n) # Standard normal covariance A0_mean = linops.Identity(shape=self.n) A0_covfactor = linops.Identity(shape=self.n) # TODO: should this be a sample from A to achieve symm. posterior # correspondence? return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor, b_mean # Construct matrix priors from initial guess x0 elif isinstance(x0, np.ndarray): # Sample from linear operator for prior construction if isinstance(self.A, probnum.RandomVariable): _A = self.A.sample([1])[0] else: _A = self.A A0_mean, Ainv0_mean = self._construct_symmetric_matrix_prior_means( A=_A, x0=x0, b=b_mean ) Ainv0_covfactor = Ainv0_mean # Standard normal covariance A0_covfactor = linops.Identity(shape=self.n) # TODO: should this be a sample from A to achieve symm. posterior # correspondence? return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor, b_mean elif isinstance(x0, probnum.RandomVariable): raise NotImplementedError # Prior on Ainv specified if not isinstance(A0, probnum.RandomVariable) and Ainv0 is not None: if isinstance(Ainv0, probnum.RandomVariable): Ainv0_mean = Ainv0.mean Ainv0_covfactor = Ainv0.cov.A else: Ainv0_mean = Ainv0 Ainv0_covfactor = Ainv0 # Symmetric posterior correspondence try: if A0 is not None: A0_mean = A0 elif isinstance(Ainv0, probnum.RandomVariable): A0_mean = Ainv0.mean.inv() else: A0_mean = Ainv0.inv() except AttributeError: warnings.warn( "Prior specified only for Ainv. Inverting prior mean naively. " "This operation is computationally costly! Specify an inverse " "prior (mean) instead." ) A0_mean = np.linalg.inv(Ainv0.mean) except NotImplementedError: A0_mean = linops.Identity(self.n) warnings.warn( "Prior specified only for Ainv. Automatic prior mean inversion " "not implemented, falling back to standard normal prior." ) # Standard normal covariance A0_covfactor = linops.Identity(shape=self.n) # TODO: should this be a sample from A to achieve symm. posterior # correspondence? return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor, b_mean # Prior on A specified elif A0 is not None and not isinstance(Ainv0, probnum.RandomVariable): if isinstance(A0, probnum.RandomVariable): A0_mean = A0.mean A0_covfactor = A0.cov.A else: A0_mean = A0 A0_covfactor = A0 # Symmetric posterior correspondence try: if Ainv0 is not None: Ainv0_mean = Ainv0 elif isinstance(A0, probnum.RandomVariable): Ainv0_mean = A0.mean.inv() else: Ainv0_mean = A0.inv() except AttributeError: warnings.warn( "Prior specified only for A. Inverting prior mean naively. " "This operation is computationally costly! Specify an inverse " "prior (mean) instead." ) Ainv0_mean = np.linalg.inv(A0.mean) except NotImplementedError: Ainv0_mean = linops.Identity(self.n) warnings.warn( "Prior specified only for A. Automatic prior mean inversion " "failed, falling back to standard normal prior." ) # Symmetric posterior correspondence Ainv0_covfactor = Ainv0_mean return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor, b_mean # Both matrix priors on A and H specified via random variables elif isinstance(A0, probnum.RandomVariable) and isinstance( Ainv0, probnum.RandomVariable ): A0_mean = A0.mean A0_covfactor = A0.cov.A Ainv0_mean = Ainv0.mean Ainv0_covfactor = Ainv0.cov.A return A0_mean, A0_covfactor, Ainv0_mean, Ainv0_covfactor, b_mean else: raise NotImplementedError def has_converged(self, iter, maxiter, atol=None, rtol=None): """Check convergence of a linear solver. Evaluates a set of convergence criteria based on its input arguments to decide whether the iteration has converged. Parameters ---------- iter : int Current iteration of solver. maxiter : int Maximum number of iterations atol : float Absolute tolerance for the uncertainty about the solution estimate. Stops if :math:`\\sqrt{\\text{tr}(\\Sigma)} \\leq \\text{atol}`, where :math:`\\Sigma` is the covariance of the solution :math:`x`. rtol : float Relative tolerance for the uncertainty about the solution estimate. Stops if :math:`\\sqrt{\\text{tr}(\\Sigma)} \\leq \\text{rtol} \\lVert x_i \\rVert`, where :math:`\\Sigma` is the covariance of the solution :math`x` and :math:`x_i` its mean. Returns ------- has_converged : bool True if the method has converged. convergence_criterion : str Convergence criterion which caused termination. """ # maximum iterations if iter >= maxiter: warnings.warn( "Iteration terminated. Solver reached the maximum number of iterations." ) return True, "maxiter" # uncertainty-based if isinstance(self.x_cov, linops.LinearOperator): sqrttracecov = np.sqrt(self.x_cov.trace()) else: sqrttracecov = np.sqrt(np.trace(self.x_cov)) if sqrttracecov <= atol: return True, "covar_atol" elif sqrttracecov <= rtol * np.linalg.norm(self.x_mean): return True, "covar_rtol" else: return False, "" def solve( self, callback=None, maxiter=None, atol=10 ** -6, rtol=10 ** -6, noise_scale=None, **kwargs ): """Solve the linear system :math:`Ax=b`. Parameters ---------- callback : function, optional User-supplied function called after each iteration of the linear solver. It is called as ``callback(xk, Ak, Ainvk, sk, yk, alphak, resid, noise_scale)`` and can be used to return quantities from the iteration. Note that depending on the function supplied, this can slow down the solver. maxiter : int Maximum number of iterations atol : float Absolute tolerance for the uncertainty about the solution estimate. Stops if :math:`\\sqrt{\\text{tr}(\\Sigma)} \\leq \\text{atol}`, where :math:`\\Sigma` is the covariance of the solution :math:`x`. rtol : float Relative tolerance for the uncertainty about the solution estimate. Stops if :math:`\\sqrt{\\text{tr}(\\Sigma)} \\leq \\text{rtol} \\lVert x_i \\rVert`, where :math:`\\Sigma` is the covariance of the solution :math`x` and :math:`x_i` its mean. noise_scale : float Assumed (initial) noise scale :math:`\\varepsilon^2`. Returns ------- x : RandomVariable, shape=(n,) or (n, nrhs) Approximate solution :math:`x` to the linear system. Shape of the return matches the shape of ``b``. A : RandomVariable, shape=(n,n) Posterior belief over the linear operator. Ainv : RandomVariable, shape=(n,n) Posterior belief over the linear operator inverse :math:`H=A^{-1}`. info : dict Information on convergence of the solver. """ raise NotImplementedError