257 lines
8.5 KiB
Python
257 lines
8.5 KiB
Python
"""Read evaluation run artifacts from disk into API-friendly structures.
|
|
|
|
A "run" is any directory under the configured output roots that contains a
|
|
metadata.json file. This service stays decoupled from rag_eval internals: it
|
|
only reads the standard artifact files (metadata.json, scores.csv, summary.md,
|
|
scenario.snapshot.yaml) that the reporting layer writes.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pandas as pd
|
|
import yaml
|
|
|
|
from webapp.models import RunSummary
|
|
|
|
|
|
# Directory names that commonly hold run outputs, relative to the repo root.
|
|
DEFAULT_OUTPUT_ROOTS = ("outputs", "runs")
|
|
|
|
|
|
def _repo_root() -> Path:
|
|
"""Return the siemens_ragas repository root (parent of the webapp package)."""
|
|
return Path(__file__).resolve().parents[2]
|
|
|
|
|
|
def _candidate_roots(extra_roots: list[Path] | None = None) -> list[Path]:
|
|
"""Collect existing output directories that may contain run artifacts."""
|
|
root = _repo_root()
|
|
roots: list[Path] = []
|
|
for name in DEFAULT_OUTPUT_ROOTS:
|
|
candidate = root / name
|
|
if candidate.is_dir():
|
|
roots.append(candidate)
|
|
for extra in extra_roots or []:
|
|
if extra.is_dir():
|
|
roots.append(extra)
|
|
return roots
|
|
|
|
|
|
def _read_json(path: Path) -> dict[str, Any]:
|
|
"""Load a JSON file, returning an empty dict on any failure."""
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, ValueError):
|
|
return {}
|
|
|
|
|
|
def _read_metrics_from_snapshot(run_dir: Path) -> list[str]:
|
|
"""Read the configured metric list from a scenario snapshot if present."""
|
|
snapshot = run_dir / "scenario.snapshot.yaml"
|
|
if not snapshot.is_file():
|
|
return []
|
|
try:
|
|
payload = yaml.safe_load(snapshot.read_text(encoding="utf-8")) or {}
|
|
except (OSError, yaml.YAMLError):
|
|
return []
|
|
metrics = payload.get("metrics")
|
|
if isinstance(metrics, list):
|
|
return [str(item) for item in metrics]
|
|
return []
|
|
|
|
|
|
def _read_weights_from_snapshot(run_dir: Path) -> tuple[dict[str, float], dict[str, float]]:
|
|
"""Read metric_weights and doc_weights from a scenario snapshot if present.
|
|
|
|
Returns a (metric_weights, doc_weights) tuple of plain dicts.
|
|
Both default to empty dicts when the snapshot is absent or lacks the fields.
|
|
"""
|
|
snapshot = run_dir / "scenario.snapshot.yaml"
|
|
if not snapshot.is_file():
|
|
return {}, {}
|
|
try:
|
|
payload = yaml.safe_load(snapshot.read_text(encoding="utf-8")) or {}
|
|
except (OSError, yaml.YAMLError):
|
|
return {}, {}
|
|
mw = payload.get("metric_weights") or {}
|
|
dw = payload.get("doc_weights") or {}
|
|
return (
|
|
{str(k): float(v) for k, v in mw.items() if isinstance(v, (int, float))},
|
|
{str(k): float(v) for k, v in dw.items() if isinstance(v, (int, float))},
|
|
)
|
|
|
|
|
|
def discover_run_dirs(extra_roots: list[Path] | None = None) -> list[Path]:
|
|
"""Find every run directory (one that contains metadata.json) under the roots."""
|
|
run_dirs: list[Path] = []
|
|
seen: set[Path] = set()
|
|
for root in _candidate_roots(extra_roots):
|
|
for metadata_path in root.rglob("metadata.json"):
|
|
run_dir = metadata_path.parent
|
|
# A dataset-build metadata.json also exists; keep only evaluation runs
|
|
# by requiring a scores.csv alongside, or a recognizable run metadata.
|
|
metadata = _read_json(metadata_path)
|
|
if "scenario_name" not in metadata:
|
|
continue
|
|
if run_dir in seen:
|
|
continue
|
|
seen.add(run_dir)
|
|
run_dirs.append(run_dir)
|
|
return run_dirs
|
|
|
|
|
|
def _metric_means(run_dir: Path, metrics: list[str]) -> dict[str, float | None]:
|
|
"""Compute per-metric mean scores from a run's scores.csv."""
|
|
scores_path = run_dir / "scores.csv"
|
|
if not scores_path.is_file():
|
|
return {}
|
|
try:
|
|
frame = pd.read_csv(scores_path)
|
|
except (OSError, ValueError, pd.errors.ParserError):
|
|
return {}
|
|
means: dict[str, float | None] = {}
|
|
for metric in metrics:
|
|
if metric in frame.columns:
|
|
mean_value = frame[metric].mean(numeric_only=True)
|
|
means[metric] = None if pd.isna(mean_value) else round(float(mean_value), 4)
|
|
else:
|
|
means[metric] = None
|
|
return means
|
|
|
|
|
|
def build_run_summary(run_dir: Path) -> RunSummary | None:
|
|
"""Assemble a RunSummary from one run directory's artifacts."""
|
|
metadata = _read_json(run_dir / "metadata.json")
|
|
if "scenario_name" not in metadata:
|
|
return None
|
|
|
|
metrics = _read_metrics_from_snapshot(run_dir)
|
|
if not metrics:
|
|
# Fall back to numeric score columns inferred from the scores file.
|
|
metrics = _infer_metrics_from_scores(run_dir)
|
|
|
|
valid = int(metadata.get("valid_samples", 0) or 0)
|
|
invalid = int(metadata.get("invalid_samples", 0) or 0)
|
|
run_id = str(metadata.get("run_id") or run_dir.name)
|
|
|
|
return RunSummary(
|
|
run_id=run_id,
|
|
scenario_name=str(metadata.get("scenario_name", "")),
|
|
mode=str(metadata.get("mode", "")),
|
|
judge_model=str(metadata.get("judge_model", "")),
|
|
embedding_model=str(metadata.get("embedding_model", "")),
|
|
started_at=str(metadata.get("started_at", "")),
|
|
finished_at=str(metadata.get("finished_at", "")),
|
|
dataset=str(metadata.get("dataset", "")),
|
|
total_samples=valid + invalid,
|
|
valid_samples=valid,
|
|
invalid_samples=invalid,
|
|
metrics=metrics,
|
|
metric_means=_metric_means(run_dir, metrics),
|
|
output_path=run_dir.as_posix(),
|
|
)
|
|
|
|
|
|
# Columns in scores.csv that are sample fields rather than metric scores.
|
|
NON_METRIC_COLUMNS = {
|
|
"sample_id",
|
|
"question",
|
|
"contexts",
|
|
"answer",
|
|
"ground_truth",
|
|
"scenario",
|
|
"language",
|
|
"retrieval_config",
|
|
"error",
|
|
"judge_model",
|
|
"embedding_model",
|
|
"run_id",
|
|
"difficulty",
|
|
"question_type",
|
|
"doc_id",
|
|
"doc_name",
|
|
"section_path",
|
|
"page_start",
|
|
"page_end",
|
|
"source_chunk_ids",
|
|
"review_status",
|
|
"review_notes",
|
|
"weighted_score",
|
|
"sample_weight",
|
|
}
|
|
|
|
|
|
def _infer_metrics_from_scores(run_dir: Path) -> list[str]:
|
|
"""Infer metric column names from a scores.csv when no snapshot is available."""
|
|
scores_path = run_dir / "scores.csv"
|
|
if not scores_path.is_file():
|
|
return []
|
|
try:
|
|
frame = pd.read_csv(scores_path, nrows=1)
|
|
except (OSError, ValueError, pd.errors.ParserError):
|
|
return []
|
|
metrics: list[str] = []
|
|
for column in frame.columns:
|
|
if column in NON_METRIC_COLUMNS:
|
|
continue
|
|
if pd.api.types.is_numeric_dtype(frame[column]):
|
|
metrics.append(str(column))
|
|
return metrics
|
|
|
|
|
|
def list_run_summaries(extra_roots: list[Path] | None = None) -> list[RunSummary]:
|
|
"""Return all run summaries sorted by finish time (most recent first)."""
|
|
summaries: list[RunSummary] = []
|
|
for run_dir in discover_run_dirs(extra_roots):
|
|
summary = build_run_summary(run_dir)
|
|
if summary is not None:
|
|
summaries.append(summary)
|
|
summaries.sort(key=lambda item: item.finished_at or item.started_at, reverse=True)
|
|
return summaries
|
|
|
|
|
|
def find_run_dir(run_id: str, extra_roots: list[Path] | None = None) -> Path | None:
|
|
"""Locate the run directory whose metadata or folder name matches run_id."""
|
|
for run_dir in discover_run_dirs(extra_roots):
|
|
metadata = _read_json(run_dir / "metadata.json")
|
|
if str(metadata.get("run_id") or run_dir.name) == run_id:
|
|
return run_dir
|
|
return None
|
|
|
|
|
|
def read_scores_frame(run_dir: Path) -> pd.DataFrame:
|
|
"""Load a run's scores.csv into a dataframe, or an empty frame if missing."""
|
|
scores_path = run_dir / "scores.csv"
|
|
if not scores_path.is_file():
|
|
return pd.DataFrame()
|
|
try:
|
|
return pd.read_csv(scores_path)
|
|
except (OSError, ValueError, pd.errors.ParserError):
|
|
return pd.DataFrame()
|
|
|
|
|
|
def read_summary_markdown(run_dir: Path) -> str:
|
|
"""Return the human-readable summary.md for a run, or an empty string."""
|
|
summary_path = run_dir / "summary.md"
|
|
if not summary_path.is_file():
|
|
return ""
|
|
try:
|
|
return summary_path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def read_advice_markdown(run_dir: Path) -> str:
|
|
"""Return the optimization_advice.md for a run, or an empty string if not generated."""
|
|
advice_path = run_dir / "optimization_advice.md"
|
|
if not advice_path.is_file():
|
|
return ""
|
|
try:
|
|
return advice_path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return ""
|