Source code for evomap.metrics

"""
Module for evaluating maps.
"""

import numpy as np
import pandas as pd
from scipy.spatial.distance import squareform, pdist, cdist, cosine
from scipy.stats import pearsonr

[docs] def misalign_score(X_t, normalize=True): """ Calculate misalignment of a sequence of maps. Misalignment is measured as the average Euclidean distance between objects' subsequent map positions. The final score is averaged across all objects. Parameters ---------- X_t : list of ndarrays, each of shape (n_samples, n_dims) Map coordinates. normalize : bool, optional If True, misalignment is normalized by the average interobject distance on the map. Useful for comparing maps across differently scaled coordinate systems, by default True. Returns ------- float Misalignment score, bounded within [0, inf). Lower values indicate better alignment. """ n_periods = len(X_t) if n_periods < 2: raise ValueError("At least two maps are required to compute misalignment.") misalignment = 0 for t in range(1, n_periods): distances = np.linalg.norm(X_t[t] - X_t[t-1], axis=1) if normalize: normalization_factor = np.mean(pdist(X_t[t-1])) distances /= normalization_factor if normalization_factor > 0 else 1 misalignment += np.mean(distances) misalignment /= (n_periods - 1) return misalignment
[docs] def align_score(X_t): """ Calculate alignment of a sequence of maps. Alignment is measured as the mean cosine similarity of objects' subsequent map positions. The final score is averaged across all objects. Cosine similarity measures the cosine of the angle between two vectors, thus providing a scale-invariant metric of similarity. Parameters ---------- X_t : list of ndarray, each of shape (n_samples, n_dims) Sequence of map coordinates. Returns ------- float Alignment score, bounded between [-1,1]. Higher values indicate better alignment. """ if len(X_t) < 2: raise ValueError("At least two maps are required to compute alignment.") n_periods = len(X_t) n_samples = X_t[0].shape[0] total_cosine_similarity = 0 for t in range(1, n_periods): X_this = X_t[t] X_prev = X_t[t-1] period_cosine_similarity = 0 for i in range(n_samples): # Compute the cosine similarity for each pair of vectors # Note that 'cosine' function returns the cosine distance, so 1 - distance gives similarity period_cosine_similarity += 1 - cosine(X_this[i, :], X_prev[i, :]) # Average cosine similarity across all samples for the current period total_cosine_similarity += period_cosine_similarity / n_samples # Average the total cosine similarity across all periods mean_alignment = total_cosine_similarity / (n_periods - 1) return mean_alignment
[docs] def hitrate_score(X, D, n_neighbors=10, inc=None, input_format='dissimilarity'): """ Calculate the Hitrate of nearest neighbor recovery for a single map. The score is averaged across all objects. Parameters ---------- X : ndarray of shape (n_samples, d) Map coordinates. D : ndarray Input data, either a similarity/dissimilarity matrix of shape (n_samples, n_samples), or a matrix of feature vectors of shape (n_samples, d_input). n_neighbors : int, optional Number of neighbors considered when calculating the hitrate, by default 10. inc : ndarray of shape (n_samples,), optional Inclusion array, indicating if an object is present (via 0 and 1s), by default None. input_format : str, optional One of 'vector', 'similarity', or 'dissimilarity', indicating the type of the input D, by default 'dissimilarity'. Returns ------- float Hitrate of nearest neighbor recovery, bounded within [0,1]. Higher values indicate better recovery. Raises ------ ValueError If the input dimensions mismatch or unsupported input format is provided. """ D = D.copy() if n_neighbors < 1: raise ValueError('Number of neighbors must be at least 1.') if not input_format in ['similarity', 'dissimilarity', 'vector']: raise ValueError('Input format should be "vector", "similarity", or "dissimilarity".') n_samples = X.shape[0] if inc is not None: if len(inc) != n_samples: raise ValueError('Inclusion array size must match the number of samples in X.') if np.any(~np.isin(inc, [0, 1])): raise ValueError('Inclusion array must contain only 0 or 1 values.') X = X[inc == 1, :] D = D[inc == 1, :][:, inc == 1] n_samples = X.shape[0] # Update n_samples after applying inc if input_format == 'vector': D = cdist(D, D) # Compute distance matrix from feature vectors # Prepare distance matrix for hitrate calculation if input_format in ['dissimilarity', 'vector']: np.fill_diagonal(D, np.inf) # Ensure no point is its own neighbor else: # 'similarity' np.fill_diagonal(D, -np.inf) # Ensure no point is its own neighbor # Compute distances in the map space Dist_map = squareform(pdist(X, 'sqeuclidean')) np.fill_diagonal(Dist_map, np.inf) # Similarly, ensure no point is its own neighbor hit_rate = 0 for i in range(n_samples): # Find indices of the n_neighbors closest neighbors in both original and map spaces if input_format in ['dissimilarity', 'vector']: nearest_original = np.argsort(D[i, :])[:n_neighbors] else: # 'similarity' nearest_original = np.argsort(-D[i, :])[:n_neighbors] nearest_map = np.argsort(Dist_map[i, :])[:n_neighbors] # Calculate the intersection of the two neighbor lists hit_rate += len(np.intersect1d(nearest_original, nearest_map)) return hit_rate / (n_neighbors * n_samples)
[docs] def adjusted_hitrate_score(X, D, n_neighbors=10, inc=None, input_format='dissimilarity'): """ Calculate the Hitrate of nearest neighbor recovery for a single map, adjusted for random agreement. The score is averaged across all objects. Parameters ---------- X : ndarray Map coordinates, shape (n_samples, d). D : ndarray Input data, either a similarity/dissimilarity matrix of shape (n_samples, n_samples), or a matrix of feature vectors of shape (n_samples, d_input). n_neighbors : int, optional Number of neighbors considered when calculating the hitrate, by default 10. inc : ndarray of shape (n_samples,), optional Inclusion array, indicating if an object is present (via 0 and 1s), by default None. input_format : str, optional One of 'vector', 'similarity', or 'dissimilarity', by default 'dissimilarity'. Returns ------- float Adjusted Hitrate of nearest neighbor recovery, bounded within [0,1]. Higher values indicate better recovery. Adjusted hitrate corrects the raw hitrate by the expected hitrate due to chance. Raises ------ ValueError If parameters are out of expected range or input dimensions mismatch. """ # First, calculate the raw hitrate score using the previously defined function raw_hitrate = hitrate_score(X, D, n_neighbors, inc, input_format) if inc is not None: # Update n_samples to reflect the number of included samples n_samples = len(X[inc == 1, :]) else: n_samples = X.shape[0] # Calculate the expected hitrate due to chance expected_hitrate = n_neighbors / (n_samples - 1) # n-1 because the object itself cannot be a neighbor # Adjust the hitrate by subtracting the expected hitrate due to chance adjusted_hitrate = raw_hitrate - expected_hitrate return adjusted_hitrate
[docs] def avg_hitrate_score(X_t, D_t, n_neighbors=10, inc_t=None, input_format='dissimilarity'): """ Calculate the average Hitrate of nearest neighbor recovery for a sequence of maps. The score is averaged across all maps within the sequence. Parameters ---------- X_t : list of ndarray List of map coordinates for each time period, each of shape (n_samples, d). D_t : list of ndarray List of input data matrices for each time period, each either a similarity/dissimilarity matrix of shape (n_samples, n_samples), or a matrix of feature vectors of shape (n_samples, d_input). n_neighbors : int, optional Number of neighbors considered when calculating the hitrate for each map, by default 10. inc_t : list of ndarray, optional List of inclusion arrays for each time period, each indicating if an object is present (via 0 and 1s), by default None. If provided, each should match the number of samples in the corresponding X and D. input_format : str, optional Specifies the input format of D_t, one of 'vector', 'similarity', or 'dissimilarity', by default 'dissimilarity'. Returns ------- float Average hitrate of nearest neighbor recovery, bounded between [0,1]. Higher values indicate better recovery. Raises ------ ValueError If there are inconsistencies in array sizes or unsupported input format is specified. """ if not X_t or not D_t or len(X_t) != len(D_t): raise ValueError("List of map coordinates and data matrices must be non-empty and of equal length.") if inc_t and len(inc_t) != len(X_t): raise ValueError("Inclusion arrays, if provided, must match the number of map coordinate arrays.") total_hitrate = 0 n_periods = len(X_t) for t in range(n_periods): inc = inc_t[t] if inc_t else None hit_rate = hitrate_score(X=X_t[t], D=D_t[t], n_neighbors=n_neighbors, inc=inc, input_format=input_format) total_hitrate += hit_rate return total_hitrate / n_periods
[docs] def avg_adjusted_hitrate_score(X_t, D_t, n_neighbors=10, inc_t=None, input_format='dissimilarity'): """ Calculate the average Adjusted Hitrate of nearest neighbor recovery for a sequence of maps, adjusted for random agreement. The score is averaged across all maps within the sequence. Parameters ---------- X_t : list of ndarray List of map coordinates for each time period, each of shape (n_samples, d). D_t : list of ndarray List of input data matrices for each time period, each either a similarity/dissimilarity matrix of shape (n_samples, n_samples), or a matrix of feature vectors of shape (n_samples, d_input). n_neighbors : int, optional Number of neighbors considered when calculating the adjusted hitrate for each map, by default 10. inc_t : list of ndarray, optional List of inclusion arrays for each time period, each indicating if an object is present (via 0 and 1s), by default None. If provided, each should match the number of samples in the corresponding X and D. input_format : str, optional Specifies the input format of D_t, one of 'vector', 'similarity', or 'dissimilarity', by default 'dissimilarity'. Returns ------- float Average adjusted hitrate of nearest neighbor recovery, bounded between [0,1]. Higher values indicate better recovery. Raises ------ ValueError If there are inconsistencies in array sizes or unsupported input format is specified. """ if not X_t or not D_t or len(X_t) != len(D_t): raise ValueError("List of map coordinates and data matrices must be non-empty and of equal length.") if inc_t and len(inc_t) != len(X_t): raise ValueError("Inclusion arrays, if provided, must match the number of map coordinate arrays.") total_adjusted_hitrate = 0 n_periods = len(X_t) for t in range(n_periods): inc = inc_t[t] if inc_t else None adjusted_hit_rate = adjusted_hitrate_score(X=X_t[t], D=D_t[t], n_neighbors=n_neighbors, inc=inc, input_format=input_format) total_adjusted_hitrate += adjusted_hit_rate return total_adjusted_hitrate / n_periods
[docs] def persistence_score(X_t): """ Calculate persistence of a sequence of maps as the average Pearson correlation coefficient between objects' subsequent map movements (i.e., the first differences of their map positions). The score is averaged across all objects. Parameters ---------- X_t : list of ndarrays, each of shape (n_samples, n_dims) Sequence of map coordinates. Each ndarray represents a map at a different time. Returns ------- float Persistence score, bounded within (-1,1). Higher positive values indicate higher persistence of map movements across time periods. Raises ------ ValueError If fewer than two maps are provided or if maps do not have consistent dimensions. """ if len(X_t) < 2: raise ValueError("Persistence can only be computed for a sequence of at least two maps.") # Calculate first differences between consecutive maps deltas = np.diff(X_t, axis=0) # Calculate correlation of these differences from one time step to the next n_periods = len(X_t) - 1 # Number of periods with valid differences correlations = [] for d in range(deltas.shape[2]): # Iterate over each dimension # Flatten the differences in this dimension for all samples across all periods flat_deltas = deltas[:, :, d].flatten() if np.all(flat_deltas == 0): # If there is no change in this dimension, treat the persistence as perfect correlations.append(1.0) else: # Calculate the Pearson correlation between consecutive periods corr, _ = pearsonr(flat_deltas[:-X_t[0].shape[0]], flat_deltas[X_t[0].shape[0]:]) correlations.append(corr) # Average the correlations across all dimensions to get the overall persistence score return np.mean(correlations)