Files
siemens_ragas/webapp/services/report_builder.py
wangwei e89695e490 Add RAGAS evaluation web console (FastAPI + vanilla JS)
- webapp/: FastAPI backend with runs/scenarios/evaluations API routers;
  services for run_reader, report_builder, scenario_scanner, task_manager
  (lazy ragas import — server boots even without ragas); Pydantic models
- webapp/static/: single-page console (layout A: left-nav + main area);
  report detail with metric cards, Chart.js distribution histogram,
  grouping table, lowest-score sample review; trigger evaluation + log polling
- webmain.py: uvicorn entry point (alongside existing main.py CLI)
- start.bat: Windows one-click launcher with env checks and auto-browser open
- rag_eval/datasets/: implement missing loader + normalizer modules
  (load_dataset_records, normalize_records) required by evaluator
- scripts/seed_sample_run.py: generate realistic demo run artifacts
- .gitignore: exclude datasets/ data files but keep rag_eval/datasets/ source

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
2026-06-15 15:53:57 +08:00

189 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Aggregate a run's per-sample scores into the report payload for the UI.
All aggregation reads only the standard scores.csv produced by the reporting
layer, plus the metric list resolved by run_reader. The output mirrors the
report detail page: metric means, per-metric distribution histograms, grouped
means by difficulty / question_type, and the lowest-scoring samples for review.
"""
from __future__ import annotations
import math
from pathlib import Path
import pandas as pd
from webapp.services.text_utils import parse_contexts
from webapp.models import (
DistributionBin,
GroupStat,
ReportData,
SampleScore,
)
from webapp.services import run_reader
# Number of equal-width buckets used for metric score histograms.
DISTRIBUTION_BIN_COUNT = 5
# Metadata columns that we group samples by when present in the data.
GROUPING_FIELDS = ("difficulty", "question_type", "language")
# How many lowest-scoring samples to surface for manual review.
LOWEST_SAMPLE_COUNT = 10
def _round_or_none(value: float | None) -> float | None:
"""Round a float to four places, mapping NaN/None to None for clean JSON."""
if value is None:
return None
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)):
return None
return round(float(value), 4)
def _metric_means(frame: pd.DataFrame, metrics: list[str]) -> dict[str, float | None]:
"""Compute the mean of each metric column across all scored samples."""
means: dict[str, float | None] = {}
for metric in metrics:
if metric in frame.columns:
means[metric] = _round_or_none(frame[metric].mean(numeric_only=True))
else:
means[metric] = None
return means
def _distribution(frame: pd.DataFrame, metric: str) -> list[DistributionBin]:
"""Bucket one metric's scores into fixed-width [0,1] histogram bins."""
bins: list[DistributionBin] = []
if metric not in frame.columns:
return bins
series = pd.to_numeric(frame[metric], errors="coerce").dropna()
width = 1.0 / DISTRIBUTION_BIN_COUNT
for index in range(DISTRIBUTION_BIN_COUNT):
lower = index * width
upper = (index + 1) * width
# Include the right edge in the final bin so 1.0 is counted.
if index == DISTRIBUTION_BIN_COUNT - 1:
mask = (series >= lower) & (series <= upper)
else:
mask = (series >= lower) & (series < upper)
bins.append(
DistributionBin(
label=f"{lower:.1f}{upper:.1f}",
lower=round(lower, 2),
upper=round(upper, 2),
count=int(mask.sum()),
)
)
return bins
def _groupings(frame: pd.DataFrame, metrics: list[str]) -> dict[str, list[GroupStat]]:
"""Compute per-group metric means for each available grouping field."""
groupings: dict[str, list[GroupStat]] = {}
for field in GROUPING_FIELDS:
if field not in frame.columns:
continue
# Skip fields that are entirely empty so the UI does not render noise.
non_empty = frame[field].astype(str).str.strip().replace("nan", "")
if non_empty.eq("").all():
continue
stats: list[GroupStat] = []
for key, group in frame.groupby(frame[field].astype(str)):
key_text = str(key).strip()
if not key_text or key_text == "nan":
continue
means = {
metric: _round_or_none(group[metric].mean(numeric_only=True))
for metric in metrics
if metric in group.columns
}
stats.append(GroupStat(key=key_text, count=int(len(group)), means=means))
if stats:
stats.sort(key=lambda item: item.key)
groupings[field] = stats
return groupings
def _sample_mean(row: pd.Series, metrics: list[str]) -> float | None:
"""Average a single sample's available metric scores for ranking."""
values = [
float(row[metric])
for metric in metrics
if metric in row and pd.notna(row[metric])
]
if not values:
return None
return sum(values) / len(values)
def _cell_text(row: pd.Series, column: str) -> str:
"""Safely read a string cell, returning '' for missing or NaN values."""
if column not in row or pd.isna(row[column]):
return ""
return str(row[column]).strip()
def _lowest_samples(frame: pd.DataFrame, metrics: list[str]) -> list[SampleScore]:
"""Select and shape the lowest-scoring samples for the review table."""
if frame.empty:
return []
enriched: list[tuple[float, SampleScore]] = []
for _, row in frame.iterrows():
mean_score = _sample_mean(row, metrics)
sample = SampleScore(
sample_id=_cell_text(row, "sample_id") or "",
question=_cell_text(row, "question"),
contexts=parse_contexts(row["contexts"]) if "contexts" in row else [],
answer=_cell_text(row, "answer"),
ground_truth=_cell_text(row, "ground_truth"),
language=_cell_text(row, "language"),
difficulty=_cell_text(row, "difficulty"),
question_type=_cell_text(row, "question_type"),
metrics={
metric: _round_or_none(float(row[metric]))
for metric in metrics
if metric in row and pd.notna(row[metric])
},
mean_score=_round_or_none(mean_score),
error=_cell_text(row, "error"),
)
# Samples without any score sort last (treated as worst for review).
sort_key = mean_score if mean_score is not None else -1.0
enriched.append((sort_key, sample))
enriched.sort(key=lambda item: item[0])
return [sample for _, sample in enriched[:LOWEST_SAMPLE_COUNT]]
def build_report(run_dir: Path, metrics: list[str]) -> ReportData:
"""Build the full aggregated report payload for one run directory."""
frame = run_reader.read_scores_frame(run_dir)
summary_markdown = run_reader.read_summary_markdown(run_dir)
if frame.empty or not metrics:
return ReportData(
metrics=metrics,
metric_means={metric: None for metric in metrics},
summary_markdown=summary_markdown,
)
distributions = {
metric: _distribution(frame, metric)
for metric in metrics
if metric in frame.columns
}
return ReportData(
metrics=metrics,
metric_means=_metric_means(frame, metrics),
distributions=distributions,
groupings=_groupings(frame, metrics),
lowest_samples=_lowest_samples(frame, metrics),
summary_markdown=summary_markdown,
)