Source code for tensorly.solvers.nnls

import tensorly as tl
from math import sqrt


[docs] def hals_nnls( UtM, UtU, V=None, n_iter_max=500, tol=1e-8, sparsity_coefficient=None, ridge_coefficient=None, nonzero_rows=False, exact=False, epsilon=0.0, callback=None, ): """ Non Negative Least Squares (NNLS) Computes an approximate solution of a nonnegative least squares problem (NNLS) with an exact block-coordinate descent scheme. M is m by n, U is m by r, V is r by n. All matrices are nonnegative componentwise. This algorithm is a simplified implementation of the accelerated HALS defined in [1]. It features an early stop stopping criterion. It is simplified to ensure reproducibility and expose a simple API to control the number of inner iterations. This function is made for being used repetively inside an outer-loop alternating algorithm, for instance for computing nonnegative matrix Factorization or tensor factorization. To use as a stand-alone solver, set the exact flag to True. Parameters ---------- UtM: r-by-n array Pre-computed product of the transposed of U and M, used in the update rule UtU: r-by-r array Pre-computed product of the transposed of U and U, used in the update rule V: r-by-n initialization matrix (mutable) Initialized V array By default, is initialized with one non-zero entry per column corresponding to the closest column of U of the corresponding column of M. n_iter_max: Positive integer Upper bound on the number of iterations Default: 500 tol : float in [0,1] early stop criterion, while err_k > delta*err_0. Set small for almost exact nnls solution, or larger (e.g. 1e-2) for inner loops of a PARAFAC computation. Default: 10e-8 sparsity_coefficient: float or None The coefficient controling the sparisty level in the objective function. If set to None, the problem is solved unconstrained. Default: None ridge_coefficient: float or None The coefficient controling the ridge (l2) penalty in the objective function. If set to None, no ridge penalty is imposed. Default: None nonzero_rows: boolean True if the lines of the V matrix can't be zero, False if they can be zero Default: False exact: If it is True, the algorithm gives a results with high precision but it needs high computational cost. If it is False, the algorithm gives an approximate solution Default: False epsilon: float Small constant such that V>=epsilon instead of V>=0. Required to ensure convergence, avoids division by zero and reset. Default: 0 callback: callable, optional A callable called after each iteration. The supported signature is callback(V: tensor, error: float) where V is the last estimated nonnegative least squares solution, and error is the squared Euclidean norm of the difference between V at the current iteration k, and V at iteration k-1 (therefore error is not the loss function which is costly to compute). Moreover, the algorithm will also terminate if the callback callable returns True. Default: None Returns ------- V: array a r-by-n nonnegative matrix, see notes. Notes ----- We solve the following problem .. math:: \\min_{V >= \\epsilon} ||M-UV||_F^2 The matrix V is updated linewise. The update rule for this resolution is .. math:: \\begin{equation} V[k,:]_{(j+1)} = V[k,:]_{(j)} + (UtM[k,:] - UtU[k,:]\\times V_{(j)})/UtU[k,k] \\end{equation} with j the update iteration index. V is then thresholded to be larger than epsilon. This problem can also be defined by adding respectively a sparsity coefficient and a ridge coefficients .. math:: \lambda_s, \lambda_r enhancing sparsity or smoothness in the solution [2]. In this sparse/ridge version, the update rule becomes .. math:: \\begin{equation} V[k,:]_{(j+1)} = V[k,:]_{(j)} + (UtM[k,:] - UtU[k,:]\\times V_{(j)} - \lambda_s)/(UtU[k,k]+2\lambda_r) \\end{equation} Note that the data fitting is halved but not the ridge penalization. References ---------- .. [1] N. Gillis and F. Glineur, Accelerated Multiplicative Updates and Hierarchical ALS Algorithms for Nonnegative Matrix Factorization, Neural Computation 24 (4): 1085-1105, 2012. .. [2] J. Eggert, and E. Korner. "Sparse coding and NMF." 2004 IEEE International Joint Conference on Neural Networks (IEEE Cat. No. 04CH37541). Vol. 4. IEEE, 2004. """ rank, _ = tl.shape(UtM) if V is None: V = tl.solve(UtU, UtM) V = tl.clip(V, a_min=0, a_max=None) # Scaling scale = tl.sum(UtM * V) / tl.sum(UtU * tl.dot(V, tl.transpose(V))) V = V * scale if exact: n_iter_max = 50000 tol = 1e-16 for iteration in range(n_iter_max): rec_error = 0 for k in range(rank): if UtU[k, k]: num = UtM[k, :] - tl.dot(UtU[k, :], V) + UtU[k, k] * V[k, :] den = UtU[k, k] if sparsity_coefficient is not None: num -= sparsity_coefficient if ridge_coefficient is not None: den += 2 * ridge_coefficient newV = tl.clip(num / den, a_min=epsilon) rec_error += tl.norm(V - newV) ** 2 V = tl.index_update(V, tl.index[k, :], newV) # Safety procedure, if columns aren't allow to be zero if nonzero_rows and tl.all(V[k, :] == 0): V[k, :] = tl.eps(V.dtype) * tl.max(V) elif nonzero_rows: raise ValueError( "Column " + str(k) + " of U is zero with nonzero condition" ) if callback is not None: retVal = callback(V, rec_error) if retVal is True: print("Received True from callback function. Exiting.") break if iteration == 0: rec_error0 = rec_error if rec_error < tol * rec_error0: break return V
[docs] def fista( UtM, UtU, x=None, n_iter_max=100, non_negative=True, sparsity_coef=0, ridge_coef=0, lr=None, tol=1e-8, epsilon=1e-8, ): """ Fast Iterative Shrinkage Thresholding Algorithm (FISTA), see [1]_ Computes an approximate (nonnegative) solution for Ux=M linear system. Parameters ---------- UtM : ndarray Pre-computed product of the transposed of U and M UtU : ndarray Pre-computed product of the transposed of U and U x : init Default: None n_iter_max : int Maximum number of iteration Default: 100 non_negative : bool, default is False if True, result will be non-negative lr : float learning rate Default : None sparsity_coef : float or None ridge_coef : float or None tol : float stopping criterion for the l1 error decrease relative to the first iteration error epsilon : float Small constant such that the solution is greater than epsilon instead of zero. Required to ensure convergence, avoids division by zero and reset. Default: 1e-8 Returns ------- x : approximate solution such that Ux = M Notes ----- We solve the following problem .. math:: \\frac{1}{2} \\|m - Ux \\|_2^2 + \\lambda_1 \\|x\\|_1 + \\lambda_2 \\|x\\|_2^2 References ---------- .. [1] Beck, A., & Teboulle, M. (2009). A fast iterative shrinkage-thresholding algorithm for linear inverse problems. SIAM journal on imaging sciences, 2(1), 183-202. """ if sparsity_coef is None: sparsity_coef = 0 if x is None: x = tl.zeros(tl.shape(UtM), **tl.context(UtM)) if lr is None: lr = 1 / (tl.truncated_svd(UtU)[1][0] + 2 * ridge_coef) # Parameters momentum_old = 1.0 # tl.tensor(1.0) norm_0 = 0.0 x_update = tl.copy(x) for iteration in range(n_iter_max): if isinstance(UtU, list): x_gradient = ( -UtM + tl.tenalg.multi_mode_dot(x_update, UtU, transpose=False) + sparsity_coef + 2 * ridge_coef * x_update ) else: x_gradient = ( -UtM + tl.dot(UtU, x_update) + sparsity_coef + 2 * ridge_coef * x_update ) x_new = x_update - lr * x_gradient if non_negative: x_new = tl.where(x_new < epsilon, epsilon, x_new) momentum = (1 + sqrt(1 + 4 * momentum_old**2)) / 2 x_update = x_new + ((momentum_old - 1) / momentum) * (x_new - x) momentum_old = momentum norm = tl.abs( tl.sum(x - x_new) ) # for tracking loss decrease, l2 has square overflow issues x = tl.copy(x_new) if iteration == 0: norm_0 = norm if norm < tol * norm_0: break return x
[docs] def active_set_nnls(Utm, UtU, x=None, n_iter_max=100, tol=10e-8): """ Active set algorithm for non-negative least square solution, see [1]_ Computes an approximate non-negative solution for Ux=m linear system. Parameters ---------- Utm : vectorized ndarray Pre-computed product of the transposed of U and m UtU : ndarray Pre-computed Kronecker product of the transposed of U and U x : init Default: None n_iter_max : int Maximum number of iteration Default: 100 tol : float Early stopping criterion Returns ------- x : ndarray Notes ----- This function solves following problem: .. math:: \\begin{equation} \\min_{x} \\|Ux - m\\|^2 \\end{equation} According to [1], non-negativity-constrained least square estimation problem becomes: .. math:: \\begin{equation} x' = Utm - UtU x \\end{equation} References ---------- .. [1] Bro, R., & De Jong, S. (1997). A fast non‐negativity‐constrained least squares algorithm. Journal of Chemometrics: A Journal of the Chemometrics Society, 11(5), 393-401. """ if tl.get_backend() == "tensorflow": raise ValueError( "Active set is not supported with the tensorflow backend. Consider using fista method with tensorflow." ) if x is None: x_vec = tl.zeros(tl.shape(UtU)[1], **tl.context(UtU)) else: x_vec = tl.base.tensor_to_vec(x) x_gradient = Utm - tl.dot(UtU, x_vec) passive_set = x_vec > 0 active_set = x_vec <= 0 support_vec = tl.zeros(tl.shape(x_vec), **tl.context(x_vec)) for iteration in range(n_iter_max): if iteration > 0 or tl.all(x_vec == 0): indice = tl.argmax(x_gradient) passive_set = tl.index_update(passive_set, tl.index[indice], True) active_set = tl.index_update(active_set, tl.index[indice], False) # To avoid singularity error when initial x exists try: passive_solution = tl.solve( UtU[passive_set, :][:, passive_set], Utm[passive_set] ) indice_list = [] for i in range(tl.shape(support_vec)[0]): if passive_set[i]: indice_list.append(i) support_vec = tl.index_update( support_vec, tl.index[int(i)], passive_solution[len(indice_list) - 1], ) else: support_vec = tl.index_update(support_vec, tl.index[int(i)], 0) # Start from zeros if solve is not achieved except: x_vec = tl.zeros(tl.shape(UtU)[1]) support_vec = tl.zeros(tl.shape(x_vec), **tl.context(x_vec)) passive_set = x_vec > 0 active_set = x_vec <= 0 if tl.any(active_set): indice = tl.argmax(x_gradient) passive_set = tl.index_update(passive_set, tl.index[indice], True) active_set = tl.index_update(active_set, tl.index[indice], False) passive_solution = tl.solve( UtU[passive_set, :][:, passive_set], Utm[passive_set] ) indice_list = [] for i in range(tl.shape(support_vec)[0]): if passive_set[i]: indice_list.append(i) support_vec = tl.index_update( support_vec, tl.index[int(i)], passive_solution[len(indice_list) - 1], ) else: support_vec = tl.index_update(support_vec, tl.index[int(i)], 0) # update support vector if it is necessary if tl.min(support_vec[passive_set]) <= 0: for i in range(len(passive_set)): alpha = tl.min( x_vec[passive_set][support_vec[passive_set] <= 0] / ( x_vec[passive_set][support_vec[passive_set] <= 0] - support_vec[passive_set][support_vec[passive_set] <= 0] ) ) update = alpha * (support_vec - x_vec) x_vec = x_vec + update passive_set = x_vec > 0 active_set = x_vec <= 0 passive_solution = tl.solve( UtU[passive_set, :][:, passive_set], Utm[passive_set] ) indice_list = [] for i in range(tl.shape(support_vec)[0]): if passive_set[i]: indice_list.append(i) support_vec = tl.index_update( support_vec, tl.index[int(i)], passive_solution[len(indice_list) - 1], ) else: support_vec = tl.index_update(support_vec, tl.index[int(i)], 0) if tl.any(passive_set) != True or tl.min(support_vec[passive_set]) > 0: break # set x to s x_vec = tl.clip(support_vec, 0, tl.max(support_vec)) # gradient update x_gradient = Utm - tl.dot(UtU, x_vec) if tl.any(active_set) != True or tl.max(x_gradient[active_set]) <= tol: break return x_vec