Files

153 lines
4.7 KiB
Python
Raw Permalink Normal View History

"""Utility functions for weighted metric aggregation.
All functions are pure (no side effects, no I/O) and operate on plain dicts/lists.
Weights do not need to be pre-normalised normalisation is done internally.
"""
from __future__ import annotations
import math
def resolve_weight(weights: dict[str, float], key: str, default: float = 1.0) -> float:
"""Return the weight for *key*, or *default* when absent."""
return float(weights.get(key, default))
def compute_weighted_score(
scores: dict[str, float | None],
metric_weights: dict[str, float],
) -> float | None:
"""Return the weighted mean of valid (non-NaN, non-None) metric scores.
Args:
scores: mapping of metric_name -> raw score (may be NaN or None).
metric_weights: optional per-metric weights; absent keys default to 1.0.
Returns:
Weighted mean as a float, or None when no valid score exists.
"""
total_weight = 0.0
total_score = 0.0
for metric, score in scores.items():
if score is None:
continue
try:
value = float(score)
except (TypeError, ValueError):
continue
if math.isnan(value) or math.isinf(value):
continue
weight = resolve_weight(metric_weights, metric, default=1.0)
total_weight += weight
total_score += weight * value
if total_weight == 0.0:
return None
return total_score / total_weight
def weighted_metric_means(
score_rows: list[dict],
metrics: list[str],
doc_weights: dict[str, float],
) -> dict[str, float | None]:
"""Compute per-metric weighted means across all score rows.
Each row's contribution is scaled by the doc_weight for its ``doc_name``.
Rows with NaN/None for a given metric are excluded from that metric's mean.
Args:
score_rows: list of score record dicts (from scores.csv).
metrics: ordered list of metric names to aggregate.
doc_weights: mapping doc_name -> weight multiplier; absent keys default to 1.0.
Returns:
Dict mapping metric_name -> weighted mean (or None if no valid data).
"""
totals: dict[str, float] = {metric: 0.0 for metric in metrics}
weights_sum: dict[str, float] = {metric: 0.0 for metric in metrics}
for row in score_rows:
doc_name = str(row.get("doc_name", "") or "")
sample_weight = resolve_weight(doc_weights, doc_name, default=1.0)
for metric in metrics:
raw_value = row.get(metric)
if raw_value is None:
continue
try:
value = float(raw_value)
except (TypeError, ValueError):
continue
if math.isnan(value) or math.isinf(value):
continue
totals[metric] += sample_weight * value
weights_sum[metric] += sample_weight
return {
metric: (totals[metric] / weights_sum[metric] if weights_sum[metric] > 0 else None)
for metric in metrics
}
def compute_overall_weighted_score_mean(
score_rows: list[dict],
metric_weights: dict[str, float],
doc_weights: dict[str, float],
) -> float | None:
"""Compute the overall weighted-score mean across all samples.
For each sample:
1. Compute per-sample weighted_score via compute_weighted_score.
2. Scale by the doc weight for that sample's doc_name.
Then return the weighted mean of all per-sample weighted_scores.
"""
total_weight = 0.0
total_score = 0.0
for row in score_rows:
metric_scores: dict[str, float | None] = {}
for key, value in row.items():
if key in _META_COLUMNS:
continue
metric_scores[key] = value # type: ignore[assignment]
weighted_score = compute_weighted_score(metric_scores, metric_weights)
if weighted_score is None:
continue
doc_name = str(row.get("doc_name", "") or "")
sample_weight = resolve_weight(doc_weights, doc_name, default=1.0)
total_weight += sample_weight
total_score += sample_weight * weighted_score
return total_score / total_weight if total_weight > 0 else None
# Columns in scores.csv that are sample metadata, not metric scores.
_META_COLUMNS = frozenset(
{
"sample_id",
"question",
"contexts",
"answer",
"ground_truth",
"scenario",
"language",
"retrieval_config",
"error",
"judge_model",
"embedding_model",
"run_id",
"difficulty",
"question_type",
"doc_id",
"doc_name",
"section_path",
"page_start",
"page_end",
"source_chunk_ids",
"review_status",
"review_notes",
"weighted_score",
"sample_weight",
}
)