"""Core evaluation workflow for offline and online scenarios.""" from __future__ import annotations import asyncio import logging import time from typing import Any from rag_eval.adapters.base import AppAdapter from rag_eval.datasets.loader import load_dataset_records from rag_eval.datasets.normalizers import normalize_records from rag_eval.execution.concurrency import gather_with_limit from rag_eval.metrics.pipeline import MetricPipeline from rag_eval.shared.models import EvaluationResult, InvalidSample, NormalizedSample, Scenario from rag_eval.shared.utils import utc_now_iso logger = logging.getLogger("rag_eval.execution.evaluator") class Evaluator: """Coordinate dataset loading, optional app execution, and metric scoring.""" def __init__( self, scenario: Scenario, metric_pipeline: MetricPipeline, app_adapter: AppAdapter | None = None, ): """Create an evaluator for one resolved scenario.""" self.scenario = scenario self.metric_pipeline = metric_pipeline self.app_adapter = app_adapter def evaluate(self) -> EvaluationResult: """Execute the full evaluation flow and return the collected results.""" started_at = utc_now_iso() scenario_name = self.scenario.scenario_name mode = self.scenario.mode logger.info("=" * 60) logger.info("[eval] START scenario=%s mode=%s", scenario_name, mode) logger.info("[eval] dataset=%s", self.scenario.dataset.path) logger.info("[eval] metrics=%s", list(self.scenario.metrics)) logger.info("[eval] judge=%s embed=%s", self.scenario.judge_model, self.scenario.embedding_model) raw_records = load_dataset_records(self.scenario.dataset.path) logger.info("[eval] raw_records=%d", len(raw_records)) samples, invalid_samples = normalize_records( raw_records, mode=self.scenario.mode, max_samples=self.scenario.runtime.max_samples, ) logger.info("[eval] normalized: valid=%d invalid=%d", len(samples), len(invalid_samples)) if self.scenario.mode == "online": logger.info("[eval] online mode: calling app adapter for %d samples ...", len(samples)) t0 = time.monotonic() samples, online_invalids = asyncio.run(self._enrich_online_samples(samples)) elapsed = time.monotonic() - t0 invalid_samples.extend(online_invalids) logger.info( "[eval] adapter done: enriched=%d adapter_invalids=%d elapsed=%.1fs", len(samples), len(online_invalids), elapsed, ) logger.info("[eval] scoring %d samples with metric pipeline ...", len(samples)) t0 = time.monotonic() metric_scores = asyncio.run( self.metric_pipeline.score_samples( samples, max_concurrency=self.scenario.runtime.metric_limit(), ) ) elapsed = time.monotonic() - t0 logger.info("[eval] metric scoring done elapsed=%.1fs", elapsed) finished_at = utc_now_iso() score_rows = [self._merge_score(sample, score) for sample, score in zip(samples, metric_scores)] # Summary of NaN rates per metric import math for metric_name in self.scenario.metrics: nan_count = sum(1 for row in score_rows if math.isnan(float(row.get(metric_name, float("nan")) or float("nan")))) logger.info("[eval] %-22s NaN=%d/%d (%.0f%%)", metric_name, nan_count, len(score_rows), 100 * nan_count / len(score_rows) if score_rows else 0) run_id = finished_at.replace(":", "-") logger.info("[eval] DONE run_id=%s total_valid=%d total_invalid=%d", run_id, len(samples), len(invalid_samples)) logger.info("=" * 60) return EvaluationResult( scenario=self.scenario, run_id=run_id, started_at=started_at, finished_at=finished_at, valid_samples=samples, invalid_samples=invalid_samples, score_rows=score_rows, ) async def _enrich_online_samples( self, samples: list[NormalizedSample], ) -> tuple[list[NormalizedSample], list[InvalidSample]]: """Populate answers and contexts by calling the configured application adapter.""" if self.app_adapter is None: raise ValueError("online mode requires an app adapter.") valid: list[NormalizedSample] = [] invalid: list[InvalidSample] = [] total = len(samples) async def enrich_with_capture(idx: int, sample: NormalizedSample) -> NormalizedSample | InvalidSample: """Convert adapter exceptions into invalid samples instead of aborting the run.""" sid = sample.sample_id[:12] logger.debug("[adapter] [%d/%d] calling adapter sample=%s question=%r", idx + 1, total, sid, (sample.question or "")[:60]) t0 = time.monotonic() try: result = await self.app_adapter.enrich_sample(sample) elapsed = time.monotonic() - t0 ans_len = len(result.answer or "") ctx_count = len(result.contexts or []) logger.info("[adapter] [%d/%d] OK sample=%-12s ans_len=%d ctx_count=%d elapsed=%.1fs", idx + 1, total, sid, ans_len, ctx_count, elapsed) return result except Exception as exc: elapsed = time.monotonic() - t0 error_type = type(exc).__name__ logger.warning("[adapter] [%d/%d] FAIL sample=%-12s %s: %s (elapsed=%.1fs)", idx + 1, total, sid, error_type, exc, elapsed) return InvalidSample( sample_id=sample.sample_id, error=f"adapter failed [{error_type}]: {exc}", raw=sample.raw, ) factories = [ (lambda _idx=i, _sample=sample: enrich_with_capture(_idx, _sample)) for i, sample in enumerate(samples) ] results = await gather_with_limit(factories, self.scenario.runtime.app_limit()) for sample in results: if isinstance(sample, InvalidSample): invalid.append(sample) continue # Treat incomplete adapter payloads as invalid so reporting stays explicit. errors: list[str] = [] if not sample.answer: errors.append("adapter returned empty answer") if not sample.contexts: errors.append("adapter returned empty contexts") if errors: logger.warning("[adapter] incomplete payload sample=%s errors=%s", sample.sample_id[:12], errors) invalid.append( InvalidSample( sample_id=sample.sample_id, error="; ".join(errors), raw=sample.raw, ) ) continue valid.append(sample) logger.info("[adapter] enrichment summary: valid=%d invalid=%d of total=%d", len(valid), len(invalid), total) return valid, invalid def _merge_score(self, sample: NormalizedSample, score: Any) -> dict[str, Any]: """Combine sample data, metric results, and run metadata into one output row.""" record = sample.to_record() record["contexts"] = sample.contexts record.update(score.metrics) record["error"] = score.error record["judge_model"] = self.scenario.judge_model record["embedding_model"] = self.scenario.embedding_model record["run_id"] = self.scenario.scenario_name return record