Files
siemens_ragas/rag_eval/execution/runner.py

85 lines
3.4 KiB
Python
Raw Normal View History

2026-06-12 14:02:15 +08:00
"""High-level scenario runner used by the package and CLI entrypoints."""
from __future__ import annotations
import logging
import sys
from pathlib import Path
2026-06-12 14:02:15 +08:00
from rag_eval.adapters.http import HttpAppAdapter
from rag_eval.adapters.python import PythonFunctionAdapter
from rag_eval.advisor import run_advisor
2026-06-12 14:02:15 +08:00
from rag_eval.config.loader import load_scenario
from rag_eval.metrics.factory import build_models, build_metric_pipeline
2026-06-12 14:02:15 +08:00
from rag_eval.reporting.writers import write_run_artifacts
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.models import Scenario
from .evaluator import Evaluator
logger = logging.getLogger("rag_eval.execution.runner")
def _setup_logging(log_file: Path | None = None, level: int = logging.INFO) -> None:
"""Configure root logger: always write to stderr, optionally also to a file."""
fmt = "%(asctime)s %(levelname)-8s %(name)s %(message)s"
datefmt = "%H:%M:%S"
handlers: list[logging.Handler] = [logging.StreamHandler(sys.stderr)]
if log_file is not None:
log_file.parent.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setFormatter(logging.Formatter(fmt, datefmt=datefmt))
handlers.append(fh)
logging.basicConfig(level=level, format=fmt, datefmt=datefmt, handlers=handlers, force=True)
# Also show ragas internal logs at WARNING so we can see LLM errors
logging.getLogger("ragas").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)
2026-06-12 14:02:15 +08:00
def build_adapter(scenario: Scenario):
"""Instantiate the adapter required by the resolved scenario, if any."""
if scenario.app_adapter is None:
return None
if scenario.app_adapter.type == "http":
return HttpAppAdapter(scenario.app_adapter)
if scenario.app_adapter.type == "python":
return PythonFunctionAdapter(scenario.app_adapter)
raise ValueError(f"Unsupported adapter type: {scenario.app_adapter.type}")
def run_scenario(
scenario_path: str,
settings: EvaluationSettings | None = None,
log_file: Path | None = None,
log_level: int = logging.INFO,
2026-06-12 14:02:15 +08:00
):
"""Run one scenario end to end and persist its reporting artifacts."""
_setup_logging(log_file=log_file, level=log_level)
logger.info("[runner] run_scenario path=%s", scenario_path)
2026-06-12 14:02:15 +08:00
settings = settings or EvaluationSettings()
if not settings.openai_api_key:
raise EnvironmentError("OPENAI_API_KEY must be set before running the evaluator.")
scenario = load_scenario(scenario_path)
logger.info("[runner] scenario loaded: name=%s mode=%s max_samples=%s",
scenario.scenario_name, scenario.mode, scenario.runtime.max_samples)
# Build models once; reuse llm in both MetricPipeline and advisor.
llm, embeddings = build_models(scenario.judge_model, scenario.embedding_model, settings)
2026-06-12 14:02:15 +08:00
adapter = build_adapter(scenario)
pipeline = build_metric_pipeline(scenario, settings, llm=llm, embeddings=embeddings)
2026-06-12 14:02:15 +08:00
evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline, app_adapter=adapter)
result = evaluator.evaluate()
write_run_artifacts(result)
logger.info("[runner] artifacts written for run_id=%s", result.run_id)
# Optimization advisor — runs only if scenario.optimization_advisor is True.
run_advisor(result, scenario, llm)
2026-06-12 14:02:15 +08:00
return result