feat(logging): add structured evaluation logs for metric-level debugging

- pipeline.py: log each metric score/timeout/error with sample_id,
  elapsed time, and score value; log NaN list per sample; progress
  counter N/total after each sample completes
- evaluator.py: log eval start, dataset counts, adapter enrichment
  progress (per-sample OK/FAIL with elapsed), metric scoring summary,
  and per-metric NaN rate at end of run
- runner.py: _setup_logging() helper writes to stderr + optional file;
  ragas/httpx/openai noisy loggers throttled to WARNING
- main.py: add --log-file and --log-level CLI flags

Usage:
  python main.py --scenario scenarios/online/... --log-file logs/eval.log --log-level DEBUG

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2026-06-16 10:48:41 +08:00
parent 1ff4a3943a
commit 629304aa6d
4 changed files with 164 additions and 10 deletions

19
main.py
View File

@@ -1,6 +1,8 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import logging
from pathlib import Path
from rag_eval.dataset_builder.runner import run_dataset_build from rag_eval.dataset_builder.runner import run_dataset_build
from rag_eval.execution.runner import run_scenario from rag_eval.execution.runner import run_scenario
@@ -18,18 +20,33 @@ def parse_args() -> argparse.Namespace:
"--dataset-build-config", "--dataset-build-config",
help="Path to a YAML dataset build config file.", help="Path to a YAML dataset build config file.",
) )
parser.add_argument(
"--log-file",
default=None,
help="Write evaluation logs to this file (in addition to stderr). "
"Example: logs/eval.log",
)
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging verbosity level (default: INFO). Use DEBUG for per-metric detail.",
)
return parser.parse_args() return parser.parse_args()
def main() -> None: def main() -> None:
"""Dispatch the CLI call to the requested workflow.""" """Dispatch the CLI call to the requested workflow."""
args = parse_args() args = parse_args()
log_level = getattr(logging, args.log_level.upper(), logging.INFO)
log_file = Path(args.log_file) if args.log_file else None
if args.dataset_build_config: if args.dataset_build_config:
result = run_dataset_build(args.dataset_build_config) result = run_dataset_build(args.dataset_build_config)
print(f"Completed dataset build: {result.artifact_paths.root_dir}") print(f"Completed dataset build: {result.artifact_paths.root_dir}")
return return
result = run_scenario(args.scenario) result = run_scenario(args.scenario, log_file=log_file, log_level=log_level)
print(f"Completed run: {result.scenario.output_dir}") print(f"Completed run: {result.scenario.output_dir}")

View File

@@ -3,6 +3,8 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
import time
from typing import Any from typing import Any
from rag_eval.adapters.base import AppAdapter from rag_eval.adapters.base import AppAdapter
@@ -13,6 +15,8 @@ from rag_eval.metrics.pipeline import MetricPipeline
from rag_eval.shared.models import EvaluationResult, InvalidSample, NormalizedSample, Scenario from rag_eval.shared.models import EvaluationResult, InvalidSample, NormalizedSample, Scenario
from rag_eval.shared.utils import utc_now_iso from rag_eval.shared.utils import utc_now_iso
logger = logging.getLogger("rag_eval.execution.evaluator")
class Evaluator: class Evaluator:
"""Coordinate dataset loading, optional app execution, and metric scoring.""" """Coordinate dataset loading, optional app execution, and metric scoring."""
@@ -31,27 +35,61 @@ class Evaluator:
def evaluate(self) -> EvaluationResult: def evaluate(self) -> EvaluationResult:
"""Execute the full evaluation flow and return the collected results.""" """Execute the full evaluation flow and return the collected results."""
started_at = utc_now_iso() started_at = utc_now_iso()
scenario_name = self.scenario.scenario_name
mode = self.scenario.mode
logger.info("=" * 60)
logger.info("[eval] START scenario=%s mode=%s", scenario_name, mode)
logger.info("[eval] dataset=%s", self.scenario.dataset.path)
logger.info("[eval] metrics=%s", list(self.scenario.metrics))
logger.info("[eval] judge=%s embed=%s", self.scenario.judge_model, self.scenario.embedding_model)
raw_records = load_dataset_records(self.scenario.dataset.path) raw_records = load_dataset_records(self.scenario.dataset.path)
logger.info("[eval] raw_records=%d", len(raw_records))
samples, invalid_samples = normalize_records( samples, invalid_samples = normalize_records(
raw_records, raw_records,
mode=self.scenario.mode, mode=self.scenario.mode,
max_samples=self.scenario.runtime.max_samples, max_samples=self.scenario.runtime.max_samples,
) )
logger.info("[eval] normalized: valid=%d invalid=%d", len(samples), len(invalid_samples))
if self.scenario.mode == "online": if self.scenario.mode == "online":
# Online mode enriches each sample by calling the target application first. logger.info("[eval] online mode: calling app adapter for %d samples ...", len(samples))
t0 = time.monotonic()
samples, online_invalids = asyncio.run(self._enrich_online_samples(samples)) samples, online_invalids = asyncio.run(self._enrich_online_samples(samples))
elapsed = time.monotonic() - t0
invalid_samples.extend(online_invalids) invalid_samples.extend(online_invalids)
logger.info(
"[eval] adapter done: enriched=%d adapter_invalids=%d elapsed=%.1fs",
len(samples), len(online_invalids), elapsed,
)
logger.info("[eval] scoring %d samples with metric pipeline ...", len(samples))
t0 = time.monotonic()
metric_scores = asyncio.run( metric_scores = asyncio.run(
self.metric_pipeline.score_samples( self.metric_pipeline.score_samples(
samples, samples,
max_concurrency=self.scenario.runtime.metric_limit(), max_concurrency=self.scenario.runtime.metric_limit(),
) )
) )
elapsed = time.monotonic() - t0
logger.info("[eval] metric scoring done elapsed=%.1fs", elapsed)
finished_at = utc_now_iso() finished_at = utc_now_iso()
score_rows = [self._merge_score(sample, score) for sample, score in zip(samples, metric_scores)] score_rows = [self._merge_score(sample, score) for sample, score in zip(samples, metric_scores)]
# Summary of NaN rates per metric
import math
for metric_name in self.scenario.metrics:
nan_count = sum(1 for row in score_rows if math.isnan(float(row.get(metric_name, float("nan")) or float("nan"))))
logger.info("[eval] %-22s NaN=%d/%d (%.0f%%)",
metric_name, nan_count, len(score_rows),
100 * nan_count / len(score_rows) if score_rows else 0)
run_id = finished_at.replace(":", "-") run_id = finished_at.replace(":", "-")
logger.info("[eval] DONE run_id=%s total_valid=%d total_invalid=%d",
run_id, len(samples), len(invalid_samples))
logger.info("=" * 60)
return EvaluationResult( return EvaluationResult(
scenario=self.scenario, scenario=self.scenario,
run_id=run_id, run_id=run_id,
@@ -72,13 +110,27 @@ class Evaluator:
valid: list[NormalizedSample] = [] valid: list[NormalizedSample] = []
invalid: list[InvalidSample] = [] invalid: list[InvalidSample] = []
total = len(samples)
async def enrich_with_capture(sample: NormalizedSample) -> NormalizedSample | InvalidSample: async def enrich_with_capture(idx: int, sample: NormalizedSample) -> NormalizedSample | InvalidSample:
"""Convert adapter exceptions into invalid samples instead of aborting the run.""" """Convert adapter exceptions into invalid samples instead of aborting the run."""
sid = sample.sample_id[:12]
logger.debug("[adapter] [%d/%d] calling adapter sample=%s question=%r",
idx + 1, total, sid, (sample.question or "")[:60])
t0 = time.monotonic()
try: try:
return await self.app_adapter.enrich_sample(sample) result = await self.app_adapter.enrich_sample(sample)
elapsed = time.monotonic() - t0
ans_len = len(result.answer or "")
ctx_count = len(result.contexts or [])
logger.info("[adapter] [%d/%d] OK sample=%-12s ans_len=%d ctx_count=%d elapsed=%.1fs",
idx + 1, total, sid, ans_len, ctx_count, elapsed)
return result
except Exception as exc: except Exception as exc:
elapsed = time.monotonic() - t0
error_type = type(exc).__name__ error_type = type(exc).__name__
logger.warning("[adapter] [%d/%d] FAIL sample=%-12s %s: %s (elapsed=%.1fs)",
idx + 1, total, sid, error_type, exc, elapsed)
return InvalidSample( return InvalidSample(
sample_id=sample.sample_id, sample_id=sample.sample_id,
error=f"adapter failed [{error_type}]: {exc}", error=f"adapter failed [{error_type}]: {exc}",
@@ -86,8 +138,8 @@ class Evaluator:
) )
factories = [ factories = [
(lambda sample=sample: enrich_with_capture(sample)) (lambda _idx=i, _sample=sample: enrich_with_capture(_idx, _sample))
for sample in samples for i, sample in enumerate(samples)
] ]
results = await gather_with_limit(factories, self.scenario.runtime.app_limit()) results = await gather_with_limit(factories, self.scenario.runtime.app_limit())
@@ -102,6 +154,8 @@ class Evaluator:
if not sample.contexts: if not sample.contexts:
errors.append("adapter returned empty contexts") errors.append("adapter returned empty contexts")
if errors: if errors:
logger.warning("[adapter] incomplete payload sample=%s errors=%s",
sample.sample_id[:12], errors)
invalid.append( invalid.append(
InvalidSample( InvalidSample(
sample_id=sample.sample_id, sample_id=sample.sample_id,
@@ -111,6 +165,9 @@ class Evaluator:
) )
continue continue
valid.append(sample) valid.append(sample)
logger.info("[adapter] enrichment summary: valid=%d invalid=%d of total=%d",
len(valid), len(invalid), total)
return valid, invalid return valid, invalid
def _merge_score(self, sample: NormalizedSample, score: Any) -> dict[str, Any]: def _merge_score(self, sample: NormalizedSample, score: Any) -> dict[str, Any]:

View File

@@ -2,6 +2,10 @@
from __future__ import annotations from __future__ import annotations
import logging
import sys
from pathlib import Path
from rag_eval.adapters.http import HttpAppAdapter from rag_eval.adapters.http import HttpAppAdapter
from rag_eval.adapters.python import PythonFunctionAdapter from rag_eval.adapters.python import PythonFunctionAdapter
from rag_eval.config.loader import load_scenario from rag_eval.config.loader import load_scenario
@@ -12,6 +16,27 @@ from rag_eval.shared.models import Scenario
from .evaluator import Evaluator from .evaluator import Evaluator
logger = logging.getLogger("rag_eval.execution.runner")
def _setup_logging(log_file: Path | None = None, level: int = logging.INFO) -> None:
"""Configure root logger: always write to stderr, optionally also to a file."""
fmt = "%(asctime)s %(levelname)-8s %(name)s %(message)s"
datefmt = "%H:%M:%S"
handlers: list[logging.Handler] = [logging.StreamHandler(sys.stderr)]
if log_file is not None:
log_file.parent.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setFormatter(logging.Formatter(fmt, datefmt=datefmt))
handlers.append(fh)
logging.basicConfig(level=level, format=fmt, datefmt=datefmt, handlers=handlers, force=True)
# Also show ragas internal logs at WARNING so we can see LLM errors
logging.getLogger("ragas").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)
def build_adapter(scenario: Scenario): def build_adapter(scenario: Scenario):
"""Instantiate the adapter required by the resolved scenario, if any.""" """Instantiate the adapter required by the resolved scenario, if any."""
@@ -27,16 +52,25 @@ def build_adapter(scenario: Scenario):
def run_scenario( def run_scenario(
scenario_path: str, scenario_path: str,
settings: EvaluationSettings | None = None, settings: EvaluationSettings | None = None,
log_file: Path | None = None,
log_level: int = logging.INFO,
): ):
"""Run one scenario end to end and persist its reporting artifacts.""" """Run one scenario end to end and persist its reporting artifacts."""
_setup_logging(log_file=log_file, level=log_level)
logger.info("[runner] run_scenario path=%s", scenario_path)
settings = settings or EvaluationSettings() settings = settings or EvaluationSettings()
if not settings.openai_api_key: if not settings.openai_api_key:
raise EnvironmentError("OPENAI_API_KEY must be set before running the evaluator.") raise EnvironmentError("OPENAI_API_KEY must be set before running the evaluator.")
scenario = load_scenario(scenario_path) scenario = load_scenario(scenario_path)
logger.info("[runner] scenario loaded: name=%s mode=%s max_samples=%s",
scenario.scenario_name, scenario.mode, scenario.runtime.max_samples)
adapter = build_adapter(scenario) adapter = build_adapter(scenario)
pipeline = build_metric_pipeline(scenario, settings) pipeline = build_metric_pipeline(scenario, settings)
evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline, app_adapter=adapter) evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline, app_adapter=adapter)
result = evaluator.evaluate() result = evaluator.evaluate()
write_run_artifacts(result) write_run_artifacts(result)
logger.info("[runner] artifacts written for run_id=%s", result.run_id)
return result return result

View File

@@ -3,12 +3,16 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
import math import math
import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
from rag_eval.shared.models import MetricScore, NormalizedSample from rag_eval.shared.models import MetricScore, NormalizedSample
logger = logging.getLogger("rag_eval.metrics.pipeline")
@dataclass(slots=True) @dataclass(slots=True)
class MetricPipeline: class MetricPipeline:
@@ -22,12 +26,43 @@ class MetricPipeline:
results = {name: math.nan for name in self.metrics} results = {name: math.nan for name in self.metrics}
errors: list[str] = [] errors: list[str] = []
sid = sample.sample_id[:12]
ans_len = len(sample.answer or "")
ctx_count = len(sample.contexts or [])
logger.debug(
"[score] sample=%s ans_len=%d ctx_count=%d question=%r",
sid, ans_len, ctx_count,
(sample.question or "")[:80],
)
for name, metric in self.metrics.items(): for name, metric in self.metrics.items():
t0 = time.monotonic()
try: try:
result = await self._run_metric(name, metric, sample) result = await self._run_metric(name, metric, sample)
results[name] = float(result.value) score_val = float(result.value)
results[name] = score_val
elapsed = time.monotonic() - t0
logger.info(
"[metric OK ] sample=%-12s %-20s score=%.4f elapsed=%.1fs",
sid, name, score_val, elapsed,
)
except asyncio.TimeoutError:
elapsed = time.monotonic() - t0
msg = f"timeout after {self.metric_timeout_seconds}s"
errors.append(f"{name}: {msg}")
logger.warning(
"[metric TMO] sample=%-12s %-20s TIMEOUT after %.1fs",
sid, name, elapsed,
)
except Exception as exc: except Exception as exc:
elapsed = time.monotonic() - t0
exc_type = type(exc).__name__
errors.append(f"{name}: {exc}") errors.append(f"{name}: {exc}")
logger.warning(
"[metric ERR] sample=%-12s %-20s %s: %s (elapsed=%.1fs)",
sid, name, exc_type, exc, elapsed,
)
return MetricScore(metrics=results, error=" | ".join(errors)) return MetricScore(metrics=results, error=" | ".join(errors))
async def _run_metric(self, name: str, metric: Any, sample: NormalizedSample) -> Any: async def _run_metric(self, name: str, metric: Any, sample: NormalizedSample) -> Any:
@@ -72,11 +107,22 @@ class MetricPipeline:
max_concurrency: int, max_concurrency: int,
) -> list[MetricScore]: ) -> list[MetricScore]:
"""Score all samples while respecting the configured concurrency limit.""" """Score all samples while respecting the configured concurrency limit."""
total = len(samples)
logger.info("[pipeline] scoring %d samples concurrency=%d timeout=%ss",
total, max_concurrency, self.metric_timeout_seconds)
semaphore = asyncio.Semaphore(max(1, max_concurrency)) semaphore = asyncio.Semaphore(max(1, max_concurrency))
completed = 0
async def guarded(sample: NormalizedSample) -> MetricScore: async def guarded(idx: int, sample: NormalizedSample) -> MetricScore:
"""Throttle a single sample-scoring coroutine with the shared semaphore.""" """Throttle a single sample-scoring coroutine with the shared semaphore."""
nonlocal completed
async with semaphore: async with semaphore:
return await self.score_sample(sample) result = await self.score_sample(sample)
completed += 1
nan_metrics = [k for k, v in result.metrics.items() if math.isnan(v)]
status = f"NaN={nan_metrics}" if nan_metrics else "all OK"
logger.info("[pipeline] progress %d/%d sample=%-12s %s",
completed, total, sample.sample_id[:12], status)
return result
return await asyncio.gather(*(guarded(sample) for sample in samples)) return await asyncio.gather(*(guarded(i, s) for i, s in enumerate(samples)))