Source code for tensorly.contrib.decomposition.tt_TTOI

import numpy as np
import tensorly as tl
from tensorly.tt_tensor import tt_to_tensor, validate_tt_rank
from tensorly.decomposition._base_decomposition import DecompositionMixin


def sequential_prod(tensor_prod, multiplier_list, left_to_right=True):
    """Perform sequential multiplication and reshaping

    Parameters
    ----------
    tensor_prod: ndrray
        the tensor to be multiplied by a list of tensors
        tensor_prod is of dimension r_0*p_1*p_2*...*p_d*r_{d-1}
    multiplier_list: list
        a list of tensors to multiply the tensor
        len(multiplier_list) <= d-1
        If direction == "left", multiplier_list[i] is of shape (r_i, p_{i+1}, r_{i+1})
        If direction == "right", multiplier_list[i] is of shape (r_{d-i}, p_{d-i}, r_{d-i-1})
    left_to_right : bool, optional, default is True
        direction of the multiplication
    Returns
    -------
    tensor_prod : ndarray
        product tensor
        If direction == "left" and `multiplier_list` contains N tensors, then `tensor_prod` is of dimension r_N*p_{N+1}*...*p_d*r_{d-1}
        If direction == "right" and `multiplier_list` contains d-N tensors, then `tensor_prod` is of dimension r_0*p_1*...*p_N*r_N
    """

    if left_to_right == True:
        for i in range(len(multiplier_list)):
            tensor_prod = tl.tensordot(
                multiplier_list[i], tensor_prod, axes=([0, 1], [0, 1])
            )
    else:
        for i in range(len(multiplier_list)):
            tensor_prod = tl.tensordot(
                tensor_prod,
                multiplier_list[i],
                axes=([tl.ndim(tensor_prod) - 1, tl.ndim(tensor_prod) - 2], [0, 1]),
            )
    return tensor_prod


[docs] def tensor_train_OI(data_tensor, rank, n_iter=2, trajectory=False, return_errors=True): """Perform tensor-train orthogonal iteration (TTOI) [1]_ for tensor train decomposition Parameters ---------- data_tensor: tl.tensor observed tensor data rank : tuple rank of the TT decomposition must verify rank[0] == rank[-1] == 1 (boundary conditions) and len(rank) == len(tl.shape(data_tensor))+1 n_iter : int the number of iterations trajectory : bool, optional, default is False if True, the output of each iteration of TTOI is returned: 2*n_iter outputs otherwise, only the output of the last iteration is returned return_errors : bool, optional, default is True if True, the approximation/reconstruction error of each iteration of TTOI is returned: 2*n_iter outputs Returns ------- factors : list of n_iter lists of tensor-train factors or one list of tensor-train factors:: * n_iter lists of factors (if `trajectory` is True) : each list contains the returned list of tensor-train factors from each iteration * one list of factors (otherwise): returned list of tensor-train factors from the last iteration full_tensor : list of n_iter tensors or one tensor:: * n_iter tensors (if `trajectory` is True) : each list contains the returned full tensor from each iteration * one tensor (otherwise): returned full tensor from the last iteration References ---------- .. [1] Zhou, Y., Zhang, A. R., Zheng, L., & Wang, Y. (2022). Optimal high-order tensor svd via tensor-train orthogonal iteration. IEEE Transactions on Information Theory, 68(6), 3991-4019. """ context = tl.context(data_tensor) shape = tl.shape(data_tensor) n_dim = len(shape) rank = validate_tt_rank(shape, rank) # Make sure it's not a tuple but a list rank = list(rank) # Add two one-dimensional mode to data_tensor data_tensor_extended = tl.reshape(data_tensor, (1,) + shape + (1,)) if trajectory: factors = [] full_tensor = [] if return_errors: error_list = [] # perform TTOI for n_iter iterations for iteration in range(int(np.ceil(n_iter / 2))): # first perform forward update # left_singular_vectors will be a list including estimated left singular spaces at the current iteration left_singular_vectors = [] # initialize left_residuals (sequential unfolding of data_tensor multiplied by left_singular_vectors sequentially on the left, useful for backward update to obtain right_singular_vectors) left_residuals = [] # estimate the first left singular spaces # Here, R_tmp is the first sequential unfolding compressed on the right by previous updated right_singular_vectors (if exists) R_tmp_l = data_tensor_extended if iteration == 0: R_tmp = R_tmp_l else: R_tmp = sequential_prod( R_tmp_l, right_singular_vectors, left_to_right=False ) U_tmp, _, _ = tl.tenalg.svd_interface( matrix=tl.reshape(R_tmp, (shape[0], -1)), n_eigenvecs=rank[1] ) left_singular_vectors.append(tl.reshape(U_tmp, (rank[0], shape[0], rank[1]))) # estimate the 2nd to (d-1)th left singular spaces for mode in range(n_dim - 2): # compress the (mode+2)th sequential unfolding of data_tensor from the left R_tmp_l = sequential_prod( R_tmp_l, [left_singular_vectors[mode]], left_to_right=True ) # R_tmp_l will be useful for backward update left_residuals.append(R_tmp_l) # compress the (mode+2)th sequential unfolding of data_tensor from the right (if iteration>0) if iteration == 0: R_tmp = R_tmp_l else: R_tmp = sequential_prod( R_tmp_l, right_singular_vectors[0 : (n_dim - mode - 2)], left_to_right=False, ) U_tmp, _, _ = tl.tenalg.svd_interface( matrix=tl.reshape(R_tmp, (rank[mode + 1] * shape[mode + 1], -1)), n_eigenvecs=rank[mode + 2], ) left_singular_vectors.append( tl.reshape(U_tmp, (rank[mode + 1], shape[mode + 1], rank[mode + 2])) ) # forward update is done; output the final residual left_residuals.append( sequential_prod( R_tmp_l, [left_singular_vectors[n_dim - 2]], left_to_right=True ) ) if trajectory or return_errors: factors_list_tmp = [] for mode in range(n_dim - 1): factors_list_tmp.append(left_singular_vectors[mode]) factors_list_tmp.append(left_residuals[n_dim - 2]) full_tensor_tmp = tt_to_tensor(factors_list_tmp) if return_errors: error_list.append(tl.norm(full_tensor_tmp - data_tensor, 2)) if trajectory: factors.append(factors_list_tmp) full_tensor.append(full_tensor_tmp) elif ( iteration == np.ceil(n_iter / 2) - 1 and np.ceil(n_iter / 2) * 2 > n_iter ): factors = factors_list_tmp full_tensor = full_tensor_tmp if np.ceil(n_iter / 2) * 2 == n_iter: # perform backward update # initialize right_singular_vectors: right_singular_vectors will be a list of estimated right singular spaces at the current or previous iteration right_singular_vectors = [] _, _, V_tmp = tl.tenalg.svd_interface( matrix=tl.reshape( left_residuals[n_dim - 2], (rank[n_dim - 1], shape[n_dim - 1]) ), n_eigenvecs=rank[n_dim - 1], ) V_tmp = tl.transpose(V_tmp) right_singular_vectors.append( tl.reshape(V_tmp, (rank[n_dim], shape[n_dim - 1], rank[n_dim - 1])) ) # estimate the 2nd to (d-1)th right singular spaces for mode in range(n_dim - 2): # compress left_residuals from the right R_tmp_r = sequential_prod( left_residuals[n_dim - mode - 3], right_singular_vectors[0 : (mode + 1)], "right", ) _, _, V_tmp = tl.tenalg.svd_interface( matrix=tl.reshape( R_tmp_r, ( rank[n_dim - mode - 2], shape[n_dim - mode - 2] * rank[n_dim - mode - 1], ), ), n_eigenvecs=rank[n_dim - mode - 2], ) right_singular_vectors.append( tl.transpose( tl.reshape( V_tmp, ( rank[n_dim - mode - 2], shape[n_dim - mode - 2], rank[n_dim - mode - 1], ), ) ) ) Residual_right = sequential_prod( data_tensor_extended, right_singular_vectors, left_to_right=False ) if trajectory or return_errors or iteration == np.ceil(n_iter / 2) - 1: factors_list_tmp = [] factors_list_tmp.append(Residual_right) for mode in range(n_dim - 1): factors_list_tmp.append( tl.transpose(right_singular_vectors[n_dim - mode - 2]) ) full_tensor_tmp = tt_to_tensor(factors_list_tmp) if return_errors: error_list.append(tl.norm(full_tensor_tmp - data_tensor, 2)) if trajectory: factors.append(factors_list_tmp) full_tensor.append(full_tensor_tmp) elif iteration == n_iter - 1: factors = factors_list_tmp full_tensor = full_tensor_tmp # return final results if return_errors: return factors, full_tensor, error_list else: return factors, full_tensor
class TensorTrain_OI(DecompositionMixin): def __init__(self, rank, n_iter, trajectory, return_errors): self.rank = rank self.n_iter = n_iter self.trajectory = trajectory self.return_errors = return_errors def fit_transform(self, tensor): self.decomposition_ = tensor_train_OI( tensor, rank=self.rank, n_iter=self.n_iter, trajectory=self.trajectory, return_errors=self.return_errors, ) return self.decomposition_