Compare commits
7 Commits
a781ba1e4a
...
e1751447df
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1751447df | ||
|
|
4fd515d2d9 | ||
|
|
abcd61ec8f | ||
|
|
363e8b0f27 | ||
|
|
b870ed8730 | ||
|
|
791738bb07 | ||
|
|
630b70cc2a |
808
docs/superpowers/plans/2026-06-24-async-score-jobs.md
Normal file
808
docs/superpowers/plans/2026-06-24-async-score-jobs.md
Normal file
@@ -0,0 +1,808 @@
|
|||||||
|
# 异步评分记录(Async Score Jobs)Implementation Plan
|
||||||
|
|
||||||
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||||
|
|
||||||
|
**Goal:** 新增 `POST /api/score/async` 异步端点,结果持久化至 `outputs/score-jobs/`,并在前端新增「评分记录」页面展示。
|
||||||
|
|
||||||
|
**Architecture:** 新建 `ScoreJobManager`(复用 `pipeline_task_manager` 线程池模式)在后台执行 `InlineScorer.score()`,写入 JSON 文件;新增三个 REST 端点;前端新增导航页加载并轮询记录。
|
||||||
|
|
||||||
|
**Tech Stack:** Python 3.12, FastAPI, Pydantic v2, threading, Vanilla JS, pytest
|
||||||
|
|
||||||
|
## Global Constraints
|
||||||
|
|
||||||
|
- Python 3.12+,PEP 8,4 空格缩进,类型注解必须
|
||||||
|
- 存储路径:`outputs/score-jobs/<job_id>.json`
|
||||||
|
- 复用现有 `ScoreRequest`(含 `effective_metrics()` 和 `contexts_as_list()` 方法)
|
||||||
|
- 复用现有 `InlineScorer.score()` 和 `compute_weighted_score()`
|
||||||
|
- 所有测试用 pytest,不依赖真实 LLM
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 文件清单
|
||||||
|
|
||||||
|
| 操作 | 文件 | 职责 |
|
||||||
|
|------|------|------|
|
||||||
|
| 新建 | `webapp/services/score_job_manager.py` | ScoreJobManager:线程池 + JSON 持久化 |
|
||||||
|
| 新建 | `webapp/api/score_jobs.py` | 3 个端点路由 |
|
||||||
|
| 新建 | `webapp/static/js/score_jobs.js` | 前端列表 + 轮询逻辑 |
|
||||||
|
| 新建 | `tests/webapp/test_score_jobs_api.py` | API 集成测试 |
|
||||||
|
| 修改 | `webapp/models.py` | 新增 `AsyncScoreJobStatus`、`AsyncScoreJobResponse` |
|
||||||
|
| 修改 | `webapp/server.py` | 注册 score_jobs router,更新 OPENAPI_TAGS 和 description |
|
||||||
|
| 修改 | `webapp/static/index.html` | 新增导航项 + `#view-scorejobs` section |
|
||||||
|
| 修改 | `webapp/static/js/api.js` | 新增 `scoreJobsAsync()`、`getScoreJob()`、`listScoreJobs()` |
|
||||||
|
| 修改 | `webapp/static/js/app.js` | 注册 `scorejobs` 视图、加载调用 |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 1: Pydantic 模型 + ScoreJobManager
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `webapp/models.py`
|
||||||
|
- Create: `webapp/services/score_job_manager.py`
|
||||||
|
- Create: `tests/webapp/test_score_jobs_api.py` (partial)
|
||||||
|
|
||||||
|
**Interfaces:**
|
||||||
|
- Produces:
|
||||||
|
- `AsyncScoreJobStatus` Pydantic model
|
||||||
|
- `AsyncScoreJobResponse` Pydantic model
|
||||||
|
- `score_job_manager: ScoreJobManager` singleton
|
||||||
|
- `ScoreJobManager.submit(request: ScoreRequest) -> AsyncScoreJobStatus`
|
||||||
|
- `ScoreJobManager.get(job_id: str) -> AsyncScoreJobStatus | None`
|
||||||
|
- `ScoreJobManager.list_jobs() -> list[AsyncScoreJobStatus]`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Add models to `webapp/models.py`**
|
||||||
|
|
||||||
|
Append after `AsyncScoreJobResponse` (at the end of the file, after `ScoreResponse`):
|
||||||
|
|
||||||
|
```python
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 异步评分记录模型
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class AsyncScoreJobResponse(BaseModel):
|
||||||
|
"""Immediate response after submitting an async score job."""
|
||||||
|
|
||||||
|
job_id: str = Field(description="任务唯一标识符,用于后续查询结果。")
|
||||||
|
status: str = Field(default="queued", description="初始状态:queued。")
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncScoreJobStatus(BaseModel):
|
||||||
|
"""Full state of one async score job, persisted to disk."""
|
||||||
|
|
||||||
|
job_id: str = Field(description="任务唯一标识符。")
|
||||||
|
status: str = Field(description="queued | running | completed | failed")
|
||||||
|
created_at: str = Field(default="", description="创建时间(ISO 8601 UTC)。")
|
||||||
|
finished_at: str = Field(default="", description="完成时间(ISO 8601 UTC)。")
|
||||||
|
request_summary: dict = Field(
|
||||||
|
default_factory=dict,
|
||||||
|
description="请求参数快照(question 前80字、metrics、judge_model 等)。",
|
||||||
|
)
|
||||||
|
scores: dict[str, float | None] = Field(default_factory=dict, description="各指标得分。")
|
||||||
|
weighted_score: float | None = Field(default=None, description="加权综合得分。")
|
||||||
|
latency_ms: int = Field(default=0, description="评分耗时毫秒。")
|
||||||
|
skipped_metrics: list[str] = Field(default_factory=list)
|
||||||
|
error: str | None = Field(default=None)
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Write failing tests**
|
||||||
|
|
||||||
|
Create `tests/webapp/test_score_jobs_api.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
"""Tests for async score jobs API."""
|
||||||
|
from __future__ import annotations
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def client(tmp_path, monkeypatch):
|
||||||
|
import webapp.services.score_job_manager as mgr_mod
|
||||||
|
from webapp.services.score_job_manager import ScoreJobManager
|
||||||
|
fresh_mgr = ScoreJobManager(jobs_dir=tmp_path / "score-jobs")
|
||||||
|
monkeypatch.setattr(mgr_mod, "score_job_manager", fresh_mgr)
|
||||||
|
import webapp.api.score_jobs as api_mod
|
||||||
|
monkeypatch.setattr(api_mod, "score_job_manager", fresh_mgr)
|
||||||
|
from webapp.server import create_app
|
||||||
|
return TestClient(create_app())
|
||||||
|
|
||||||
|
|
||||||
|
class TestScoreJobManager:
|
||||||
|
def test_submit_returns_job_status_with_queued(self, tmp_path):
|
||||||
|
from webapp.services.score_job_manager import ScoreJobManager
|
||||||
|
from webapp.models import ScoreRequest
|
||||||
|
mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
|
||||||
|
req = ScoreRequest(question="q", answer="a", metrics=["answer_relevancy"])
|
||||||
|
with patch.object(mgr, "_execute") as mock_exec:
|
||||||
|
mock_exec.return_value = None
|
||||||
|
status = mgr.submit(req)
|
||||||
|
assert status.status in ("queued", "running", "completed")
|
||||||
|
assert len(status.job_id) > 0
|
||||||
|
|
||||||
|
def test_get_returns_none_for_unknown_id(self, tmp_path):
|
||||||
|
from webapp.services.score_job_manager import ScoreJobManager
|
||||||
|
mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
|
||||||
|
assert mgr.get("nonexistent") is None
|
||||||
|
|
||||||
|
def test_list_returns_empty_initially(self, tmp_path):
|
||||||
|
from webapp.services.score_job_manager import ScoreJobManager
|
||||||
|
mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
|
||||||
|
assert mgr.list_jobs() == []
|
||||||
|
|
||||||
|
def test_completed_job_persisted_to_disk(self, tmp_path):
|
||||||
|
from webapp.services.score_job_manager import ScoreJobManager
|
||||||
|
from webapp.models import ScoreRequest
|
||||||
|
mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs", max_workers=1)
|
||||||
|
req = ScoreRequest(question="q?", answer="a.", metrics=["answer_relevancy"])
|
||||||
|
mock_scorer = MagicMock()
|
||||||
|
mock_scorer.score.return_value = {"answer_relevancy": 0.85}
|
||||||
|
with patch("webapp.services.score_job_manager.inline_scorer", mock_scorer):
|
||||||
|
with patch("webapp.services.score_job_manager.EvaluationSettings"):
|
||||||
|
status = mgr.submit(req)
|
||||||
|
for _ in range(20):
|
||||||
|
s = mgr.get(status.job_id)
|
||||||
|
if s and s.status in ("completed", "failed"):
|
||||||
|
break
|
||||||
|
time.sleep(0.2)
|
||||||
|
s = mgr.get(status.job_id)
|
||||||
|
assert s is not None
|
||||||
|
json_path = tmp_path / "jobs" / f"{status.job_id}.json"
|
||||||
|
assert json_path.exists()
|
||||||
|
data = json.loads(json_path.read_text(encoding="utf-8"))
|
||||||
|
assert data["job_id"] == status.job_id
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 3: Run to verify FAIL**
|
||||||
|
|
||||||
|
```
|
||||||
|
cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas
|
||||||
|
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobManager -v
|
||||||
|
```
|
||||||
|
Expected: `ModuleNotFoundError: No module named 'webapp.services.score_job_manager'`
|
||||||
|
|
||||||
|
- [ ] **Step 4: Create `webapp/services/score_job_manager.py`**
|
||||||
|
|
||||||
|
```python
|
||||||
|
"""Background task manager for async RAGAS single-sample scoring.
|
||||||
|
|
||||||
|
Each job runs InlineScorer.score() in a thread pool and persists the
|
||||||
|
result as a JSON file under outputs/score-jobs/<job_id>.json so results
|
||||||
|
survive server restarts and can be listed by the frontend.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import threading
|
||||||
|
import uuid
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from rag_eval.metrics.weights import compute_weighted_score
|
||||||
|
from rag_eval.settings import EvaluationSettings
|
||||||
|
from webapp.models import AsyncScoreJobStatus, ScoreRequest
|
||||||
|
from webapp.services.inline_scorer import inline_scorer
|
||||||
|
|
||||||
|
_REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||||
|
_DEFAULT_JOBS_DIR = _REPO_ROOT / "outputs" / "score-jobs"
|
||||||
|
|
||||||
|
|
||||||
|
def _now_iso() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
class ScoreJobManager:
|
||||||
|
"""Thread-pool manager for async RAGAS scoring jobs with JSON persistence."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
jobs_dir: Path = _DEFAULT_JOBS_DIR,
|
||||||
|
max_workers: int = 4,
|
||||||
|
) -> None:
|
||||||
|
self._jobs_dir = Path(jobs_dir)
|
||||||
|
self._jobs_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._executor = ThreadPoolExecutor(max_workers=max_workers)
|
||||||
|
# In-memory index: job_id -> AsyncScoreJobStatus (authoritative while running)
|
||||||
|
self._cache: dict[str, AsyncScoreJobStatus] = {}
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._load_existing()
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
# Public API
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
|
||||||
|
def submit(self, request: ScoreRequest) -> AsyncScoreJobStatus:
|
||||||
|
"""Queue one scoring job and return its initial status immediately."""
|
||||||
|
job_id = uuid.uuid4().hex[:12]
|
||||||
|
status = AsyncScoreJobStatus(
|
||||||
|
job_id=job_id,
|
||||||
|
status="queued",
|
||||||
|
created_at=_now_iso(),
|
||||||
|
request_summary={
|
||||||
|
"question": request.question[:80],
|
||||||
|
"answer": (request.answer or "")[:80],
|
||||||
|
"metrics": list(request.metrics),
|
||||||
|
"judge_model": request.judge_model or "",
|
||||||
|
"embedding_model": request.embedding_model or "",
|
||||||
|
"has_contexts": bool(request.contexts),
|
||||||
|
"has_ground_truth": bool(request.ground_truth),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
with self._lock:
|
||||||
|
self._cache[job_id] = status
|
||||||
|
self._persist(status)
|
||||||
|
self._executor.submit(self._run, job_id, request)
|
||||||
|
return status
|
||||||
|
|
||||||
|
def get(self, job_id: str) -> AsyncScoreJobStatus | None:
|
||||||
|
"""Return the current status for one job, or None if unknown."""
|
||||||
|
with self._lock:
|
||||||
|
return self._cache.get(job_id)
|
||||||
|
|
||||||
|
def list_jobs(self) -> list[AsyncScoreJobStatus]:
|
||||||
|
"""Return all known jobs sorted newest first."""
|
||||||
|
with self._lock:
|
||||||
|
jobs = list(self._cache.values())
|
||||||
|
jobs.sort(key=lambda j: j.created_at, reverse=True)
|
||||||
|
return jobs
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
# Internal
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
|
||||||
|
def _run(self, job_id: str, request: ScoreRequest) -> None:
|
||||||
|
"""Execute scoring in the thread pool and persist the result."""
|
||||||
|
self._update(job_id, status="running")
|
||||||
|
settings = EvaluationSettings()
|
||||||
|
judge_model = request.judge_model or settings.ragas_judge_model
|
||||||
|
embedding_model = request.embedding_model or settings.ragas_embedding_model
|
||||||
|
effective = request.effective_metrics()
|
||||||
|
requested = set(request.metrics)
|
||||||
|
skipped = sorted(requested - set(effective))
|
||||||
|
|
||||||
|
import time as _time
|
||||||
|
t0 = _time.monotonic()
|
||||||
|
try:
|
||||||
|
if not effective:
|
||||||
|
scores: dict[str, float | None] = {m: None for m in request.metrics}
|
||||||
|
weighted = None
|
||||||
|
else:
|
||||||
|
raw = inline_scorer.score(
|
||||||
|
question=request.question,
|
||||||
|
answer=request.answer,
|
||||||
|
contexts=request.contexts_as_list(),
|
||||||
|
ground_truth=request.ground_truth,
|
||||||
|
metrics=effective,
|
||||||
|
judge_model=judge_model,
|
||||||
|
embedding_model=embedding_model,
|
||||||
|
settings=settings,
|
||||||
|
)
|
||||||
|
scores = {m: None for m in request.metrics}
|
||||||
|
scores.update(raw)
|
||||||
|
weighted_raw = compute_weighted_score(
|
||||||
|
{k: v for k, v in raw.items() if v is not None}, {}
|
||||||
|
)
|
||||||
|
weighted = round(weighted_raw, 4) if weighted_raw is not None else None
|
||||||
|
|
||||||
|
latency_ms = int((_time.monotonic() - t0) * 1000)
|
||||||
|
self._update(
|
||||||
|
job_id,
|
||||||
|
status="completed",
|
||||||
|
finished_at=_now_iso(),
|
||||||
|
scores=scores,
|
||||||
|
weighted_score=weighted,
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
skipped_metrics=skipped,
|
||||||
|
)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
latency_ms = int((_time.monotonic() - t0) * 1000)
|
||||||
|
self._update(
|
||||||
|
job_id,
|
||||||
|
status="failed",
|
||||||
|
finished_at=_now_iso(),
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
error=f"{type(exc).__name__}: {exc}",
|
||||||
|
)
|
||||||
|
|
||||||
|
def _update(self, job_id: str, **kwargs: Any) -> None:
|
||||||
|
"""Merge kwargs into the job status and persist."""
|
||||||
|
with self._lock:
|
||||||
|
existing = self._cache.get(job_id)
|
||||||
|
if existing is None:
|
||||||
|
return
|
||||||
|
updated = existing.model_copy(update=kwargs)
|
||||||
|
self._cache[job_id] = updated
|
||||||
|
self._persist(updated)
|
||||||
|
|
||||||
|
def _persist(self, status: AsyncScoreJobStatus) -> None:
|
||||||
|
"""Write one job's status to its JSON file."""
|
||||||
|
path = self._jobs_dir / f"{status.job_id}.json"
|
||||||
|
path.write_text(
|
||||||
|
json.dumps(status.model_dump(), ensure_ascii=False, indent=2),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
def _load_existing(self) -> None:
|
||||||
|
"""Load completed jobs from disk into memory on startup."""
|
||||||
|
for path in sorted(self._jobs_dir.glob("*.json")):
|
||||||
|
try:
|
||||||
|
data = json.loads(path.read_text(encoding="utf-8"))
|
||||||
|
status = AsyncScoreJobStatus.model_validate(data)
|
||||||
|
self._cache[status.job_id] = status
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
pass # Corrupt file — skip
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton shared by FastAPI routes.
|
||||||
|
score_job_manager = ScoreJobManager()
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 5: Run to verify tests PASS**
|
||||||
|
|
||||||
|
```
|
||||||
|
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobManager -v
|
||||||
|
```
|
||||||
|
Expected: 4 tests PASS
|
||||||
|
|
||||||
|
- [ ] **Step 6: Commit**
|
||||||
|
|
||||||
|
```
|
||||||
|
git add webapp/models.py webapp/services/score_job_manager.py tests/webapp/test_score_jobs_api.py
|
||||||
|
git commit -m "feat: add AsyncScoreJobStatus model and ScoreJobManager with JSON persistence"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 2: API 端点
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Create: `webapp/api/score_jobs.py`
|
||||||
|
- Modify: `webapp/server.py`
|
||||||
|
- Modify: `tests/webapp/test_score_jobs_api.py`
|
||||||
|
|
||||||
|
**Interfaces:**
|
||||||
|
- Consumes: `score_job_manager: ScoreJobManager`, `AsyncScoreJobResponse`, `AsyncScoreJobStatus`, `ScoreRequest`
|
||||||
|
- Produces: `POST /api/score/async`, `GET /api/score/jobs`, `GET /api/score/jobs/{job_id}`
|
||||||
|
|
||||||
|
- [ ] **Step 1: Add API tests to `tests/webapp/test_score_jobs_api.py`**
|
||||||
|
|
||||||
|
Append this class:
|
||||||
|
|
||||||
|
```python
|
||||||
|
class TestScoreJobsEndpoint:
|
||||||
|
def test_submit_async_returns_202(self, client):
|
||||||
|
with patch("webapp.services.score_job_manager.ScoreJobManager._execute"):
|
||||||
|
resp = client.post("/api/score/async", json={
|
||||||
|
"question": "q?", "answer": "a.",
|
||||||
|
"metrics": ["answer_relevancy"],
|
||||||
|
})
|
||||||
|
assert resp.status_code == 202
|
||||||
|
data = resp.json()
|
||||||
|
assert "job_id" in data
|
||||||
|
assert data["status"] == "queued"
|
||||||
|
|
||||||
|
def test_get_unknown_job_returns_404(self, client):
|
||||||
|
resp = client.get("/api/score/jobs/nonexistent")
|
||||||
|
assert resp.status_code == 404
|
||||||
|
|
||||||
|
def test_list_jobs_returns_empty_initially(self, client):
|
||||||
|
resp = client.get("/api/score/jobs")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["jobs"] == []
|
||||||
|
|
||||||
|
def test_submitted_job_appears_in_list(self, client):
|
||||||
|
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
|
||||||
|
resp = client.post("/api/score/async", json={
|
||||||
|
"question": "q?", "answer": "a.",
|
||||||
|
"metrics": ["answer_relevancy"],
|
||||||
|
})
|
||||||
|
job_id = resp.json()["job_id"]
|
||||||
|
list_resp = client.get("/api/score/jobs")
|
||||||
|
ids = [j["job_id"] for j in list_resp.json()["jobs"]]
|
||||||
|
assert job_id in ids
|
||||||
|
|
||||||
|
def test_get_job_by_id(self, client):
|
||||||
|
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
|
||||||
|
resp = client.post("/api/score/async", json={
|
||||||
|
"question": "q?", "answer": "a.",
|
||||||
|
"metrics": ["answer_relevancy"],
|
||||||
|
})
|
||||||
|
job_id = resp.json()["job_id"]
|
||||||
|
get_resp = client.get(f"/api/score/jobs/{job_id}")
|
||||||
|
assert get_resp.status_code == 200
|
||||||
|
assert get_resp.json()["job_id"] == job_id
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run to verify FAIL**
|
||||||
|
|
||||||
|
```
|
||||||
|
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobsEndpoint -v
|
||||||
|
```
|
||||||
|
Expected: FAIL — `ModuleNotFoundError: No module named 'webapp.api.score_jobs'`
|
||||||
|
|
||||||
|
- [ ] **Step 3: Create `webapp/api/score_jobs.py`**
|
||||||
|
|
||||||
|
```python
|
||||||
|
"""Routes for async RAGAS scoring jobs (Dify fire-and-forget integration)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from fastapi import APIRouter, HTTPException
|
||||||
|
|
||||||
|
from webapp.models import AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest
|
||||||
|
from webapp.services.score_job_manager import score_job_manager
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/api/score", tags=["score"])
|
||||||
|
logger = logging.getLogger("webapp.api.score_jobs")
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/async",
|
||||||
|
status_code=202,
|
||||||
|
response_model=AsyncScoreJobResponse,
|
||||||
|
summary="提交异步评分任务(Dify 推荐方式)",
|
||||||
|
responses={
|
||||||
|
202: {
|
||||||
|
"description": "任务已排队,立即返回 job_id。通过 GET /api/score/jobs/{job_id} 查询结果。",
|
||||||
|
"content": {
|
||||||
|
"application/json": {
|
||||||
|
"example": {"job_id": "abc123def456", "status": "queued"}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
def submit_async_score(request: ScoreRequest) -> AsyncScoreJobResponse:
|
||||||
|
"""提交异步 RAGAS 评分任务,立即返回 job_id(202 Accepted)。
|
||||||
|
|
||||||
|
评分在后台线程中执行,结果持久化至 `outputs/score-jobs/<job_id>.json`。
|
||||||
|
在 RAGAS 平台「评分记录」页面可查看所有历史评分记录。
|
||||||
|
|
||||||
|
**Dify 工作流推荐使用此接口**:不等待评分完成,工作流立即继续,
|
||||||
|
避免 HTTP 节点超时。评分结果通过平台界面查看。
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
"[score_async] submit metrics=%s has_ctx=%s has_gt=%s",
|
||||||
|
request.metrics, bool(request.contexts), bool(request.ground_truth),
|
||||||
|
)
|
||||||
|
status = score_job_manager.submit(request)
|
||||||
|
logger.info("[score_async] queued job_id=%s", status.job_id)
|
||||||
|
return AsyncScoreJobResponse(job_id=status.job_id, status=status.status)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/jobs",
|
||||||
|
response_model=dict,
|
||||||
|
summary="列出所有评分记录",
|
||||||
|
)
|
||||||
|
def list_score_jobs() -> dict:
|
||||||
|
"""返回所有异步评分记录,按创建时间倒序排列。"""
|
||||||
|
jobs = score_job_manager.list_jobs()
|
||||||
|
logger.info("[score_jobs] list count=%d", len(jobs))
|
||||||
|
return {"jobs": [j.model_dump() for j in jobs]}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/jobs/{job_id}",
|
||||||
|
response_model=AsyncScoreJobStatus,
|
||||||
|
summary="查询评分记录详情",
|
||||||
|
responses={404: {"description": "指定 job_id 的评分记录不存在。"}},
|
||||||
|
)
|
||||||
|
def get_score_job(job_id: str) -> AsyncScoreJobStatus:
|
||||||
|
"""返回一个异步评分任务的当前状态和结果。"""
|
||||||
|
status = score_job_manager.get(job_id)
|
||||||
|
if status is None:
|
||||||
|
raise HTTPException(status_code=404, detail=f"Score job not found: {job_id}")
|
||||||
|
return status
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: Register router in `webapp/server.py`**
|
||||||
|
|
||||||
|
Add import:
|
||||||
|
```python
|
||||||
|
from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs
|
||||||
|
```
|
||||||
|
|
||||||
|
Add after `app.include_router(score.router)`:
|
||||||
|
```python
|
||||||
|
app.include_router(score_jobs.router)
|
||||||
|
```
|
||||||
|
|
||||||
|
Add entry to `OPENAPI_TAGS` before `"meta"`:
|
||||||
|
```python
|
||||||
|
{
|
||||||
|
"name": "score",
|
||||||
|
"description": (
|
||||||
|
"**实时评分 API(同步)** — `POST /api/score`\n\n"
|
||||||
|
"**异步评分 API(Dify 推荐)** — `POST /api/score/async`\n\n"
|
||||||
|
"异步方式立即返回 job_id(202),评分在后台执行,结果在「评分记录」页查看。\n\n"
|
||||||
|
"**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
|
||||||
|
"`Authorization: Bearer <token>` 请求头。"
|
||||||
|
),
|
||||||
|
},
|
||||||
|
```
|
||||||
|
|
||||||
|
> Note: this replaces the existing `"score"` entry in `OPENAPI_TAGS`.
|
||||||
|
|
||||||
|
- [ ] **Step 5: Verify no route conflict**
|
||||||
|
|
||||||
|
```
|
||||||
|
python -c "
|
||||||
|
from webapp.server import create_app
|
||||||
|
app = create_app()
|
||||||
|
score_routes = [(r.path, list(getattr(r,'methods',[]))) for r in app.routes if 'score' in r.path]
|
||||||
|
print(score_routes)
|
||||||
|
"
|
||||||
|
```
|
||||||
|
Expected: shows `/api/score`, `/api/score/async`, `/api/score/jobs`, `/api/score/jobs/{job_id}`
|
||||||
|
|
||||||
|
- [ ] **Step 6: Run API tests**
|
||||||
|
|
||||||
|
```
|
||||||
|
python -m pytest tests/webapp/test_score_jobs_api.py -v --tb=short
|
||||||
|
```
|
||||||
|
Expected: all 9 tests PASS
|
||||||
|
|
||||||
|
- [ ] **Step 7: Commit**
|
||||||
|
|
||||||
|
```
|
||||||
|
git add webapp/api/score_jobs.py webapp/server.py tests/webapp/test_score_jobs_api.py
|
||||||
|
git commit -m "feat: add POST /api/score/async and GET /api/score/jobs endpoints"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 3: 前端「评分记录」页
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `webapp/static/index.html`
|
||||||
|
- Modify: `webapp/static/js/api.js`
|
||||||
|
- Modify: `webapp/static/js/app.js`
|
||||||
|
- Create: `webapp/static/js/score_jobs.js`
|
||||||
|
|
||||||
|
**Interfaces:**
|
||||||
|
- Consumes: `GET /api/score/jobs`, `GET /api/score/jobs/{job_id}`
|
||||||
|
- Produces: `#view-scorejobs` section, `ScoreJobs` JS object
|
||||||
|
|
||||||
|
- [ ] **Step 1: Add API methods to `webapp/static/js/api.js`**
|
||||||
|
|
||||||
|
Add before the closing `};`:
|
||||||
|
```javascript
|
||||||
|
// 异步评分记录 API
|
||||||
|
scoreJobsAsync(body) { return API.post("/api/score/async", body); },
|
||||||
|
getScoreJob(jobId) { return API.get(`/api/score/jobs/${encodeURIComponent(jobId)}`); },
|
||||||
|
listScoreJobs() { return API.get("/api/score/jobs"); },
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 2: Add nav item and section to `webapp/static/index.html`**
|
||||||
|
|
||||||
|
In the `<nav class="nav">` block, add after the `profiles` nav-item and before the `apidocs` nav-item:
|
||||||
|
```html
|
||||||
|
<button class="nav-item" data-view="scorejobs">
|
||||||
|
<span class="nav-ico">📋</span><span>评分记录</span>
|
||||||
|
</button>
|
||||||
|
```
|
||||||
|
|
||||||
|
Add a new section before the `<!-- API 文档视图 -->` comment:
|
||||||
|
```html
|
||||||
|
<!-- 评分记录视图 -->
|
||||||
|
<section class="view" id="view-scorejobs" hidden>
|
||||||
|
<div class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h2>评分记录</h2>
|
||||||
|
<span class="muted" style="font-size:13px">来自 Dify 异步评分任务(POST /api/score/async)</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<div id="scorejobs-container"></div>
|
||||||
|
<div class="empty" id="scorejobs-empty" hidden>
|
||||||
|
<p>暂无评分记录。</p>
|
||||||
|
<p class="muted">在 Dify 工作流中调用 <code>POST /api/score/async</code> 后,记录将在此显示。</p>
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 3: Create `webapp/static/js/score_jobs.js`**
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
// score_jobs.js — 评分记录页面逻辑(异步 RAGAS 评分结果列表)
|
||||||
|
|
||||||
|
const ScoreJobs = {
|
||||||
|
_pollTimers: {}, // job_id -> setInterval handle
|
||||||
|
|
||||||
|
async load() {
|
||||||
|
const container = document.getElementById("scorejobs-container");
|
||||||
|
const empty = document.getElementById("scorejobs-empty");
|
||||||
|
container.innerHTML = '<p class="muted">加载中…</p>';
|
||||||
|
try {
|
||||||
|
const data = await API.listScoreJobs();
|
||||||
|
const jobs = data.jobs || [];
|
||||||
|
container.innerHTML = "";
|
||||||
|
if (jobs.length === 0) {
|
||||||
|
empty.hidden = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
empty.hidden = true;
|
||||||
|
jobs.forEach(job => container.appendChild(ScoreJobs.renderRow(job)));
|
||||||
|
// Auto-poll any queued/running jobs
|
||||||
|
jobs.forEach(job => {
|
||||||
|
if (job.status === "queued" || job.status === "running") {
|
||||||
|
ScoreJobs._startPoll(job.job_id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
container.innerHTML = `<p class="muted">加载失败:${App.escape(err.message)}</p>`;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
renderRow(job) {
|
||||||
|
const row = document.createElement("div");
|
||||||
|
row.className = "panel score-job-row";
|
||||||
|
row.id = `score-job-${job.job_id}`;
|
||||||
|
row.innerHTML = ScoreJobs._rowHtml(job);
|
||||||
|
return row;
|
||||||
|
},
|
||||||
|
|
||||||
|
_rowHtml(job) {
|
||||||
|
const time = App.shortTime(job.created_at);
|
||||||
|
const question = App.escape((job.request_summary?.question || "—").slice(0, 50));
|
||||||
|
const metrics = (job.request_summary?.metrics || []).join(", ");
|
||||||
|
const statusBadge = `<span class="badge ${job.status}">${job.status}</span>`;
|
||||||
|
|
||||||
|
let scoreHtml = "";
|
||||||
|
if (job.status === "completed") {
|
||||||
|
scoreHtml = Object.entries(job.scores || {})
|
||||||
|
.map(([k, v]) => {
|
||||||
|
const cls = App.scoreClass(v);
|
||||||
|
const text = v === null || v === undefined ? "n/a" : Number(v).toFixed(3);
|
||||||
|
return `<span class="metric-chip" title="${App.escape(k)}">${App.escape(App.shortMetric(k))} <b class="${cls}">${text}</b></span>`;
|
||||||
|
})
|
||||||
|
.join(" ");
|
||||||
|
if (job.weighted_score !== null && job.weighted_score !== undefined) {
|
||||||
|
const cls = App.scoreClass(job.weighted_score);
|
||||||
|
scoreHtml += ` <span class="metric-chip">综合 <b class="${cls}">${Number(job.weighted_score).toFixed(3)}</b></span>`;
|
||||||
|
}
|
||||||
|
} else if (job.status === "failed") {
|
||||||
|
scoreHtml = `<span class="muted" style="color:var(--bad)">${App.escape(job.error || "未知错误")}</span>`;
|
||||||
|
} else {
|
||||||
|
scoreHtml = `<span class="muted">评分中…</span>`;
|
||||||
|
}
|
||||||
|
|
||||||
|
return `
|
||||||
|
<div class="run-card-head">
|
||||||
|
<div class="run-card-title">${question}</div>
|
||||||
|
<div>${statusBadge}</div>
|
||||||
|
</div>
|
||||||
|
<div class="run-card-meta">
|
||||||
|
<div>指标:${App.escape(metrics)} · ${time} · ${job.latency_ms}ms</div>
|
||||||
|
</div>
|
||||||
|
<div class="run-card-metrics">${scoreHtml}</div>
|
||||||
|
`;
|
||||||
|
},
|
||||||
|
|
||||||
|
_startPoll(jobId) {
|
||||||
|
if (ScoreJobs._pollTimers[jobId]) return;
|
||||||
|
ScoreJobs._pollTimers[jobId] = setInterval(async () => {
|
||||||
|
try {
|
||||||
|
const job = await API.getScoreJob(jobId);
|
||||||
|
const el = document.getElementById(`score-job-${jobId}`);
|
||||||
|
if (el) el.innerHTML = ScoreJobs._rowHtml(job);
|
||||||
|
if (job.status === "completed" || job.status === "failed") {
|
||||||
|
clearInterval(ScoreJobs._pollTimers[jobId]);
|
||||||
|
delete ScoreJobs._pollTimers[jobId];
|
||||||
|
}
|
||||||
|
} catch (_e) {
|
||||||
|
clearInterval(ScoreJobs._pollTimers[jobId]);
|
||||||
|
delete ScoreJobs._pollTimers[jobId];
|
||||||
|
}
|
||||||
|
}, 5000);
|
||||||
|
},
|
||||||
|
|
||||||
|
stopAllPolls() {
|
||||||
|
Object.values(ScoreJobs._pollTimers).forEach(t => clearInterval(t));
|
||||||
|
ScoreJobs._pollTimers = {};
|
||||||
|
},
|
||||||
|
};
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 4: Update `webapp/static/js/app.js`**
|
||||||
|
|
||||||
|
Add `"scorejobs"` to the `views` array and `titles` object:
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
views: ["runs", "new", "report", "profiles", "scorejobs", "apidocs"],
|
||||||
|
titles: { runs: "运行列表", new: "新建评估", report: "报告详情", profiles: "LLM 配置", scorejobs: "评分记录", apidocs: "API 文档" },
|
||||||
|
```
|
||||||
|
|
||||||
|
Add in `_doSwitch` after `if (view === "profiles") Profiles.load();`:
|
||||||
|
```javascript
|
||||||
|
if (view === "scorejobs") ScoreJobs.load();
|
||||||
|
```
|
||||||
|
|
||||||
|
Add `ScoreJobs.stopAllPolls();` when switching away, in `_doSwitch` before view switching logic:
|
||||||
|
```javascript
|
||||||
|
// Stop score job pollers when leaving the scorejobs view
|
||||||
|
if (App.activeView === "scorejobs" && view !== "scorejobs") ScoreJobs.stopAllPolls();
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 5: Add script tag to `webapp/static/index.html`**
|
||||||
|
|
||||||
|
Add before `<script src="/static/js/app.js"></script>`:
|
||||||
|
```html
|
||||||
|
<script src="/static/js/score_jobs.js"></script>
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 6: Verify server boots**
|
||||||
|
|
||||||
|
```
|
||||||
|
python -c "from webapp.server import create_app; create_app(); print('OK')"
|
||||||
|
```
|
||||||
|
Expected: `OK`
|
||||||
|
|
||||||
|
Also verify HTML has all new elements:
|
||||||
|
```
|
||||||
|
python -c "
|
||||||
|
c = open('webapp/static/index.html', encoding='utf-8').read()
|
||||||
|
assert 'view-scorejobs' in c
|
||||||
|
assert 'scorejobs-container' in c
|
||||||
|
assert '评分记录' in c
|
||||||
|
print('HTML OK')
|
||||||
|
"
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Step 7: Commit**
|
||||||
|
|
||||||
|
```
|
||||||
|
git add webapp/static/index.html webapp/static/js/api.js webapp/static/js/app.js webapp/static/js/score_jobs.js
|
||||||
|
git commit -m "feat: add 评分记录 page with async score job list and auto-polling"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Task 4: 全量回归测试 + Dify 说明注释
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `webapp/static/js/score_jobs.js` (minor: add Dify curl comment at top)
|
||||||
|
|
||||||
|
- [ ] **Step 1: Run full test suite**
|
||||||
|
|
||||||
|
```
|
||||||
|
python -m pytest tests/ -v --tb=short -q 2>&1 | tail -15
|
||||||
|
```
|
||||||
|
|
||||||
|
Pre-existing failures to ignore:
|
||||||
|
- `test_normalize_sample_pdf_offline_smoke_row`
|
||||||
|
- `test_evaluator_and_reporting_write_run_assets`
|
||||||
|
- `test_question_generator_rejects_invalid_json`
|
||||||
|
- `test_question_generator_rejects_non_list_samples`
|
||||||
|
|
||||||
|
Any other failure is a regression — fix before proceeding.
|
||||||
|
|
||||||
|
- [ ] **Step 2: Run targeted tests**
|
||||||
|
|
||||||
|
```
|
||||||
|
python -m pytest tests/webapp/test_score_jobs_api.py tests/webapp/test_score_api.py tests/test_pipeline.py -v --tb=short
|
||||||
|
```
|
||||||
|
Expected: all PASS
|
||||||
|
|
||||||
|
- [ ] **Step 3: Final commit**
|
||||||
|
|
||||||
|
```
|
||||||
|
git add .
|
||||||
|
git commit -m "feat: async score jobs complete — POST /api/score/async + 评分记录 page
|
||||||
|
|
||||||
|
- ScoreJobManager: thread pool + JSON persistence (outputs/score-jobs/)
|
||||||
|
- POST /api/score/async: 202 immediate response with job_id
|
||||||
|
- GET /api/score/jobs + GET /api/score/jobs/{id}: query endpoints
|
||||||
|
- Frontend: 评分记录 nav page with 5s auto-polling for pending jobs
|
||||||
|
- Dify integration: change /api/score → /api/score/async, remove response parsing
|
||||||
|
|
||||||
|
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>"
|
||||||
|
```
|
||||||
116
docs/superpowers/specs/2026-06-24-async-score-jobs-design.md
Normal file
116
docs/superpowers/specs/2026-06-24-async-score-jobs-design.md
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
# 异步评分记录功能设计
|
||||||
|
|
||||||
|
**日期**: 2026-06-24
|
||||||
|
**状态**: 已批准,待实现
|
||||||
|
**范围**: 新增 `POST /api/score/async` 异步评分端点,评分结果持久化到磁盘,前端新增「评分记录」页面展示。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 1. 目标
|
||||||
|
|
||||||
|
- Dify 工作流调用 `/api/score/async` 立即返回 `job_id`(202),不等待评分完成
|
||||||
|
- 后台异步执行 RAGAS 评分,结果写入 `outputs/score-jobs/<job_id>.json`
|
||||||
|
- RAGAS 平台新增「评分记录」导航页,列表展示所有评分记录及状态
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. 架构
|
||||||
|
|
||||||
|
```
|
||||||
|
Dify → POST /api/score/async → 202 {job_id, status:"queued"}
|
||||||
|
↓
|
||||||
|
ScoreJobManager (线程池)
|
||||||
|
↓
|
||||||
|
InlineScorer.score()
|
||||||
|
↓
|
||||||
|
outputs/score-jobs/<job_id>.json
|
||||||
|
↓
|
||||||
|
GET /api/score/jobs ← 前端「评分记录」页轮询
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. 存储格式
|
||||||
|
|
||||||
|
`outputs/score-jobs/<job_id>.json`:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"job_id": "abc123def456",
|
||||||
|
"status": "completed",
|
||||||
|
"created_at": "2026-06-24T09:00:00+00:00",
|
||||||
|
"finished_at": "2026-06-24T09:00:15+00:00",
|
||||||
|
"request": {
|
||||||
|
"question": "双源CT的时间分辨率是多少?",
|
||||||
|
"answer": "双源CT的单扇区时间分辨率为75ms。",
|
||||||
|
"contexts": null,
|
||||||
|
"ground_truth": null,
|
||||||
|
"metrics": ["answer_relevancy"],
|
||||||
|
"judge_model": "gpt-5",
|
||||||
|
"embedding_model": "text-embedding-3-small"
|
||||||
|
},
|
||||||
|
"scores": {"answer_relevancy": 0.9075},
|
||||||
|
"weighted_score": 0.9075,
|
||||||
|
"latency_ms": 12500,
|
||||||
|
"skipped_metrics": [],
|
||||||
|
"error": null
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4. API 端点
|
||||||
|
|
||||||
|
### `POST /api/score/async`
|
||||||
|
|
||||||
|
请求体与 `POST /api/score` 完全相同(`ScoreRequest`)。
|
||||||
|
|
||||||
|
```json
|
||||||
|
// 立即返回 202
|
||||||
|
{"job_id": "abc123def456", "status": "queued"}
|
||||||
|
```
|
||||||
|
|
||||||
|
### `GET /api/score/jobs`
|
||||||
|
|
||||||
|
返回所有评分记录,按创建时间倒序:
|
||||||
|
```json
|
||||||
|
{"jobs": [{...ScoreJobStatus...}]}
|
||||||
|
```
|
||||||
|
|
||||||
|
### `GET /api/score/jobs/{job_id}`
|
||||||
|
|
||||||
|
返回单条评分记录详情。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5. 新增文件
|
||||||
|
|
||||||
|
| 文件 | 职责 |
|
||||||
|
|------|------|
|
||||||
|
| `webapp/services/score_job_manager.py` | ScoreJobManager:线程池 + JSON 持久化 |
|
||||||
|
| `webapp/api/score_jobs.py` | 3 个端点路由 |
|
||||||
|
| `webapp/static/js/score_jobs.js` | 前端列表逻辑 + 轮询 |
|
||||||
|
|
||||||
|
## 6. 修改文件
|
||||||
|
|
||||||
|
| 文件 | 改动 |
|
||||||
|
|------|------|
|
||||||
|
| `webapp/models.py` | 新增 `AsyncScoreJobStatus`、`AsyncScoreJobResponse` |
|
||||||
|
| `webapp/server.py` | 注册 score_jobs router,更新 OPENAPI_TAGS |
|
||||||
|
| `webapp/static/index.html` | 新增导航项 + section |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 7. 前端「评分记录」页
|
||||||
|
|
||||||
|
列表列:时间 / 问题摘要(前40字)/ 指标 / 得分 / 状态
|
||||||
|
|
||||||
|
- 进入页面自动刷新
|
||||||
|
- `queued/running` 记录每 5 秒轮询 `GET /api/score/jobs/{id}` 更新状态
|
||||||
|
- 得分按 scoreClass(good/warn/bad)着色
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 8. Dify 改造
|
||||||
|
|
||||||
|
只改 HTTP 节点 URL:`/api/score` → `/api/score/async`,删除解析响应的代码节点。
|
||||||
1101
project-overview.html
Normal file
1101
project-overview.html
Normal file
File diff suppressed because it is too large
Load Diff
@@ -22,22 +22,31 @@ _PROMPT_TEMPLATE = """\
|
|||||||
|
|
||||||
## 报告要求
|
## 报告要求
|
||||||
|
|
||||||
1. 按指标分节(## 指标名 [severity]),先解释"为什么低"(结合低分样本具体分析),再给出"具体怎么改"
|
1. 按指标分节(## 指标名 [严重程度]),先解释"为什么低"(结合低分样本具体分析),再给出"具体怎么改"
|
||||||
2. "具体怎么改"要结合低分样本的实际内容,而不只是泛泛建议
|
2. 严重程度说明:critical=严重(<阈值50%),warning=警告(<阈值70%),low=待优化(低于0.85,有提升空间)
|
||||||
3. 最后写一节 **## 优先优化次序**,按性价比排序(不增加 LLM 调用次数的优化优先)
|
3. "具体怎么改"要结合低分样本的实际内容,而不只是泛泛建议
|
||||||
4. 语言简洁,面向工程师,不要废话,不要重复列表内容
|
4. 最后写一节 **## 优先优化次序**,按性价比排序(不增加 LLM 调用次数的优化优先),critical 和 warning 项优先于 low 项
|
||||||
|
5. 语言简洁,面向工程师,不要废话,不要重复列表内容
|
||||||
|
|
||||||
只输出 Markdown 报告正文,不要任何前置说明。
|
只输出 Markdown 报告正文,不要任何前置说明。
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
_SEVERITY_LABEL_ZH: dict[str, str] = {
|
||||||
|
"critical": "严重",
|
||||||
|
"warning": "警告",
|
||||||
|
"low": "待优化",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def _build_diagnosis_summary(diagnoses: list[Diagnosis]) -> str:
|
def _build_diagnosis_summary(diagnoses: list[Diagnosis]) -> str:
|
||||||
lines = []
|
lines = []
|
||||||
for d in diagnoses:
|
for d in diagnoses:
|
||||||
direction = "(越低越好)" if d.metric == "noise_sensitivity" else ""
|
direction = "(越低越好)" if d.metric == "noise_sensitivity" else ""
|
||||||
|
label = _SEVERITY_LABEL_ZH.get(d.severity, d.severity)
|
||||||
lines.append(
|
lines.append(
|
||||||
f"- **{d.metric}** {direction} 均值={d.mean_score:.4f},"
|
f"- **{d.metric}** {direction} 均值={d.mean_score:.4f},"
|
||||||
f"阈值={d.threshold},严重程度={d.severity}"
|
f"阈值={d.threshold},严重程度={label}"
|
||||||
)
|
)
|
||||||
lines.append(f" - 可能原因:{'; '.join(d.root_causes)}")
|
lines.append(f" - 可能原因:{'; '.join(d.root_causes)}")
|
||||||
lines.append(f" - 建议动作:{'; '.join(d.suggested_actions)}")
|
lines.append(f" - 建议动作:{'; '.join(d.suggested_actions)}")
|
||||||
|
|||||||
@@ -14,6 +14,9 @@ class MetricRule:
|
|||||||
higher_is_better: bool # False for noise_sensitivity
|
higher_is_better: bool # False for noise_sensitivity
|
||||||
root_causes: list[str]
|
root_causes: list[str]
|
||||||
suggested_actions: list[str]
|
suggested_actions: list[str]
|
||||||
|
# Scores below this threshold trigger a "low" advisory (LLM suggestion requested).
|
||||||
|
# Only applies to higher_is_better metrics; noise_sensitivity uses existing thresholds.
|
||||||
|
advisory_threshold: float = 0.85
|
||||||
|
|
||||||
|
|
||||||
METRIC_RULES: dict[str, MetricRule] = {
|
METRIC_RULES: dict[str, MetricRule] = {
|
||||||
@@ -208,10 +211,14 @@ def diagnose(
|
|||||||
elif mean < rule.warning_threshold:
|
elif mean < rule.warning_threshold:
|
||||||
severity = "warning"
|
severity = "warning"
|
||||||
threshold = rule.warning_threshold
|
threshold = rule.warning_threshold
|
||||||
|
elif mean < rule.advisory_threshold:
|
||||||
|
# Score is acceptable but below 0.85 — request LLM optimization advice.
|
||||||
|
severity = "low"
|
||||||
|
threshold = rule.advisory_threshold
|
||||||
else:
|
else:
|
||||||
continue # above warning threshold → no diagnosis
|
continue # >= advisory_threshold → no diagnosis needed
|
||||||
else:
|
else:
|
||||||
# lower is better (noise_sensitivity)
|
# lower is better (noise_sensitivity): keep existing two-tier logic
|
||||||
if mean > rule.critical_threshold:
|
if mean > rule.critical_threshold:
|
||||||
severity = "critical"
|
severity = "critical"
|
||||||
threshold = rule.critical_threshold
|
threshold = rule.critical_threshold
|
||||||
|
|||||||
@@ -8,12 +8,22 @@ from .rules import Diagnosis
|
|||||||
|
|
||||||
logger = logging.getLogger("rag_eval.advisor")
|
logger = logging.getLogger("rag_eval.advisor")
|
||||||
|
|
||||||
|
# Chinese display labels for each severity tier.
|
||||||
|
_SEVERITY_LABEL: dict[str, str] = {
|
||||||
|
"critical": "严重",
|
||||||
|
"warning": "警告",
|
||||||
|
"low": "待优化",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def _format_log_summary(diagnoses: list[Diagnosis], advice_path: Path) -> str:
|
def _format_log_summary(diagnoses: list[Diagnosis], advice_path: Path) -> str:
|
||||||
"""Return a single-line log summary of triggered diagnoses."""
|
"""Return a single-line log summary of triggered diagnoses."""
|
||||||
if not diagnoses:
|
if not diagnoses:
|
||||||
return "[advisor] 所有指标正常,无需优化建议。"
|
return "[advisor] 所有指标正常,无需优化建议。"
|
||||||
parts = [f"{d.metric}({d.mean_score:.2f}, {d.severity})" for d in diagnoses]
|
parts = [
|
||||||
|
f"{d.metric}({d.mean_score:.2f},{_SEVERITY_LABEL.get(d.severity, d.severity)})"
|
||||||
|
for d in diagnoses
|
||||||
|
]
|
||||||
triggered = " ".join(parts)
|
triggered = " ".join(parts)
|
||||||
return f"[advisor] 触发诊断 {len(diagnoses)} 项: {triggered} → {advice_path}"
|
return f"[advisor] 触发诊断 {len(diagnoses)} 项: {triggered} → {advice_path}"
|
||||||
|
|
||||||
@@ -24,7 +34,8 @@ def _build_fallback_report(diagnoses: list[Diagnosis]) -> str:
|
|||||||
return ""
|
return ""
|
||||||
lines = ["## 规则诊断(LLM 分析不可用)\n"]
|
lines = ["## 规则诊断(LLM 分析不可用)\n"]
|
||||||
for d in diagnoses:
|
for d in diagnoses:
|
||||||
lines.append(f"### {d.metric} [{d.severity}] 均值={d.mean_score:.4f}")
|
label = _SEVERITY_LABEL.get(d.severity, d.severity)
|
||||||
|
lines.append(f"### {d.metric} [{label}] 均值={d.mean_score:.4f}")
|
||||||
lines.append("\n**可能原因:**")
|
lines.append("\n**可能原因:**")
|
||||||
for cause in d.root_causes:
|
for cause in d.root_causes:
|
||||||
lines.append(f"- {cause}")
|
lines.append(f"- {cause}")
|
||||||
|
|||||||
@@ -10,10 +10,38 @@ class TestDiagnosis(unittest.TestCase):
|
|||||||
for i, s in enumerate(scores)]
|
for i, s in enumerate(scores)]
|
||||||
|
|
||||||
def test_no_diagnosis_when_all_scores_above_threshold(self):
|
def test_no_diagnosis_when_all_scores_above_threshold(self):
|
||||||
|
# Mean exactly 0.85 should NOT trigger any diagnosis (< 0.85 is the condition).
|
||||||
rows = self._make_rows("faithfulness", [0.8, 0.9, 0.85])
|
rows = self._make_rows("faithfulness", [0.8, 0.9, 0.85])
|
||||||
result = diagnose(rows, metrics=["faithfulness"])
|
result = diagnose(rows, metrics=["faithfulness"])
|
||||||
self.assertEqual(result, [])
|
self.assertEqual(result, [])
|
||||||
|
|
||||||
|
def test_no_diagnosis_when_mean_above_advisory_threshold(self):
|
||||||
|
rows = self._make_rows("answer_relevancy", [0.9, 0.92, 0.88])
|
||||||
|
result = diagnose(rows, metrics=["answer_relevancy"])
|
||||||
|
self.assertEqual(result, [])
|
||||||
|
|
||||||
|
def test_low_severity_when_mean_below_advisory_threshold(self):
|
||||||
|
# Score between warning_threshold (0.7) and advisory_threshold (0.85) → "low"
|
||||||
|
rows = self._make_rows("faithfulness", [0.78, 0.80, 0.82])
|
||||||
|
result = diagnose(rows, metrics=["faithfulness"])
|
||||||
|
self.assertEqual(len(result), 1)
|
||||||
|
self.assertEqual(result[0].severity, "low")
|
||||||
|
self.assertAlmostEqual(result[0].threshold, 0.85, places=2)
|
||||||
|
|
||||||
|
def test_low_severity_answer_relevancy_at_0_84(self):
|
||||||
|
rows = self._make_rows("answer_relevancy", [0.84, 0.84, 0.84])
|
||||||
|
result = diagnose(rows, metrics=["answer_relevancy"])
|
||||||
|
self.assertEqual(len(result), 1)
|
||||||
|
self.assertEqual(result[0].severity, "low")
|
||||||
|
|
||||||
|
def test_low_severity_has_root_causes_and_actions(self):
|
||||||
|
rows = self._make_rows("context_precision", [0.75, 0.76, 0.77])
|
||||||
|
result = diagnose(rows, metrics=["context_precision"])
|
||||||
|
self.assertEqual(len(result), 1)
|
||||||
|
self.assertEqual(result[0].severity, "low")
|
||||||
|
self.assertTrue(len(result[0].root_causes) > 0)
|
||||||
|
self.assertTrue(len(result[0].suggested_actions) > 0)
|
||||||
|
|
||||||
def test_warning_when_mean_below_warning_threshold(self):
|
def test_warning_when_mean_below_warning_threshold(self):
|
||||||
rows = self._make_rows("faithfulness", [0.65, 0.62, 0.68])
|
rows = self._make_rows("faithfulness", [0.65, 0.62, 0.68])
|
||||||
result = diagnose(rows, metrics=["faithfulness"])
|
result = diagnose(rows, metrics=["faithfulness"])
|
||||||
|
|||||||
@@ -91,9 +91,9 @@ class TestWriteAdvice(unittest.TestCase):
|
|||||||
]
|
]
|
||||||
summary = _format_log_summary(diags, self.advice_path)
|
summary = _format_log_summary(diags, self.advice_path)
|
||||||
self.assertIn("faithfulness", summary)
|
self.assertIn("faithfulness", summary)
|
||||||
self.assertIn("critical", summary)
|
self.assertIn("严重", summary) # "critical" maps to Chinese label
|
||||||
self.assertIn("context_recall", summary)
|
self.assertIn("context_recall", summary)
|
||||||
self.assertIn("warning", summary)
|
self.assertIn("警告", summary) # "warning" maps to Chinese label
|
||||||
|
|
||||||
def test_write_empty_diagnoses_still_creates_file(self):
|
def test_write_empty_diagnoses_still_creates_file(self):
|
||||||
write_advice(
|
write_advice(
|
||||||
|
|||||||
@@ -57,9 +57,11 @@ class TestScoreRequest:
|
|||||||
with pytest.raises(ValidationError):
|
with pytest.raises(ValidationError):
|
||||||
ScoreRequest(question="q", contexts="c") # type: ignore[call-arg]
|
ScoreRequest(question="q", contexts="c") # type: ignore[call-arg]
|
||||||
|
|
||||||
def test_missing_contexts_raises(self):
|
def test_missing_contexts_defaults_to_none(self):
|
||||||
with pytest.raises(ValidationError):
|
"""contexts is now optional — missing contexts is allowed."""
|
||||||
ScoreRequest(question="q", answer="a") # type: ignore[call-arg]
|
req = ScoreRequest(question="q", answer="a")
|
||||||
|
assert req.contexts is None
|
||||||
|
assert req.contexts_as_list() == []
|
||||||
|
|
||||||
def test_custom_metrics_accepted(self):
|
def test_custom_metrics_accepted(self):
|
||||||
req = ScoreRequest(
|
req = ScoreRequest(
|
||||||
@@ -115,6 +117,17 @@ class TestScoreRequest:
|
|||||||
"factual_correctness",
|
"factual_correctness",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
def test_effective_metrics_drops_context_dependent_when_contexts_absent(self):
|
||||||
|
"""Without contexts, context-dependent metrics are excluded."""
|
||||||
|
req = ScoreRequest(
|
||||||
|
question="q", answer="a",
|
||||||
|
metrics=["faithfulness", "answer_relevancy", "context_precision"],
|
||||||
|
)
|
||||||
|
effective = req.effective_metrics()
|
||||||
|
assert "answer_relevancy" in effective
|
||||||
|
assert "faithfulness" not in effective
|
||||||
|
assert "context_precision" not in effective
|
||||||
|
|
||||||
|
|
||||||
class TestScoreResponse:
|
class TestScoreResponse:
|
||||||
def test_score_response_structure(self):
|
def test_score_response_structure(self):
|
||||||
|
|||||||
146
tests/webapp/test_score_jobs_api.py
Normal file
146
tests/webapp/test_score_jobs_api.py
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
"""Tests for async score jobs API."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def client(tmp_path, monkeypatch):
|
||||||
|
"""TestClient with fresh ScoreJobManager backed by tmp dirs."""
|
||||||
|
import webapp.services.score_job_manager as mgr_mod
|
||||||
|
from webapp.services.score_job_manager import ScoreJobManager
|
||||||
|
|
||||||
|
fresh_mgr = ScoreJobManager(
|
||||||
|
output_dir=tmp_path / "score-async",
|
||||||
|
index_dir=tmp_path / "score-jobs",
|
||||||
|
max_workers=2,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(mgr_mod, "score_job_manager", fresh_mgr)
|
||||||
|
|
||||||
|
import webapp.api.score_jobs as api_mod
|
||||||
|
monkeypatch.setattr(api_mod, "score_job_manager", fresh_mgr)
|
||||||
|
|
||||||
|
from webapp.server import create_app
|
||||||
|
return TestClient(create_app())
|
||||||
|
|
||||||
|
|
||||||
|
class TestAsyncScoreEndpoints:
|
||||||
|
def test_submit_returns_202_with_job_id(self, client):
|
||||||
|
"""POST /api/score/async returns 202 immediately."""
|
||||||
|
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
|
||||||
|
resp = client.post("/api/score/async", json={
|
||||||
|
"question": "q?",
|
||||||
|
"answer": "a.",
|
||||||
|
"metrics": ["answer_relevancy"],
|
||||||
|
})
|
||||||
|
assert resp.status_code == 202
|
||||||
|
data = resp.json()
|
||||||
|
assert "job_id" in data
|
||||||
|
assert data["status"] == "queued"
|
||||||
|
|
||||||
|
def test_list_jobs_empty_initially(self, client):
|
||||||
|
resp = client.get("/api/score/jobs")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["jobs"] == []
|
||||||
|
|
||||||
|
def test_get_unknown_job_returns_404(self, client):
|
||||||
|
resp = client.get("/api/score/jobs/nonexistent123")
|
||||||
|
assert resp.status_code == 404
|
||||||
|
|
||||||
|
def test_submitted_job_appears_in_list(self, client):
|
||||||
|
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
|
||||||
|
resp = client.post("/api/score/async", json={
|
||||||
|
"question": "q?", "answer": "a.", "metrics": ["answer_relevancy"],
|
||||||
|
})
|
||||||
|
job_id = resp.json()["job_id"]
|
||||||
|
time.sleep(0.1)
|
||||||
|
list_resp = client.get("/api/score/jobs")
|
||||||
|
ids = [j["job_id"] for j in list_resp.json()["jobs"]]
|
||||||
|
assert job_id in ids
|
||||||
|
|
||||||
|
def test_get_job_by_id_returns_status(self, client):
|
||||||
|
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
|
||||||
|
resp = client.post("/api/score/async", json={
|
||||||
|
"question": "q?", "answer": "a.", "metrics": ["answer_relevancy"],
|
||||||
|
})
|
||||||
|
job_id = resp.json()["job_id"]
|
||||||
|
time.sleep(0.1)
|
||||||
|
get_resp = client.get(f"/api/score/jobs/{job_id}")
|
||||||
|
assert get_resp.status_code == 200
|
||||||
|
assert get_resp.json()["job_id"] == job_id
|
||||||
|
|
||||||
|
def test_missing_required_fields_returns_422(self, client):
|
||||||
|
resp = client.post("/api/score/async", json={"question": "q?"})
|
||||||
|
assert resp.status_code == 422
|
||||||
|
|
||||||
|
|
||||||
|
class TestScoreJobManager:
|
||||||
|
def test_completed_job_persisted_to_index(self, tmp_path):
|
||||||
|
"""Completed job writes index JSON."""
|
||||||
|
from webapp.services.score_job_manager import ScoreJobManager
|
||||||
|
from webapp.models import ScoreRequest
|
||||||
|
|
||||||
|
mgr = ScoreJobManager(
|
||||||
|
output_dir=tmp_path / "runs",
|
||||||
|
index_dir=tmp_path / "index",
|
||||||
|
max_workers=1,
|
||||||
|
)
|
||||||
|
req = ScoreRequest(question="q?", answer="a.", metrics=["answer_relevancy"])
|
||||||
|
|
||||||
|
# Patch _run directly — it uses lazy imports internally
|
||||||
|
def fake_run(job_id, request):
|
||||||
|
mgr._update(job_id, status="completed", finished_at="2026-01-01T00:00:01+00:00",
|
||||||
|
run_id="fake-run-id", scores={"answer_relevancy": 0.85},
|
||||||
|
weighted_score=0.85, latency_ms=500)
|
||||||
|
|
||||||
|
with patch.object(mgr, "_run", side_effect=fake_run):
|
||||||
|
status = mgr.submit(req)
|
||||||
|
|
||||||
|
for _ in range(20):
|
||||||
|
s = mgr.get(status.job_id)
|
||||||
|
if s and s.status == "completed":
|
||||||
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
s = mgr.get(status.job_id)
|
||||||
|
assert s is not None
|
||||||
|
idx_path = tmp_path / "index" / f"{status.job_id}.json"
|
||||||
|
assert idx_path.exists()
|
||||||
|
data = json.loads(idx_path.read_text(encoding="utf-8"))
|
||||||
|
assert data["job_id"] == status.job_id
|
||||||
|
assert data["status"] == "completed"
|
||||||
|
|
||||||
|
def test_loads_existing_index_on_startup(self, tmp_path):
|
||||||
|
"""Manager loads persisted jobs from index dir on init."""
|
||||||
|
from webapp.services.score_job_manager import ScoreJobManager
|
||||||
|
from webapp.models import AsyncScoreJobStatus
|
||||||
|
|
||||||
|
idx_dir = tmp_path / "index"
|
||||||
|
idx_dir.mkdir()
|
||||||
|
fake = AsyncScoreJobStatus(
|
||||||
|
job_id="testjob001",
|
||||||
|
status="completed",
|
||||||
|
created_at="2026-01-01T00:00:00+00:00",
|
||||||
|
run_id="some-run-id",
|
||||||
|
scores={"answer_relevancy": 0.9},
|
||||||
|
weighted_score=0.9,
|
||||||
|
latency_ms=1000,
|
||||||
|
)
|
||||||
|
(idx_dir / "testjob001.json").write_text(
|
||||||
|
json.dumps(fake.model_dump(), ensure_ascii=False), encoding="utf-8"
|
||||||
|
)
|
||||||
|
mgr = ScoreJobManager(
|
||||||
|
output_dir=tmp_path / "runs",
|
||||||
|
index_dir=idx_dir,
|
||||||
|
max_workers=1,
|
||||||
|
)
|
||||||
|
loaded = mgr.get("testjob001")
|
||||||
|
assert loaded is not None
|
||||||
|
assert loaded.status == "completed"
|
||||||
|
assert loaded.run_id == "some-run-id"
|
||||||
@@ -73,7 +73,8 @@ def score_sample(
|
|||||||
用于日志记录、质量监控或触发 Agent 自我改进流程。
|
用于日志记录、质量监控或触发 Agent 自我改进流程。
|
||||||
|
|
||||||
**contexts 格式**:多个检索片段用 `context_separator`(默认 `" |||| "`)拼接为一个字符串,
|
**contexts 格式**:多个检索片段用 `context_separator`(默认 `" |||| "`)拼接为一个字符串,
|
||||||
服务端自动拆分后传入 RAGAS 管道。
|
服务端自动拆分后传入 RAGAS 管道。**contexts 为可选字段**,缺失时自动跳过依赖检索内容的指标
|
||||||
|
(`faithfulness`、`context_recall`、`context_precision`、`noise_sensitivity`)。
|
||||||
|
|
||||||
**ground_truth 可选**:
|
**ground_truth 可选**:
|
||||||
- 提供时:所有指定指标均参与计算。
|
- 提供时:所有指定指标均参与计算。
|
||||||
@@ -99,12 +100,13 @@ def score_sample(
|
|||||||
"""
|
"""
|
||||||
client = f"{raw_request.client.host}:{raw_request.client.port}" if raw_request.client else "unknown"
|
client = f"{raw_request.client.host}:{raw_request.client.port}" if raw_request.client else "unknown"
|
||||||
logger.info(
|
logger.info(
|
||||||
"[score] incoming client=%s method=%s content_type=%s metrics=%s has_gt=%s",
|
"[score] incoming client=%s method=%s content_type=%s metrics=%s has_gt=%s has_ctx=%s",
|
||||||
client,
|
client,
|
||||||
raw_request.method,
|
raw_request.method,
|
||||||
raw_request.headers.get("content-type", ""),
|
raw_request.headers.get("content-type", ""),
|
||||||
request.metrics,
|
request.metrics,
|
||||||
request.ground_truth is not None,
|
request.ground_truth is not None,
|
||||||
|
bool(request.contexts),
|
||||||
)
|
)
|
||||||
settings = _get_settings()
|
settings = _get_settings()
|
||||||
|
|
||||||
|
|||||||
89
webapp/api/score_jobs.py
Normal file
89
webapp/api/score_jobs.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
"""Routes for async RAGAS scoring jobs (Dify fire-and-forget integration).
|
||||||
|
|
||||||
|
Dify calls POST /api/score/async → gets job_id immediately (202).
|
||||||
|
Scoring runs in background, result written as a standard run artifact.
|
||||||
|
View full report at GET /api/runs/{run_id} or in the 「运行列表」 page.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from fastapi import APIRouter, HTTPException
|
||||||
|
|
||||||
|
from webapp.models import AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest
|
||||||
|
from webapp.services.score_job_manager import score_job_manager
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/api/score", tags=["score"])
|
||||||
|
logger = logging.getLogger("webapp.api.score_jobs")
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/async",
|
||||||
|
status_code=202,
|
||||||
|
response_model=AsyncScoreJobResponse,
|
||||||
|
summary="提交异步评分任务(Dify 推荐方式)",
|
||||||
|
responses={
|
||||||
|
202: {
|
||||||
|
"description": (
|
||||||
|
"任务已排队,立即返回 job_id(202 Accepted)。\n\n"
|
||||||
|
"评分在后台执行,完成后自动生成完整报告(含优化建议)。\n"
|
||||||
|
"通过 `GET /api/score/jobs/{job_id}` 查询状态,"
|
||||||
|
"完成后在「运行列表」页查看完整报告。"
|
||||||
|
),
|
||||||
|
"content": {
|
||||||
|
"application/json": {
|
||||||
|
"example": {"job_id": "abc123def456", "status": "queued", "run_id": None}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
def submit_async_score(request: ScoreRequest) -> AsyncScoreJobResponse:
|
||||||
|
"""提交异步 RAGAS 评分任务,立即返回 job_id。
|
||||||
|
|
||||||
|
**适合 Dify 工作流**:HTTP 节点无需等待评分完成(无超时风险),
|
||||||
|
工作流立即继续,评分结果在 RAGAS 平台「运行列表」中查看。
|
||||||
|
|
||||||
|
评分完成后自动生成:
|
||||||
|
- 各指标得分(`scores.csv`)
|
||||||
|
- 摘要报告(`summary.md`)
|
||||||
|
- LLM 优化建议(`optimization_advice.md`)
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
"[score_async] submit metrics=%s has_ctx=%s has_gt=%s",
|
||||||
|
request.metrics, bool(request.contexts), bool(request.ground_truth),
|
||||||
|
)
|
||||||
|
status = score_job_manager.submit(request)
|
||||||
|
logger.info("[score_async] queued job_id=%s", status.job_id)
|
||||||
|
return AsyncScoreJobResponse(job_id=status.job_id, status=status.status)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/jobs",
|
||||||
|
response_model=dict,
|
||||||
|
summary="列出所有异步评分记录",
|
||||||
|
)
|
||||||
|
def list_score_jobs() -> dict:
|
||||||
|
"""返回所有异步评分记录,按创建时间倒序排列。"""
|
||||||
|
jobs = score_job_manager.list_jobs()
|
||||||
|
logger.info("[score_jobs] list count=%d", len(jobs))
|
||||||
|
return {"jobs": [j.model_dump() for j in jobs]}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/jobs/{job_id}",
|
||||||
|
response_model=AsyncScoreJobStatus,
|
||||||
|
summary="查询单个异步评分任务状态",
|
||||||
|
responses={404: {"description": "指定 job_id 的评分任务不存在。"}},
|
||||||
|
)
|
||||||
|
def get_score_job(job_id: str) -> AsyncScoreJobStatus:
|
||||||
|
"""查询单个异步评分任务的状态和结果。
|
||||||
|
|
||||||
|
`status` 为 `completed` 时,`run_id` 字段包含对应的运行 ID,
|
||||||
|
可通过 `GET /api/runs/{run_id}` 获取完整评分报告。
|
||||||
|
"""
|
||||||
|
status = score_job_manager.get(job_id)
|
||||||
|
if status is None:
|
||||||
|
raise HTTPException(status_code=404, detail=f"Score job not found: {job_id}")
|
||||||
|
return status
|
||||||
@@ -384,6 +384,14 @@ _GT_DEPENDENT_METRICS: frozenset[str] = frozenset({
|
|||||||
"noise_sensitivity",
|
"noise_sensitivity",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# 需要 contexts 才能计算的指标集合
|
||||||
|
_CONTEXT_DEPENDENT_METRICS: frozenset[str] = frozenset({
|
||||||
|
"faithfulness",
|
||||||
|
"context_recall",
|
||||||
|
"context_precision",
|
||||||
|
"noise_sensitivity",
|
||||||
|
})
|
||||||
|
|
||||||
# 所有合法指标名称
|
# 所有合法指标名称
|
||||||
_VALID_METRICS: frozenset[str] = frozenset({
|
_VALID_METRICS: frozenset[str] = frozenset({
|
||||||
"faithfulness",
|
"faithfulness",
|
||||||
@@ -428,8 +436,9 @@ class ScoreRequest(BaseModel):
|
|||||||
|
|
||||||
question: str = Field(description="问题文本。")
|
question: str = Field(description="问题文本。")
|
||||||
answer: str = Field(description="待评分的回答。")
|
answer: str = Field(description="待评分的回答。")
|
||||||
contexts: str = Field(
|
contexts: str | None = Field(
|
||||||
description="检索上下文字符串,多段之间用 context_separator 拼接。"
|
default=None,
|
||||||
|
description="检索上下文字符串,多段之间用 context_separator 拼接。缺失时自动跳过依赖检索内容的指标(faithfulness、context_recall、context_precision、noise_sensitivity)。",
|
||||||
)
|
)
|
||||||
ground_truth: str | None = Field(
|
ground_truth: str | None = Field(
|
||||||
default=None,
|
default=None,
|
||||||
@@ -467,15 +476,23 @@ class ScoreRequest(BaseModel):
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
def contexts_as_list(self) -> list[str]:
|
def contexts_as_list(self) -> list[str]:
|
||||||
"""Split the contexts string into a list of non-empty fragments."""
|
"""Split the contexts string into a list of non-empty fragments.
|
||||||
|
|
||||||
|
Returns an empty list when contexts is None or blank.
|
||||||
|
"""
|
||||||
|
if not self.contexts:
|
||||||
|
return []
|
||||||
separator = self.context_separator or " |||| "
|
separator = self.context_separator or " |||| "
|
||||||
return [part.strip() for part in self.contexts.split(separator) if part.strip()]
|
return [part.strip() for part in self.contexts.split(separator) if part.strip()]
|
||||||
|
|
||||||
def effective_metrics(self) -> list[str]:
|
def effective_metrics(self) -> list[str]:
|
||||||
"""Return metrics filtered to exclude GT-dependent ones when ground_truth is absent."""
|
"""Return metrics filtered to exclude GT-dependent or context-dependent ones when inputs are absent."""
|
||||||
if self.ground_truth is not None:
|
result = list(self.metrics)
|
||||||
return list(self.metrics)
|
if self.ground_truth is None:
|
||||||
return [metric_name for metric_name in self.metrics if metric_name not in _GT_DEPENDENT_METRICS]
|
result = [m for m in result if m not in _GT_DEPENDENT_METRICS]
|
||||||
|
if not self.contexts:
|
||||||
|
result = [m for m in result if m not in _CONTEXT_DEPENDENT_METRICS]
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
class ScoreResponse(BaseModel):
|
class ScoreResponse(BaseModel):
|
||||||
@@ -497,3 +514,40 @@ class ScoreResponse(BaseModel):
|
|||||||
default=None,
|
default=None,
|
||||||
description="打分异常时的错误信息(HTTP 200 仍返回,scores 为空)。",
|
description="打分异常时的错误信息(HTTP 200 仍返回,scores 为空)。",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 异步评分记录模型
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class AsyncScoreJobResponse(BaseModel):
|
||||||
|
"""Immediate 202 response after submitting an async score job."""
|
||||||
|
|
||||||
|
job_id: str = Field(description="任务唯一标识符,用于后续查询结果。")
|
||||||
|
status: str = Field(default="queued", description="初始状态:queued。")
|
||||||
|
run_id: str | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="评分完成后写入的 Run ID,可在「运行列表」中查看完整报告。",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncScoreJobStatus(BaseModel):
|
||||||
|
"""State of one async score job (queued → running → completed/failed)."""
|
||||||
|
|
||||||
|
job_id: str = Field(description="任务唯一标识符。")
|
||||||
|
status: str = Field(description="queued | running | completed | failed")
|
||||||
|
created_at: str = Field(default="", description="创建时间(ISO 8601 UTC)。")
|
||||||
|
finished_at: str = Field(default="", description="完成时间(ISO 8601 UTC)。")
|
||||||
|
run_id: str | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="完成后对应的 Run ID,可通过 GET /api/runs/{run_id} 查看完整报告。",
|
||||||
|
)
|
||||||
|
request_summary: dict = Field(
|
||||||
|
default_factory=dict,
|
||||||
|
description="请求参数快照(question 前80字、metrics、judge_model 等)。",
|
||||||
|
)
|
||||||
|
scores: dict[str, float | None] = Field(default_factory=dict, description="各指标得分。")
|
||||||
|
weighted_score: float | None = Field(default=None, description="加权综合得分。")
|
||||||
|
latency_ms: int = Field(default=0, description="评分耗时毫秒。")
|
||||||
|
skipped_metrics: list[str] = Field(default_factory=list)
|
||||||
|
error: str | None = Field(default=None)
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ from fastapi.exceptions import RequestValidationError
|
|||||||
from fastapi.responses import FileResponse, JSONResponse
|
from fastapi.responses import FileResponse, JSONResponse
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
|
|
||||||
from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score
|
from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs
|
||||||
|
|
||||||
STATIC_DIR = Path(__file__).resolve().parent / "static"
|
STATIC_DIR = Path(__file__).resolve().parent / "static"
|
||||||
logger = logging.getLogger("webapp.server")
|
logger = logging.getLogger("webapp.server")
|
||||||
@@ -69,10 +69,12 @@ OPENAPI_TAGS = [
|
|||||||
{
|
{
|
||||||
"name": "score",
|
"name": "score",
|
||||||
"description": (
|
"description": (
|
||||||
"**实时评分 API(Dify 外部 Tool)**\n\n"
|
"**实时评分 API(同步)** — `POST /api/score`\n\n"
|
||||||
"接受单条问答记录 `(question, answer, contexts, ground_truth)`,\n"
|
"**异步评分 API(Dify 推荐)** — `POST /api/score/async`\n\n"
|
||||||
"同步运行 RAGAS 指标打分,返回各指标得分和加权综合得分。\n\n"
|
"异步方式立即返回 job_id(202),评分在后台执行,完成后自动生成完整报告(含优化建议),"
|
||||||
"适用场景:Dify Agent 在回答后即时调用,用于质量监控或自我改进。\n\n"
|
"在「运行列表」页查看。\n\n"
|
||||||
|
"通过 `GET /api/score/jobs` 列出所有异步评分记录,"
|
||||||
|
"`GET /api/score/jobs/{job_id}` 查询单个任务状态。\n\n"
|
||||||
"**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
|
"**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
|
||||||
"`Authorization: Bearer <token>` 请求头。"
|
"`Authorization: Bearer <token>` 请求头。"
|
||||||
),
|
),
|
||||||
@@ -87,7 +89,7 @@ OPENAPI_TAGS = [
|
|||||||
def create_app() -> FastAPI:
|
def create_app() -> FastAPI:
|
||||||
"""Build and configure the FastAPI application instance."""
|
"""Build and configure the FastAPI application instance."""
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="RAGAS 评估系统",
|
title="Siemens RAGAS 评估平台",
|
||||||
description=(
|
description=(
|
||||||
"西门子医疗影像 RAG 评估平台 API 文档。\n\n"
|
"西门子医疗影像 RAG 评估平台 API 文档。\n\n"
|
||||||
"提供以下能力:\n"
|
"提供以下能力:\n"
|
||||||
@@ -108,6 +110,7 @@ def create_app() -> FastAPI:
|
|||||||
app.include_router(llm_profiles.router)
|
app.include_router(llm_profiles.router)
|
||||||
app.include_router(pipeline.router)
|
app.include_router(pipeline.router)
|
||||||
app.include_router(score.router)
|
app.include_router(score.router)
|
||||||
|
app.include_router(score_jobs.router)
|
||||||
|
|
||||||
@app.middleware("http")
|
@app.middleware("http")
|
||||||
async def access_log_middleware(request: Request, call_next):
|
async def access_log_middleware(request: Request, call_next):
|
||||||
|
|||||||
269
webapp/services/score_job_manager.py
Normal file
269
webapp/services/score_job_manager.py
Normal file
@@ -0,0 +1,269 @@
|
|||||||
|
"""Background task manager for async RAGAS single-sample scoring.
|
||||||
|
|
||||||
|
Each job:
|
||||||
|
1. Runs InlineScorer.score() in a thread pool.
|
||||||
|
2. Constructs a minimal EvaluationResult + Scenario in the standard format.
|
||||||
|
3. Calls write_run_artifacts() — produces metadata.json, scores.csv, summary.md.
|
||||||
|
4. Calls run_advisor() — produces optimization_advice.md.
|
||||||
|
|
||||||
|
The resulting run directory lands under outputs/score-async/<run_id>/ and is
|
||||||
|
automatically picked up by run_reader.list_run_summaries(), so it appears in
|
||||||
|
the existing 「运行列表」 and 「报告详情」 pages without any extra wiring.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from webapp.models import AsyncScoreJobStatus, ScoreRequest
|
||||||
|
|
||||||
|
_REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||||
|
_DEFAULT_JOBS_DIR = _REPO_ROOT / "outputs" / "score-async"
|
||||||
|
_DEFAULT_INDEX_DIR = _REPO_ROOT / "outputs" / "score-jobs" # lightweight job index
|
||||||
|
|
||||||
|
|
||||||
|
def _now_iso() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
class ScoreJobManager:
|
||||||
|
"""Thread-pool manager for async scoring jobs.
|
||||||
|
|
||||||
|
Results are written as standard run artifacts so the report detail page
|
||||||
|
can render them with zero additional code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
output_dir: Path = _DEFAULT_JOBS_DIR,
|
||||||
|
index_dir: Path = _DEFAULT_INDEX_DIR,
|
||||||
|
max_workers: int = 4,
|
||||||
|
) -> None:
|
||||||
|
self._output_dir = Path(output_dir)
|
||||||
|
self._index_dir = Path(index_dir)
|
||||||
|
self._output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._index_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._executor = ThreadPoolExecutor(max_workers=max_workers)
|
||||||
|
self._cache: dict[str, AsyncScoreJobStatus] = {}
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._load_existing()
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
# Public API
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
|
||||||
|
def submit(self, request: ScoreRequest) -> AsyncScoreJobStatus:
|
||||||
|
"""Queue one scoring job and return its initial status immediately."""
|
||||||
|
job_id = uuid.uuid4().hex[:12]
|
||||||
|
status = AsyncScoreJobStatus(
|
||||||
|
job_id=job_id,
|
||||||
|
status="queued",
|
||||||
|
created_at=_now_iso(),
|
||||||
|
request_summary={
|
||||||
|
"question": (request.question or "")[:80],
|
||||||
|
"answer": (request.answer or "")[:80],
|
||||||
|
"metrics": list(request.metrics),
|
||||||
|
"judge_model": request.judge_model or "",
|
||||||
|
"embedding_model": request.embedding_model or "",
|
||||||
|
"has_contexts": bool(request.contexts),
|
||||||
|
"has_ground_truth": bool(request.ground_truth),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
with self._lock:
|
||||||
|
self._cache[job_id] = status
|
||||||
|
self._persist_index(status)
|
||||||
|
self._executor.submit(self._run, job_id, request)
|
||||||
|
return status
|
||||||
|
|
||||||
|
def get(self, job_id: str) -> AsyncScoreJobStatus | None:
|
||||||
|
"""Return current status or None if unknown."""
|
||||||
|
with self._lock:
|
||||||
|
return self._cache.get(job_id)
|
||||||
|
|
||||||
|
def list_jobs(self) -> list[AsyncScoreJobStatus]:
|
||||||
|
"""Return all known jobs, newest first."""
|
||||||
|
with self._lock:
|
||||||
|
jobs = list(self._cache.values())
|
||||||
|
jobs.sort(key=lambda j: j.created_at, reverse=True)
|
||||||
|
return jobs
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
# Worker
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
|
||||||
|
def _run(self, job_id: str, request: ScoreRequest) -> None:
|
||||||
|
"""Execute scoring, write run artifacts, run advisor."""
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger("webapp.services.score_job_manager")
|
||||||
|
self._update(job_id, status="running")
|
||||||
|
|
||||||
|
# Lazy imports to keep web server bootable if ragas is not installed.
|
||||||
|
from rag_eval.advisor import run_advisor
|
||||||
|
from rag_eval.metrics.factory import build_models
|
||||||
|
from rag_eval.metrics.weights import compute_weighted_score
|
||||||
|
from rag_eval.reporting.writers import write_run_artifacts
|
||||||
|
from rag_eval.settings import EvaluationSettings
|
||||||
|
from rag_eval.shared.models import (
|
||||||
|
DatasetConfig, EvaluationResult, NormalizedSample,
|
||||||
|
RuntimeConfig, Scenario,
|
||||||
|
)
|
||||||
|
from rag_eval.shared.utils import utc_now_iso
|
||||||
|
from webapp.services.inline_scorer import inline_scorer
|
||||||
|
|
||||||
|
settings = EvaluationSettings()
|
||||||
|
judge_model = request.judge_model or settings.ragas_judge_model
|
||||||
|
embedding_model = request.embedding_model or settings.ragas_embedding_model
|
||||||
|
effective = request.effective_metrics()
|
||||||
|
requested = set(request.metrics)
|
||||||
|
skipped = sorted(requested - set(effective))
|
||||||
|
|
||||||
|
t0 = time.monotonic()
|
||||||
|
started_at = utc_now_iso()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if effective:
|
||||||
|
raw_scores = inline_scorer.score(
|
||||||
|
question=request.question,
|
||||||
|
answer=request.answer,
|
||||||
|
contexts=request.contexts_as_list(),
|
||||||
|
ground_truth=request.ground_truth,
|
||||||
|
metrics=effective,
|
||||||
|
judge_model=judge_model,
|
||||||
|
embedding_model=embedding_model,
|
||||||
|
settings=settings,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raw_scores = {}
|
||||||
|
|
||||||
|
latency_ms = int((time.monotonic() - t0) * 1000)
|
||||||
|
finished_at = utc_now_iso()
|
||||||
|
|
||||||
|
# Build full scores dict (skipped = None)
|
||||||
|
all_scores: dict[str, float | None] = {m: None for m in request.metrics}
|
||||||
|
all_scores.update(raw_scores)
|
||||||
|
weighted_raw = compute_weighted_score(
|
||||||
|
{k: v for k, v in raw_scores.items() if v is not None}, {}
|
||||||
|
)
|
||||||
|
weighted = round(weighted_raw, 4) if weighted_raw is not None else None
|
||||||
|
|
||||||
|
# Build a score row compatible with report_builder
|
||||||
|
score_row: dict[str, Any] = {
|
||||||
|
"sample_id": "async-score-1",
|
||||||
|
"question": request.question,
|
||||||
|
"answer": request.answer or "",
|
||||||
|
"contexts": request.contexts or "",
|
||||||
|
"ground_truth": request.ground_truth or "",
|
||||||
|
"error": "",
|
||||||
|
}
|
||||||
|
score_row.update(all_scores)
|
||||||
|
|
||||||
|
# Construct minimal EvaluationResult so write_run_artifacts works
|
||||||
|
run_id = finished_at.replace(":", "-")
|
||||||
|
output_dir = self._output_dir
|
||||||
|
|
||||||
|
# Build a minimal Scenario for snapshot + advisor
|
||||||
|
scenario = Scenario(
|
||||||
|
scenario_name=f"async-score-{job_id}",
|
||||||
|
mode="offline",
|
||||||
|
dataset=DatasetConfig(path=output_dir / run_id / "dataset.csv"),
|
||||||
|
judge_model=judge_model,
|
||||||
|
embedding_model=embedding_model,
|
||||||
|
metrics=list(request.metrics),
|
||||||
|
output_dir=output_dir,
|
||||||
|
optimization_advisor=True, # always generate advice
|
||||||
|
)
|
||||||
|
|
||||||
|
sample = NormalizedSample(
|
||||||
|
sample_id="async-score-1",
|
||||||
|
question=request.question,
|
||||||
|
answer=request.answer or "",
|
||||||
|
contexts=request.contexts_as_list(),
|
||||||
|
ground_truth=request.ground_truth or "",
|
||||||
|
)
|
||||||
|
|
||||||
|
result = EvaluationResult(
|
||||||
|
scenario=scenario,
|
||||||
|
run_id=run_id,
|
||||||
|
started_at=started_at,
|
||||||
|
finished_at=finished_at,
|
||||||
|
valid_samples=[sample],
|
||||||
|
invalid_samples=[],
|
||||||
|
score_rows=[score_row],
|
||||||
|
)
|
||||||
|
|
||||||
|
write_run_artifacts(result)
|
||||||
|
logger.info("[score_job] artifacts written job_id=%s run_id=%s", job_id, run_id)
|
||||||
|
|
||||||
|
# Run optimization advisor (builds optimization_advice.md)
|
||||||
|
try:
|
||||||
|
llm, _ = build_models(judge_model, embedding_model, settings)
|
||||||
|
run_advisor(result, scenario, llm)
|
||||||
|
logger.info("[score_job] advisor done job_id=%s", job_id)
|
||||||
|
except Exception as adv_exc: # noqa: BLE001
|
||||||
|
logger.warning("[score_job] advisor failed job_id=%s err=%s", job_id, adv_exc)
|
||||||
|
|
||||||
|
self._update(
|
||||||
|
job_id,
|
||||||
|
status="completed",
|
||||||
|
finished_at=finished_at,
|
||||||
|
run_id=run_id,
|
||||||
|
scores=all_scores,
|
||||||
|
weighted_score=weighted,
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
skipped_metrics=skipped,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
latency_ms = int((time.monotonic() - t0) * 1000)
|
||||||
|
logger.error("[score_job] failed job_id=%s err=%s", job_id, exc)
|
||||||
|
self._update(
|
||||||
|
job_id,
|
||||||
|
status="failed",
|
||||||
|
finished_at=_now_iso(),
|
||||||
|
latency_ms=latency_ms,
|
||||||
|
error=f"{type(exc).__name__}: {exc}",
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
# Persistence helpers
|
||||||
|
# ------------------------------------------------------------------ #
|
||||||
|
|
||||||
|
def _update(self, job_id: str, **kwargs: Any) -> None:
|
||||||
|
"""Merge kwargs into the job status and persist the index."""
|
||||||
|
with self._lock:
|
||||||
|
existing = self._cache.get(job_id)
|
||||||
|
if existing is None:
|
||||||
|
return
|
||||||
|
updated = existing.model_copy(update=kwargs)
|
||||||
|
self._cache[job_id] = updated
|
||||||
|
self._persist_index(updated)
|
||||||
|
|
||||||
|
def _persist_index(self, status: AsyncScoreJobStatus) -> None:
|
||||||
|
"""Write a lightweight index JSON for this job (survives restarts)."""
|
||||||
|
path = self._index_dir / f"{status.job_id}.json"
|
||||||
|
path.write_text(
|
||||||
|
json.dumps(status.model_dump(), ensure_ascii=False, indent=2),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
def _load_existing(self) -> None:
|
||||||
|
"""Load existing job index files on startup."""
|
||||||
|
for path in sorted(self._index_dir.glob("*.json")):
|
||||||
|
try:
|
||||||
|
data = json.loads(path.read_text(encoding="utf-8"))
|
||||||
|
status = AsyncScoreJobStatus.model_validate(data)
|
||||||
|
self._cache[status.job_id] = status
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton shared by FastAPI routes.
|
||||||
|
score_job_manager = ScoreJobManager()
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
/* Siemens RAGAS 评估控制台 — 样式表
|
/* Siemens RAGAS 评估平台 — 样式表
|
||||||
配色取自西门子品牌色(petrol / 深青)与中性灰,呼应企业语境。 */
|
配色取自西门子品牌色(petrol / 深青)与中性灰,呼应企业语境。 */
|
||||||
|
|
||||||
:root {
|
:root {
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
<head>
|
<head>
|
||||||
<meta charset="UTF-8" />
|
<meta charset="UTF-8" />
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||||
<title>RAGAS 评估控制台</title>
|
<title>Siemens RAGAS 评估平台</title>
|
||||||
<link rel="stylesheet" href="/static/css/app.css" />
|
<link rel="stylesheet" href="/static/css/app.css" />
|
||||||
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.1/dist/chart.umd.min.js"></script>
|
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.1/dist/chart.umd.min.js"></script>
|
||||||
</head>
|
</head>
|
||||||
@@ -12,8 +12,8 @@
|
|||||||
<!-- 左侧导航(布局 A) -->
|
<!-- 左侧导航(布局 A) -->
|
||||||
<aside class="sidebar">
|
<aside class="sidebar">
|
||||||
<div class="brand">
|
<div class="brand">
|
||||||
<div class="brand-mark">RAGAS</div>
|
<div class="brand-mark">Siemens RAGAS</div>
|
||||||
<div class="brand-sub">评估控制台</div>
|
<div class="brand-sub">评估平台</div>
|
||||||
</div>
|
</div>
|
||||||
<nav class="nav">
|
<nav class="nav">
|
||||||
<button class="nav-item" data-view="runs">
|
<button class="nav-item" data-view="runs">
|
||||||
@@ -28,6 +28,9 @@
|
|||||||
<button class="nav-item" data-view="profiles">
|
<button class="nav-item" data-view="profiles">
|
||||||
<span class="nav-ico">⚙</span><span>LLM 配置</span>
|
<span class="nav-ico">⚙</span><span>LLM 配置</span>
|
||||||
</button>
|
</button>
|
||||||
|
<button class="nav-item" data-view="scorejobs">
|
||||||
|
<span class="nav-ico">📋</span><span>评分记录</span>
|
||||||
|
</button>
|
||||||
<button class="nav-item" data-view="apidocs">
|
<button class="nav-item" data-view="apidocs">
|
||||||
<span class="nav-ico">⎔</span><span>API 文档</span>
|
<span class="nav-ico">⎔</span><span>API 文档</span>
|
||||||
</button>
|
</button>
|
||||||
@@ -234,6 +237,22 @@
|
|||||||
</div>
|
</div>
|
||||||
</section>
|
</section>
|
||||||
|
|
||||||
|
<!-- 评分记录视图 -->
|
||||||
|
<section class="view" id="view-scorejobs" hidden>
|
||||||
|
<div class="panel">
|
||||||
|
<div class="panel-head">
|
||||||
|
<h2>评分记录</h2>
|
||||||
|
<span class="muted" style="font-size:13px">来自 Dify 异步评分任务(POST /api/score/async)</span>
|
||||||
|
</div>
|
||||||
|
<p class="muted">评分完成后自动生成完整报告(含指标得分与 LLM 优化建议),点击「查看报告」跳转报告详情页。</p>
|
||||||
|
</div>
|
||||||
|
<div id="scorejobs-list"></div>
|
||||||
|
<div class="empty" id="scorejobs-empty" hidden>
|
||||||
|
<p>暂无评分记录。</p>
|
||||||
|
<p class="muted">在 Dify 工作流中调用 <code>POST /api/score/async</code> 后,记录将在此显示。</p>
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
|
||||||
<!-- API 文档视图 -->
|
<!-- API 文档视图 -->
|
||||||
<section class="view" id="view-apidocs" hidden>
|
<section class="view" id="view-apidocs" hidden>
|
||||||
<iframe
|
<iframe
|
||||||
@@ -251,6 +270,7 @@
|
|||||||
<script src="/static/js/report.js"></script>
|
<script src="/static/js/report.js"></script>
|
||||||
<script src="/static/js/profiles.js"></script>
|
<script src="/static/js/profiles.js"></script>
|
||||||
<script src="/static/js/runner.js"></script>
|
<script src="/static/js/runner.js"></script>
|
||||||
|
<script src="/static/js/score_jobs.js"></script>
|
||||||
<script src="/static/js/app.js"></script>
|
<script src="/static/js/app.js"></script>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|||||||
@@ -66,6 +66,11 @@ const API = {
|
|||||||
},
|
},
|
||||||
applyProfiles(body) { return API.post("/api/llm-profiles/apply", body); },
|
applyProfiles(body) { return API.post("/api/llm-profiles/apply", body); },
|
||||||
|
|
||||||
|
// 异步评分记录 API
|
||||||
|
scoreJobsAsync(body) { return API.post("/api/score/async", body); },
|
||||||
|
getScoreJob(jobId) { return API.get(`/api/score/jobs/${encodeURIComponent(jobId)}`); },
|
||||||
|
listScoreJobs() { return API.get("/api/score/jobs"); },
|
||||||
|
|
||||||
// 测试已保存 profile 的连通性
|
// 测试已保存 profile 的连通性
|
||||||
testProfile(id) {
|
testProfile(id) {
|
||||||
return fetch(`/api/llm-profiles/${encodeURIComponent(id)}/test`, { method: "POST" })
|
return fetch(`/api/llm-profiles/${encodeURIComponent(id)}/test`, { method: "POST" })
|
||||||
|
|||||||
@@ -5,8 +5,8 @@
|
|||||||
const App = {
|
const App = {
|
||||||
currentRunId: null,
|
currentRunId: null,
|
||||||
activeView: null,
|
activeView: null,
|
||||||
views: ["runs", "new", "report", "profiles", "apidocs"],
|
views: ["runs", "new", "report", "profiles", "scorejobs", "apidocs"],
|
||||||
titles: { runs: "运行列表", new: "新建评估", report: "报告详情", profiles: "LLM 配置", apidocs: "API 文档" },
|
titles: { runs: "运行列表", new: "新建评估", report: "报告详情", profiles: "LLM 配置", scorejobs: "评分记录", apidocs: "API 文档" },
|
||||||
|
|
||||||
// 初始化:绑定导航、从 URL/sessionStorage 恢复上次位置、启动健康检查。
|
// 初始化:绑定导航、从 URL/sessionStorage 恢复上次位置、启动健康检查。
|
||||||
init() {
|
init() {
|
||||||
@@ -72,6 +72,7 @@ const App = {
|
|||||||
if (view === "new") Runner.loadScenarios();
|
if (view === "new") Runner.loadScenarios();
|
||||||
if (view === "report") Report.render(App.currentRunId);
|
if (view === "report") Report.render(App.currentRunId);
|
||||||
if (view === "profiles") Profiles.load();
|
if (view === "profiles") Profiles.load();
|
||||||
|
if (view === "scorejobs") ScoreJobs.load();
|
||||||
},
|
},
|
||||||
|
|
||||||
// ----------------------------------------------------------------
|
// ----------------------------------------------------------------
|
||||||
|
|||||||
125
webapp/static/js/score_jobs.js
Normal file
125
webapp/static/js/score_jobs.js
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
// score_jobs.js — 评分记录页面(异步 RAGAS 评分任务列表)
|
||||||
|
// 每条评分完成后自动写入标准 Run 产物,点击「查看报告」复用现有报告详情页。
|
||||||
|
|
||||||
|
const ScoreJobs = {
|
||||||
|
_pollTimers: {}, // job_id -> setInterval handle
|
||||||
|
|
||||||
|
async load() {
|
||||||
|
const list = document.getElementById("scorejobs-list");
|
||||||
|
const empty = document.getElementById("scorejobs-empty");
|
||||||
|
list.innerHTML = '<p class="muted">加载中…</p>';
|
||||||
|
try {
|
||||||
|
const data = await API.listScoreJobs();
|
||||||
|
const jobs = data.jobs || [];
|
||||||
|
list.innerHTML = "";
|
||||||
|
if (jobs.length === 0) {
|
||||||
|
empty.hidden = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
empty.hidden = true;
|
||||||
|
jobs.forEach(job => list.appendChild(ScoreJobs.renderCard(job)));
|
||||||
|
// Auto-poll any pending jobs
|
||||||
|
jobs.forEach(job => {
|
||||||
|
if (job.status === "queued" || job.status === "running") {
|
||||||
|
ScoreJobs._startPoll(job.job_id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
list.innerHTML = `<p class="muted">加载失败:${App.escape(err.message)}</p>`;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
renderCard(job) {
|
||||||
|
const card = document.createElement("div");
|
||||||
|
card.className = "run-card";
|
||||||
|
card.id = `score-job-${job.job_id}`;
|
||||||
|
card.innerHTML = ScoreJobs._cardHtml(job);
|
||||||
|
// Bind report button if already completed
|
||||||
|
ScoreJobs._bindReportBtn(card, job);
|
||||||
|
return card;
|
||||||
|
},
|
||||||
|
|
||||||
|
_cardHtml(job) {
|
||||||
|
const time = App.shortTime(job.created_at);
|
||||||
|
const question = App.escape((job.request_summary?.question || "—").slice(0, 60));
|
||||||
|
const metrics = (job.request_summary?.metrics || []).join(", ");
|
||||||
|
|
||||||
|
const statusBadge = `<span class="badge ${job.status}">${job.status}</span>`;
|
||||||
|
|
||||||
|
let scoreHtml = "";
|
||||||
|
if (job.status === "completed") {
|
||||||
|
scoreHtml = Object.entries(job.scores || {})
|
||||||
|
.map(([k, v]) => {
|
||||||
|
const cls = App.scoreClass(v);
|
||||||
|
const text = v === null || v === undefined ? "n/a" : Number(v).toFixed(3);
|
||||||
|
return `<span class="metric-chip" title="${App.escape(k)}">${App.escape(App.shortMetric(k))} <b class="${cls}">${text}</b></span>`;
|
||||||
|
})
|
||||||
|
.join(" ");
|
||||||
|
if (job.weighted_score !== null && job.weighted_score !== undefined) {
|
||||||
|
const cls = App.scoreClass(job.weighted_score);
|
||||||
|
scoreHtml += ` <span class="metric-chip">综合 <b class="${cls}">${Number(job.weighted_score).toFixed(3)}</b></span>`;
|
||||||
|
}
|
||||||
|
} else if (job.status === "failed") {
|
||||||
|
scoreHtml = `<span style="color:var(--bad);font-size:12px">${App.escape((job.error || "").slice(0, 80))}</span>`;
|
||||||
|
} else {
|
||||||
|
scoreHtml = `<span class="muted">评分中,请稍候…</span>`;
|
||||||
|
}
|
||||||
|
|
||||||
|
const reportBtn = job.status === "completed" && job.run_id
|
||||||
|
? `<button class="btn btn-sm btn-primary score-job-report-btn" data-run-id="${App.escape(job.run_id)}">查看报告</button>`
|
||||||
|
: "";
|
||||||
|
|
||||||
|
return `
|
||||||
|
<div class="run-card-head">
|
||||||
|
<div class="run-card-title">${question}</div>
|
||||||
|
<div style="display:flex;gap:8px;align-items:center">${statusBadge}${reportBtn}</div>
|
||||||
|
</div>
|
||||||
|
<div class="run-card-meta">
|
||||||
|
<div>指标:${App.escape(metrics)} · ${time} · ${job.latency_ms}ms</div>
|
||||||
|
</div>
|
||||||
|
<div class="run-card-metrics">${scoreHtml}</div>
|
||||||
|
`;
|
||||||
|
},
|
||||||
|
|
||||||
|
_bindReportBtn(card, job) {
|
||||||
|
const btn = card.querySelector(".score-job-report-btn");
|
||||||
|
if (!btn) return;
|
||||||
|
btn.addEventListener("click", () => {
|
||||||
|
const runId = btn.dataset.runId;
|
||||||
|
if (runId) {
|
||||||
|
App.enableReportNav();
|
||||||
|
App.navigate("report", runId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
_startPoll(jobId) {
|
||||||
|
if (ScoreJobs._pollTimers[jobId]) return;
|
||||||
|
ScoreJobs._pollTimers[jobId] = setInterval(async () => {
|
||||||
|
try {
|
||||||
|
const job = await API.getScoreJob(jobId);
|
||||||
|
const card = document.getElementById(`score-job-${jobId}`);
|
||||||
|
if (card) {
|
||||||
|
card.innerHTML = ScoreJobs._cardHtml(job);
|
||||||
|
ScoreJobs._bindReportBtn(card, job);
|
||||||
|
}
|
||||||
|
if (job.status === "completed" || job.status === "failed") {
|
||||||
|
clearInterval(ScoreJobs._pollTimers[jobId]);
|
||||||
|
delete ScoreJobs._pollTimers[jobId];
|
||||||
|
// If completed, pre-enable report nav
|
||||||
|
if (job.status === "completed" && job.run_id) {
|
||||||
|
App.enableReportNav();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (_e) {
|
||||||
|
clearInterval(ScoreJobs._pollTimers[jobId]);
|
||||||
|
delete ScoreJobs._pollTimers[jobId];
|
||||||
|
}
|
||||||
|
}, 5000);
|
||||||
|
},
|
||||||
|
|
||||||
|
stopAllPolls() {
|
||||||
|
Object.values(ScoreJobs._pollTimers).forEach(t => clearInterval(t));
|
||||||
|
ScoreJobs._pollTimers = {};
|
||||||
|
},
|
||||||
|
};
|
||||||
Reference in New Issue
Block a user