Source code for kaczmarz._variants

"""A module providing selection strategies for the Kaczmarz algorithm."""

from collections import deque

import numpy as np

import kaczmarz


[docs]class Cyclic(kaczmarz.Base): """Cycle through the equations of the system in order, repeatedly. References ---------- 1. S. Kaczmarz. "Angenäherte Auflösung von Systemen linearer Gleichungen." *Bulletin International de l’Académie Polonaise des Sciences et des Lettres. Classe des Sciences Mathématiques et Naturelles. Série A, Sciences Mathématiques*, 35, 335–357, 1937 """ def __init__(self, *base_args, **base_kwargs): super().__init__(*base_args, **base_kwargs) self._row_index = -1 def _select_row_index(self, xk): self._row_index = (1 + self._row_index) % self._n_rows return self._row_index
[docs]class MaxDistance(kaczmarz.Base): """Choose equations which leads to the most progress. This selection strategy is also known as `Motzkin's method`. References ---------- 1. T. S. Motzkin and I. J. Schoenberg. "The relaxation method for linear inequalities." *Canadian Journal of Mathematics*, 6:393–404, 1954. """ def _select_row_index(self, xk): # TODO: use auxiliary update for the residual. residual = self._b - self._A @ self._xk return np.argmax(np.abs(residual))
class Random(kaczmarz.Base): """Sample equations according to a `fixed` probability distribution. Parameters ---------- p : (m,) array_like, optional Sampling probability for each equation. Uniform by default. """ def __init__(self, *base_args, p=None, **base_kwargs): super().__init__(*base_args, **base_kwargs) self._p = p # p=None corresponds to uniform. def _select_row_index(self, xk): return np.random.choice(self._n_rows, p=self._p) class SVRandom(Random): """Sample equations with probability proportional to the squared row norms. References ---------- 1. T. Strohmer and R. Vershynin, "A Randomized Kaczmarz Algorithm with Exponential Convergence." Journal of Fourier Analysis and Applications 15, 262 2009. """ def __init__(self, *base_args, **base_kwargs): super().__init__(*base_args, **base_kwargs) squared_row_norms = self._row_norms ** 2 self._p = squared_row_norms / squared_row_norms.sum() class UniformRandom(Random): """Sample equations uniformly at random.""" # Nothing to do since uniform sampling is the default behavior of Random. class Quantile(Random): """Reject equations whose normalized residual is above a quantile. This algorithm is intended for use in solving corrupted systems of equations. That is, systems where a subset of the equations are consistent, while a minority of the equations are not. Such systems are almost always overdetermined. Parameters ---------- quantile : float, optional Quantile of normalized residual above which to reject. References ---------- 1. There will be a reference soon. Keep an eye out for that. """ def __init__(self, *args, quantile=1.0, **kwargs): super().__init__(*args, **kwargs) self._quantile = quantile def _distance(self, xk, ik): return np.abs(self._b[ik] - self._A[ik] @ xk) def _threshold_distances(self, xk): return np.abs(self._b - self._A @ xk) def _threshold(self, xk): distances = self._threshold_distances(xk) return np.quantile(distances, self._quantile) def _select_row_index(self, xk): ik = super()._select_row_index(xk) distance = self._distance(xk, ik) threshold = self._threshold(xk) if distance < threshold or np.isclose(distance, threshold): return ik return -1 # No projection please class SampledQuantile(Quantile): """Reject equations whose normalized residual is above a quantile of a random subset of residual entries. Parameters ---------- n_samples: int, optional Number of normalized residual samples used to compute the threshold quantile. References ---------- 1. There will be a reference soon. Keep an eye out for that. """ def __init__(self, *args, n_samples=None, **kwargs): super().__init__(*args, **kwargs) if n_samples is None: n_samples = self._n_rows self._n_samples = n_samples def _threshold_distances(self, xk): idxs = np.random.choice(self._n_rows, self._n_samples, replace=False) return np.abs(self._b[idxs] - self._A[idxs] @ xk) class WindowedQuantile(Quantile): """Reject equations whose normalized residual is above a quantile of the most recent normalized residual values. Parameters ---------- window_size : int, optional Number of recent normalized residual values used to compute the threshold quantile. Note ---- ``WindowedQuantile`` also accepts the parameters of ``Quantile``. References ---------- 1. There will be a reference soon. Keep an eye out for that. """ def __init__(self, *args, window_size=None, **kwargs): super().__init__(*args, **kwargs) if window_size is None: window_size = self._n_rows self._window = deque([], maxlen=window_size) def _distance(self, xk, ik): distance = super()._distance(xk, ik) self._window.append(distance) return distance def _threshold_distances(self, xk): return self._window class RandomOrthoGraph(kaczmarz.Base): """Try to only sample equations which are not already satisfied. Use the orthogonality graph defined in [1] to decide which rows should be considered "selectable" at each iteration. Parameters ---------- p : (m,) array_like, optional Sampling probability for each equation. Uniform by default. These probabilities will be re-normalized based on the selectable rows at each iteration. References ---------- 1. Nutini, Julie, et al. "Convergence rates for greedy Kaczmarz algorithms, and faster randomized Kaczmarz rules using the orthogonality graph." arXiv preprint arXiv:1612.07838 2016. """ def __init__(self, *args, p=None, **kwargs): super().__init__(*args, **kwargs) self._ortho_graph = self._A @ self._A.T self._i_to_newly_selectable = { i: self._newly_selectable(i) for i in range(self._n_rows) } self._selectable = np.argwhere(self._A @ self._x0 - self._b).flatten() if p is None: p = np.ones((self._n_rows,)) self._p = p def _newly_selectable(self, i): return np.argwhere(self._ortho_graph[i, :]) def _update_selectable(self, ik): # Every time a row is selected, all of its neighbors become selectable, and itself becomes unselectable. newly_selectable = self._i_to_newly_selectable[ik] selectable_with_ik = np.union1d(self._selectable, newly_selectable) self._selectable = np.setdiff1d(selectable_with_ik, [ik], assume_unique=True) def _select_row_index(self, xk): unnormalized_p = self._p[self._selectable] p = unnormalized_p / unnormalized_p.sum() ik = np.random.choice(self._selectable, p=p) self._update_selectable(ik) return ik