Files
siemens_ragas/webapp/services/report_builder.py

192 lines
6.7 KiB
Python
Raw Normal View History

"""Aggregate a run's per-sample scores into the report payload for the UI.
All aggregation reads only the standard scores.csv produced by the reporting
layer, plus the metric list resolved by run_reader. The output mirrors the
report detail page: metric means, per-metric distribution histograms, grouped
means by difficulty / question_type, and the lowest-scoring samples for review.
"""
from __future__ import annotations
import math
from pathlib import Path
import pandas as pd
from webapp.services.text_utils import parse_contexts
from webapp.models import (
DistributionBin,
GroupStat,
ReportData,
SampleScore,
)
from webapp.services import run_reader
# Number of equal-width buckets used for metric score histograms.
DISTRIBUTION_BIN_COUNT = 5
# Metadata columns that we group samples by when present in the data.
GROUPING_FIELDS = ("difficulty", "question_type", "language")
# How many lowest-scoring samples to surface for manual review.
LOWEST_SAMPLE_COUNT = 10
def _round_or_none(value: float | None) -> float | None:
"""Round a float to four places, mapping NaN/None to None for clean JSON."""
if value is None:
return None
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)):
return None
return round(float(value), 4)
def _metric_means(frame: pd.DataFrame, metrics: list[str]) -> dict[str, float | None]:
"""Compute the mean of each metric column across all scored samples."""
means: dict[str, float | None] = {}
for metric in metrics:
if metric in frame.columns:
means[metric] = _round_or_none(frame[metric].mean(numeric_only=True))
else:
means[metric] = None
return means
def _distribution(frame: pd.DataFrame, metric: str) -> list[DistributionBin]:
"""Bucket one metric's scores into fixed-width [0,1] histogram bins."""
bins: list[DistributionBin] = []
if metric not in frame.columns:
return bins
series = pd.to_numeric(frame[metric], errors="coerce").dropna()
width = 1.0 / DISTRIBUTION_BIN_COUNT
for index in range(DISTRIBUTION_BIN_COUNT):
lower = index * width
upper = (index + 1) * width
# Include the right edge in the final bin so 1.0 is counted.
if index == DISTRIBUTION_BIN_COUNT - 1:
mask = (series >= lower) & (series <= upper)
else:
mask = (series >= lower) & (series < upper)
bins.append(
DistributionBin(
label=f"{lower:.1f}{upper:.1f}",
lower=round(lower, 2),
upper=round(upper, 2),
count=int(mask.sum()),
)
)
return bins
def _groupings(frame: pd.DataFrame, metrics: list[str]) -> dict[str, list[GroupStat]]:
"""Compute per-group metric means for each available grouping field."""
groupings: dict[str, list[GroupStat]] = {}
for field in GROUPING_FIELDS:
if field not in frame.columns:
continue
# Skip fields that are entirely empty so the UI does not render noise.
non_empty = frame[field].astype(str).str.strip().replace("nan", "")
if non_empty.eq("").all():
continue
stats: list[GroupStat] = []
for key, group in frame.groupby(frame[field].astype(str)):
key_text = str(key).strip()
if not key_text or key_text == "nan":
continue
means = {
metric: _round_or_none(group[metric].mean(numeric_only=True))
for metric in metrics
if metric in group.columns
}
stats.append(GroupStat(key=key_text, count=int(len(group)), means=means))
if stats:
stats.sort(key=lambda item: item.key)
groupings[field] = stats
return groupings
def _sample_mean(row: pd.Series, metrics: list[str]) -> float | None:
"""Average a single sample's available metric scores for ranking."""
values = [
float(row[metric])
for metric in metrics
if metric in row and pd.notna(row[metric])
]
if not values:
return None
return sum(values) / len(values)
def _cell_text(row: pd.Series, column: str) -> str:
"""Safely read a string cell, returning '' for missing or NaN values."""
if column not in row or pd.isna(row[column]):
return ""
return str(row[column]).strip()
def _lowest_samples(frame: pd.DataFrame, metrics: list[str]) -> list[SampleScore]:
"""Select and shape the lowest-scoring samples for the review table."""
if frame.empty:
return []
enriched: list[tuple[float, SampleScore]] = []
for _, row in frame.iterrows():
mean_score = _sample_mean(row, metrics)
sample = SampleScore(
sample_id=_cell_text(row, "sample_id") or "",
question=_cell_text(row, "question"),
contexts=parse_contexts(row["contexts"]) if "contexts" in row else [],
answer=_cell_text(row, "answer"),
ground_truth=_cell_text(row, "ground_truth"),
language=_cell_text(row, "language"),
difficulty=_cell_text(row, "difficulty"),
question_type=_cell_text(row, "question_type"),
metrics={
metric: _round_or_none(float(row[metric]))
for metric in metrics
if metric in row and pd.notna(row[metric])
},
mean_score=_round_or_none(mean_score),
error=_cell_text(row, "error"),
)
# Samples without any score sort last (treated as worst for review).
sort_key = mean_score if mean_score is not None else -1.0
enriched.append((sort_key, sample))
enriched.sort(key=lambda item: item[0])
return [sample for _, sample in enriched[:LOWEST_SAMPLE_COUNT]]
def build_report(run_dir: Path, metrics: list[str]) -> ReportData:
"""Build the full aggregated report payload for one run directory."""
frame = run_reader.read_scores_frame(run_dir)
summary_markdown = run_reader.read_summary_markdown(run_dir)
advice_markdown = run_reader.read_advice_markdown(run_dir)
if frame.empty or not metrics:
return ReportData(
metrics=metrics,
metric_means={metric: None for metric in metrics},
summary_markdown=summary_markdown,
advice_markdown=advice_markdown,
)
distributions = {
metric: _distribution(frame, metric)
for metric in metrics
if metric in frame.columns
}
return ReportData(
metrics=metrics,
metric_means=_metric_means(frame, metrics),
distributions=distributions,
groupings=_groupings(frame, metrics),
lowest_samples=_lowest_samples(frame, metrics),
summary_markdown=summary_markdown,
advice_markdown=advice_markdown,
)