diff --git a/docs/superpowers/plans/2026-06-24-async-score-jobs.md b/docs/superpowers/plans/2026-06-24-async-score-jobs.md new file mode 100644 index 0000000..0364a28 --- /dev/null +++ b/docs/superpowers/plans/2026-06-24-async-score-jobs.md @@ -0,0 +1,808 @@ +# 异步评分记录(Async Score Jobs)Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 新增 `POST /api/score/async` 异步端点,结果持久化至 `outputs/score-jobs/`,并在前端新增「评分记录」页面展示。 + +**Architecture:** 新建 `ScoreJobManager`(复用 `pipeline_task_manager` 线程池模式)在后台执行 `InlineScorer.score()`,写入 JSON 文件;新增三个 REST 端点;前端新增导航页加载并轮询记录。 + +**Tech Stack:** Python 3.12, FastAPI, Pydantic v2, threading, Vanilla JS, pytest + +## Global Constraints + +- Python 3.12+,PEP 8,4 空格缩进,类型注解必须 +- 存储路径:`outputs/score-jobs/.json` +- 复用现有 `ScoreRequest`(含 `effective_metrics()` 和 `contexts_as_list()` 方法) +- 复用现有 `InlineScorer.score()` 和 `compute_weighted_score()` +- 所有测试用 pytest,不依赖真实 LLM + +--- + +## 文件清单 + +| 操作 | 文件 | 职责 | +|------|------|------| +| 新建 | `webapp/services/score_job_manager.py` | ScoreJobManager:线程池 + JSON 持久化 | +| 新建 | `webapp/api/score_jobs.py` | 3 个端点路由 | +| 新建 | `webapp/static/js/score_jobs.js` | 前端列表 + 轮询逻辑 | +| 新建 | `tests/webapp/test_score_jobs_api.py` | API 集成测试 | +| 修改 | `webapp/models.py` | 新增 `AsyncScoreJobStatus`、`AsyncScoreJobResponse` | +| 修改 | `webapp/server.py` | 注册 score_jobs router,更新 OPENAPI_TAGS 和 description | +| 修改 | `webapp/static/index.html` | 新增导航项 + `#view-scorejobs` section | +| 修改 | `webapp/static/js/api.js` | 新增 `scoreJobsAsync()`、`getScoreJob()`、`listScoreJobs()` | +| 修改 | `webapp/static/js/app.js` | 注册 `scorejobs` 视图、加载调用 | + +--- + +## Task 1: Pydantic 模型 + ScoreJobManager + +**Files:** +- Modify: `webapp/models.py` +- Create: `webapp/services/score_job_manager.py` +- Create: `tests/webapp/test_score_jobs_api.py` (partial) + +**Interfaces:** +- Produces: + - `AsyncScoreJobStatus` Pydantic model + - `AsyncScoreJobResponse` Pydantic model + - `score_job_manager: ScoreJobManager` singleton + - `ScoreJobManager.submit(request: ScoreRequest) -> AsyncScoreJobStatus` + - `ScoreJobManager.get(job_id: str) -> AsyncScoreJobStatus | None` + - `ScoreJobManager.list_jobs() -> list[AsyncScoreJobStatus]` + +- [ ] **Step 1: Add models to `webapp/models.py`** + +Append after `AsyncScoreJobResponse` (at the end of the file, after `ScoreResponse`): + +```python +# --------------------------------------------------------------------------- +# 异步评分记录模型 +# --------------------------------------------------------------------------- + +class AsyncScoreJobResponse(BaseModel): + """Immediate response after submitting an async score job.""" + + job_id: str = Field(description="任务唯一标识符,用于后续查询结果。") + status: str = Field(default="queued", description="初始状态:queued。") + + +class AsyncScoreJobStatus(BaseModel): + """Full state of one async score job, persisted to disk.""" + + job_id: str = Field(description="任务唯一标识符。") + status: str = Field(description="queued | running | completed | failed") + created_at: str = Field(default="", description="创建时间(ISO 8601 UTC)。") + finished_at: str = Field(default="", description="完成时间(ISO 8601 UTC)。") + request_summary: dict = Field( + default_factory=dict, + description="请求参数快照(question 前80字、metrics、judge_model 等)。", + ) + scores: dict[str, float | None] = Field(default_factory=dict, description="各指标得分。") + weighted_score: float | None = Field(default=None, description="加权综合得分。") + latency_ms: int = Field(default=0, description="评分耗时毫秒。") + skipped_metrics: list[str] = Field(default_factory=list) + error: str | None = Field(default=None) +``` + +- [ ] **Step 2: Write failing tests** + +Create `tests/webapp/test_score_jobs_api.py`: + +```python +"""Tests for async score jobs API.""" +from __future__ import annotations +import json +import time +import pytest +from unittest.mock import MagicMock, patch +from fastapi.testclient import TestClient + + +@pytest.fixture() +def client(tmp_path, monkeypatch): + import webapp.services.score_job_manager as mgr_mod + from webapp.services.score_job_manager import ScoreJobManager + fresh_mgr = ScoreJobManager(jobs_dir=tmp_path / "score-jobs") + monkeypatch.setattr(mgr_mod, "score_job_manager", fresh_mgr) + import webapp.api.score_jobs as api_mod + monkeypatch.setattr(api_mod, "score_job_manager", fresh_mgr) + from webapp.server import create_app + return TestClient(create_app()) + + +class TestScoreJobManager: + def test_submit_returns_job_status_with_queued(self, tmp_path): + from webapp.services.score_job_manager import ScoreJobManager + from webapp.models import ScoreRequest + mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs") + req = ScoreRequest(question="q", answer="a", metrics=["answer_relevancy"]) + with patch.object(mgr, "_execute") as mock_exec: + mock_exec.return_value = None + status = mgr.submit(req) + assert status.status in ("queued", "running", "completed") + assert len(status.job_id) > 0 + + def test_get_returns_none_for_unknown_id(self, tmp_path): + from webapp.services.score_job_manager import ScoreJobManager + mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs") + assert mgr.get("nonexistent") is None + + def test_list_returns_empty_initially(self, tmp_path): + from webapp.services.score_job_manager import ScoreJobManager + mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs") + assert mgr.list_jobs() == [] + + def test_completed_job_persisted_to_disk(self, tmp_path): + from webapp.services.score_job_manager import ScoreJobManager + from webapp.models import ScoreRequest + mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs", max_workers=1) + req = ScoreRequest(question="q?", answer="a.", metrics=["answer_relevancy"]) + mock_scorer = MagicMock() + mock_scorer.score.return_value = {"answer_relevancy": 0.85} + with patch("webapp.services.score_job_manager.inline_scorer", mock_scorer): + with patch("webapp.services.score_job_manager.EvaluationSettings"): + status = mgr.submit(req) + for _ in range(20): + s = mgr.get(status.job_id) + if s and s.status in ("completed", "failed"): + break + time.sleep(0.2) + s = mgr.get(status.job_id) + assert s is not None + json_path = tmp_path / "jobs" / f"{status.job_id}.json" + assert json_path.exists() + data = json.loads(json_path.read_text(encoding="utf-8")) + assert data["job_id"] == status.job_id +``` + +- [ ] **Step 3: Run to verify FAIL** + +``` +cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas +python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobManager -v +``` +Expected: `ModuleNotFoundError: No module named 'webapp.services.score_job_manager'` + +- [ ] **Step 4: Create `webapp/services/score_job_manager.py`** + +```python +"""Background task manager for async RAGAS single-sample scoring. + +Each job runs InlineScorer.score() in a thread pool and persists the +result as a JSON file under outputs/score-jobs/.json so results +survive server restarts and can be listed by the frontend. +""" + +from __future__ import annotations + +import json +import math +import threading +import uuid +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from rag_eval.metrics.weights import compute_weighted_score +from rag_eval.settings import EvaluationSettings +from webapp.models import AsyncScoreJobStatus, ScoreRequest +from webapp.services.inline_scorer import inline_scorer + +_REPO_ROOT = Path(__file__).resolve().parents[2] +_DEFAULT_JOBS_DIR = _REPO_ROOT / "outputs" / "score-jobs" + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +class ScoreJobManager: + """Thread-pool manager for async RAGAS scoring jobs with JSON persistence.""" + + def __init__( + self, + jobs_dir: Path = _DEFAULT_JOBS_DIR, + max_workers: int = 4, + ) -> None: + self._jobs_dir = Path(jobs_dir) + self._jobs_dir.mkdir(parents=True, exist_ok=True) + self._executor = ThreadPoolExecutor(max_workers=max_workers) + # In-memory index: job_id -> AsyncScoreJobStatus (authoritative while running) + self._cache: dict[str, AsyncScoreJobStatus] = {} + self._lock = threading.Lock() + self._load_existing() + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def submit(self, request: ScoreRequest) -> AsyncScoreJobStatus: + """Queue one scoring job and return its initial status immediately.""" + job_id = uuid.uuid4().hex[:12] + status = AsyncScoreJobStatus( + job_id=job_id, + status="queued", + created_at=_now_iso(), + request_summary={ + "question": request.question[:80], + "answer": (request.answer or "")[:80], + "metrics": list(request.metrics), + "judge_model": request.judge_model or "", + "embedding_model": request.embedding_model or "", + "has_contexts": bool(request.contexts), + "has_ground_truth": bool(request.ground_truth), + }, + ) + with self._lock: + self._cache[job_id] = status + self._persist(status) + self._executor.submit(self._run, job_id, request) + return status + + def get(self, job_id: str) -> AsyncScoreJobStatus | None: + """Return the current status for one job, or None if unknown.""" + with self._lock: + return self._cache.get(job_id) + + def list_jobs(self) -> list[AsyncScoreJobStatus]: + """Return all known jobs sorted newest first.""" + with self._lock: + jobs = list(self._cache.values()) + jobs.sort(key=lambda j: j.created_at, reverse=True) + return jobs + + # ------------------------------------------------------------------ # + # Internal + # ------------------------------------------------------------------ # + + def _run(self, job_id: str, request: ScoreRequest) -> None: + """Execute scoring in the thread pool and persist the result.""" + self._update(job_id, status="running") + settings = EvaluationSettings() + judge_model = request.judge_model or settings.ragas_judge_model + embedding_model = request.embedding_model or settings.ragas_embedding_model + effective = request.effective_metrics() + requested = set(request.metrics) + skipped = sorted(requested - set(effective)) + + import time as _time + t0 = _time.monotonic() + try: + if not effective: + scores: dict[str, float | None] = {m: None for m in request.metrics} + weighted = None + else: + raw = inline_scorer.score( + question=request.question, + answer=request.answer, + contexts=request.contexts_as_list(), + ground_truth=request.ground_truth, + metrics=effective, + judge_model=judge_model, + embedding_model=embedding_model, + settings=settings, + ) + scores = {m: None for m in request.metrics} + scores.update(raw) + weighted_raw = compute_weighted_score( + {k: v for k, v in raw.items() if v is not None}, {} + ) + weighted = round(weighted_raw, 4) if weighted_raw is not None else None + + latency_ms = int((_time.monotonic() - t0) * 1000) + self._update( + job_id, + status="completed", + finished_at=_now_iso(), + scores=scores, + weighted_score=weighted, + latency_ms=latency_ms, + skipped_metrics=skipped, + ) + except Exception as exc: # noqa: BLE001 + latency_ms = int((_time.monotonic() - t0) * 1000) + self._update( + job_id, + status="failed", + finished_at=_now_iso(), + latency_ms=latency_ms, + error=f"{type(exc).__name__}: {exc}", + ) + + def _update(self, job_id: str, **kwargs: Any) -> None: + """Merge kwargs into the job status and persist.""" + with self._lock: + existing = self._cache.get(job_id) + if existing is None: + return + updated = existing.model_copy(update=kwargs) + self._cache[job_id] = updated + self._persist(updated) + + def _persist(self, status: AsyncScoreJobStatus) -> None: + """Write one job's status to its JSON file.""" + path = self._jobs_dir / f"{status.job_id}.json" + path.write_text( + json.dumps(status.model_dump(), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + def _load_existing(self) -> None: + """Load completed jobs from disk into memory on startup.""" + for path in sorted(self._jobs_dir.glob("*.json")): + try: + data = json.loads(path.read_text(encoding="utf-8")) + status = AsyncScoreJobStatus.model_validate(data) + self._cache[status.job_id] = status + except Exception: # noqa: BLE001 + pass # Corrupt file — skip + + +# Module-level singleton shared by FastAPI routes. +score_job_manager = ScoreJobManager() +``` + +- [ ] **Step 5: Run to verify tests PASS** + +``` +python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobManager -v +``` +Expected: 4 tests PASS + +- [ ] **Step 6: Commit** + +``` +git add webapp/models.py webapp/services/score_job_manager.py tests/webapp/test_score_jobs_api.py +git commit -m "feat: add AsyncScoreJobStatus model and ScoreJobManager with JSON persistence" +``` + +--- + +## Task 2: API 端点 + +**Files:** +- Create: `webapp/api/score_jobs.py` +- Modify: `webapp/server.py` +- Modify: `tests/webapp/test_score_jobs_api.py` + +**Interfaces:** +- Consumes: `score_job_manager: ScoreJobManager`, `AsyncScoreJobResponse`, `AsyncScoreJobStatus`, `ScoreRequest` +- Produces: `POST /api/score/async`, `GET /api/score/jobs`, `GET /api/score/jobs/{job_id}` + +- [ ] **Step 1: Add API tests to `tests/webapp/test_score_jobs_api.py`** + +Append this class: + +```python +class TestScoreJobsEndpoint: + def test_submit_async_returns_202(self, client): + with patch("webapp.services.score_job_manager.ScoreJobManager._execute"): + resp = client.post("/api/score/async", json={ + "question": "q?", "answer": "a.", + "metrics": ["answer_relevancy"], + }) + assert resp.status_code == 202 + data = resp.json() + assert "job_id" in data + assert data["status"] == "queued" + + def test_get_unknown_job_returns_404(self, client): + resp = client.get("/api/score/jobs/nonexistent") + assert resp.status_code == 404 + + def test_list_jobs_returns_empty_initially(self, client): + resp = client.get("/api/score/jobs") + assert resp.status_code == 200 + assert resp.json()["jobs"] == [] + + def test_submitted_job_appears_in_list(self, client): + with patch("webapp.services.score_job_manager.ScoreJobManager._run"): + resp = client.post("/api/score/async", json={ + "question": "q?", "answer": "a.", + "metrics": ["answer_relevancy"], + }) + job_id = resp.json()["job_id"] + list_resp = client.get("/api/score/jobs") + ids = [j["job_id"] for j in list_resp.json()["jobs"]] + assert job_id in ids + + def test_get_job_by_id(self, client): + with patch("webapp.services.score_job_manager.ScoreJobManager._run"): + resp = client.post("/api/score/async", json={ + "question": "q?", "answer": "a.", + "metrics": ["answer_relevancy"], + }) + job_id = resp.json()["job_id"] + get_resp = client.get(f"/api/score/jobs/{job_id}") + assert get_resp.status_code == 200 + assert get_resp.json()["job_id"] == job_id +``` + +- [ ] **Step 2: Run to verify FAIL** + +``` +python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobsEndpoint -v +``` +Expected: FAIL — `ModuleNotFoundError: No module named 'webapp.api.score_jobs'` + +- [ ] **Step 3: Create `webapp/api/score_jobs.py`** + +```python +"""Routes for async RAGAS scoring jobs (Dify fire-and-forget integration).""" + +from __future__ import annotations + +import logging + +from fastapi import APIRouter, HTTPException + +from webapp.models import AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest +from webapp.services.score_job_manager import score_job_manager + +router = APIRouter(prefix="/api/score", tags=["score"]) +logger = logging.getLogger("webapp.api.score_jobs") + + +@router.post( + "/async", + status_code=202, + response_model=AsyncScoreJobResponse, + summary="提交异步评分任务(Dify 推荐方式)", + responses={ + 202: { + "description": "任务已排队,立即返回 job_id。通过 GET /api/score/jobs/{job_id} 查询结果。", + "content": { + "application/json": { + "example": {"job_id": "abc123def456", "status": "queued"} + } + }, + }, + }, +) +def submit_async_score(request: ScoreRequest) -> AsyncScoreJobResponse: + """提交异步 RAGAS 评分任务,立即返回 job_id(202 Accepted)。 + + 评分在后台线程中执行,结果持久化至 `outputs/score-jobs/.json`。 + 在 RAGAS 平台「评分记录」页面可查看所有历史评分记录。 + + **Dify 工作流推荐使用此接口**:不等待评分完成,工作流立即继续, + 避免 HTTP 节点超时。评分结果通过平台界面查看。 + """ + logger.info( + "[score_async] submit metrics=%s has_ctx=%s has_gt=%s", + request.metrics, bool(request.contexts), bool(request.ground_truth), + ) + status = score_job_manager.submit(request) + logger.info("[score_async] queued job_id=%s", status.job_id) + return AsyncScoreJobResponse(job_id=status.job_id, status=status.status) + + +@router.get( + "/jobs", + response_model=dict, + summary="列出所有评分记录", +) +def list_score_jobs() -> dict: + """返回所有异步评分记录,按创建时间倒序排列。""" + jobs = score_job_manager.list_jobs() + logger.info("[score_jobs] list count=%d", len(jobs)) + return {"jobs": [j.model_dump() for j in jobs]} + + +@router.get( + "/jobs/{job_id}", + response_model=AsyncScoreJobStatus, + summary="查询评分记录详情", + responses={404: {"description": "指定 job_id 的评分记录不存在。"}}, +) +def get_score_job(job_id: str) -> AsyncScoreJobStatus: + """返回一个异步评分任务的当前状态和结果。""" + status = score_job_manager.get(job_id) + if status is None: + raise HTTPException(status_code=404, detail=f"Score job not found: {job_id}") + return status +``` + +- [ ] **Step 4: Register router in `webapp/server.py`** + +Add import: +```python +from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs +``` + +Add after `app.include_router(score.router)`: +```python + app.include_router(score_jobs.router) +``` + +Add entry to `OPENAPI_TAGS` before `"meta"`: +```python + { + "name": "score", + "description": ( + "**实时评分 API(同步)** — `POST /api/score`\n\n" + "**异步评分 API(Dify 推荐)** — `POST /api/score/async`\n\n" + "异步方式立即返回 job_id(202),评分在后台执行,结果在「评分记录」页查看。\n\n" + "**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 " + "`Authorization: Bearer ` 请求头。" + ), + }, +``` + +> Note: this replaces the existing `"score"` entry in `OPENAPI_TAGS`. + +- [ ] **Step 5: Verify no route conflict** + +``` +python -c " +from webapp.server import create_app +app = create_app() +score_routes = [(r.path, list(getattr(r,'methods',[]))) for r in app.routes if 'score' in r.path] +print(score_routes) +" +``` +Expected: shows `/api/score`, `/api/score/async`, `/api/score/jobs`, `/api/score/jobs/{job_id}` + +- [ ] **Step 6: Run API tests** + +``` +python -m pytest tests/webapp/test_score_jobs_api.py -v --tb=short +``` +Expected: all 9 tests PASS + +- [ ] **Step 7: Commit** + +``` +git add webapp/api/score_jobs.py webapp/server.py tests/webapp/test_score_jobs_api.py +git commit -m "feat: add POST /api/score/async and GET /api/score/jobs endpoints" +``` + +--- + +## Task 3: 前端「评分记录」页 + +**Files:** +- Modify: `webapp/static/index.html` +- Modify: `webapp/static/js/api.js` +- Modify: `webapp/static/js/app.js` +- Create: `webapp/static/js/score_jobs.js` + +**Interfaces:** +- Consumes: `GET /api/score/jobs`, `GET /api/score/jobs/{job_id}` +- Produces: `#view-scorejobs` section, `ScoreJobs` JS object + +- [ ] **Step 1: Add API methods to `webapp/static/js/api.js`** + +Add before the closing `};`: +```javascript + // 异步评分记录 API + scoreJobsAsync(body) { return API.post("/api/score/async", body); }, + getScoreJob(jobId) { return API.get(`/api/score/jobs/${encodeURIComponent(jobId)}`); }, + listScoreJobs() { return API.get("/api/score/jobs"); }, +``` + +- [ ] **Step 2: Add nav item and section to `webapp/static/index.html`** + +In the `