import math
from collections.abc import Callable
from typing import Literal
import numpy as np
import torch
from seqme.core.base import Metric, MetricResult
[docs]
class FKEA(Metric):
"""
Fourier-based Kernel Entropy Approximation (FKEA) approximates the VENDI-score and RKE-score using random Fourier features.
This is a reference-free method to estimate diversity in a set of
generated sequences. It is positively correlated with the number of
distinct modes or clusters in the embedding space, without requiring
access to real/reference data.
The method works by projecting embeddings into a randomized Fourier
feature space, approximating the Gaussian kernel, and computing the
α-norm of the normalized kernel eigenvalues.
- If alpha=2, this corresponds to the RKE-score.
- If alpha≠2, this corresponds to the VENDI-α score.
References:
[1] Friedman et al., The Vendi Score: A Diversity Evaluation Metric for Machine Learning, 2023
(https://arxiv.org/abs/2210.02410)
[2] Ospanov, Zhang, Jalali et al., "Towards a Scalable Reference-Free Evaluation of Generative Models", 2024
(https://arxiv.org/pdf/2407.02961)
"""
[docs]
def __init__(
self,
embedder: Callable[[list[str]], np.ndarray],
bandwidth: float,
*,
alpha: float | int = 2,
n_random_fourier_features: int | None = 2048,
batch_size: int = 256,
device: str = "cpu",
seed: int = 0,
strict: bool = True,
name: str = "FKEA",
):
"""Initialize the metric with an embedding function and kernel bandwidth.
Args:
embedder: A function that maps a list of sequences to a 2D NumPy array of embeddings.
bandwidth: Bandwidth parameter for the Gaussian kernel.
alpha: alpha-norm of the normalized kernels eigenvalues. If ``alpha=2`` then it corresponds to the RKE-score otherwise VENDI-alpha.
n_random_fourier_features: Number of random Fourier features. Used to approximate the kernel function. Consider increasing this to get a better approximation. If ``None``, use the exact kernel covariance matrix.
batch_size: Number of samples per batch when computing the kernel.
device: Compute device, e.g., ``"cpu"`` or ``"cuda"``.
seed: Seed for deterministic sampling of Fourier features.
strict: Enforce equal number of samples for computation.
name: Metric name.
"""
self.embedder = embedder
self.n_random_fourier_features = n_random_fourier_features
self.alpha = alpha
self.bandwidth = bandwidth
self.batch_size = batch_size
self.device = device
self.seed = seed
self.strict = strict
self._name = name
self._n_sequences: int = None
if (self.n_random_fourier_features is not None) and (self.n_random_fourier_features <= 0):
raise ValueError("Expected n_random_fourier_features > 0.")
if self.bandwidth <= 0:
raise ValueError("Expected bandwidth > 0.")
if self.alpha <= 0:
raise ValueError("Expected alpha > 0.")
[docs]
def __call__(self, sequences: list[str]) -> MetricResult:
"""Computes FKEA of the input sequences.
Args:
sequences: Sequences to evaluate.
Returns:
MetricResult: FKEA score.
"""
if self.strict:
if self._n_sequences is None:
self._n_sequences = len(sequences)
if self._n_sequences != len(sequences):
raise ValueError("Computed the metric using different number of sequences.")
seq_embeddings = torch.from_numpy(self.embedder(sequences)).to(device=self.device)
if self.n_random_fourier_features is None:
score = calculate_vendi(seq_embeddings, self.bandwidth, self.batch_size, self.alpha)
else:
score = calculate_fourier_vendi(
seq_embeddings, self.n_random_fourier_features, self.bandwidth, self.batch_size, self.alpha, self.seed
)
return MetricResult(score)
@property
def name(self) -> str:
return self._name
@property
def objective(self) -> Literal["minimize", "maximize"]:
return "maximize"
def _calculate_score(eigenvalues: torch.Tensor, alpha: float = 2, eps: float = 1e-8) -> float:
eigenvalues = torch.clamp(eigenvalues, min=eps)
if alpha == math.inf:
score = 1 / torch.max(eigenvalues)
elif alpha == 1:
log_eigenvalues = torch.log(eigenvalues)
entanglement_entropy = -torch.sum(eigenvalues * log_eigenvalues) # * 100
score = torch.exp(entanglement_entropy)
else:
entropy = (1 / (1 - alpha)) * torch.log(torch.sum(eigenvalues**alpha))
score = torch.exp(entropy)
return score.item()
def calculate_fourier_vendi(
xs: torch.Tensor,
random_fourier_feature_dim: int,
bandwidth: float,
batch_size: int,
alpha: float = 2.0,
seed: int = 0,
) -> float:
std = math.sqrt(bandwidth / 2.0)
x_cov = _cov_random_fourier_features(xs, random_fourier_feature_dim, std, batch_size, seed)
eigenvalues, _ = torch.linalg.eigh(x_cov)
score = _calculate_score(eigenvalues.real, alpha)
return score
def _cov_random_fourier_features(
xs: torch.Tensor,
n_features: int,
std: float,
batch_size: int,
seed: int,
) -> torch.Tensor:
assert len(xs.shape) == 2 # [B, dim]
generator = torch.Generator(device=xs.device).manual_seed(seed)
omegas = torch.randn((xs.shape[-1], n_features), device=xs.device, generator=generator) / std
product = torch.matmul(xs, omegas)
rff = torch.cat([torch.cos(product), torch.sin(product)], dim=1)
rff = rff / n_features**0.5 # [B, 2 * n_features]
cov = torch.zeros((2 * n_features, 2 * n_features), device=xs.device)
for start in range(0, rff.shape[0], batch_size):
end = start + batch_size
chunk = rff[start:end] # [mini_B, 2*n_features]
cov += torch.matmul(chunk.T, chunk) # accumulate
cov /= xs.shape[0]
return cov
def calculate_vendi(xs: torch.Tensor, bandwidth: float, batch_size: int, alpha: float = 2) -> float:
std = math.sqrt(bandwidth / 2.0)
K = _normalized_gaussian_kernel(xs, xs, std, batch_size)
eigenvalues, _ = torch.linalg.eigh(K)
score = _calculate_score(eigenvalues, alpha)
return score
def _normalized_gaussian_kernel(xs: torch.Tensor, ys: torch.Tensor, std: float, batch_size: int) -> torch.Tensor:
assert xs.shape[1:] == ys.shape[1:]
scalar = -1 / (2 * std * std)
chunks = []
for start in range(0, ys.shape[0], batch_size):
end = start + batch_size
y_slice = ys[start:end]
diff = xs[:, None, :] - y_slice[None, :, :]
res = torch.sum(diff * diff, dim=2)
res = torch.exp(scalar * res)
chunks.append(res)
total_res = torch.hstack(chunks)
total_res = total_res / np.sqrt(xs.shape[0] * ys.shape[0])
return total_res