"""
Module for data pre-processing, including transformations between different data formats.
"""
import numpy as np
import pandas as pd
from scipy.sparse import coo_matrix
from scipy.spatial.distance import squareform, pdist
[docs]
def diss2sim(diss_mat, transformation='inverse', eps=1e-3):
"""
Transform a dissimilarity matrix to a similarity matrix.
Parameters
----------
diss_mat : ndarray of shape (n_samples, n_samples)
Matrix of pairwise dissimilarities.
transformation : str, optional
Transformation function, either 'inverse' or 'mirror', by default 'inverse'
eps : float, optional
Incremental constant to avoid division by zero, by default 1e-3
Returns
-------
ndarray of shape (n_samples, n_samples)
Matrix of pairwise similarities.
"""
if diss_mat.shape[0] != diss_mat.shape[1]:
raise ValueError("Dissimilarity matrix must be square.")
if transformation == "inverse":
sim_mat = 1 / (1 + diss_mat)
elif transformation == "mirror":
max_val = np.max(diss_mat) + eps
diss_mat_normalized = diss_mat / max_val
sim_mat = 1 - diss_mat_normalized
else:
raise ValueError('Unknown transformation type "{}"'.format(transformation))
np.fill_diagonal(sim_mat, 1)
return sim_mat
[docs]
def sim2diss(sim_mat, transformation='inverse', eps=1e-4):
"""
Transform a similarity matrix to a dissimilarity matrix.
Parameters
----------
sim_mat : ndarray of shape (n_samples, n_samples)
Matrix of pairwise similarities.
transformation : str, optional
Transformation function, either 'inverse' or 'mirror', by default 'inverse'.
'inverse' - Transforms by taking the reciprocal of the similarity scores.
'mirror' - Transforms by reflecting the similarity scores about 0.5 (1 - similarity).
eps : float, optional
Incremental constant to avoid division by zero, by default 1e-4
Returns
-------
ndarray of shape (n_samples, n_samples)
Matrix of pairwise dissimilarities.
"""
if sim_mat.shape[0] != sim_mat.shape[1]:
raise ValueError("Similarity matrix must be square.")
# Ensuring the diagonal elements are not below the threshold which could distort the transformation
np.fill_diagonal(sim_mat, np.maximum(1.0, np.diagonal(sim_mat)))
if transformation == 'inverse':
# Ensure no similarity value is less than eps to avoid division by zero
sim_mat_clipped = np.maximum(sim_mat, eps)
diss_mat = 1 / sim_mat_clipped
elif transformation == 'mirror':
# Normalize similarities if needed
if np.any(sim_mat > 1):
sim_mat = sim_mat / (np.max(sim_mat) + eps)
diss_mat = 1 - sim_mat
else:
raise ValueError(f'Unknown transformation type "{transformation}". Valid options are "inverse" or "mirror".')
np.fill_diagonal(diss_mat, 0)
return diss_mat
[docs]
def coocc2sim(coocc_mat):
"""
Transform a matrix with co-occurrence counts to a similarity matrix.
Parameters
----------
coocc_mat : ndarray of shape (n_samples, n_samples)
Matrix of co-occurrence counts. Assumes a non-negative, symmetric matrix where
the diagonal can be ignored (usually representing self-co-occurrence).
Returns
-------
ndarray of shape (n_samples, n_samples)
Matrix of pairwise similarities, normalized such that each element is a proportion
of the maximum co-occurrence for that row.
Notes
-----
The function sets the diagonal to zero to prevent self-similarity from skewing the results.
If a row's total co-occurrence is zero, it sets the entire row's similarity to zero to avoid
division by zero.
"""
n_samples = coocc_mat.shape[0]
np.fill_diagonal(coocc_mat, 0)
row_sums = np.sum(coocc_mat, axis=1)
row_sums_with_epsilon = np.where(row_sums == 0, np.finfo(float).eps, row_sums)
sim_mat = coocc_mat / row_sums_with_epsilon[:, np.newaxis]
sim_mat = (sim_mat + sim_mat.T) / 2
return sim_mat
[docs]
def edgelist2matrix(df, score_var, id_var_i, id_var_j, time_var=None, time_selected=None):
"""
Transform an edgelist to a relationship matrix.
Parameters
----------
df : DataFrame
Data containing the edgelist. Each row should include a pair. Must contain
two id variables and a score variable. Can optionally include a time variable.
score_var : string
The score variable.
id_var_i : string
The first id variable.
id_var_j : string
The second id variable.
time_var : string, optional
The time variable, by default None.
time_selected : int, optional
The selected time, by default None.
Returns
-------
S: ndarray of shape (n_samples, n_samples)
A matrix of pairwise relationships.
ids: ndarray
Identifiers for each element of the matrix.
Raises
------
ValueError:
If required columns are missing in the DataFrame.
"""
required_columns = {score_var, id_var_i, id_var_j}
if time_var:
required_columns.add(time_var)
if not required_columns.issubset(df.columns):
missing_cols = required_columns - set(df.columns)
raise ValueError(f"DataFrame is missing required columns: {missing_cols}")
# Filter data if a specific time is selected
if time_var and time_selected is not None:
df = df[df[time_var] == time_selected]
if df.empty:
raise ValueError(f"No data found for selected time: {time_selected}")
# Get unique identifiers and create a mapping index
unique_ids = np.unique(np.concatenate([df[id_var_i], df[id_var_j]], axis = 0))
id_index = {id_val: idx for idx, id_val in enumerate(unique_ids)}
# Prepare the matrix
n = len(unique_ids)
row_indices = df[id_var_i].map(id_index).values
col_indices = df[id_var_j].map(id_index).values
scores = df[score_var].values
# Create a sparse matrix and convert to dense
S = coo_matrix((scores, (row_indices, col_indices)), shape=(n, n), dtype=np.float64).toarray()
# Symmetrize the matrix
S = (S + S.T) / 2
return S, unique_ids
[docs]
def edgelist2matrices(df, score_var, id_var_i, id_var_j, time_var):
"""
Transform a time-indexed edgelist into a sequence of relationship matrices.
Parameters
----------
df : DataFrame
Data containing the edgelist. Each row should include a pair and must contain
two id variables, a score variable, and a time variable.
score_var : string
The score variable used to assign values in the matrix.
id_var_i : string
The first id variable corresponding to rows in the matrix.
id_var_j : string
The second id variable corresponding to columns in the matrix.
time_var : string
The time variable used to split the data into different matrices.
Returns
-------
S_t : list of ndarray
A list of relationship matrices, each corresponding to a different time period.
ids_t : ndarray
Array of identifiers for each element of the matrices.
Raises
------
ValueError:
If the DataFrame is missing any required columns or if there are no valid entries for any time period.
"""
required_columns = {score_var, id_var_i, id_var_j, time_var}
if not required_columns.issubset(df.columns):
missing_cols = required_columns - set(df.columns)
raise ValueError(f"DataFrame is missing required columns: {missing_cols}")
# Sort the DataFrame based on the time variable to ensure chronological order
df_sorted = df.sort_values(by=time_var)
# Get unique time periods
periods = df_sorted[time_var].unique()
# Prepare lists to hold the output matrices and identifiers
S_t = []
ids_t = []
# Process each time period separately
for period in periods:
period_data = df_sorted[df_sorted[time_var] == period]
if period_data.empty:
raise ValueError(f"No data found for time period: {period}")
S, ids = edgelist2matrix(period_data, score_var, id_var_i, id_var_j)
S_t.append(S)
ids_t.append(ids)
return S_t, ids_t
[docs]
def normalize_diss_mat(D):
"""
Normalize a dissimilarity matrix by the maximum dissimilarity observed in the matrix.
Parameters
----------
D : ndarray of shape (n_samples, n_samples)
A dissimilarity matrix.
Returns
-------
ndarray of shape (n_samples, n_samples)
Normalized dissimilarity matrix.
Raises
------
ValueError
If the input matrix is not square or if the maximum dissimilarity is zero.
"""
if D.shape[0] != D.shape[1]:
raise ValueError("Dissimilarity matrix must be square.")
max_diss = np.max(D)
if max_diss == 0:
raise ValueError("Maximum dissimilarity in the matrix is zero, normalization cannot be performed.")
# Normalize the matrix by the maximum dissimilarity
normalized_D = D / max_diss
return normalized_D
[docs]
def normalize_diss_mats(D_ts):
"""
Normalize a sequence of dissimilarity matrices by a common factor
(the maximum dissimilarity within the sequence).
Parameters
----------
D_ts : list of ndarray, each of shape (n_samples, n_samples)
Sequence of dissimilarity matrices.
Returns
-------
list of ndarray
Sequence of dissimilarity matrices, normalized by the maximum dissimilarity within
the input sequence.
Raises
------
ValueError
If any matrix is not square or if the list is empty.
"""
if not D_ts:
raise ValueError("Input list of dissimilarity matrices is empty.")
# Verify that all matrices are square
if any(mat.shape[0] != mat.shape[1] for mat in D_ts):
raise ValueError("All dissimilarity matrices must be square.")
# Find the global maximum dissimilarity across all matrices
max_diss = max(np.max(mat) for mat in D_ts)
if max_diss == 0:
raise ValueError("Maximum dissimilarity across all matrices is zero, cannot normalize.")
# Normalize all matrices by the global maximum dissimilarity
normalized_mats = [mat / max_diss for mat in D_ts]
return normalized_mats
[docs]
def expand_matrices(X_ts, labels_ts):
"""
Expand a list of similarity matrices (X_ts) to equal shape and calculate inclusion vectors.
Args:
X_ts (list of ndarray): List of similarity matrices for each time point.
labels_ts (list of list): List of labels corresponding to each matrix in X_ts.
Returns:
tuple: Contains a list of expanded similarity matrices, inclusion vectors, and all labels.
"""
all_labels = [[label for label in names] for names in labels_ts]
all_labels = [item for sublist in all_labels for item in sublist]
seen = set()
seen_add = seen.add
all_labels = [label for label in all_labels if not (label in seen or seen_add(label))]
expanded_matrices = []
inclusion_vectors = []
for X, labels in zip(X_ts, labels_ts):
# Initialize full_matrix with floating-point zeros to avoid dtype conflicts
full_matrix = pd.DataFrame(0.0, index=all_labels, columns=all_labels)
matrix_df = pd.DataFrame(X, index=labels, columns=labels)
full_matrix.update(matrix_df)
inclusion_vector = np.array([int(label in labels) for label in all_labels])
expanded_matrices.append(full_matrix.values)
inclusion_vectors.append(inclusion_vector)
return expanded_matrices, inclusion_vectors, all_labels
[docs]
def calc_distances(X, metric='euclidean'):
"""
Calculate matrix of pairwise distances among the rows of an input matrix.
Parameters
----------
X : ndarray of shape (n_samples, n_dims)
Input matrix containing samples for which pairwise distances will be calculated.
metric : str, optional
The distance metric to use. Can be any of those supported by `scipy.spatial.distance.pdist`,
such as 'euclidean', 'cityblock', 'cosine', etc. Defaults to 'euclidean'.
Returns
-------
ndarray of shape (n_samples, n_samples)
A matrix of pairwise distances, where each element (i, j) is the distance
between the i-th and j-th rows of the input matrix X according to the specified metric.
Raises
------
ValueError
If the metric specified is not supported by `scipy.spatial.distance.pdist`.
"""
try:
# Calculate the pairwise distances using pdist and squareform to convert it into a square matrix
distance_matrix = squareform(pdist(X, metric=metric))
except ValueError as e:
raise ValueError(f"The specified metric '{metric}' is not supported. Error: {str(e)}")
return distance_matrix