diff --git a/tests/webapp/test_score_jobs_api.py b/tests/webapp/test_score_jobs_api.py new file mode 100644 index 0000000..1d5c0b3 --- /dev/null +++ b/tests/webapp/test_score_jobs_api.py @@ -0,0 +1,146 @@ +"""Tests for async score jobs API.""" +from __future__ import annotations + +import json +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture() +def client(tmp_path, monkeypatch): + """TestClient with fresh ScoreJobManager backed by tmp dirs.""" + import webapp.services.score_job_manager as mgr_mod + from webapp.services.score_job_manager import ScoreJobManager + + fresh_mgr = ScoreJobManager( + output_dir=tmp_path / "score-async", + index_dir=tmp_path / "score-jobs", + max_workers=2, + ) + monkeypatch.setattr(mgr_mod, "score_job_manager", fresh_mgr) + + import webapp.api.score_jobs as api_mod + monkeypatch.setattr(api_mod, "score_job_manager", fresh_mgr) + + from webapp.server import create_app + return TestClient(create_app()) + + +class TestAsyncScoreEndpoints: + def test_submit_returns_202_with_job_id(self, client): + """POST /api/score/async returns 202 immediately.""" + with patch("webapp.services.score_job_manager.ScoreJobManager._run"): + resp = client.post("/api/score/async", json={ + "question": "q?", + "answer": "a.", + "metrics": ["answer_relevancy"], + }) + assert resp.status_code == 202 + data = resp.json() + assert "job_id" in data + assert data["status"] == "queued" + + def test_list_jobs_empty_initially(self, client): + resp = client.get("/api/score/jobs") + assert resp.status_code == 200 + assert resp.json()["jobs"] == [] + + def test_get_unknown_job_returns_404(self, client): + resp = client.get("/api/score/jobs/nonexistent123") + assert resp.status_code == 404 + + def test_submitted_job_appears_in_list(self, client): + with patch("webapp.services.score_job_manager.ScoreJobManager._run"): + resp = client.post("/api/score/async", json={ + "question": "q?", "answer": "a.", "metrics": ["answer_relevancy"], + }) + job_id = resp.json()["job_id"] + time.sleep(0.1) + list_resp = client.get("/api/score/jobs") + ids = [j["job_id"] for j in list_resp.json()["jobs"]] + assert job_id in ids + + def test_get_job_by_id_returns_status(self, client): + with patch("webapp.services.score_job_manager.ScoreJobManager._run"): + resp = client.post("/api/score/async", json={ + "question": "q?", "answer": "a.", "metrics": ["answer_relevancy"], + }) + job_id = resp.json()["job_id"] + time.sleep(0.1) + get_resp = client.get(f"/api/score/jobs/{job_id}") + assert get_resp.status_code == 200 + assert get_resp.json()["job_id"] == job_id + + def test_missing_required_fields_returns_422(self, client): + resp = client.post("/api/score/async", json={"question": "q?"}) + assert resp.status_code == 422 + + +class TestScoreJobManager: + def test_completed_job_persisted_to_index(self, tmp_path): + """Completed job writes index JSON.""" + from webapp.services.score_job_manager import ScoreJobManager + from webapp.models import ScoreRequest + + mgr = ScoreJobManager( + output_dir=tmp_path / "runs", + index_dir=tmp_path / "index", + max_workers=1, + ) + req = ScoreRequest(question="q?", answer="a.", metrics=["answer_relevancy"]) + + # Patch _run directly — it uses lazy imports internally + def fake_run(job_id, request): + mgr._update(job_id, status="completed", finished_at="2026-01-01T00:00:01+00:00", + run_id="fake-run-id", scores={"answer_relevancy": 0.85}, + weighted_score=0.85, latency_ms=500) + + with patch.object(mgr, "_run", side_effect=fake_run): + status = mgr.submit(req) + + for _ in range(20): + s = mgr.get(status.job_id) + if s and s.status == "completed": + break + time.sleep(0.1) + + s = mgr.get(status.job_id) + assert s is not None + idx_path = tmp_path / "index" / f"{status.job_id}.json" + assert idx_path.exists() + data = json.loads(idx_path.read_text(encoding="utf-8")) + assert data["job_id"] == status.job_id + assert data["status"] == "completed" + + def test_loads_existing_index_on_startup(self, tmp_path): + """Manager loads persisted jobs from index dir on init.""" + from webapp.services.score_job_manager import ScoreJobManager + from webapp.models import AsyncScoreJobStatus + + idx_dir = tmp_path / "index" + idx_dir.mkdir() + fake = AsyncScoreJobStatus( + job_id="testjob001", + status="completed", + created_at="2026-01-01T00:00:00+00:00", + run_id="some-run-id", + scores={"answer_relevancy": 0.9}, + weighted_score=0.9, + latency_ms=1000, + ) + (idx_dir / "testjob001.json").write_text( + json.dumps(fake.model_dump(), ensure_ascii=False), encoding="utf-8" + ) + mgr = ScoreJobManager( + output_dir=tmp_path / "runs", + index_dir=idx_dir, + max_workers=1, + ) + loaded = mgr.get("testjob001") + assert loaded is not None + assert loaded.status == "completed" + assert loaded.run_id == "some-run-id" diff --git a/webapp/api/score_jobs.py b/webapp/api/score_jobs.py new file mode 100644 index 0000000..96cda2b --- /dev/null +++ b/webapp/api/score_jobs.py @@ -0,0 +1,89 @@ +"""Routes for async RAGAS scoring jobs (Dify fire-and-forget integration). + +Dify calls POST /api/score/async → gets job_id immediately (202). +Scoring runs in background, result written as a standard run artifact. +View full report at GET /api/runs/{run_id} or in the 「运行列表」 page. +""" + +from __future__ import annotations + +import logging + +from fastapi import APIRouter, HTTPException + +from webapp.models import AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest +from webapp.services.score_job_manager import score_job_manager + +router = APIRouter(prefix="/api/score", tags=["score"]) +logger = logging.getLogger("webapp.api.score_jobs") + + +@router.post( + "/async", + status_code=202, + response_model=AsyncScoreJobResponse, + summary="提交异步评分任务(Dify 推荐方式)", + responses={ + 202: { + "description": ( + "任务已排队,立即返回 job_id(202 Accepted)。\n\n" + "评分在后台执行,完成后自动生成完整报告(含优化建议)。\n" + "通过 `GET /api/score/jobs/{job_id}` 查询状态," + "完成后在「运行列表」页查看完整报告。" + ), + "content": { + "application/json": { + "example": {"job_id": "abc123def456", "status": "queued", "run_id": None} + } + }, + }, + }, +) +def submit_async_score(request: ScoreRequest) -> AsyncScoreJobResponse: + """提交异步 RAGAS 评分任务,立即返回 job_id。 + + **适合 Dify 工作流**:HTTP 节点无需等待评分完成(无超时风险), + 工作流立即继续,评分结果在 RAGAS 平台「运行列表」中查看。 + + 评分完成后自动生成: + - 各指标得分(`scores.csv`) + - 摘要报告(`summary.md`) + - LLM 优化建议(`optimization_advice.md`) + """ + logger.info( + "[score_async] submit metrics=%s has_ctx=%s has_gt=%s", + request.metrics, bool(request.contexts), bool(request.ground_truth), + ) + status = score_job_manager.submit(request) + logger.info("[score_async] queued job_id=%s", status.job_id) + return AsyncScoreJobResponse(job_id=status.job_id, status=status.status) + + +@router.get( + "/jobs", + response_model=dict, + summary="列出所有异步评分记录", +) +def list_score_jobs() -> dict: + """返回所有异步评分记录,按创建时间倒序排列。""" + jobs = score_job_manager.list_jobs() + logger.info("[score_jobs] list count=%d", len(jobs)) + return {"jobs": [j.model_dump() for j in jobs]} + + +@router.get( + "/jobs/{job_id}", + response_model=AsyncScoreJobStatus, + summary="查询单个异步评分任务状态", + responses={404: {"description": "指定 job_id 的评分任务不存在。"}}, +) +def get_score_job(job_id: str) -> AsyncScoreJobStatus: + """查询单个异步评分任务的状态和结果。 + + `status` 为 `completed` 时,`run_id` 字段包含对应的运行 ID, + 可通过 `GET /api/runs/{run_id}` 获取完整评分报告。 + """ + status = score_job_manager.get(job_id) + if status is None: + raise HTTPException(status_code=404, detail=f"Score job not found: {job_id}") + return status diff --git a/webapp/models.py b/webapp/models.py index 7a0a6db..604fc80 100644 --- a/webapp/models.py +++ b/webapp/models.py @@ -514,3 +514,40 @@ class ScoreResponse(BaseModel): default=None, description="打分异常时的错误信息(HTTP 200 仍返回,scores 为空)。", ) + + +# --------------------------------------------------------------------------- +# 异步评分记录模型 +# --------------------------------------------------------------------------- + +class AsyncScoreJobResponse(BaseModel): + """Immediate 202 response after submitting an async score job.""" + + job_id: str = Field(description="任务唯一标识符,用于后续查询结果。") + status: str = Field(default="queued", description="初始状态:queued。") + run_id: str | None = Field( + default=None, + description="评分完成后写入的 Run ID,可在「运行列表」中查看完整报告。", + ) + + +class AsyncScoreJobStatus(BaseModel): + """State of one async score job (queued → running → completed/failed).""" + + job_id: str = Field(description="任务唯一标识符。") + status: str = Field(description="queued | running | completed | failed") + created_at: str = Field(default="", description="创建时间(ISO 8601 UTC)。") + finished_at: str = Field(default="", description="完成时间(ISO 8601 UTC)。") + run_id: str | None = Field( + default=None, + description="完成后对应的 Run ID,可通过 GET /api/runs/{run_id} 查看完整报告。", + ) + request_summary: dict = Field( + default_factory=dict, + description="请求参数快照(question 前80字、metrics、judge_model 等)。", + ) + scores: dict[str, float | None] = Field(default_factory=dict, description="各指标得分。") + weighted_score: float | None = Field(default=None, description="加权综合得分。") + latency_ms: int = Field(default=0, description="评分耗时毫秒。") + skipped_metrics: list[str] = Field(default_factory=list) + error: str | None = Field(default=None) diff --git a/webapp/server.py b/webapp/server.py index e7f3916..5a9ad75 100644 --- a/webapp/server.py +++ b/webapp/server.py @@ -17,7 +17,7 @@ from fastapi.exceptions import RequestValidationError from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles -from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score +from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs STATIC_DIR = Path(__file__).resolve().parent / "static" logger = logging.getLogger("webapp.server") @@ -69,10 +69,12 @@ OPENAPI_TAGS = [ { "name": "score", "description": ( - "**实时评分 API(Dify 外部 Tool)**\n\n" - "接受单条问答记录 `(question, answer, contexts, ground_truth)`,\n" - "同步运行 RAGAS 指标打分,返回各指标得分和加权综合得分。\n\n" - "适用场景:Dify Agent 在回答后即时调用,用于质量监控或自我改进。\n\n" + "**实时评分 API(同步)** — `POST /api/score`\n\n" + "**异步评分 API(Dify 推荐)** — `POST /api/score/async`\n\n" + "异步方式立即返回 job_id(202),评分在后台执行,完成后自动生成完整报告(含优化建议)," + "在「运行列表」页查看。\n\n" + "通过 `GET /api/score/jobs` 列出所有异步评分记录," + "`GET /api/score/jobs/{job_id}` 查询单个任务状态。\n\n" "**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 " "`Authorization: Bearer ` 请求头。" ), @@ -108,6 +110,7 @@ def create_app() -> FastAPI: app.include_router(llm_profiles.router) app.include_router(pipeline.router) app.include_router(score.router) + app.include_router(score_jobs.router) @app.middleware("http") async def access_log_middleware(request: Request, call_next): diff --git a/webapp/services/score_job_manager.py b/webapp/services/score_job_manager.py new file mode 100644 index 0000000..8805d5b --- /dev/null +++ b/webapp/services/score_job_manager.py @@ -0,0 +1,269 @@ +"""Background task manager for async RAGAS single-sample scoring. + +Each job: + 1. Runs InlineScorer.score() in a thread pool. + 2. Constructs a minimal EvaluationResult + Scenario in the standard format. + 3. Calls write_run_artifacts() — produces metadata.json, scores.csv, summary.md. + 4. Calls run_advisor() — produces optimization_advice.md. + +The resulting run directory lands under outputs/score-async// and is +automatically picked up by run_reader.list_run_summaries(), so it appears in +the existing 「运行列表」 and 「报告详情」 pages without any extra wiring. +""" + +from __future__ import annotations + +import json +import math +import threading +import time +import uuid +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from webapp.models import AsyncScoreJobStatus, ScoreRequest + +_REPO_ROOT = Path(__file__).resolve().parents[2] +_DEFAULT_JOBS_DIR = _REPO_ROOT / "outputs" / "score-async" +_DEFAULT_INDEX_DIR = _REPO_ROOT / "outputs" / "score-jobs" # lightweight job index + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +class ScoreJobManager: + """Thread-pool manager for async scoring jobs. + + Results are written as standard run artifacts so the report detail page + can render them with zero additional code. + """ + + def __init__( + self, + output_dir: Path = _DEFAULT_JOBS_DIR, + index_dir: Path = _DEFAULT_INDEX_DIR, + max_workers: int = 4, + ) -> None: + self._output_dir = Path(output_dir) + self._index_dir = Path(index_dir) + self._output_dir.mkdir(parents=True, exist_ok=True) + self._index_dir.mkdir(parents=True, exist_ok=True) + self._executor = ThreadPoolExecutor(max_workers=max_workers) + self._cache: dict[str, AsyncScoreJobStatus] = {} + self._lock = threading.Lock() + self._load_existing() + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def submit(self, request: ScoreRequest) -> AsyncScoreJobStatus: + """Queue one scoring job and return its initial status immediately.""" + job_id = uuid.uuid4().hex[:12] + status = AsyncScoreJobStatus( + job_id=job_id, + status="queued", + created_at=_now_iso(), + request_summary={ + "question": (request.question or "")[:80], + "answer": (request.answer or "")[:80], + "metrics": list(request.metrics), + "judge_model": request.judge_model or "", + "embedding_model": request.embedding_model or "", + "has_contexts": bool(request.contexts), + "has_ground_truth": bool(request.ground_truth), + }, + ) + with self._lock: + self._cache[job_id] = status + self._persist_index(status) + self._executor.submit(self._run, job_id, request) + return status + + def get(self, job_id: str) -> AsyncScoreJobStatus | None: + """Return current status or None if unknown.""" + with self._lock: + return self._cache.get(job_id) + + def list_jobs(self) -> list[AsyncScoreJobStatus]: + """Return all known jobs, newest first.""" + with self._lock: + jobs = list(self._cache.values()) + jobs.sort(key=lambda j: j.created_at, reverse=True) + return jobs + + # ------------------------------------------------------------------ # + # Worker + # ------------------------------------------------------------------ # + + def _run(self, job_id: str, request: ScoreRequest) -> None: + """Execute scoring, write run artifacts, run advisor.""" + import logging + logger = logging.getLogger("webapp.services.score_job_manager") + self._update(job_id, status="running") + + # Lazy imports to keep web server bootable if ragas is not installed. + from rag_eval.advisor import run_advisor + from rag_eval.metrics.factory import build_models + from rag_eval.metrics.weights import compute_weighted_score + from rag_eval.reporting.writers import write_run_artifacts + from rag_eval.settings import EvaluationSettings + from rag_eval.shared.models import ( + DatasetConfig, EvaluationResult, NormalizedSample, + RuntimeConfig, Scenario, + ) + from rag_eval.shared.utils import utc_now_iso + from webapp.services.inline_scorer import inline_scorer + + settings = EvaluationSettings() + judge_model = request.judge_model or settings.ragas_judge_model + embedding_model = request.embedding_model or settings.ragas_embedding_model + effective = request.effective_metrics() + requested = set(request.metrics) + skipped = sorted(requested - set(effective)) + + t0 = time.monotonic() + started_at = utc_now_iso() + + try: + if effective: + raw_scores = inline_scorer.score( + question=request.question, + answer=request.answer, + contexts=request.contexts_as_list(), + ground_truth=request.ground_truth, + metrics=effective, + judge_model=judge_model, + embedding_model=embedding_model, + settings=settings, + ) + else: + raw_scores = {} + + latency_ms = int((time.monotonic() - t0) * 1000) + finished_at = utc_now_iso() + + # Build full scores dict (skipped = None) + all_scores: dict[str, float | None] = {m: None for m in request.metrics} + all_scores.update(raw_scores) + weighted_raw = compute_weighted_score( + {k: v for k, v in raw_scores.items() if v is not None}, {} + ) + weighted = round(weighted_raw, 4) if weighted_raw is not None else None + + # Build a score row compatible with report_builder + score_row: dict[str, Any] = { + "sample_id": "async-score-1", + "question": request.question, + "answer": request.answer or "", + "contexts": request.contexts or "", + "ground_truth": request.ground_truth or "", + "error": "", + } + score_row.update(all_scores) + + # Construct minimal EvaluationResult so write_run_artifacts works + run_id = finished_at.replace(":", "-") + output_dir = self._output_dir + + # Build a minimal Scenario for snapshot + advisor + scenario = Scenario( + scenario_name=f"async-score-{job_id}", + mode="offline", + dataset=DatasetConfig(path=output_dir / run_id / "dataset.csv"), + judge_model=judge_model, + embedding_model=embedding_model, + metrics=list(request.metrics), + output_dir=output_dir, + optimization_advisor=True, # always generate advice + ) + + sample = NormalizedSample( + sample_id="async-score-1", + question=request.question, + answer=request.answer or "", + contexts=request.contexts_as_list(), + ground_truth=request.ground_truth or "", + ) + + result = EvaluationResult( + scenario=scenario, + run_id=run_id, + started_at=started_at, + finished_at=finished_at, + valid_samples=[sample], + invalid_samples=[], + score_rows=[score_row], + ) + + write_run_artifacts(result) + logger.info("[score_job] artifacts written job_id=%s run_id=%s", job_id, run_id) + + # Run optimization advisor (builds optimization_advice.md) + try: + llm, _ = build_models(judge_model, embedding_model, settings) + run_advisor(result, scenario, llm) + logger.info("[score_job] advisor done job_id=%s", job_id) + except Exception as adv_exc: # noqa: BLE001 + logger.warning("[score_job] advisor failed job_id=%s err=%s", job_id, adv_exc) + + self._update( + job_id, + status="completed", + finished_at=finished_at, + run_id=run_id, + scores=all_scores, + weighted_score=weighted, + latency_ms=latency_ms, + skipped_metrics=skipped, + ) + + except Exception as exc: # noqa: BLE001 + latency_ms = int((time.monotonic() - t0) * 1000) + logger.error("[score_job] failed job_id=%s err=%s", job_id, exc) + self._update( + job_id, + status="failed", + finished_at=_now_iso(), + latency_ms=latency_ms, + error=f"{type(exc).__name__}: {exc}", + ) + + # ------------------------------------------------------------------ # + # Persistence helpers + # ------------------------------------------------------------------ # + + def _update(self, job_id: str, **kwargs: Any) -> None: + """Merge kwargs into the job status and persist the index.""" + with self._lock: + existing = self._cache.get(job_id) + if existing is None: + return + updated = existing.model_copy(update=kwargs) + self._cache[job_id] = updated + self._persist_index(updated) + + def _persist_index(self, status: AsyncScoreJobStatus) -> None: + """Write a lightweight index JSON for this job (survives restarts).""" + path = self._index_dir / f"{status.job_id}.json" + path.write_text( + json.dumps(status.model_dump(), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + def _load_existing(self) -> None: + """Load existing job index files on startup.""" + for path in sorted(self._index_dir.glob("*.json")): + try: + data = json.loads(path.read_text(encoding="utf-8")) + status = AsyncScoreJobStatus.model_validate(data) + self._cache[status.job_id] = status + except Exception: # noqa: BLE001 + pass + + +# Module-level singleton shared by FastAPI routes. +score_job_manager = ScoreJobManager() diff --git a/webapp/static/index.html b/webapp/static/index.html index 612dec2..42088a8 100644 --- a/webapp/static/index.html +++ b/webapp/static/index.html @@ -28,6 +28,9 @@ + @@ -234,6 +237,22 @@ + + +