diff --git a/main.py b/main.py index 70f6839..b770d28 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,8 @@ from __future__ import annotations import argparse +import logging +from pathlib import Path from rag_eval.dataset_builder.runner import run_dataset_build from rag_eval.execution.runner import run_scenario @@ -18,18 +20,33 @@ def parse_args() -> argparse.Namespace: "--dataset-build-config", help="Path to a YAML dataset build config file.", ) + parser.add_argument( + "--log-file", + default=None, + help="Write evaluation logs to this file (in addition to stderr). " + "Example: logs/eval.log", + ) + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="Logging verbosity level (default: INFO). Use DEBUG for per-metric detail.", + ) return parser.parse_args() def main() -> None: """Dispatch the CLI call to the requested workflow.""" args = parse_args() + log_level = getattr(logging, args.log_level.upper(), logging.INFO) + log_file = Path(args.log_file) if args.log_file else None + if args.dataset_build_config: result = run_dataset_build(args.dataset_build_config) print(f"Completed dataset build: {result.artifact_paths.root_dir}") return - result = run_scenario(args.scenario) + result = run_scenario(args.scenario, log_file=log_file, log_level=log_level) print(f"Completed run: {result.scenario.output_dir}") diff --git a/rag_eval/execution/evaluator.py b/rag_eval/execution/evaluator.py index 05a80c3..a69454f 100644 --- a/rag_eval/execution/evaluator.py +++ b/rag_eval/execution/evaluator.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio +import logging +import time from typing import Any from rag_eval.adapters.base import AppAdapter @@ -13,6 +15,8 @@ from rag_eval.metrics.pipeline import MetricPipeline from rag_eval.shared.models import EvaluationResult, InvalidSample, NormalizedSample, Scenario from rag_eval.shared.utils import utc_now_iso +logger = logging.getLogger("rag_eval.execution.evaluator") + class Evaluator: """Coordinate dataset loading, optional app execution, and metric scoring.""" @@ -31,27 +35,61 @@ class Evaluator: def evaluate(self) -> EvaluationResult: """Execute the full evaluation flow and return the collected results.""" started_at = utc_now_iso() + scenario_name = self.scenario.scenario_name + mode = self.scenario.mode + logger.info("=" * 60) + logger.info("[eval] START scenario=%s mode=%s", scenario_name, mode) + logger.info("[eval] dataset=%s", self.scenario.dataset.path) + logger.info("[eval] metrics=%s", list(self.scenario.metrics)) + logger.info("[eval] judge=%s embed=%s", self.scenario.judge_model, self.scenario.embedding_model) + raw_records = load_dataset_records(self.scenario.dataset.path) + logger.info("[eval] raw_records=%d", len(raw_records)) + samples, invalid_samples = normalize_records( raw_records, mode=self.scenario.mode, max_samples=self.scenario.runtime.max_samples, ) + logger.info("[eval] normalized: valid=%d invalid=%d", len(samples), len(invalid_samples)) if self.scenario.mode == "online": - # Online mode enriches each sample by calling the target application first. + logger.info("[eval] online mode: calling app adapter for %d samples ...", len(samples)) + t0 = time.monotonic() samples, online_invalids = asyncio.run(self._enrich_online_samples(samples)) + elapsed = time.monotonic() - t0 invalid_samples.extend(online_invalids) + logger.info( + "[eval] adapter done: enriched=%d adapter_invalids=%d elapsed=%.1fs", + len(samples), len(online_invalids), elapsed, + ) + logger.info("[eval] scoring %d samples with metric pipeline ...", len(samples)) + t0 = time.monotonic() metric_scores = asyncio.run( self.metric_pipeline.score_samples( samples, max_concurrency=self.scenario.runtime.metric_limit(), ) ) + elapsed = time.monotonic() - t0 + logger.info("[eval] metric scoring done elapsed=%.1fs", elapsed) + finished_at = utc_now_iso() score_rows = [self._merge_score(sample, score) for sample, score in zip(samples, metric_scores)] + + # Summary of NaN rates per metric + import math + for metric_name in self.scenario.metrics: + nan_count = sum(1 for row in score_rows if math.isnan(float(row.get(metric_name, float("nan")) or float("nan")))) + logger.info("[eval] %-22s NaN=%d/%d (%.0f%%)", + metric_name, nan_count, len(score_rows), + 100 * nan_count / len(score_rows) if score_rows else 0) + run_id = finished_at.replace(":", "-") + logger.info("[eval] DONE run_id=%s total_valid=%d total_invalid=%d", + run_id, len(samples), len(invalid_samples)) + logger.info("=" * 60) return EvaluationResult( scenario=self.scenario, run_id=run_id, @@ -72,13 +110,27 @@ class Evaluator: valid: list[NormalizedSample] = [] invalid: list[InvalidSample] = [] + total = len(samples) - async def enrich_with_capture(sample: NormalizedSample) -> NormalizedSample | InvalidSample: + async def enrich_with_capture(idx: int, sample: NormalizedSample) -> NormalizedSample | InvalidSample: """Convert adapter exceptions into invalid samples instead of aborting the run.""" + sid = sample.sample_id[:12] + logger.debug("[adapter] [%d/%d] calling adapter sample=%s question=%r", + idx + 1, total, sid, (sample.question or "")[:60]) + t0 = time.monotonic() try: - return await self.app_adapter.enrich_sample(sample) + result = await self.app_adapter.enrich_sample(sample) + elapsed = time.monotonic() - t0 + ans_len = len(result.answer or "") + ctx_count = len(result.contexts or []) + logger.info("[adapter] [%d/%d] OK sample=%-12s ans_len=%d ctx_count=%d elapsed=%.1fs", + idx + 1, total, sid, ans_len, ctx_count, elapsed) + return result except Exception as exc: + elapsed = time.monotonic() - t0 error_type = type(exc).__name__ + logger.warning("[adapter] [%d/%d] FAIL sample=%-12s %s: %s (elapsed=%.1fs)", + idx + 1, total, sid, error_type, exc, elapsed) return InvalidSample( sample_id=sample.sample_id, error=f"adapter failed [{error_type}]: {exc}", @@ -86,8 +138,8 @@ class Evaluator: ) factories = [ - (lambda sample=sample: enrich_with_capture(sample)) - for sample in samples + (lambda _idx=i, _sample=sample: enrich_with_capture(_idx, _sample)) + for i, sample in enumerate(samples) ] results = await gather_with_limit(factories, self.scenario.runtime.app_limit()) @@ -102,6 +154,8 @@ class Evaluator: if not sample.contexts: errors.append("adapter returned empty contexts") if errors: + logger.warning("[adapter] incomplete payload sample=%s errors=%s", + sample.sample_id[:12], errors) invalid.append( InvalidSample( sample_id=sample.sample_id, @@ -111,6 +165,9 @@ class Evaluator: ) continue valid.append(sample) + + logger.info("[adapter] enrichment summary: valid=%d invalid=%d of total=%d", + len(valid), len(invalid), total) return valid, invalid def _merge_score(self, sample: NormalizedSample, score: Any) -> dict[str, Any]: diff --git a/rag_eval/execution/runner.py b/rag_eval/execution/runner.py index 7042aab..46a1824 100644 --- a/rag_eval/execution/runner.py +++ b/rag_eval/execution/runner.py @@ -2,6 +2,10 @@ from __future__ import annotations +import logging +import sys +from pathlib import Path + from rag_eval.adapters.http import HttpAppAdapter from rag_eval.adapters.python import PythonFunctionAdapter from rag_eval.config.loader import load_scenario @@ -12,6 +16,27 @@ from rag_eval.shared.models import Scenario from .evaluator import Evaluator +logger = logging.getLogger("rag_eval.execution.runner") + + +def _setup_logging(log_file: Path | None = None, level: int = logging.INFO) -> None: + """Configure root logger: always write to stderr, optionally also to a file.""" + fmt = "%(asctime)s %(levelname)-8s %(name)s %(message)s" + datefmt = "%H:%M:%S" + + handlers: list[logging.Handler] = [logging.StreamHandler(sys.stderr)] + if log_file is not None: + log_file.parent.mkdir(parents=True, exist_ok=True) + fh = logging.FileHandler(log_file, encoding="utf-8") + fh.setFormatter(logging.Formatter(fmt, datefmt=datefmt)) + handlers.append(fh) + + logging.basicConfig(level=level, format=fmt, datefmt=datefmt, handlers=handlers, force=True) + # Also show ragas internal logs at WARNING so we can see LLM errors + logging.getLogger("ragas").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("openai").setLevel(logging.WARNING) + def build_adapter(scenario: Scenario): """Instantiate the adapter required by the resolved scenario, if any.""" @@ -27,16 +52,25 @@ def build_adapter(scenario: Scenario): def run_scenario( scenario_path: str, settings: EvaluationSettings | None = None, + log_file: Path | None = None, + log_level: int = logging.INFO, ): """Run one scenario end to end and persist its reporting artifacts.""" + _setup_logging(log_file=log_file, level=log_level) + logger.info("[runner] run_scenario path=%s", scenario_path) + settings = settings or EvaluationSettings() if not settings.openai_api_key: raise EnvironmentError("OPENAI_API_KEY must be set before running the evaluator.") scenario = load_scenario(scenario_path) + logger.info("[runner] scenario loaded: name=%s mode=%s max_samples=%s", + scenario.scenario_name, scenario.mode, scenario.runtime.max_samples) + adapter = build_adapter(scenario) pipeline = build_metric_pipeline(scenario, settings) evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline, app_adapter=adapter) result = evaluator.evaluate() write_run_artifacts(result) + logger.info("[runner] artifacts written for run_id=%s", result.run_id) return result diff --git a/rag_eval/metrics/pipeline.py b/rag_eval/metrics/pipeline.py index 1f5f052..91865c2 100644 --- a/rag_eval/metrics/pipeline.py +++ b/rag_eval/metrics/pipeline.py @@ -3,12 +3,16 @@ from __future__ import annotations import asyncio +import logging import math +import time from dataclasses import dataclass from typing import Any from rag_eval.shared.models import MetricScore, NormalizedSample +logger = logging.getLogger("rag_eval.metrics.pipeline") + @dataclass(slots=True) class MetricPipeline: @@ -22,12 +26,43 @@ class MetricPipeline: results = {name: math.nan for name in self.metrics} errors: list[str] = [] + sid = sample.sample_id[:12] + ans_len = len(sample.answer or "") + ctx_count = len(sample.contexts or []) + logger.debug( + "[score] sample=%s ans_len=%d ctx_count=%d question=%r", + sid, ans_len, ctx_count, + (sample.question or "")[:80], + ) + for name, metric in self.metrics.items(): + t0 = time.monotonic() try: result = await self._run_metric(name, metric, sample) - results[name] = float(result.value) + score_val = float(result.value) + results[name] = score_val + elapsed = time.monotonic() - t0 + logger.info( + "[metric OK ] sample=%-12s %-20s score=%.4f elapsed=%.1fs", + sid, name, score_val, elapsed, + ) + except asyncio.TimeoutError: + elapsed = time.monotonic() - t0 + msg = f"timeout after {self.metric_timeout_seconds}s" + errors.append(f"{name}: {msg}") + logger.warning( + "[metric TMO] sample=%-12s %-20s TIMEOUT after %.1fs", + sid, name, elapsed, + ) except Exception as exc: + elapsed = time.monotonic() - t0 + exc_type = type(exc).__name__ errors.append(f"{name}: {exc}") + logger.warning( + "[metric ERR] sample=%-12s %-20s %s: %s (elapsed=%.1fs)", + sid, name, exc_type, exc, elapsed, + ) + return MetricScore(metrics=results, error=" | ".join(errors)) async def _run_metric(self, name: str, metric: Any, sample: NormalizedSample) -> Any: @@ -72,11 +107,22 @@ class MetricPipeline: max_concurrency: int, ) -> list[MetricScore]: """Score all samples while respecting the configured concurrency limit.""" + total = len(samples) + logger.info("[pipeline] scoring %d samples concurrency=%d timeout=%ss", + total, max_concurrency, self.metric_timeout_seconds) semaphore = asyncio.Semaphore(max(1, max_concurrency)) + completed = 0 - async def guarded(sample: NormalizedSample) -> MetricScore: + async def guarded(idx: int, sample: NormalizedSample) -> MetricScore: """Throttle a single sample-scoring coroutine with the shared semaphore.""" + nonlocal completed async with semaphore: - return await self.score_sample(sample) + result = await self.score_sample(sample) + completed += 1 + nan_metrics = [k for k, v in result.metrics.items() if math.isnan(v)] + status = f"NaN={nan_metrics}" if nan_metrics else "all OK" + logger.info("[pipeline] progress %d/%d sample=%-12s %s", + completed, total, sample.sample_id[:12], status) + return result - return await asyncio.gather(*(guarded(sample) for sample in samples)) + return await asyncio.gather(*(guarded(i, s) for i, s in enumerate(samples)))