Files
siemens_ragas/docs/superpowers/plans/2026-06-24-async-score-jobs.md

29 KiB
Raw Permalink Blame History

异步评分记录Async Score JobsImplementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 新增 POST /api/score/async 异步端点,结果持久化至 outputs/score-jobs/,并在前端新增「评分记录」页面展示。

Architecture: 新建 ScoreJobManager(复用 pipeline_task_manager 线程池模式)在后台执行 InlineScorer.score(),写入 JSON 文件;新增三个 REST 端点;前端新增导航页加载并轮询记录。

Tech Stack: Python 3.12, FastAPI, Pydantic v2, threading, Vanilla JS, pytest

Global Constraints

  • Python 3.12+PEP 84 空格缩进,类型注解必须
  • 存储路径:outputs/score-jobs/<job_id>.json
  • 复用现有 ScoreRequest(含 effective_metrics()contexts_as_list() 方法)
  • 复用现有 InlineScorer.score()compute_weighted_score()
  • 所有测试用 pytest不依赖真实 LLM

文件清单

操作 文件 职责
新建 webapp/services/score_job_manager.py ScoreJobManager线程池 + JSON 持久化
新建 webapp/api/score_jobs.py 3 个端点路由
新建 webapp/static/js/score_jobs.js 前端列表 + 轮询逻辑
新建 tests/webapp/test_score_jobs_api.py API 集成测试
修改 webapp/models.py 新增 AsyncScoreJobStatusAsyncScoreJobResponse
修改 webapp/server.py 注册 score_jobs router更新 OPENAPI_TAGS 和 description
修改 webapp/static/index.html 新增导航项 + #view-scorejobs section
修改 webapp/static/js/api.js 新增 scoreJobsAsync()getScoreJob()listScoreJobs()
修改 webapp/static/js/app.js 注册 scorejobs 视图、加载调用

Task 1: Pydantic 模型 + ScoreJobManager

Files:

  • Modify: webapp/models.py
  • Create: webapp/services/score_job_manager.py
  • Create: tests/webapp/test_score_jobs_api.py (partial)

Interfaces:

  • Produces:

    • AsyncScoreJobStatus Pydantic model
    • AsyncScoreJobResponse Pydantic model
    • score_job_manager: ScoreJobManager singleton
    • ScoreJobManager.submit(request: ScoreRequest) -> AsyncScoreJobStatus
    • ScoreJobManager.get(job_id: str) -> AsyncScoreJobStatus | None
    • ScoreJobManager.list_jobs() -> list[AsyncScoreJobStatus]
  • Step 1: Add models to webapp/models.py

Append after AsyncScoreJobResponse (at the end of the file, after ScoreResponse):

# ---------------------------------------------------------------------------
# 异步评分记录模型
# ---------------------------------------------------------------------------

class AsyncScoreJobResponse(BaseModel):
    """Immediate response after submitting an async score job."""

    job_id: str = Field(description="任务唯一标识符,用于后续查询结果。")
    status: str = Field(default="queued", description="初始状态queued。")


class AsyncScoreJobStatus(BaseModel):
    """Full state of one async score job, persisted to disk."""

    job_id: str = Field(description="任务唯一标识符。")
    status: str = Field(description="queued | running | completed | failed")
    created_at: str = Field(default="", description="创建时间ISO 8601 UTC。")
    finished_at: str = Field(default="", description="完成时间ISO 8601 UTC。")
    request_summary: dict = Field(
        default_factory=dict,
        description="请求参数快照question 前80字、metrics、judge_model 等)。",
    )
    scores: dict[str, float | None] = Field(default_factory=dict, description="各指标得分。")
    weighted_score: float | None = Field(default=None, description="加权综合得分。")
    latency_ms: int = Field(default=0, description="评分耗时毫秒。")
    skipped_metrics: list[str] = Field(default_factory=list)
    error: str | None = Field(default=None)
  • Step 2: Write failing tests

Create tests/webapp/test_score_jobs_api.py:

"""Tests for async score jobs API."""
from __future__ import annotations
import json
import time
import pytest
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient


@pytest.fixture()
def client(tmp_path, monkeypatch):
    import webapp.services.score_job_manager as mgr_mod
    from webapp.services.score_job_manager import ScoreJobManager
    fresh_mgr = ScoreJobManager(jobs_dir=tmp_path / "score-jobs")
    monkeypatch.setattr(mgr_mod, "score_job_manager", fresh_mgr)
    import webapp.api.score_jobs as api_mod
    monkeypatch.setattr(api_mod, "score_job_manager", fresh_mgr)
    from webapp.server import create_app
    return TestClient(create_app())


class TestScoreJobManager:
    def test_submit_returns_job_status_with_queued(self, tmp_path):
        from webapp.services.score_job_manager import ScoreJobManager
        from webapp.models import ScoreRequest
        mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
        req = ScoreRequest(question="q", answer="a", metrics=["answer_relevancy"])
        with patch.object(mgr, "_execute") as mock_exec:
            mock_exec.return_value = None
            status = mgr.submit(req)
        assert status.status in ("queued", "running", "completed")
        assert len(status.job_id) > 0

    def test_get_returns_none_for_unknown_id(self, tmp_path):
        from webapp.services.score_job_manager import ScoreJobManager
        mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
        assert mgr.get("nonexistent") is None

    def test_list_returns_empty_initially(self, tmp_path):
        from webapp.services.score_job_manager import ScoreJobManager
        mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
        assert mgr.list_jobs() == []

    def test_completed_job_persisted_to_disk(self, tmp_path):
        from webapp.services.score_job_manager import ScoreJobManager
        from webapp.models import ScoreRequest
        mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs", max_workers=1)
        req = ScoreRequest(question="q?", answer="a.", metrics=["answer_relevancy"])
        mock_scorer = MagicMock()
        mock_scorer.score.return_value = {"answer_relevancy": 0.85}
        with patch("webapp.services.score_job_manager.inline_scorer", mock_scorer):
            with patch("webapp.services.score_job_manager.EvaluationSettings"):
                status = mgr.submit(req)
        for _ in range(20):
            s = mgr.get(status.job_id)
            if s and s.status in ("completed", "failed"):
                break
            time.sleep(0.2)
        s = mgr.get(status.job_id)
        assert s is not None
        json_path = tmp_path / "jobs" / f"{status.job_id}.json"
        assert json_path.exists()
        data = json.loads(json_path.read_text(encoding="utf-8"))
        assert data["job_id"] == status.job_id
  • Step 3: Run to verify FAIL
cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobManager -v

Expected: ModuleNotFoundError: No module named 'webapp.services.score_job_manager'

  • Step 4: Create webapp/services/score_job_manager.py
"""Background task manager for async RAGAS single-sample scoring.

Each job runs InlineScorer.score() in a thread pool and persists the
result as a JSON file under outputs/score-jobs/<job_id>.json so results
survive server restarts and can be listed by the frontend.
"""

from __future__ import annotations

import json
import math
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from rag_eval.metrics.weights import compute_weighted_score
from rag_eval.settings import EvaluationSettings
from webapp.models import AsyncScoreJobStatus, ScoreRequest
from webapp.services.inline_scorer import inline_scorer

_REPO_ROOT = Path(__file__).resolve().parents[2]
_DEFAULT_JOBS_DIR = _REPO_ROOT / "outputs" / "score-jobs"


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


class ScoreJobManager:
    """Thread-pool manager for async RAGAS scoring jobs with JSON persistence."""

    def __init__(
        self,
        jobs_dir: Path = _DEFAULT_JOBS_DIR,
        max_workers: int = 4,
    ) -> None:
        self._jobs_dir = Path(jobs_dir)
        self._jobs_dir.mkdir(parents=True, exist_ok=True)
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        # In-memory index: job_id -> AsyncScoreJobStatus (authoritative while running)
        self._cache: dict[str, AsyncScoreJobStatus] = {}
        self._lock = threading.Lock()
        self._load_existing()

    # ------------------------------------------------------------------ #
    # Public API
    # ------------------------------------------------------------------ #

    def submit(self, request: ScoreRequest) -> AsyncScoreJobStatus:
        """Queue one scoring job and return its initial status immediately."""
        job_id = uuid.uuid4().hex[:12]
        status = AsyncScoreJobStatus(
            job_id=job_id,
            status="queued",
            created_at=_now_iso(),
            request_summary={
                "question": request.question[:80],
                "answer": (request.answer or "")[:80],
                "metrics": list(request.metrics),
                "judge_model": request.judge_model or "",
                "embedding_model": request.embedding_model or "",
                "has_contexts": bool(request.contexts),
                "has_ground_truth": bool(request.ground_truth),
            },
        )
        with self._lock:
            self._cache[job_id] = status
        self._persist(status)
        self._executor.submit(self._run, job_id, request)
        return status

    def get(self, job_id: str) -> AsyncScoreJobStatus | None:
        """Return the current status for one job, or None if unknown."""
        with self._lock:
            return self._cache.get(job_id)

    def list_jobs(self) -> list[AsyncScoreJobStatus]:
        """Return all known jobs sorted newest first."""
        with self._lock:
            jobs = list(self._cache.values())
        jobs.sort(key=lambda j: j.created_at, reverse=True)
        return jobs

    # ------------------------------------------------------------------ #
    # Internal
    # ------------------------------------------------------------------ #

    def _run(self, job_id: str, request: ScoreRequest) -> None:
        """Execute scoring in the thread pool and persist the result."""
        self._update(job_id, status="running")
        settings = EvaluationSettings()
        judge_model = request.judge_model or settings.ragas_judge_model
        embedding_model = request.embedding_model or settings.ragas_embedding_model
        effective = request.effective_metrics()
        requested = set(request.metrics)
        skipped = sorted(requested - set(effective))

        import time as _time
        t0 = _time.monotonic()
        try:
            if not effective:
                scores: dict[str, float | None] = {m: None for m in request.metrics}
                weighted = None
            else:
                raw = inline_scorer.score(
                    question=request.question,
                    answer=request.answer,
                    contexts=request.contexts_as_list(),
                    ground_truth=request.ground_truth,
                    metrics=effective,
                    judge_model=judge_model,
                    embedding_model=embedding_model,
                    settings=settings,
                )
                scores = {m: None for m in request.metrics}
                scores.update(raw)
                weighted_raw = compute_weighted_score(
                    {k: v for k, v in raw.items() if v is not None}, {}
                )
                weighted = round(weighted_raw, 4) if weighted_raw is not None else None

            latency_ms = int((_time.monotonic() - t0) * 1000)
            self._update(
                job_id,
                status="completed",
                finished_at=_now_iso(),
                scores=scores,
                weighted_score=weighted,
                latency_ms=latency_ms,
                skipped_metrics=skipped,
            )
        except Exception as exc:  # noqa: BLE001
            latency_ms = int((_time.monotonic() - t0) * 1000)
            self._update(
                job_id,
                status="failed",
                finished_at=_now_iso(),
                latency_ms=latency_ms,
                error=f"{type(exc).__name__}: {exc}",
            )

    def _update(self, job_id: str, **kwargs: Any) -> None:
        """Merge kwargs into the job status and persist."""
        with self._lock:
            existing = self._cache.get(job_id)
            if existing is None:
                return
            updated = existing.model_copy(update=kwargs)
            self._cache[job_id] = updated
        self._persist(updated)

    def _persist(self, status: AsyncScoreJobStatus) -> None:
        """Write one job's status to its JSON file."""
        path = self._jobs_dir / f"{status.job_id}.json"
        path.write_text(
            json.dumps(status.model_dump(), ensure_ascii=False, indent=2),
            encoding="utf-8",
        )

    def _load_existing(self) -> None:
        """Load completed jobs from disk into memory on startup."""
        for path in sorted(self._jobs_dir.glob("*.json")):
            try:
                data = json.loads(path.read_text(encoding="utf-8"))
                status = AsyncScoreJobStatus.model_validate(data)
                self._cache[status.job_id] = status
            except Exception:  # noqa: BLE001
                pass  # Corrupt file — skip


# Module-level singleton shared by FastAPI routes.
score_job_manager = ScoreJobManager()
  • Step 5: Run to verify tests PASS
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobManager -v

Expected: 4 tests PASS

  • Step 6: Commit
git add webapp/models.py webapp/services/score_job_manager.py tests/webapp/test_score_jobs_api.py
git commit -m "feat: add AsyncScoreJobStatus model and ScoreJobManager with JSON persistence"

Task 2: API 端点

Files:

  • Create: webapp/api/score_jobs.py
  • Modify: webapp/server.py
  • Modify: tests/webapp/test_score_jobs_api.py

Interfaces:

  • Consumes: score_job_manager: ScoreJobManager, AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest

  • Produces: POST /api/score/async, GET /api/score/jobs, GET /api/score/jobs/{job_id}

  • Step 1: Add API tests to tests/webapp/test_score_jobs_api.py

Append this class:

class TestScoreJobsEndpoint:
    def test_submit_async_returns_202(self, client):
        with patch("webapp.services.score_job_manager.ScoreJobManager._execute"):
            resp = client.post("/api/score/async", json={
                "question": "q?", "answer": "a.",
                "metrics": ["answer_relevancy"],
            })
        assert resp.status_code == 202
        data = resp.json()
        assert "job_id" in data
        assert data["status"] == "queued"

    def test_get_unknown_job_returns_404(self, client):
        resp = client.get("/api/score/jobs/nonexistent")
        assert resp.status_code == 404

    def test_list_jobs_returns_empty_initially(self, client):
        resp = client.get("/api/score/jobs")
        assert resp.status_code == 200
        assert resp.json()["jobs"] == []

    def test_submitted_job_appears_in_list(self, client):
        with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
            resp = client.post("/api/score/async", json={
                "question": "q?", "answer": "a.",
                "metrics": ["answer_relevancy"],
            })
        job_id = resp.json()["job_id"]
        list_resp = client.get("/api/score/jobs")
        ids = [j["job_id"] for j in list_resp.json()["jobs"]]
        assert job_id in ids

    def test_get_job_by_id(self, client):
        with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
            resp = client.post("/api/score/async", json={
                "question": "q?", "answer": "a.",
                "metrics": ["answer_relevancy"],
            })
        job_id = resp.json()["job_id"]
        get_resp = client.get(f"/api/score/jobs/{job_id}")
        assert get_resp.status_code == 200
        assert get_resp.json()["job_id"] == job_id
  • Step 2: Run to verify FAIL
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobsEndpoint -v

Expected: FAIL — ModuleNotFoundError: No module named 'webapp.api.score_jobs'

  • Step 3: Create webapp/api/score_jobs.py
"""Routes for async RAGAS scoring jobs (Dify fire-and-forget integration)."""

from __future__ import annotations

import logging

from fastapi import APIRouter, HTTPException

from webapp.models import AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest
from webapp.services.score_job_manager import score_job_manager

router = APIRouter(prefix="/api/score", tags=["score"])
logger = logging.getLogger("webapp.api.score_jobs")


@router.post(
    "/async",
    status_code=202,
    response_model=AsyncScoreJobResponse,
    summary="提交异步评分任务Dify 推荐方式)",
    responses={
        202: {
            "description": "任务已排队,立即返回 job_id。通过 GET /api/score/jobs/{job_id} 查询结果。",
            "content": {
                "application/json": {
                    "example": {"job_id": "abc123def456", "status": "queued"}
                }
            },
        },
    },
)
def submit_async_score(request: ScoreRequest) -> AsyncScoreJobResponse:
    """提交异步 RAGAS 评分任务,立即返回 job_id202 Accepted
    评分在后台线程中执行,结果持久化至 `outputs/score-jobs/<job_id>.json`。
    在 RAGAS 平台「评分记录」页面可查看所有历史评分记录。

    **Dify 工作流推荐使用此接口**:不等待评分完成,工作流立即继续,
    避免 HTTP 节点超时。评分结果通过平台界面查看。
    """
    logger.info(
        "[score_async] submit  metrics=%s  has_ctx=%s  has_gt=%s",
        request.metrics, bool(request.contexts), bool(request.ground_truth),
    )
    status = score_job_manager.submit(request)
    logger.info("[score_async] queued  job_id=%s", status.job_id)
    return AsyncScoreJobResponse(job_id=status.job_id, status=status.status)


@router.get(
    "/jobs",
    response_model=dict,
    summary="列出所有评分记录",
)
def list_score_jobs() -> dict:
    """返回所有异步评分记录,按创建时间倒序排列。"""
    jobs = score_job_manager.list_jobs()
    logger.info("[score_jobs] list  count=%d", len(jobs))
    return {"jobs": [j.model_dump() for j in jobs]}


@router.get(
    "/jobs/{job_id}",
    response_model=AsyncScoreJobStatus,
    summary="查询评分记录详情",
    responses={404: {"description": "指定 job_id 的评分记录不存在。"}},
)
def get_score_job(job_id: str) -> AsyncScoreJobStatus:
    """返回一个异步评分任务的当前状态和结果。"""
    status = score_job_manager.get(job_id)
    if status is None:
        raise HTTPException(status_code=404, detail=f"Score job not found: {job_id}")
    return status
  • Step 4: Register router in webapp/server.py

Add import:

from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs

Add after app.include_router(score.router):

    app.include_router(score_jobs.router)

Add entry to OPENAPI_TAGS before "meta":

    {
        "name": "score",
        "description": (
            "**实时评分 API同步** — `POST /api/score`\n\n"
            "**异步评分 APIDify 推荐)** — `POST /api/score/async`\n\n"
            "异步方式立即返回 job_id202评分在后台执行结果在「评分记录」页查看。\n\n"
            "**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
            "`Authorization: Bearer <token>` 请求头。"
        ),
    },

Note: this replaces the existing "score" entry in OPENAPI_TAGS.

  • Step 5: Verify no route conflict
python -c "
from webapp.server import create_app
app = create_app()
score_routes = [(r.path, list(getattr(r,'methods',[]))) for r in app.routes if 'score' in r.path]
print(score_routes)
"

Expected: shows /api/score, /api/score/async, /api/score/jobs, /api/score/jobs/{job_id}

  • Step 6: Run API tests
python -m pytest tests/webapp/test_score_jobs_api.py -v --tb=short

Expected: all 9 tests PASS

  • Step 7: Commit
git add webapp/api/score_jobs.py webapp/server.py tests/webapp/test_score_jobs_api.py
git commit -m "feat: add POST /api/score/async and GET /api/score/jobs endpoints"

Task 3: 前端「评分记录」页

Files:

  • Modify: webapp/static/index.html
  • Modify: webapp/static/js/api.js
  • Modify: webapp/static/js/app.js
  • Create: webapp/static/js/score_jobs.js

Interfaces:

  • Consumes: GET /api/score/jobs, GET /api/score/jobs/{job_id}

  • Produces: #view-scorejobs section, ScoreJobs JS object

  • Step 1: Add API methods to webapp/static/js/api.js

Add before the closing };:

  // 异步评分记录 API
  scoreJobsAsync(body) { return API.post("/api/score/async", body); },
  getScoreJob(jobId) { return API.get(`/api/score/jobs/${encodeURIComponent(jobId)}`); },
  listScoreJobs() { return API.get("/api/score/jobs"); },
  • Step 2: Add nav item and section to webapp/static/index.html

In the <nav class="nav"> block, add after the profiles nav-item and before the apidocs nav-item:

        <button class="nav-item" data-view="scorejobs">
          <span class="nav-ico">📋</span><span>评分记录</span>
        </button>

Add a new section before the <!-- API 文档视图 --> comment:

      <!-- 评分记录视图 -->
      <section class="view" id="view-scorejobs" hidden>
        <div class="panel">
          <div class="panel-head">
            <h2>评分记录</h2>
            <span class="muted" style="font-size:13px">来自 Dify 异步评分任务POST /api/score/async</span>
          </div>
        </div>
        <div id="scorejobs-container"></div>
        <div class="empty" id="scorejobs-empty" hidden>
          <p>暂无评分记录。</p>
          <p class="muted">在 Dify 工作流中调用 <code>POST /api/score/async</code> 后,记录将在此显示。</p>
        </div>
      </section>
  • Step 3: Create webapp/static/js/score_jobs.js
// score_jobs.js — 评分记录页面逻辑(异步 RAGAS 评分结果列表)

const ScoreJobs = {
  _pollTimers: {},   // job_id -> setInterval handle

  async load() {
    const container = document.getElementById("scorejobs-container");
    const empty = document.getElementById("scorejobs-empty");
    container.innerHTML = '<p class="muted">加载中…</p>';
    try {
      const data = await API.listScoreJobs();
      const jobs = data.jobs || [];
      container.innerHTML = "";
      if (jobs.length === 0) {
        empty.hidden = false;
        return;
      }
      empty.hidden = true;
      jobs.forEach(job => container.appendChild(ScoreJobs.renderRow(job)));
      // Auto-poll any queued/running jobs
      jobs.forEach(job => {
        if (job.status === "queued" || job.status === "running") {
          ScoreJobs._startPoll(job.job_id);
        }
      });
    } catch (err) {
      container.innerHTML = `<p class="muted">加载失败:${App.escape(err.message)}</p>`;
    }
  },

  renderRow(job) {
    const row = document.createElement("div");
    row.className = "panel score-job-row";
    row.id = `score-job-${job.job_id}`;
    row.innerHTML = ScoreJobs._rowHtml(job);
    return row;
  },

  _rowHtml(job) {
    const time = App.shortTime(job.created_at);
    const question = App.escape((job.request_summary?.question || "—").slice(0, 50));
    const metrics = (job.request_summary?.metrics || []).join(", ");
    const statusBadge = `<span class="badge ${job.status}">${job.status}</span>`;

    let scoreHtml = "";
    if (job.status === "completed") {
      scoreHtml = Object.entries(job.scores || {})
        .map(([k, v]) => {
          const cls = App.scoreClass(v);
          const text = v === null || v === undefined ? "n/a" : Number(v).toFixed(3);
          return `<span class="metric-chip" title="${App.escape(k)}">${App.escape(App.shortMetric(k))} <b class="${cls}">${text}</b></span>`;
        })
        .join(" ");
      if (job.weighted_score !== null && job.weighted_score !== undefined) {
        const cls = App.scoreClass(job.weighted_score);
        scoreHtml += ` <span class="metric-chip">综合 <b class="${cls}">${Number(job.weighted_score).toFixed(3)}</b></span>`;
      }
    } else if (job.status === "failed") {
      scoreHtml = `<span class="muted" style="color:var(--bad)">${App.escape(job.error || "未知错误")}</span>`;
    } else {
      scoreHtml = `<span class="muted">评分中…</span>`;
    }

    return `
      <div class="run-card-head">
        <div class="run-card-title">${question}</div>
        <div>${statusBadge}</div>
      </div>
      <div class="run-card-meta">
        <div>指标:${App.escape(metrics)} · ${time} · ${job.latency_ms}ms</div>
      </div>
      <div class="run-card-metrics">${scoreHtml}</div>
    `;
  },

  _startPoll(jobId) {
    if (ScoreJobs._pollTimers[jobId]) return;
    ScoreJobs._pollTimers[jobId] = setInterval(async () => {
      try {
        const job = await API.getScoreJob(jobId);
        const el = document.getElementById(`score-job-${jobId}`);
        if (el) el.innerHTML = ScoreJobs._rowHtml(job);
        if (job.status === "completed" || job.status === "failed") {
          clearInterval(ScoreJobs._pollTimers[jobId]);
          delete ScoreJobs._pollTimers[jobId];
        }
      } catch (_e) {
        clearInterval(ScoreJobs._pollTimers[jobId]);
        delete ScoreJobs._pollTimers[jobId];
      }
    }, 5000);
  },

  stopAllPolls() {
    Object.values(ScoreJobs._pollTimers).forEach(t => clearInterval(t));
    ScoreJobs._pollTimers = {};
  },
};
  • Step 4: Update webapp/static/js/app.js

Add "scorejobs" to the views array and titles object:

  views: ["runs", "new", "report", "profiles", "scorejobs", "apidocs"],
  titles: { runs: "运行列表", new: "新建评估", report: "报告详情", profiles: "LLM 配置", scorejobs: "评分记录", apidocs: "API 文档" },

Add in _doSwitch after if (view === "profiles") Profiles.load();:

    if (view === "scorejobs") ScoreJobs.load();

Add ScoreJobs.stopAllPolls(); when switching away, in _doSwitch before view switching logic:

    // Stop score job pollers when leaving the scorejobs view
    if (App.activeView === "scorejobs" && view !== "scorejobs") ScoreJobs.stopAllPolls();
  • Step 5: Add script tag to webapp/static/index.html

Add before <script src="/static/js/app.js"></script>:

  <script src="/static/js/score_jobs.js"></script>
  • Step 6: Verify server boots
python -c "from webapp.server import create_app; create_app(); print('OK')"

Expected: OK

Also verify HTML has all new elements:

python -c "
c = open('webapp/static/index.html', encoding='utf-8').read()
assert 'view-scorejobs' in c
assert 'scorejobs-container' in c
assert '评分记录' in c
print('HTML OK')
"
  • Step 7: Commit
git add webapp/static/index.html webapp/static/js/api.js webapp/static/js/app.js webapp/static/js/score_jobs.js
git commit -m "feat: add 评分记录 page with async score job list and auto-polling"

Task 4: 全量回归测试 + Dify 说明注释

Files:

  • Modify: webapp/static/js/score_jobs.js (minor: add Dify curl comment at top)

  • Step 1: Run full test suite

python -m pytest tests/ -v --tb=short -q 2>&1 | tail -15

Pre-existing failures to ignore:

  • test_normalize_sample_pdf_offline_smoke_row
  • test_evaluator_and_reporting_write_run_assets
  • test_question_generator_rejects_invalid_json
  • test_question_generator_rejects_non_list_samples

Any other failure is a regression — fix before proceeding.

  • Step 2: Run targeted tests
python -m pytest tests/webapp/test_score_jobs_api.py tests/webapp/test_score_api.py tests/test_pipeline.py -v --tb=short

Expected: all PASS

  • Step 3: Final commit
git add .
git commit -m "feat: async score jobs complete — POST /api/score/async + 评分记录 page

- ScoreJobManager: thread pool + JSON persistence (outputs/score-jobs/)
- POST /api/score/async: 202 immediate response with job_id
- GET /api/score/jobs + GET /api/score/jobs/{id}: query endpoints
- Frontend: 评分记录 nav page with 5s auto-polling for pending jobs
- Dify integration: change /api/score → /api/score/async, remove response parsing

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>"