"""Utility functions for weighted metric aggregation. All functions are pure (no side effects, no I/O) and operate on plain dicts/lists. Weights do not need to be pre-normalised — normalisation is done internally. """ from __future__ import annotations import math def resolve_weight(weights: dict[str, float], key: str, default: float = 1.0) -> float: """Return the weight for *key*, or *default* when absent.""" return float(weights.get(key, default)) def compute_weighted_score( scores: dict[str, float | None], metric_weights: dict[str, float], ) -> float | None: """Return the weighted mean of valid (non-NaN, non-None) metric scores. Args: scores: mapping of metric_name -> raw score (may be NaN or None). metric_weights: optional per-metric weights; absent keys default to 1.0. Returns: Weighted mean as a float, or None when no valid score exists. """ total_weight = 0.0 total_score = 0.0 for metric, score in scores.items(): if score is None: continue try: value = float(score) except (TypeError, ValueError): continue if math.isnan(value) or math.isinf(value): continue weight = resolve_weight(metric_weights, metric, default=1.0) total_weight += weight total_score += weight * value if total_weight == 0.0: return None return total_score / total_weight def weighted_metric_means( score_rows: list[dict], metrics: list[str], doc_weights: dict[str, float], ) -> dict[str, float | None]: """Compute per-metric weighted means across all score rows. Each row's contribution is scaled by the doc_weight for its ``doc_name``. Rows with NaN/None for a given metric are excluded from that metric's mean. Args: score_rows: list of score record dicts (from scores.csv). metrics: ordered list of metric names to aggregate. doc_weights: mapping doc_name -> weight multiplier; absent keys default to 1.0. Returns: Dict mapping metric_name -> weighted mean (or None if no valid data). """ totals: dict[str, float] = {metric: 0.0 for metric in metrics} weights_sum: dict[str, float] = {metric: 0.0 for metric in metrics} for row in score_rows: doc_name = str(row.get("doc_name", "") or "") sample_weight = resolve_weight(doc_weights, doc_name, default=1.0) for metric in metrics: raw_value = row.get(metric) if raw_value is None: continue try: value = float(raw_value) except (TypeError, ValueError): continue if math.isnan(value) or math.isinf(value): continue totals[metric] += sample_weight * value weights_sum[metric] += sample_weight return { metric: (totals[metric] / weights_sum[metric] if weights_sum[metric] > 0 else None) for metric in metrics } def compute_overall_weighted_score_mean( score_rows: list[dict], metric_weights: dict[str, float], doc_weights: dict[str, float], ) -> float | None: """Compute the overall weighted-score mean across all samples. For each sample: 1. Compute per-sample weighted_score via compute_weighted_score. 2. Scale by the doc weight for that sample's doc_name. Then return the weighted mean of all per-sample weighted_scores. """ total_weight = 0.0 total_score = 0.0 for row in score_rows: metric_scores: dict[str, float | None] = {} for key, value in row.items(): if key in _META_COLUMNS: continue metric_scores[key] = value # type: ignore[assignment] weighted_score = compute_weighted_score(metric_scores, metric_weights) if weighted_score is None: continue doc_name = str(row.get("doc_name", "") or "") sample_weight = resolve_weight(doc_weights, doc_name, default=1.0) total_weight += sample_weight total_score += sample_weight * weighted_score return total_score / total_weight if total_weight > 0 else None # Columns in scores.csv that are sample metadata, not metric scores. _META_COLUMNS = frozenset( { "sample_id", "question", "contexts", "answer", "ground_truth", "scenario", "language", "retrieval_config", "error", "judge_model", "embedding_model", "run_id", "difficulty", "question_type", "doc_id", "doc_name", "section_path", "page_start", "page_end", "source_chunk_ids", "review_status", "review_notes", "weighted_score", "sample_weight", } )