Files
siemens_ragas/webapp/services/run_reader.py

234 lines
7.6 KiB
Python
Raw Normal View History

"""Read evaluation run artifacts from disk into API-friendly structures.
A "run" is any directory under the configured output roots that contains a
metadata.json file. This service stays decoupled from rag_eval internals: it
only reads the standard artifact files (metadata.json, scores.csv, summary.md,
scenario.snapshot.yaml) that the reporting layer writes.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pandas as pd
import yaml
from webapp.models import RunSummary
# Directory names that commonly hold run outputs, relative to the repo root.
DEFAULT_OUTPUT_ROOTS = ("outputs", "runs")
def _repo_root() -> Path:
"""Return the siemens_ragas repository root (parent of the webapp package)."""
return Path(__file__).resolve().parents[2]
def _candidate_roots(extra_roots: list[Path] | None = None) -> list[Path]:
"""Collect existing output directories that may contain run artifacts."""
root = _repo_root()
roots: list[Path] = []
for name in DEFAULT_OUTPUT_ROOTS:
candidate = root / name
if candidate.is_dir():
roots.append(candidate)
for extra in extra_roots or []:
if extra.is_dir():
roots.append(extra)
return roots
def _read_json(path: Path) -> dict[str, Any]:
"""Load a JSON file, returning an empty dict on any failure."""
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, ValueError):
return {}
def _read_metrics_from_snapshot(run_dir: Path) -> list[str]:
"""Read the configured metric list from a scenario snapshot if present."""
snapshot = run_dir / "scenario.snapshot.yaml"
if not snapshot.is_file():
return []
try:
payload = yaml.safe_load(snapshot.read_text(encoding="utf-8")) or {}
except (OSError, yaml.YAMLError):
return []
metrics = payload.get("metrics")
if isinstance(metrics, list):
return [str(item) for item in metrics]
return []
def discover_run_dirs(extra_roots: list[Path] | None = None) -> list[Path]:
"""Find every run directory (one that contains metadata.json) under the roots."""
run_dirs: list[Path] = []
seen: set[Path] = set()
for root in _candidate_roots(extra_roots):
for metadata_path in root.rglob("metadata.json"):
run_dir = metadata_path.parent
# A dataset-build metadata.json also exists; keep only evaluation runs
# by requiring a scores.csv alongside, or a recognizable run metadata.
metadata = _read_json(metadata_path)
if "scenario_name" not in metadata:
continue
if run_dir in seen:
continue
seen.add(run_dir)
run_dirs.append(run_dir)
return run_dirs
def _metric_means(run_dir: Path, metrics: list[str]) -> dict[str, float | None]:
"""Compute per-metric mean scores from a run's scores.csv."""
scores_path = run_dir / "scores.csv"
if not scores_path.is_file():
return {}
try:
frame = pd.read_csv(scores_path)
except (OSError, ValueError, pd.errors.ParserError):
return {}
means: dict[str, float | None] = {}
for metric in metrics:
if metric in frame.columns:
mean_value = frame[metric].mean(numeric_only=True)
means[metric] = None if pd.isna(mean_value) else round(float(mean_value), 4)
else:
means[metric] = None
return means
def build_run_summary(run_dir: Path) -> RunSummary | None:
"""Assemble a RunSummary from one run directory's artifacts."""
metadata = _read_json(run_dir / "metadata.json")
if "scenario_name" not in metadata:
return None
metrics = _read_metrics_from_snapshot(run_dir)
if not metrics:
# Fall back to numeric score columns inferred from the scores file.
metrics = _infer_metrics_from_scores(run_dir)
valid = int(metadata.get("valid_samples", 0) or 0)
invalid = int(metadata.get("invalid_samples", 0) or 0)
run_id = str(metadata.get("run_id") or run_dir.name)
return RunSummary(
run_id=run_id,
scenario_name=str(metadata.get("scenario_name", "")),
mode=str(metadata.get("mode", "")),
judge_model=str(metadata.get("judge_model", "")),
embedding_model=str(metadata.get("embedding_model", "")),
started_at=str(metadata.get("started_at", "")),
finished_at=str(metadata.get("finished_at", "")),
dataset=str(metadata.get("dataset", "")),
total_samples=valid + invalid,
valid_samples=valid,
invalid_samples=invalid,
metrics=metrics,
metric_means=_metric_means(run_dir, metrics),
output_path=run_dir.as_posix(),
)
# Columns in scores.csv that are sample fields rather than metric scores.
NON_METRIC_COLUMNS = {
"sample_id",
"question",
"contexts",
"answer",
"ground_truth",
"scenario",
"language",
"retrieval_config",
"error",
"judge_model",
"embedding_model",
"run_id",
"difficulty",
"question_type",
"doc_id",
"doc_name",
"section_path",
"page_start",
"page_end",
"source_chunk_ids",
"review_status",
"review_notes",
}
def _infer_metrics_from_scores(run_dir: Path) -> list[str]:
"""Infer metric column names from a scores.csv when no snapshot is available."""
scores_path = run_dir / "scores.csv"
if not scores_path.is_file():
return []
try:
frame = pd.read_csv(scores_path, nrows=1)
except (OSError, ValueError, pd.errors.ParserError):
return []
metrics: list[str] = []
for column in frame.columns:
if column in NON_METRIC_COLUMNS:
continue
if pd.api.types.is_numeric_dtype(frame[column]):
metrics.append(str(column))
return metrics
def list_run_summaries(extra_roots: list[Path] | None = None) -> list[RunSummary]:
"""Return all run summaries sorted by finish time (most recent first)."""
summaries: list[RunSummary] = []
for run_dir in discover_run_dirs(extra_roots):
summary = build_run_summary(run_dir)
if summary is not None:
summaries.append(summary)
summaries.sort(key=lambda item: item.finished_at or item.started_at, reverse=True)
return summaries
def find_run_dir(run_id: str, extra_roots: list[Path] | None = None) -> Path | None:
"""Locate the run directory whose metadata or folder name matches run_id."""
for run_dir in discover_run_dirs(extra_roots):
metadata = _read_json(run_dir / "metadata.json")
if str(metadata.get("run_id") or run_dir.name) == run_id:
return run_dir
return None
def read_scores_frame(run_dir: Path) -> pd.DataFrame:
"""Load a run's scores.csv into a dataframe, or an empty frame if missing."""
scores_path = run_dir / "scores.csv"
if not scores_path.is_file():
return pd.DataFrame()
try:
return pd.read_csv(scores_path)
except (OSError, ValueError, pd.errors.ParserError):
return pd.DataFrame()
def read_summary_markdown(run_dir: Path) -> str:
"""Return the human-readable summary.md for a run, or an empty string."""
summary_path = run_dir / "summary.md"
if not summary_path.is_file():
return ""
try:
return summary_path.read_text(encoding="utf-8")
except OSError:
return ""
def read_advice_markdown(run_dir: Path) -> str:
"""Return the optimization_advice.md for a run, or an empty string if not generated."""
advice_path = run_dir / "optimization_advice.md"
if not advice_path.is_file():
return ""
try:
return advice_path.read_text(encoding="utf-8")
except OSError:
return ""