Files
siemens_ragas/webapp/services/score_job_manager.py

272 lines
10 KiB
Python
Raw Permalink Normal View History

"""Background task manager for async RAGAS single-sample scoring.
Each job:
1. Runs InlineScorer.score() in a thread pool.
2. Constructs a minimal EvaluationResult + Scenario in the standard format.
3. Calls write_run_artifacts() produces metadata.json, scores.csv, summary.md.
4. Calls run_advisor() produces optimization_advice.md.
The resulting run directory lands under outputs/score-async/<run_id>/ and is
automatically picked up by run_reader.list_run_summaries(), so it appears in
the existing 运行列表 and 报告详情 pages without any extra wiring.
"""
from __future__ import annotations
import json
import math
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from webapp.models import AsyncScoreJobStatus, ScoreRequest
_REPO_ROOT = Path(__file__).resolve().parents[2]
_DEFAULT_JOBS_DIR = _REPO_ROOT / "outputs" / "score-async"
_DEFAULT_INDEX_DIR = _REPO_ROOT / "outputs" / "score-jobs" # lightweight job index
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class ScoreJobManager:
"""Thread-pool manager for async scoring jobs.
Results are written as standard run artifacts so the report detail page
can render them with zero additional code.
"""
def __init__(
self,
output_dir: Path = _DEFAULT_JOBS_DIR,
index_dir: Path = _DEFAULT_INDEX_DIR,
max_workers: int = 4,
) -> None:
self._output_dir = Path(output_dir)
self._index_dir = Path(index_dir)
self._output_dir.mkdir(parents=True, exist_ok=True)
self._index_dir.mkdir(parents=True, exist_ok=True)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._cache: dict[str, AsyncScoreJobStatus] = {}
self._lock = threading.Lock()
self._load_existing()
# ------------------------------------------------------------------ #
# Public API
# ------------------------------------------------------------------ #
def submit(self, request: ScoreRequest) -> AsyncScoreJobStatus:
"""Queue one scoring job and return its initial status immediately."""
job_id = uuid.uuid4().hex[:12]
status = AsyncScoreJobStatus(
job_id=job_id,
status="queued",
created_at=_now_iso(),
request_summary={
"question": (request.question or "")[:80],
"answer": (request.answer or "")[:80],
"metrics": list(request.metrics),
"judge_model": request.judge_model or "",
"embedding_model": request.embedding_model or "",
"has_contexts": bool(request.contexts),
"has_ground_truth": bool(request.ground_truth),
},
)
with self._lock:
self._cache[job_id] = status
self._persist_index(status)
self._executor.submit(self._run, job_id, request)
return status
def get(self, job_id: str) -> AsyncScoreJobStatus | None:
"""Return current status or None if unknown."""
with self._lock:
return self._cache.get(job_id)
def list_jobs(self) -> list[AsyncScoreJobStatus]:
"""Return all known jobs, newest first."""
with self._lock:
jobs = list(self._cache.values())
jobs.sort(key=lambda j: j.created_at, reverse=True)
return jobs
# ------------------------------------------------------------------ #
# Worker
# ------------------------------------------------------------------ #
def _run(self, job_id: str, request: ScoreRequest) -> None:
"""Execute scoring, write run artifacts, run advisor."""
import logging
logger = logging.getLogger("webapp.services.score_job_manager")
self._update(job_id, status="running")
# Lazy imports to keep web server bootable if ragas is not installed.
from rag_eval.advisor import run_advisor
from rag_eval.metrics.factory import build_models
from rag_eval.metrics.weights import compute_weighted_score
from rag_eval.reporting.writers import write_run_artifacts
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.models import (
DatasetConfig, EvaluationResult, NormalizedSample,
RuntimeConfig, Scenario,
)
from rag_eval.shared.utils import utc_now_iso
from webapp.services.inline_scorer import inline_scorer
settings = EvaluationSettings()
judge_model = request.judge_model or settings.ragas_judge_model
embedding_model = request.embedding_model or settings.ragas_embedding_model
effective = request.effective_metrics()
requested = set(request.metrics)
skipped = sorted(requested - set(effective))
t0 = time.monotonic()
started_at = utc_now_iso()
try:
if effective:
raw_scores = inline_scorer.score(
question=request.question,
answer=request.answer,
contexts=request.contexts_as_list(),
ground_truth=request.ground_truth,
metrics=effective,
judge_model=judge_model,
embedding_model=embedding_model,
settings=settings,
)
else:
raw_scores = {}
latency_ms = int((time.monotonic() - t0) * 1000)
finished_at = utc_now_iso()
# Build full scores dict (skipped = None)
all_scores: dict[str, float | None] = {m: None for m in request.metrics}
all_scores.update(raw_scores)
# 综合加权得分计算(已暂时禁用)
# weighted_raw = compute_weighted_score(
# {k: v for k, v in raw_scores.items() if v is not None}, {}
# )
# weighted = round(weighted_raw, 4) if weighted_raw is not None else None
weighted = None
# Build a score row compatible with report_builder
score_row: dict[str, Any] = {
"sample_id": "async-score-1",
"question": request.question,
"answer": request.answer or "",
"contexts": request.contexts or "",
"ground_truth": request.ground_truth or "",
"error": "",
}
score_row.update(all_scores)
# Construct minimal EvaluationResult so write_run_artifacts works
run_id = finished_at.replace(":", "-")
output_dir = self._output_dir
# Build a minimal Scenario for snapshot + advisor
scenario = Scenario(
scenario_name=f"async-score-{job_id}",
mode="offline",
dataset=DatasetConfig(path=output_dir / run_id / "dataset.csv"),
judge_model=judge_model,
embedding_model=embedding_model,
metrics=list(request.metrics),
output_dir=output_dir,
optimization_advisor=True, # always generate advice
)
sample = NormalizedSample(
sample_id="async-score-1",
question=request.question,
answer=request.answer or "",
contexts=request.contexts_as_list(),
ground_truth=request.ground_truth or "",
)
result = EvaluationResult(
scenario=scenario,
run_id=run_id,
started_at=started_at,
finished_at=finished_at,
valid_samples=[sample],
invalid_samples=[],
score_rows=[score_row],
)
write_run_artifacts(result)
logger.info("[score_job] artifacts written job_id=%s run_id=%s", job_id, run_id)
# Run optimization advisor (builds optimization_advice.md)
try:
llm, _ = build_models(judge_model, embedding_model, settings)
run_advisor(result, scenario, llm)
logger.info("[score_job] advisor done job_id=%s", job_id)
except Exception as adv_exc: # noqa: BLE001
logger.warning("[score_job] advisor failed job_id=%s err=%s", job_id, adv_exc)
self._update(
job_id,
status="completed",
finished_at=finished_at,
run_id=run_id,
scores=all_scores,
weighted_score=weighted,
latency_ms=latency_ms,
skipped_metrics=skipped,
)
except Exception as exc: # noqa: BLE001
latency_ms = int((time.monotonic() - t0) * 1000)
logger.error("[score_job] failed job_id=%s err=%s", job_id, exc)
self._update(
job_id,
status="failed",
finished_at=_now_iso(),
latency_ms=latency_ms,
error=f"{type(exc).__name__}: {exc}",
)
# ------------------------------------------------------------------ #
# Persistence helpers
# ------------------------------------------------------------------ #
def _update(self, job_id: str, **kwargs: Any) -> None:
"""Merge kwargs into the job status and persist the index."""
with self._lock:
existing = self._cache.get(job_id)
if existing is None:
return
updated = existing.model_copy(update=kwargs)
self._cache[job_id] = updated
self._persist_index(updated)
def _persist_index(self, status: AsyncScoreJobStatus) -> None:
"""Write a lightweight index JSON for this job (survives restarts)."""
path = self._index_dir / f"{status.job_id}.json"
path.write_text(
json.dumps(status.model_dump(), ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _load_existing(self) -> None:
"""Load existing job index files on startup."""
for path in sorted(self._index_dir.glob("*.json")):
try:
data = json.loads(path.read_text(encoding="utf-8"))
status = AsyncScoreJobStatus.model_validate(data)
self._cache[status.job_id] = status
except Exception: # noqa: BLE001
pass
# Module-level singleton shared by FastAPI routes.
score_job_manager = ScoreJobManager()