"""Read evaluation run artifacts from disk into API-friendly structures. A "run" is any directory under the configured output roots that contains a metadata.json file. This service stays decoupled from rag_eval internals: it only reads the standard artifact files (metadata.json, scores.csv, summary.md, scenario.snapshot.yaml) that the reporting layer writes. """ from __future__ import annotations import json from pathlib import Path from typing import Any import pandas as pd import yaml from webapp.models import RunSummary # Directory names that commonly hold run outputs, relative to the repo root. DEFAULT_OUTPUT_ROOTS = ("outputs", "runs") def _repo_root() -> Path: """Return the siemens_ragas repository root (parent of the webapp package).""" return Path(__file__).resolve().parents[2] def _candidate_roots(extra_roots: list[Path] | None = None) -> list[Path]: """Collect existing output directories that may contain run artifacts.""" root = _repo_root() roots: list[Path] = [] for name in DEFAULT_OUTPUT_ROOTS: candidate = root / name if candidate.is_dir(): roots.append(candidate) for extra in extra_roots or []: if extra.is_dir(): roots.append(extra) return roots def _read_json(path: Path) -> dict[str, Any]: """Load a JSON file, returning an empty dict on any failure.""" try: return json.loads(path.read_text(encoding="utf-8")) except (OSError, ValueError): return {} def _read_metrics_from_snapshot(run_dir: Path) -> list[str]: """Read the configured metric list from a scenario snapshot if present.""" snapshot = run_dir / "scenario.snapshot.yaml" if not snapshot.is_file(): return [] try: payload = yaml.safe_load(snapshot.read_text(encoding="utf-8")) or {} except (OSError, yaml.YAMLError): return [] metrics = payload.get("metrics") if isinstance(metrics, list): return [str(item) for item in metrics] return [] def discover_run_dirs(extra_roots: list[Path] | None = None) -> list[Path]: """Find every run directory (one that contains metadata.json) under the roots.""" run_dirs: list[Path] = [] seen: set[Path] = set() for root in _candidate_roots(extra_roots): for metadata_path in root.rglob("metadata.json"): run_dir = metadata_path.parent # A dataset-build metadata.json also exists; keep only evaluation runs # by requiring a scores.csv alongside, or a recognizable run metadata. metadata = _read_json(metadata_path) if "scenario_name" not in metadata: continue if run_dir in seen: continue seen.add(run_dir) run_dirs.append(run_dir) return run_dirs def _metric_means(run_dir: Path, metrics: list[str]) -> dict[str, float | None]: """Compute per-metric mean scores from a run's scores.csv.""" scores_path = run_dir / "scores.csv" if not scores_path.is_file(): return {} try: frame = pd.read_csv(scores_path) except (OSError, ValueError, pd.errors.ParserError): return {} means: dict[str, float | None] = {} for metric in metrics: if metric in frame.columns: mean_value = frame[metric].mean(numeric_only=True) means[metric] = None if pd.isna(mean_value) else round(float(mean_value), 4) else: means[metric] = None return means def build_run_summary(run_dir: Path) -> RunSummary | None: """Assemble a RunSummary from one run directory's artifacts.""" metadata = _read_json(run_dir / "metadata.json") if "scenario_name" not in metadata: return None metrics = _read_metrics_from_snapshot(run_dir) if not metrics: # Fall back to numeric score columns inferred from the scores file. metrics = _infer_metrics_from_scores(run_dir) valid = int(metadata.get("valid_samples", 0) or 0) invalid = int(metadata.get("invalid_samples", 0) or 0) run_id = str(metadata.get("run_id") or run_dir.name) return RunSummary( run_id=run_id, scenario_name=str(metadata.get("scenario_name", "")), mode=str(metadata.get("mode", "")), judge_model=str(metadata.get("judge_model", "")), embedding_model=str(metadata.get("embedding_model", "")), started_at=str(metadata.get("started_at", "")), finished_at=str(metadata.get("finished_at", "")), dataset=str(metadata.get("dataset", "")), total_samples=valid + invalid, valid_samples=valid, invalid_samples=invalid, metrics=metrics, metric_means=_metric_means(run_dir, metrics), output_path=run_dir.as_posix(), ) # Columns in scores.csv that are sample fields rather than metric scores. NON_METRIC_COLUMNS = { "sample_id", "question", "contexts", "answer", "ground_truth", "scenario", "language", "retrieval_config", "error", "judge_model", "embedding_model", "run_id", "difficulty", "question_type", "doc_id", "doc_name", "section_path", "page_start", "page_end", "source_chunk_ids", "review_status", "review_notes", } def _infer_metrics_from_scores(run_dir: Path) -> list[str]: """Infer metric column names from a scores.csv when no snapshot is available.""" scores_path = run_dir / "scores.csv" if not scores_path.is_file(): return [] try: frame = pd.read_csv(scores_path, nrows=1) except (OSError, ValueError, pd.errors.ParserError): return [] metrics: list[str] = [] for column in frame.columns: if column in NON_METRIC_COLUMNS: continue if pd.api.types.is_numeric_dtype(frame[column]): metrics.append(str(column)) return metrics def list_run_summaries(extra_roots: list[Path] | None = None) -> list[RunSummary]: """Return all run summaries sorted by finish time (most recent first).""" summaries: list[RunSummary] = [] for run_dir in discover_run_dirs(extra_roots): summary = build_run_summary(run_dir) if summary is not None: summaries.append(summary) summaries.sort(key=lambda item: item.finished_at or item.started_at, reverse=True) return summaries def find_run_dir(run_id: str, extra_roots: list[Path] | None = None) -> Path | None: """Locate the run directory whose metadata or folder name matches run_id.""" for run_dir in discover_run_dirs(extra_roots): metadata = _read_json(run_dir / "metadata.json") if str(metadata.get("run_id") or run_dir.name) == run_id: return run_dir return None def read_scores_frame(run_dir: Path) -> pd.DataFrame: """Load a run's scores.csv into a dataframe, or an empty frame if missing.""" scores_path = run_dir / "scores.csv" if not scores_path.is_file(): return pd.DataFrame() try: return pd.read_csv(scores_path) except (OSError, ValueError, pd.errors.ParserError): return pd.DataFrame() def read_summary_markdown(run_dir: Path) -> str: """Return the human-readable summary.md for a run, or an empty string.""" summary_path = run_dir / "summary.md" if not summary_path.is_file(): return "" try: return summary_path.read_text(encoding="utf-8") except OSError: return "" def read_advice_markdown(run_dir: Path) -> str: """Return the optimization_advice.md for a run, or an empty string if not generated.""" advice_path = run_dir / "optimization_advice.md" if not advice_path.is_file(): return "" try: return advice_path.read_text(encoding="utf-8") except OSError: return ""