Compare commits

...

7 Commits

Author SHA1 Message Date
wangwei
e1751447df feat(advisor): add 0.85 advisory threshold triggering LLM suggestions
- Add advisory_threshold=0.85 field to MetricRule (higher-is-better metrics)
- diagnose() now emits severity='low' for scores in (warning_threshold, 0.85)
- noise_sensitivity (lower-is-better) keeps its existing two-tier thresholds
- writer.py: severity labels mapped to Chinese (严重/警告/待优化)
- llm_analyzer.py: prompt explains low/warning/critical tiers in Chinese
- Tests: 5 new cases for 'low' severity, updated log summary assertions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-25 11:35:49 +08:00
wangwei
4fd515d2d9 feat: async score jobs — POST /api/score/async + 评分记录 page
Each async score job:
- Runs InlineScorer.score() in thread pool
- Writes standard run artifacts (metadata.json, scores.csv, summary.md)
- Runs optimization_advisor => optimization_advice.md
- Result appears in 运行列表 and 报告详情 with full report

New endpoints:
- POST /api/score/async  (202, job_id immediate)
- GET  /api/score/jobs   (list all jobs)
- GET  /api/score/jobs/{id} (single job status)

Frontend:
- 评分记录 nav page with card list
- 5s auto-polling for queued/running jobs
- 查看报告 button navigates to existing 报告详情 page

Dify: change /api/score -> /api/score/async, no response parsing needed

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-24 17:24:22 +08:00
wangwei
abcd61ec8f docs: add async score jobs implementation plan 2026-06-24 17:08:01 +08:00
wangwei
363e8b0f27 docs: add async score jobs design spec 2026-06-24 17:04:06 +08:00
wangwei
b870ed8730 feat: make contexts optional in /api/score
When contexts is absent, metrics that require retrieved_contexts
(faithfulness, context_recall, context_precision, noise_sensitivity)
are automatically skipped and appear in skipped_metrics.
Only answer_relevancy, factual_correctness, semantic_similarity
remain computable without contexts.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-24 14:42:03 +08:00
wangwei
791738bb07 feat: rename project to 'Siemens RAGAS 评估平台' in frontend
- index.html: page title, brand-mark, brand-sub
- server.py: FastAPI app title
- app.css: comment header

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-24 10:20:23 +08:00
wangwei
630b70cc2a docs: add project-overview.html — full project documentation
Covers: overview, architecture, modules, data flows (4 flows),
RAGAS metrics (7), API reference, weight config, deployment,
tech stack, directory structure. Self-contained HTML with
Siemens teal theme, sidebar scrollspy, responsive layout.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-24 10:17:08 +08:00
20 changed files with 2846 additions and 39 deletions

View File

@@ -0,0 +1,808 @@
# 异步评分记录Async Score JobsImplementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 新增 `POST /api/score/async` 异步端点,结果持久化至 `outputs/score-jobs/`,并在前端新增「评分记录」页面展示。
**Architecture:** 新建 `ScoreJobManager`(复用 `pipeline_task_manager` 线程池模式)在后台执行 `InlineScorer.score()`,写入 JSON 文件;新增三个 REST 端点;前端新增导航页加载并轮询记录。
**Tech Stack:** Python 3.12, FastAPI, Pydantic v2, threading, Vanilla JS, pytest
## Global Constraints
- Python 3.12+PEP 84 空格缩进,类型注解必须
- 存储路径:`outputs/score-jobs/<job_id>.json`
- 复用现有 `ScoreRequest`(含 `effective_metrics()``contexts_as_list()` 方法)
- 复用现有 `InlineScorer.score()``compute_weighted_score()`
- 所有测试用 pytest不依赖真实 LLM
---
## 文件清单
| 操作 | 文件 | 职责 |
|------|------|------|
| 新建 | `webapp/services/score_job_manager.py` | ScoreJobManager线程池 + JSON 持久化 |
| 新建 | `webapp/api/score_jobs.py` | 3 个端点路由 |
| 新建 | `webapp/static/js/score_jobs.js` | 前端列表 + 轮询逻辑 |
| 新建 | `tests/webapp/test_score_jobs_api.py` | API 集成测试 |
| 修改 | `webapp/models.py` | 新增 `AsyncScoreJobStatus``AsyncScoreJobResponse` |
| 修改 | `webapp/server.py` | 注册 score_jobs router更新 OPENAPI_TAGS 和 description |
| 修改 | `webapp/static/index.html` | 新增导航项 + `#view-scorejobs` section |
| 修改 | `webapp/static/js/api.js` | 新增 `scoreJobsAsync()``getScoreJob()``listScoreJobs()` |
| 修改 | `webapp/static/js/app.js` | 注册 `scorejobs` 视图、加载调用 |
---
## Task 1: Pydantic 模型 + ScoreJobManager
**Files:**
- Modify: `webapp/models.py`
- Create: `webapp/services/score_job_manager.py`
- Create: `tests/webapp/test_score_jobs_api.py` (partial)
**Interfaces:**
- Produces:
- `AsyncScoreJobStatus` Pydantic model
- `AsyncScoreJobResponse` Pydantic model
- `score_job_manager: ScoreJobManager` singleton
- `ScoreJobManager.submit(request: ScoreRequest) -> AsyncScoreJobStatus`
- `ScoreJobManager.get(job_id: str) -> AsyncScoreJobStatus | None`
- `ScoreJobManager.list_jobs() -> list[AsyncScoreJobStatus]`
- [ ] **Step 1: Add models to `webapp/models.py`**
Append after `AsyncScoreJobResponse` (at the end of the file, after `ScoreResponse`):
```python
# ---------------------------------------------------------------------------
# 异步评分记录模型
# ---------------------------------------------------------------------------
class AsyncScoreJobResponse(BaseModel):
"""Immediate response after submitting an async score job."""
job_id: str = Field(description="任务唯一标识符,用于后续查询结果。")
status: str = Field(default="queued", description="初始状态queued。")
class AsyncScoreJobStatus(BaseModel):
"""Full state of one async score job, persisted to disk."""
job_id: str = Field(description="任务唯一标识符。")
status: str = Field(description="queued | running | completed | failed")
created_at: str = Field(default="", description="创建时间ISO 8601 UTC。")
finished_at: str = Field(default="", description="完成时间ISO 8601 UTC。")
request_summary: dict = Field(
default_factory=dict,
description="请求参数快照question 前80字、metrics、judge_model 等)。",
)
scores: dict[str, float | None] = Field(default_factory=dict, description="各指标得分。")
weighted_score: float | None = Field(default=None, description="加权综合得分。")
latency_ms: int = Field(default=0, description="评分耗时毫秒。")
skipped_metrics: list[str] = Field(default_factory=list)
error: str | None = Field(default=None)
```
- [ ] **Step 2: Write failing tests**
Create `tests/webapp/test_score_jobs_api.py`:
```python
"""Tests for async score jobs API."""
from __future__ import annotations
import json
import time
import pytest
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
@pytest.fixture()
def client(tmp_path, monkeypatch):
import webapp.services.score_job_manager as mgr_mod
from webapp.services.score_job_manager import ScoreJobManager
fresh_mgr = ScoreJobManager(jobs_dir=tmp_path / "score-jobs")
monkeypatch.setattr(mgr_mod, "score_job_manager", fresh_mgr)
import webapp.api.score_jobs as api_mod
monkeypatch.setattr(api_mod, "score_job_manager", fresh_mgr)
from webapp.server import create_app
return TestClient(create_app())
class TestScoreJobManager:
def test_submit_returns_job_status_with_queued(self, tmp_path):
from webapp.services.score_job_manager import ScoreJobManager
from webapp.models import ScoreRequest
mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
req = ScoreRequest(question="q", answer="a", metrics=["answer_relevancy"])
with patch.object(mgr, "_execute") as mock_exec:
mock_exec.return_value = None
status = mgr.submit(req)
assert status.status in ("queued", "running", "completed")
assert len(status.job_id) > 0
def test_get_returns_none_for_unknown_id(self, tmp_path):
from webapp.services.score_job_manager import ScoreJobManager
mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
assert mgr.get("nonexistent") is None
def test_list_returns_empty_initially(self, tmp_path):
from webapp.services.score_job_manager import ScoreJobManager
mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs")
assert mgr.list_jobs() == []
def test_completed_job_persisted_to_disk(self, tmp_path):
from webapp.services.score_job_manager import ScoreJobManager
from webapp.models import ScoreRequest
mgr = ScoreJobManager(jobs_dir=tmp_path / "jobs", max_workers=1)
req = ScoreRequest(question="q?", answer="a.", metrics=["answer_relevancy"])
mock_scorer = MagicMock()
mock_scorer.score.return_value = {"answer_relevancy": 0.85}
with patch("webapp.services.score_job_manager.inline_scorer", mock_scorer):
with patch("webapp.services.score_job_manager.EvaluationSettings"):
status = mgr.submit(req)
for _ in range(20):
s = mgr.get(status.job_id)
if s and s.status in ("completed", "failed"):
break
time.sleep(0.2)
s = mgr.get(status.job_id)
assert s is not None
json_path = tmp_path / "jobs" / f"{status.job_id}.json"
assert json_path.exists()
data = json.loads(json_path.read_text(encoding="utf-8"))
assert data["job_id"] == status.job_id
```
- [ ] **Step 3: Run to verify FAIL**
```
cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobManager -v
```
Expected: `ModuleNotFoundError: No module named 'webapp.services.score_job_manager'`
- [ ] **Step 4: Create `webapp/services/score_job_manager.py`**
```python
"""Background task manager for async RAGAS single-sample scoring.
Each job runs InlineScorer.score() in a thread pool and persists the
result as a JSON file under outputs/score-jobs/<job_id>.json so results
survive server restarts and can be listed by the frontend.
"""
from __future__ import annotations
import json
import math
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from rag_eval.metrics.weights import compute_weighted_score
from rag_eval.settings import EvaluationSettings
from webapp.models import AsyncScoreJobStatus, ScoreRequest
from webapp.services.inline_scorer import inline_scorer
_REPO_ROOT = Path(__file__).resolve().parents[2]
_DEFAULT_JOBS_DIR = _REPO_ROOT / "outputs" / "score-jobs"
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class ScoreJobManager:
"""Thread-pool manager for async RAGAS scoring jobs with JSON persistence."""
def __init__(
self,
jobs_dir: Path = _DEFAULT_JOBS_DIR,
max_workers: int = 4,
) -> None:
self._jobs_dir = Path(jobs_dir)
self._jobs_dir.mkdir(parents=True, exist_ok=True)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
# In-memory index: job_id -> AsyncScoreJobStatus (authoritative while running)
self._cache: dict[str, AsyncScoreJobStatus] = {}
self._lock = threading.Lock()
self._load_existing()
# ------------------------------------------------------------------ #
# Public API
# ------------------------------------------------------------------ #
def submit(self, request: ScoreRequest) -> AsyncScoreJobStatus:
"""Queue one scoring job and return its initial status immediately."""
job_id = uuid.uuid4().hex[:12]
status = AsyncScoreJobStatus(
job_id=job_id,
status="queued",
created_at=_now_iso(),
request_summary={
"question": request.question[:80],
"answer": (request.answer or "")[:80],
"metrics": list(request.metrics),
"judge_model": request.judge_model or "",
"embedding_model": request.embedding_model or "",
"has_contexts": bool(request.contexts),
"has_ground_truth": bool(request.ground_truth),
},
)
with self._lock:
self._cache[job_id] = status
self._persist(status)
self._executor.submit(self._run, job_id, request)
return status
def get(self, job_id: str) -> AsyncScoreJobStatus | None:
"""Return the current status for one job, or None if unknown."""
with self._lock:
return self._cache.get(job_id)
def list_jobs(self) -> list[AsyncScoreJobStatus]:
"""Return all known jobs sorted newest first."""
with self._lock:
jobs = list(self._cache.values())
jobs.sort(key=lambda j: j.created_at, reverse=True)
return jobs
# ------------------------------------------------------------------ #
# Internal
# ------------------------------------------------------------------ #
def _run(self, job_id: str, request: ScoreRequest) -> None:
"""Execute scoring in the thread pool and persist the result."""
self._update(job_id, status="running")
settings = EvaluationSettings()
judge_model = request.judge_model or settings.ragas_judge_model
embedding_model = request.embedding_model or settings.ragas_embedding_model
effective = request.effective_metrics()
requested = set(request.metrics)
skipped = sorted(requested - set(effective))
import time as _time
t0 = _time.monotonic()
try:
if not effective:
scores: dict[str, float | None] = {m: None for m in request.metrics}
weighted = None
else:
raw = inline_scorer.score(
question=request.question,
answer=request.answer,
contexts=request.contexts_as_list(),
ground_truth=request.ground_truth,
metrics=effective,
judge_model=judge_model,
embedding_model=embedding_model,
settings=settings,
)
scores = {m: None for m in request.metrics}
scores.update(raw)
weighted_raw = compute_weighted_score(
{k: v for k, v in raw.items() if v is not None}, {}
)
weighted = round(weighted_raw, 4) if weighted_raw is not None else None
latency_ms = int((_time.monotonic() - t0) * 1000)
self._update(
job_id,
status="completed",
finished_at=_now_iso(),
scores=scores,
weighted_score=weighted,
latency_ms=latency_ms,
skipped_metrics=skipped,
)
except Exception as exc: # noqa: BLE001
latency_ms = int((_time.monotonic() - t0) * 1000)
self._update(
job_id,
status="failed",
finished_at=_now_iso(),
latency_ms=latency_ms,
error=f"{type(exc).__name__}: {exc}",
)
def _update(self, job_id: str, **kwargs: Any) -> None:
"""Merge kwargs into the job status and persist."""
with self._lock:
existing = self._cache.get(job_id)
if existing is None:
return
updated = existing.model_copy(update=kwargs)
self._cache[job_id] = updated
self._persist(updated)
def _persist(self, status: AsyncScoreJobStatus) -> None:
"""Write one job's status to its JSON file."""
path = self._jobs_dir / f"{status.job_id}.json"
path.write_text(
json.dumps(status.model_dump(), ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _load_existing(self) -> None:
"""Load completed jobs from disk into memory on startup."""
for path in sorted(self._jobs_dir.glob("*.json")):
try:
data = json.loads(path.read_text(encoding="utf-8"))
status = AsyncScoreJobStatus.model_validate(data)
self._cache[status.job_id] = status
except Exception: # noqa: BLE001
pass # Corrupt file — skip
# Module-level singleton shared by FastAPI routes.
score_job_manager = ScoreJobManager()
```
- [ ] **Step 5: Run to verify tests PASS**
```
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobManager -v
```
Expected: 4 tests PASS
- [ ] **Step 6: Commit**
```
git add webapp/models.py webapp/services/score_job_manager.py tests/webapp/test_score_jobs_api.py
git commit -m "feat: add AsyncScoreJobStatus model and ScoreJobManager with JSON persistence"
```
---
## Task 2: API 端点
**Files:**
- Create: `webapp/api/score_jobs.py`
- Modify: `webapp/server.py`
- Modify: `tests/webapp/test_score_jobs_api.py`
**Interfaces:**
- Consumes: `score_job_manager: ScoreJobManager`, `AsyncScoreJobResponse`, `AsyncScoreJobStatus`, `ScoreRequest`
- Produces: `POST /api/score/async`, `GET /api/score/jobs`, `GET /api/score/jobs/{job_id}`
- [ ] **Step 1: Add API tests to `tests/webapp/test_score_jobs_api.py`**
Append this class:
```python
class TestScoreJobsEndpoint:
def test_submit_async_returns_202(self, client):
with patch("webapp.services.score_job_manager.ScoreJobManager._execute"):
resp = client.post("/api/score/async", json={
"question": "q?", "answer": "a.",
"metrics": ["answer_relevancy"],
})
assert resp.status_code == 202
data = resp.json()
assert "job_id" in data
assert data["status"] == "queued"
def test_get_unknown_job_returns_404(self, client):
resp = client.get("/api/score/jobs/nonexistent")
assert resp.status_code == 404
def test_list_jobs_returns_empty_initially(self, client):
resp = client.get("/api/score/jobs")
assert resp.status_code == 200
assert resp.json()["jobs"] == []
def test_submitted_job_appears_in_list(self, client):
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
resp = client.post("/api/score/async", json={
"question": "q?", "answer": "a.",
"metrics": ["answer_relevancy"],
})
job_id = resp.json()["job_id"]
list_resp = client.get("/api/score/jobs")
ids = [j["job_id"] for j in list_resp.json()["jobs"]]
assert job_id in ids
def test_get_job_by_id(self, client):
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
resp = client.post("/api/score/async", json={
"question": "q?", "answer": "a.",
"metrics": ["answer_relevancy"],
})
job_id = resp.json()["job_id"]
get_resp = client.get(f"/api/score/jobs/{job_id}")
assert get_resp.status_code == 200
assert get_resp.json()["job_id"] == job_id
```
- [ ] **Step 2: Run to verify FAIL**
```
python -m pytest tests/webapp/test_score_jobs_api.py::TestScoreJobsEndpoint -v
```
Expected: FAIL — `ModuleNotFoundError: No module named 'webapp.api.score_jobs'`
- [ ] **Step 3: Create `webapp/api/score_jobs.py`**
```python
"""Routes for async RAGAS scoring jobs (Dify fire-and-forget integration)."""
from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException
from webapp.models import AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest
from webapp.services.score_job_manager import score_job_manager
router = APIRouter(prefix="/api/score", tags=["score"])
logger = logging.getLogger("webapp.api.score_jobs")
@router.post(
"/async",
status_code=202,
response_model=AsyncScoreJobResponse,
summary="提交异步评分任务Dify 推荐方式)",
responses={
202: {
"description": "任务已排队,立即返回 job_id。通过 GET /api/score/jobs/{job_id} 查询结果。",
"content": {
"application/json": {
"example": {"job_id": "abc123def456", "status": "queued"}
}
},
},
},
)
def submit_async_score(request: ScoreRequest) -> AsyncScoreJobResponse:
"""提交异步 RAGAS 评分任务,立即返回 job_id202 Accepted
评分在后台线程中执行,结果持久化至 `outputs/score-jobs/<job_id>.json`。
在 RAGAS 平台「评分记录」页面可查看所有历史评分记录。
**Dify 工作流推荐使用此接口**:不等待评分完成,工作流立即继续,
避免 HTTP 节点超时。评分结果通过平台界面查看。
"""
logger.info(
"[score_async] submit metrics=%s has_ctx=%s has_gt=%s",
request.metrics, bool(request.contexts), bool(request.ground_truth),
)
status = score_job_manager.submit(request)
logger.info("[score_async] queued job_id=%s", status.job_id)
return AsyncScoreJobResponse(job_id=status.job_id, status=status.status)
@router.get(
"/jobs",
response_model=dict,
summary="列出所有评分记录",
)
def list_score_jobs() -> dict:
"""返回所有异步评分记录,按创建时间倒序排列。"""
jobs = score_job_manager.list_jobs()
logger.info("[score_jobs] list count=%d", len(jobs))
return {"jobs": [j.model_dump() for j in jobs]}
@router.get(
"/jobs/{job_id}",
response_model=AsyncScoreJobStatus,
summary="查询评分记录详情",
responses={404: {"description": "指定 job_id 的评分记录不存在。"}},
)
def get_score_job(job_id: str) -> AsyncScoreJobStatus:
"""返回一个异步评分任务的当前状态和结果。"""
status = score_job_manager.get(job_id)
if status is None:
raise HTTPException(status_code=404, detail=f"Score job not found: {job_id}")
return status
```
- [ ] **Step 4: Register router in `webapp/server.py`**
Add import:
```python
from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs
```
Add after `app.include_router(score.router)`:
```python
app.include_router(score_jobs.router)
```
Add entry to `OPENAPI_TAGS` before `"meta"`:
```python
{
"name": "score",
"description": (
"**实时评分 API同步** — `POST /api/score`\n\n"
"**异步评分 APIDify 推荐)** — `POST /api/score/async`\n\n"
"异步方式立即返回 job_id202评分在后台执行结果在「评分记录」页查看。\n\n"
"**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
"`Authorization: Bearer <token>` 请求头。"
),
},
```
> Note: this replaces the existing `"score"` entry in `OPENAPI_TAGS`.
- [ ] **Step 5: Verify no route conflict**
```
python -c "
from webapp.server import create_app
app = create_app()
score_routes = [(r.path, list(getattr(r,'methods',[]))) for r in app.routes if 'score' in r.path]
print(score_routes)
"
```
Expected: shows `/api/score`, `/api/score/async`, `/api/score/jobs`, `/api/score/jobs/{job_id}`
- [ ] **Step 6: Run API tests**
```
python -m pytest tests/webapp/test_score_jobs_api.py -v --tb=short
```
Expected: all 9 tests PASS
- [ ] **Step 7: Commit**
```
git add webapp/api/score_jobs.py webapp/server.py tests/webapp/test_score_jobs_api.py
git commit -m "feat: add POST /api/score/async and GET /api/score/jobs endpoints"
```
---
## Task 3: 前端「评分记录」页
**Files:**
- Modify: `webapp/static/index.html`
- Modify: `webapp/static/js/api.js`
- Modify: `webapp/static/js/app.js`
- Create: `webapp/static/js/score_jobs.js`
**Interfaces:**
- Consumes: `GET /api/score/jobs`, `GET /api/score/jobs/{job_id}`
- Produces: `#view-scorejobs` section, `ScoreJobs` JS object
- [ ] **Step 1: Add API methods to `webapp/static/js/api.js`**
Add before the closing `};`:
```javascript
// 异步评分记录 API
scoreJobsAsync(body) { return API.post("/api/score/async", body); },
getScoreJob(jobId) { return API.get(`/api/score/jobs/${encodeURIComponent(jobId)}`); },
listScoreJobs() { return API.get("/api/score/jobs"); },
```
- [ ] **Step 2: Add nav item and section to `webapp/static/index.html`**
In the `<nav class="nav">` block, add after the `profiles` nav-item and before the `apidocs` nav-item:
```html
<button class="nav-item" data-view="scorejobs">
<span class="nav-ico">📋</span><span>评分记录</span>
</button>
```
Add a new section before the `<!-- API 文档视图 -->` comment:
```html
<!-- 评分记录视图 -->
<section class="view" id="view-scorejobs" hidden>
<div class="panel">
<div class="panel-head">
<h2>评分记录</h2>
<span class="muted" style="font-size:13px">来自 Dify 异步评分任务POST /api/score/async</span>
</div>
</div>
<div id="scorejobs-container"></div>
<div class="empty" id="scorejobs-empty" hidden>
<p>暂无评分记录。</p>
<p class="muted">在 Dify 工作流中调用 <code>POST /api/score/async</code> 后,记录将在此显示。</p>
</div>
</section>
```
- [ ] **Step 3: Create `webapp/static/js/score_jobs.js`**
```javascript
// score_jobs.js — 评分记录页面逻辑(异步 RAGAS 评分结果列表)
const ScoreJobs = {
_pollTimers: {}, // job_id -> setInterval handle
async load() {
const container = document.getElementById("scorejobs-container");
const empty = document.getElementById("scorejobs-empty");
container.innerHTML = '<p class="muted">加载中…</p>';
try {
const data = await API.listScoreJobs();
const jobs = data.jobs || [];
container.innerHTML = "";
if (jobs.length === 0) {
empty.hidden = false;
return;
}
empty.hidden = true;
jobs.forEach(job => container.appendChild(ScoreJobs.renderRow(job)));
// Auto-poll any queued/running jobs
jobs.forEach(job => {
if (job.status === "queued" || job.status === "running") {
ScoreJobs._startPoll(job.job_id);
}
});
} catch (err) {
container.innerHTML = `<p class="muted">加载失败:${App.escape(err.message)}</p>`;
}
},
renderRow(job) {
const row = document.createElement("div");
row.className = "panel score-job-row";
row.id = `score-job-${job.job_id}`;
row.innerHTML = ScoreJobs._rowHtml(job);
return row;
},
_rowHtml(job) {
const time = App.shortTime(job.created_at);
const question = App.escape((job.request_summary?.question || "—").slice(0, 50));
const metrics = (job.request_summary?.metrics || []).join(", ");
const statusBadge = `<span class="badge ${job.status}">${job.status}</span>`;
let scoreHtml = "";
if (job.status === "completed") {
scoreHtml = Object.entries(job.scores || {})
.map(([k, v]) => {
const cls = App.scoreClass(v);
const text = v === null || v === undefined ? "n/a" : Number(v).toFixed(3);
return `<span class="metric-chip" title="${App.escape(k)}">${App.escape(App.shortMetric(k))} <b class="${cls}">${text}</b></span>`;
})
.join(" ");
if (job.weighted_score !== null && job.weighted_score !== undefined) {
const cls = App.scoreClass(job.weighted_score);
scoreHtml += ` <span class="metric-chip">综合 <b class="${cls}">${Number(job.weighted_score).toFixed(3)}</b></span>`;
}
} else if (job.status === "failed") {
scoreHtml = `<span class="muted" style="color:var(--bad)">${App.escape(job.error || "未知错误")}</span>`;
} else {
scoreHtml = `<span class="muted">评分中…</span>`;
}
return `
<div class="run-card-head">
<div class="run-card-title">${question}</div>
<div>${statusBadge}</div>
</div>
<div class="run-card-meta">
<div>指标:${App.escape(metrics)} · ${time} · ${job.latency_ms}ms</div>
</div>
<div class="run-card-metrics">${scoreHtml}</div>
`;
},
_startPoll(jobId) {
if (ScoreJobs._pollTimers[jobId]) return;
ScoreJobs._pollTimers[jobId] = setInterval(async () => {
try {
const job = await API.getScoreJob(jobId);
const el = document.getElementById(`score-job-${jobId}`);
if (el) el.innerHTML = ScoreJobs._rowHtml(job);
if (job.status === "completed" || job.status === "failed") {
clearInterval(ScoreJobs._pollTimers[jobId]);
delete ScoreJobs._pollTimers[jobId];
}
} catch (_e) {
clearInterval(ScoreJobs._pollTimers[jobId]);
delete ScoreJobs._pollTimers[jobId];
}
}, 5000);
},
stopAllPolls() {
Object.values(ScoreJobs._pollTimers).forEach(t => clearInterval(t));
ScoreJobs._pollTimers = {};
},
};
```
- [ ] **Step 4: Update `webapp/static/js/app.js`**
Add `"scorejobs"` to the `views` array and `titles` object:
```javascript
views: ["runs", "new", "report", "profiles", "scorejobs", "apidocs"],
titles: { runs: "运行列表", new: "新建评估", report: "报告详情", profiles: "LLM 配置", scorejobs: "评分记录", apidocs: "API 文档" },
```
Add in `_doSwitch` after `if (view === "profiles") Profiles.load();`:
```javascript
if (view === "scorejobs") ScoreJobs.load();
```
Add `ScoreJobs.stopAllPolls();` when switching away, in `_doSwitch` before view switching logic:
```javascript
// Stop score job pollers when leaving the scorejobs view
if (App.activeView === "scorejobs" && view !== "scorejobs") ScoreJobs.stopAllPolls();
```
- [ ] **Step 5: Add script tag to `webapp/static/index.html`**
Add before `<script src="/static/js/app.js"></script>`:
```html
<script src="/static/js/score_jobs.js"></script>
```
- [ ] **Step 6: Verify server boots**
```
python -c "from webapp.server import create_app; create_app(); print('OK')"
```
Expected: `OK`
Also verify HTML has all new elements:
```
python -c "
c = open('webapp/static/index.html', encoding='utf-8').read()
assert 'view-scorejobs' in c
assert 'scorejobs-container' in c
assert '评分记录' in c
print('HTML OK')
"
```
- [ ] **Step 7: Commit**
```
git add webapp/static/index.html webapp/static/js/api.js webapp/static/js/app.js webapp/static/js/score_jobs.js
git commit -m "feat: add 评分记录 page with async score job list and auto-polling"
```
---
## Task 4: 全量回归测试 + Dify 说明注释
**Files:**
- Modify: `webapp/static/js/score_jobs.js` (minor: add Dify curl comment at top)
- [ ] **Step 1: Run full test suite**
```
python -m pytest tests/ -v --tb=short -q 2>&1 | tail -15
```
Pre-existing failures to ignore:
- `test_normalize_sample_pdf_offline_smoke_row`
- `test_evaluator_and_reporting_write_run_assets`
- `test_question_generator_rejects_invalid_json`
- `test_question_generator_rejects_non_list_samples`
Any other failure is a regression — fix before proceeding.
- [ ] **Step 2: Run targeted tests**
```
python -m pytest tests/webapp/test_score_jobs_api.py tests/webapp/test_score_api.py tests/test_pipeline.py -v --tb=short
```
Expected: all PASS
- [ ] **Step 3: Final commit**
```
git add .
git commit -m "feat: async score jobs complete — POST /api/score/async + 评分记录 page
- ScoreJobManager: thread pool + JSON persistence (outputs/score-jobs/)
- POST /api/score/async: 202 immediate response with job_id
- GET /api/score/jobs + GET /api/score/jobs/{id}: query endpoints
- Frontend: 评分记录 nav page with 5s auto-polling for pending jobs
- Dify integration: change /api/score → /api/score/async, remove response parsing
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>"
```

View File

@@ -0,0 +1,116 @@
# 异步评分记录功能设计
**日期**: 2026-06-24
**状态**: 已批准,待实现
**范围**: 新增 `POST /api/score/async` 异步评分端点,评分结果持久化到磁盘,前端新增「评分记录」页面展示。
---
## 1. 目标
- Dify 工作流调用 `/api/score/async` 立即返回 `job_id`202不等待评分完成
- 后台异步执行 RAGAS 评分,结果写入 `outputs/score-jobs/<job_id>.json`
- RAGAS 平台新增「评分记录」导航页,列表展示所有评分记录及状态
---
## 2. 架构
```
Dify → POST /api/score/async → 202 {job_id, status:"queued"}
ScoreJobManager (线程池)
InlineScorer.score()
outputs/score-jobs/<job_id>.json
GET /api/score/jobs ← 前端「评分记录」页轮询
```
---
## 3. 存储格式
`outputs/score-jobs/<job_id>.json`:
```json
{
"job_id": "abc123def456",
"status": "completed",
"created_at": "2026-06-24T09:00:00+00:00",
"finished_at": "2026-06-24T09:00:15+00:00",
"request": {
"question": "双源CT的时间分辨率是多少?",
"answer": "双源CT的单扇区时间分辨率为75ms。",
"contexts": null,
"ground_truth": null,
"metrics": ["answer_relevancy"],
"judge_model": "gpt-5",
"embedding_model": "text-embedding-3-small"
},
"scores": {"answer_relevancy": 0.9075},
"weighted_score": 0.9075,
"latency_ms": 12500,
"skipped_metrics": [],
"error": null
}
```
---
## 4. API 端点
### `POST /api/score/async`
请求体与 `POST /api/score` 完全相同(`ScoreRequest`)。
```json
// 立即返回 202
{"job_id": "abc123def456", "status": "queued"}
```
### `GET /api/score/jobs`
返回所有评分记录,按创建时间倒序:
```json
{"jobs": [{...ScoreJobStatus...}]}
```
### `GET /api/score/jobs/{job_id}`
返回单条评分记录详情。
---
## 5. 新增文件
| 文件 | 职责 |
|------|------|
| `webapp/services/score_job_manager.py` | ScoreJobManager线程池 + JSON 持久化 |
| `webapp/api/score_jobs.py` | 3 个端点路由 |
| `webapp/static/js/score_jobs.js` | 前端列表逻辑 + 轮询 |
## 6. 修改文件
| 文件 | 改动 |
|------|------|
| `webapp/models.py` | 新增 `AsyncScoreJobStatus``AsyncScoreJobResponse` |
| `webapp/server.py` | 注册 score_jobs router更新 OPENAPI_TAGS |
| `webapp/static/index.html` | 新增导航项 + section |
---
## 7. 前端「评分记录」页
列表列:时间 / 问题摘要前40字/ 指标 / 得分 / 状态
- 进入页面自动刷新
- `queued/running` 记录每 5 秒轮询 `GET /api/score/jobs/{id}` 更新状态
- 得分按 scoreClassgood/warn/bad着色
---
## 8. Dify 改造
只改 HTTP 节点 URL`/api/score``/api/score/async`,删除解析响应的代码节点。

1101
project-overview.html Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -22,22 +22,31 @@ _PROMPT_TEMPLATE = """\
## 报告要求
1. 按指标分节(## 指标名 [severity]),先解释"为什么低"(结合低分样本具体分析),再给出"具体怎么改"
2. "具体怎么改"要结合低分样本的实际内容,而不只是泛泛建议
3. 最后写一节 **## 优先优化次序**,按性价比排序(不增加 LLM 调用次数的优化优先)
4. 语言简洁,面向工程师,不要废话,不要重复列表内容
1. 按指标分节(## 指标名 [严重程度]),先解释"为什么低"(结合低分样本具体分析),再给出"具体怎么改"
2. 严重程度说明critical=严重(<阈值50%warning=警告(<阈值70%low=待优化低于0.85,有提升空间)
3. "具体怎么改"要结合低分样本的实际内容,而不只是泛泛建议
4. 最后写一节 **## 优先优化次序**,按性价比排序(不增加 LLM 调用次数的优化优先critical 和 warning 项优先于 low 项
5. 语言简洁,面向工程师,不要废话,不要重复列表内容
只输出 Markdown 报告正文,不要任何前置说明。
"""
_SEVERITY_LABEL_ZH: dict[str, str] = {
"critical": "严重",
"warning": "警告",
"low": "待优化",
}
def _build_diagnosis_summary(diagnoses: list[Diagnosis]) -> str:
lines = []
for d in diagnoses:
direction = "(越低越好)" if d.metric == "noise_sensitivity" else ""
label = _SEVERITY_LABEL_ZH.get(d.severity, d.severity)
lines.append(
f"- **{d.metric}** {direction} 均值={d.mean_score:.4f}"
f"阈值={d.threshold},严重程度={d.severity}"
f"阈值={d.threshold},严重程度={label}"
)
lines.append(f" - 可能原因:{'; '.join(d.root_causes)}")
lines.append(f" - 建议动作:{'; '.join(d.suggested_actions)}")

View File

@@ -14,6 +14,9 @@ class MetricRule:
higher_is_better: bool # False for noise_sensitivity
root_causes: list[str]
suggested_actions: list[str]
# Scores below this threshold trigger a "low" advisory (LLM suggestion requested).
# Only applies to higher_is_better metrics; noise_sensitivity uses existing thresholds.
advisory_threshold: float = 0.85
METRIC_RULES: dict[str, MetricRule] = {
@@ -208,10 +211,14 @@ def diagnose(
elif mean < rule.warning_threshold:
severity = "warning"
threshold = rule.warning_threshold
elif mean < rule.advisory_threshold:
# Score is acceptable but below 0.85 — request LLM optimization advice.
severity = "low"
threshold = rule.advisory_threshold
else:
continue # above warning threshold → no diagnosis
continue # >= advisory_threshold → no diagnosis needed
else:
# lower is better (noise_sensitivity)
# lower is better (noise_sensitivity): keep existing two-tier logic
if mean > rule.critical_threshold:
severity = "critical"
threshold = rule.critical_threshold

View File

@@ -8,12 +8,22 @@ from .rules import Diagnosis
logger = logging.getLogger("rag_eval.advisor")
# Chinese display labels for each severity tier.
_SEVERITY_LABEL: dict[str, str] = {
"critical": "严重",
"warning": "警告",
"low": "待优化",
}
def _format_log_summary(diagnoses: list[Diagnosis], advice_path: Path) -> str:
"""Return a single-line log summary of triggered diagnoses."""
if not diagnoses:
return "[advisor] 所有指标正常,无需优化建议。"
parts = [f"{d.metric}({d.mean_score:.2f}, {d.severity})" for d in diagnoses]
parts = [
f"{d.metric}({d.mean_score:.2f},{_SEVERITY_LABEL.get(d.severity, d.severity)})"
for d in diagnoses
]
triggered = " ".join(parts)
return f"[advisor] 触发诊断 {len(diagnoses)} 项: {triggered}{advice_path}"
@@ -24,7 +34,8 @@ def _build_fallback_report(diagnoses: list[Diagnosis]) -> str:
return ""
lines = ["## 规则诊断LLM 分析不可用)\n"]
for d in diagnoses:
lines.append(f"### {d.metric} [{d.severity}] 均值={d.mean_score:.4f}")
label = _SEVERITY_LABEL.get(d.severity, d.severity)
lines.append(f"### {d.metric} [{label}] 均值={d.mean_score:.4f}")
lines.append("\n**可能原因:**")
for cause in d.root_causes:
lines.append(f"- {cause}")

View File

@@ -10,10 +10,38 @@ class TestDiagnosis(unittest.TestCase):
for i, s in enumerate(scores)]
def test_no_diagnosis_when_all_scores_above_threshold(self):
# Mean exactly 0.85 should NOT trigger any diagnosis (< 0.85 is the condition).
rows = self._make_rows("faithfulness", [0.8, 0.9, 0.85])
result = diagnose(rows, metrics=["faithfulness"])
self.assertEqual(result, [])
def test_no_diagnosis_when_mean_above_advisory_threshold(self):
rows = self._make_rows("answer_relevancy", [0.9, 0.92, 0.88])
result = diagnose(rows, metrics=["answer_relevancy"])
self.assertEqual(result, [])
def test_low_severity_when_mean_below_advisory_threshold(self):
# Score between warning_threshold (0.7) and advisory_threshold (0.85) → "low"
rows = self._make_rows("faithfulness", [0.78, 0.80, 0.82])
result = diagnose(rows, metrics=["faithfulness"])
self.assertEqual(len(result), 1)
self.assertEqual(result[0].severity, "low")
self.assertAlmostEqual(result[0].threshold, 0.85, places=2)
def test_low_severity_answer_relevancy_at_0_84(self):
rows = self._make_rows("answer_relevancy", [0.84, 0.84, 0.84])
result = diagnose(rows, metrics=["answer_relevancy"])
self.assertEqual(len(result), 1)
self.assertEqual(result[0].severity, "low")
def test_low_severity_has_root_causes_and_actions(self):
rows = self._make_rows("context_precision", [0.75, 0.76, 0.77])
result = diagnose(rows, metrics=["context_precision"])
self.assertEqual(len(result), 1)
self.assertEqual(result[0].severity, "low")
self.assertTrue(len(result[0].root_causes) > 0)
self.assertTrue(len(result[0].suggested_actions) > 0)
def test_warning_when_mean_below_warning_threshold(self):
rows = self._make_rows("faithfulness", [0.65, 0.62, 0.68])
result = diagnose(rows, metrics=["faithfulness"])

View File

@@ -91,9 +91,9 @@ class TestWriteAdvice(unittest.TestCase):
]
summary = _format_log_summary(diags, self.advice_path)
self.assertIn("faithfulness", summary)
self.assertIn("critical", summary)
self.assertIn("严重", summary) # "critical" maps to Chinese label
self.assertIn("context_recall", summary)
self.assertIn("warning", summary)
self.assertIn("警告", summary) # "warning" maps to Chinese label
def test_write_empty_diagnoses_still_creates_file(self):
write_advice(

View File

@@ -57,9 +57,11 @@ class TestScoreRequest:
with pytest.raises(ValidationError):
ScoreRequest(question="q", contexts="c") # type: ignore[call-arg]
def test_missing_contexts_raises(self):
with pytest.raises(ValidationError):
ScoreRequest(question="q", answer="a") # type: ignore[call-arg]
def test_missing_contexts_defaults_to_none(self):
"""contexts is now optional — missing contexts is allowed."""
req = ScoreRequest(question="q", answer="a")
assert req.contexts is None
assert req.contexts_as_list() == []
def test_custom_metrics_accepted(self):
req = ScoreRequest(
@@ -115,6 +117,17 @@ class TestScoreRequest:
"factual_correctness",
]
def test_effective_metrics_drops_context_dependent_when_contexts_absent(self):
"""Without contexts, context-dependent metrics are excluded."""
req = ScoreRequest(
question="q", answer="a",
metrics=["faithfulness", "answer_relevancy", "context_precision"],
)
effective = req.effective_metrics()
assert "answer_relevancy" in effective
assert "faithfulness" not in effective
assert "context_precision" not in effective
class TestScoreResponse:
def test_score_response_structure(self):

View File

@@ -0,0 +1,146 @@
"""Tests for async score jobs API."""
from __future__ import annotations
import json
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture()
def client(tmp_path, monkeypatch):
"""TestClient with fresh ScoreJobManager backed by tmp dirs."""
import webapp.services.score_job_manager as mgr_mod
from webapp.services.score_job_manager import ScoreJobManager
fresh_mgr = ScoreJobManager(
output_dir=tmp_path / "score-async",
index_dir=tmp_path / "score-jobs",
max_workers=2,
)
monkeypatch.setattr(mgr_mod, "score_job_manager", fresh_mgr)
import webapp.api.score_jobs as api_mod
monkeypatch.setattr(api_mod, "score_job_manager", fresh_mgr)
from webapp.server import create_app
return TestClient(create_app())
class TestAsyncScoreEndpoints:
def test_submit_returns_202_with_job_id(self, client):
"""POST /api/score/async returns 202 immediately."""
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
resp = client.post("/api/score/async", json={
"question": "q?",
"answer": "a.",
"metrics": ["answer_relevancy"],
})
assert resp.status_code == 202
data = resp.json()
assert "job_id" in data
assert data["status"] == "queued"
def test_list_jobs_empty_initially(self, client):
resp = client.get("/api/score/jobs")
assert resp.status_code == 200
assert resp.json()["jobs"] == []
def test_get_unknown_job_returns_404(self, client):
resp = client.get("/api/score/jobs/nonexistent123")
assert resp.status_code == 404
def test_submitted_job_appears_in_list(self, client):
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
resp = client.post("/api/score/async", json={
"question": "q?", "answer": "a.", "metrics": ["answer_relevancy"],
})
job_id = resp.json()["job_id"]
time.sleep(0.1)
list_resp = client.get("/api/score/jobs")
ids = [j["job_id"] for j in list_resp.json()["jobs"]]
assert job_id in ids
def test_get_job_by_id_returns_status(self, client):
with patch("webapp.services.score_job_manager.ScoreJobManager._run"):
resp = client.post("/api/score/async", json={
"question": "q?", "answer": "a.", "metrics": ["answer_relevancy"],
})
job_id = resp.json()["job_id"]
time.sleep(0.1)
get_resp = client.get(f"/api/score/jobs/{job_id}")
assert get_resp.status_code == 200
assert get_resp.json()["job_id"] == job_id
def test_missing_required_fields_returns_422(self, client):
resp = client.post("/api/score/async", json={"question": "q?"})
assert resp.status_code == 422
class TestScoreJobManager:
def test_completed_job_persisted_to_index(self, tmp_path):
"""Completed job writes index JSON."""
from webapp.services.score_job_manager import ScoreJobManager
from webapp.models import ScoreRequest
mgr = ScoreJobManager(
output_dir=tmp_path / "runs",
index_dir=tmp_path / "index",
max_workers=1,
)
req = ScoreRequest(question="q?", answer="a.", metrics=["answer_relevancy"])
# Patch _run directly — it uses lazy imports internally
def fake_run(job_id, request):
mgr._update(job_id, status="completed", finished_at="2026-01-01T00:00:01+00:00",
run_id="fake-run-id", scores={"answer_relevancy": 0.85},
weighted_score=0.85, latency_ms=500)
with patch.object(mgr, "_run", side_effect=fake_run):
status = mgr.submit(req)
for _ in range(20):
s = mgr.get(status.job_id)
if s and s.status == "completed":
break
time.sleep(0.1)
s = mgr.get(status.job_id)
assert s is not None
idx_path = tmp_path / "index" / f"{status.job_id}.json"
assert idx_path.exists()
data = json.loads(idx_path.read_text(encoding="utf-8"))
assert data["job_id"] == status.job_id
assert data["status"] == "completed"
def test_loads_existing_index_on_startup(self, tmp_path):
"""Manager loads persisted jobs from index dir on init."""
from webapp.services.score_job_manager import ScoreJobManager
from webapp.models import AsyncScoreJobStatus
idx_dir = tmp_path / "index"
idx_dir.mkdir()
fake = AsyncScoreJobStatus(
job_id="testjob001",
status="completed",
created_at="2026-01-01T00:00:00+00:00",
run_id="some-run-id",
scores={"answer_relevancy": 0.9},
weighted_score=0.9,
latency_ms=1000,
)
(idx_dir / "testjob001.json").write_text(
json.dumps(fake.model_dump(), ensure_ascii=False), encoding="utf-8"
)
mgr = ScoreJobManager(
output_dir=tmp_path / "runs",
index_dir=idx_dir,
max_workers=1,
)
loaded = mgr.get("testjob001")
assert loaded is not None
assert loaded.status == "completed"
assert loaded.run_id == "some-run-id"

View File

@@ -73,7 +73,8 @@ def score_sample(
用于日志记录、质量监控或触发 Agent 自我改进流程。
**contexts 格式**:多个检索片段用 `context_separator`(默认 `" |||| "`)拼接为一个字符串,
服务端自动拆分后传入 RAGAS 管道。
服务端自动拆分后传入 RAGAS 管道。**contexts 为可选字段**,缺失时自动跳过依赖检索内容的指标
`faithfulness`、`context_recall`、`context_precision`、`noise_sensitivity`)。
**ground_truth 可选**
- 提供时:所有指定指标均参与计算。
@@ -99,12 +100,13 @@ def score_sample(
"""
client = f"{raw_request.client.host}:{raw_request.client.port}" if raw_request.client else "unknown"
logger.info(
"[score] incoming client=%s method=%s content_type=%s metrics=%s has_gt=%s",
"[score] incoming client=%s method=%s content_type=%s metrics=%s has_gt=%s has_ctx=%s",
client,
raw_request.method,
raw_request.headers.get("content-type", ""),
request.metrics,
request.ground_truth is not None,
bool(request.contexts),
)
settings = _get_settings()

89
webapp/api/score_jobs.py Normal file
View File

@@ -0,0 +1,89 @@
"""Routes for async RAGAS scoring jobs (Dify fire-and-forget integration).
Dify calls POST /api/score/async → gets job_id immediately (202).
Scoring runs in background, result written as a standard run artifact.
View full report at GET /api/runs/{run_id} or in the 「运行列表」 page.
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException
from webapp.models import AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest
from webapp.services.score_job_manager import score_job_manager
router = APIRouter(prefix="/api/score", tags=["score"])
logger = logging.getLogger("webapp.api.score_jobs")
@router.post(
"/async",
status_code=202,
response_model=AsyncScoreJobResponse,
summary="提交异步评分任务Dify 推荐方式)",
responses={
202: {
"description": (
"任务已排队,立即返回 job_id202 Accepted\n\n"
"评分在后台执行,完成后自动生成完整报告(含优化建议)。\n"
"通过 `GET /api/score/jobs/{job_id}` 查询状态,"
"完成后在「运行列表」页查看完整报告。"
),
"content": {
"application/json": {
"example": {"job_id": "abc123def456", "status": "queued", "run_id": None}
}
},
},
},
)
def submit_async_score(request: ScoreRequest) -> AsyncScoreJobResponse:
"""提交异步 RAGAS 评分任务,立即返回 job_id。
**适合 Dify 工作流**HTTP 节点无需等待评分完成(无超时风险),
工作流立即继续,评分结果在 RAGAS 平台「运行列表」中查看。
评分完成后自动生成:
- 各指标得分(`scores.csv`
- 摘要报告(`summary.md`
- LLM 优化建议(`optimization_advice.md`
"""
logger.info(
"[score_async] submit metrics=%s has_ctx=%s has_gt=%s",
request.metrics, bool(request.contexts), bool(request.ground_truth),
)
status = score_job_manager.submit(request)
logger.info("[score_async] queued job_id=%s", status.job_id)
return AsyncScoreJobResponse(job_id=status.job_id, status=status.status)
@router.get(
"/jobs",
response_model=dict,
summary="列出所有异步评分记录",
)
def list_score_jobs() -> dict:
"""返回所有异步评分记录,按创建时间倒序排列。"""
jobs = score_job_manager.list_jobs()
logger.info("[score_jobs] list count=%d", len(jobs))
return {"jobs": [j.model_dump() for j in jobs]}
@router.get(
"/jobs/{job_id}",
response_model=AsyncScoreJobStatus,
summary="查询单个异步评分任务状态",
responses={404: {"description": "指定 job_id 的评分任务不存在。"}},
)
def get_score_job(job_id: str) -> AsyncScoreJobStatus:
"""查询单个异步评分任务的状态和结果。
`status` 为 `completed` 时,`run_id` 字段包含对应的运行 ID
可通过 `GET /api/runs/{run_id}` 获取完整评分报告。
"""
status = score_job_manager.get(job_id)
if status is None:
raise HTTPException(status_code=404, detail=f"Score job not found: {job_id}")
return status

View File

@@ -384,6 +384,14 @@ _GT_DEPENDENT_METRICS: frozenset[str] = frozenset({
"noise_sensitivity",
})
# 需要 contexts 才能计算的指标集合
_CONTEXT_DEPENDENT_METRICS: frozenset[str] = frozenset({
"faithfulness",
"context_recall",
"context_precision",
"noise_sensitivity",
})
# 所有合法指标名称
_VALID_METRICS: frozenset[str] = frozenset({
"faithfulness",
@@ -428,8 +436,9 @@ class ScoreRequest(BaseModel):
question: str = Field(description="问题文本。")
answer: str = Field(description="待评分的回答。")
contexts: str = Field(
description="检索上下文字符串,多段之间用 context_separator 拼接。"
contexts: str | None = Field(
default=None,
description="检索上下文字符串,多段之间用 context_separator 拼接。缺失时自动跳过依赖检索内容的指标faithfulness、context_recall、context_precision、noise_sensitivity",
)
ground_truth: str | None = Field(
default=None,
@@ -467,15 +476,23 @@ class ScoreRequest(BaseModel):
return value
def contexts_as_list(self) -> list[str]:
"""Split the contexts string into a list of non-empty fragments."""
"""Split the contexts string into a list of non-empty fragments.
Returns an empty list when contexts is None or blank.
"""
if not self.contexts:
return []
separator = self.context_separator or " |||| "
return [part.strip() for part in self.contexts.split(separator) if part.strip()]
def effective_metrics(self) -> list[str]:
"""Return metrics filtered to exclude GT-dependent ones when ground_truth is absent."""
if self.ground_truth is not None:
return list(self.metrics)
return [metric_name for metric_name in self.metrics if metric_name not in _GT_DEPENDENT_METRICS]
"""Return metrics filtered to exclude GT-dependent or context-dependent ones when inputs are absent."""
result = list(self.metrics)
if self.ground_truth is None:
result = [m for m in result if m not in _GT_DEPENDENT_METRICS]
if not self.contexts:
result = [m for m in result if m not in _CONTEXT_DEPENDENT_METRICS]
return result
class ScoreResponse(BaseModel):
@@ -497,3 +514,40 @@ class ScoreResponse(BaseModel):
default=None,
description="打分异常时的错误信息HTTP 200 仍返回scores 为空)。",
)
# ---------------------------------------------------------------------------
# 异步评分记录模型
# ---------------------------------------------------------------------------
class AsyncScoreJobResponse(BaseModel):
"""Immediate 202 response after submitting an async score job."""
job_id: str = Field(description="任务唯一标识符,用于后续查询结果。")
status: str = Field(default="queued", description="初始状态queued。")
run_id: str | None = Field(
default=None,
description="评分完成后写入的 Run ID可在「运行列表」中查看完整报告。",
)
class AsyncScoreJobStatus(BaseModel):
"""State of one async score job (queued → running → completed/failed)."""
job_id: str = Field(description="任务唯一标识符。")
status: str = Field(description="queued | running | completed | failed")
created_at: str = Field(default="", description="创建时间ISO 8601 UTC")
finished_at: str = Field(default="", description="完成时间ISO 8601 UTC")
run_id: str | None = Field(
default=None,
description="完成后对应的 Run ID可通过 GET /api/runs/{run_id} 查看完整报告。",
)
request_summary: dict = Field(
default_factory=dict,
description="请求参数快照question 前80字、metrics、judge_model 等)。",
)
scores: dict[str, float | None] = Field(default_factory=dict, description="各指标得分。")
weighted_score: float | None = Field(default=None, description="加权综合得分。")
latency_ms: int = Field(default=0, description="评分耗时毫秒。")
skipped_metrics: list[str] = Field(default_factory=list)
error: str | None = Field(default=None)

View File

@@ -17,7 +17,7 @@ from fastapi.exceptions import RequestValidationError
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score
from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs
STATIC_DIR = Path(__file__).resolve().parent / "static"
logger = logging.getLogger("webapp.server")
@@ -69,10 +69,12 @@ OPENAPI_TAGS = [
{
"name": "score",
"description": (
"**实时评分 APIDify 外部 Tool**\n\n"
"接受单条问答记录 `(question, answer, contexts, ground_truth)`\n"
"同步运行 RAGAS 指标打分,返回各指标得分和加权综合得分。\n\n"
"适用场景Dify Agent 在回答后即时调用,用于质量监控或自我改进\n\n"
"**实时评分 API同步)** — `POST /api/score`\n\n"
"**异步评分 APIDify 推荐)** — `POST /api/score/async`\n\n"
"异步方式立即返回 job_id202评分在后台执行完成后自动生成完整报告含优化建议"
"在「运行列表」页查看\n\n"
"通过 `GET /api/score/jobs` 列出所有异步评分记录,"
"`GET /api/score/jobs/{job_id}` 查询单个任务状态。\n\n"
"**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
"`Authorization: Bearer <token>` 请求头。"
),
@@ -87,7 +89,7 @@ OPENAPI_TAGS = [
def create_app() -> FastAPI:
"""Build and configure the FastAPI application instance."""
app = FastAPI(
title="RAGAS 评估系统",
title="Siemens RAGAS 评估平台",
description=(
"西门子医疗影像 RAG 评估平台 API 文档。\n\n"
"提供以下能力:\n"
@@ -108,6 +110,7 @@ def create_app() -> FastAPI:
app.include_router(llm_profiles.router)
app.include_router(pipeline.router)
app.include_router(score.router)
app.include_router(score_jobs.router)
@app.middleware("http")
async def access_log_middleware(request: Request, call_next):

View File

@@ -0,0 +1,269 @@
"""Background task manager for async RAGAS single-sample scoring.
Each job:
1. Runs InlineScorer.score() in a thread pool.
2. Constructs a minimal EvaluationResult + Scenario in the standard format.
3. Calls write_run_artifacts() — produces metadata.json, scores.csv, summary.md.
4. Calls run_advisor() — produces optimization_advice.md.
The resulting run directory lands under outputs/score-async/<run_id>/ and is
automatically picked up by run_reader.list_run_summaries(), so it appears in
the existing 「运行列表」 and 「报告详情」 pages without any extra wiring.
"""
from __future__ import annotations
import json
import math
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from webapp.models import AsyncScoreJobStatus, ScoreRequest
_REPO_ROOT = Path(__file__).resolve().parents[2]
_DEFAULT_JOBS_DIR = _REPO_ROOT / "outputs" / "score-async"
_DEFAULT_INDEX_DIR = _REPO_ROOT / "outputs" / "score-jobs" # lightweight job index
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class ScoreJobManager:
"""Thread-pool manager for async scoring jobs.
Results are written as standard run artifacts so the report detail page
can render them with zero additional code.
"""
def __init__(
self,
output_dir: Path = _DEFAULT_JOBS_DIR,
index_dir: Path = _DEFAULT_INDEX_DIR,
max_workers: int = 4,
) -> None:
self._output_dir = Path(output_dir)
self._index_dir = Path(index_dir)
self._output_dir.mkdir(parents=True, exist_ok=True)
self._index_dir.mkdir(parents=True, exist_ok=True)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._cache: dict[str, AsyncScoreJobStatus] = {}
self._lock = threading.Lock()
self._load_existing()
# ------------------------------------------------------------------ #
# Public API
# ------------------------------------------------------------------ #
def submit(self, request: ScoreRequest) -> AsyncScoreJobStatus:
"""Queue one scoring job and return its initial status immediately."""
job_id = uuid.uuid4().hex[:12]
status = AsyncScoreJobStatus(
job_id=job_id,
status="queued",
created_at=_now_iso(),
request_summary={
"question": (request.question or "")[:80],
"answer": (request.answer or "")[:80],
"metrics": list(request.metrics),
"judge_model": request.judge_model or "",
"embedding_model": request.embedding_model or "",
"has_contexts": bool(request.contexts),
"has_ground_truth": bool(request.ground_truth),
},
)
with self._lock:
self._cache[job_id] = status
self._persist_index(status)
self._executor.submit(self._run, job_id, request)
return status
def get(self, job_id: str) -> AsyncScoreJobStatus | None:
"""Return current status or None if unknown."""
with self._lock:
return self._cache.get(job_id)
def list_jobs(self) -> list[AsyncScoreJobStatus]:
"""Return all known jobs, newest first."""
with self._lock:
jobs = list(self._cache.values())
jobs.sort(key=lambda j: j.created_at, reverse=True)
return jobs
# ------------------------------------------------------------------ #
# Worker
# ------------------------------------------------------------------ #
def _run(self, job_id: str, request: ScoreRequest) -> None:
"""Execute scoring, write run artifacts, run advisor."""
import logging
logger = logging.getLogger("webapp.services.score_job_manager")
self._update(job_id, status="running")
# Lazy imports to keep web server bootable if ragas is not installed.
from rag_eval.advisor import run_advisor
from rag_eval.metrics.factory import build_models
from rag_eval.metrics.weights import compute_weighted_score
from rag_eval.reporting.writers import write_run_artifacts
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.models import (
DatasetConfig, EvaluationResult, NormalizedSample,
RuntimeConfig, Scenario,
)
from rag_eval.shared.utils import utc_now_iso
from webapp.services.inline_scorer import inline_scorer
settings = EvaluationSettings()
judge_model = request.judge_model or settings.ragas_judge_model
embedding_model = request.embedding_model or settings.ragas_embedding_model
effective = request.effective_metrics()
requested = set(request.metrics)
skipped = sorted(requested - set(effective))
t0 = time.monotonic()
started_at = utc_now_iso()
try:
if effective:
raw_scores = inline_scorer.score(
question=request.question,
answer=request.answer,
contexts=request.contexts_as_list(),
ground_truth=request.ground_truth,
metrics=effective,
judge_model=judge_model,
embedding_model=embedding_model,
settings=settings,
)
else:
raw_scores = {}
latency_ms = int((time.monotonic() - t0) * 1000)
finished_at = utc_now_iso()
# Build full scores dict (skipped = None)
all_scores: dict[str, float | None] = {m: None for m in request.metrics}
all_scores.update(raw_scores)
weighted_raw = compute_weighted_score(
{k: v for k, v in raw_scores.items() if v is not None}, {}
)
weighted = round(weighted_raw, 4) if weighted_raw is not None else None
# Build a score row compatible with report_builder
score_row: dict[str, Any] = {
"sample_id": "async-score-1",
"question": request.question,
"answer": request.answer or "",
"contexts": request.contexts or "",
"ground_truth": request.ground_truth or "",
"error": "",
}
score_row.update(all_scores)
# Construct minimal EvaluationResult so write_run_artifacts works
run_id = finished_at.replace(":", "-")
output_dir = self._output_dir
# Build a minimal Scenario for snapshot + advisor
scenario = Scenario(
scenario_name=f"async-score-{job_id}",
mode="offline",
dataset=DatasetConfig(path=output_dir / run_id / "dataset.csv"),
judge_model=judge_model,
embedding_model=embedding_model,
metrics=list(request.metrics),
output_dir=output_dir,
optimization_advisor=True, # always generate advice
)
sample = NormalizedSample(
sample_id="async-score-1",
question=request.question,
answer=request.answer or "",
contexts=request.contexts_as_list(),
ground_truth=request.ground_truth or "",
)
result = EvaluationResult(
scenario=scenario,
run_id=run_id,
started_at=started_at,
finished_at=finished_at,
valid_samples=[sample],
invalid_samples=[],
score_rows=[score_row],
)
write_run_artifacts(result)
logger.info("[score_job] artifacts written job_id=%s run_id=%s", job_id, run_id)
# Run optimization advisor (builds optimization_advice.md)
try:
llm, _ = build_models(judge_model, embedding_model, settings)
run_advisor(result, scenario, llm)
logger.info("[score_job] advisor done job_id=%s", job_id)
except Exception as adv_exc: # noqa: BLE001
logger.warning("[score_job] advisor failed job_id=%s err=%s", job_id, adv_exc)
self._update(
job_id,
status="completed",
finished_at=finished_at,
run_id=run_id,
scores=all_scores,
weighted_score=weighted,
latency_ms=latency_ms,
skipped_metrics=skipped,
)
except Exception as exc: # noqa: BLE001
latency_ms = int((time.monotonic() - t0) * 1000)
logger.error("[score_job] failed job_id=%s err=%s", job_id, exc)
self._update(
job_id,
status="failed",
finished_at=_now_iso(),
latency_ms=latency_ms,
error=f"{type(exc).__name__}: {exc}",
)
# ------------------------------------------------------------------ #
# Persistence helpers
# ------------------------------------------------------------------ #
def _update(self, job_id: str, **kwargs: Any) -> None:
"""Merge kwargs into the job status and persist the index."""
with self._lock:
existing = self._cache.get(job_id)
if existing is None:
return
updated = existing.model_copy(update=kwargs)
self._cache[job_id] = updated
self._persist_index(updated)
def _persist_index(self, status: AsyncScoreJobStatus) -> None:
"""Write a lightweight index JSON for this job (survives restarts)."""
path = self._index_dir / f"{status.job_id}.json"
path.write_text(
json.dumps(status.model_dump(), ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _load_existing(self) -> None:
"""Load existing job index files on startup."""
for path in sorted(self._index_dir.glob("*.json")):
try:
data = json.loads(path.read_text(encoding="utf-8"))
status = AsyncScoreJobStatus.model_validate(data)
self._cache[status.job_id] = status
except Exception: # noqa: BLE001
pass
# Module-level singleton shared by FastAPI routes.
score_job_manager = ScoreJobManager()

View File

@@ -1,4 +1,4 @@
/* Siemens RAGAS 评估控制台 — 样式表
/* Siemens RAGAS 评估台 — 样式表
配色取自西门子品牌色petrol / 深青)与中性灰,呼应企业语境。 */
:root {

View File

@@ -3,7 +3,7 @@
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>RAGAS 评估控制</title>
<title>Siemens RAGAS 评估</title>
<link rel="stylesheet" href="/static/css/app.css" />
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.1/dist/chart.umd.min.js"></script>
</head>
@@ -12,8 +12,8 @@
<!-- 左侧导航(布局 A -->
<aside class="sidebar">
<div class="brand">
<div class="brand-mark">RAGAS</div>
<div class="brand-sub">评估控制</div>
<div class="brand-mark">Siemens RAGAS</div>
<div class="brand-sub">评估</div>
</div>
<nav class="nav">
<button class="nav-item" data-view="runs">
@@ -28,6 +28,9 @@
<button class="nav-item" data-view="profiles">
<span class="nav-ico"></span><span>LLM 配置</span>
</button>
<button class="nav-item" data-view="scorejobs">
<span class="nav-ico">📋</span><span>评分记录</span>
</button>
<button class="nav-item" data-view="apidocs">
<span class="nav-ico"></span><span>API 文档</span>
</button>
@@ -234,6 +237,22 @@
</div>
</section>
<!-- 评分记录视图 -->
<section class="view" id="view-scorejobs" hidden>
<div class="panel">
<div class="panel-head">
<h2>评分记录</h2>
<span class="muted" style="font-size:13px">来自 Dify 异步评分任务POST /api/score/async</span>
</div>
<p class="muted">评分完成后自动生成完整报告(含指标得分与 LLM 优化建议),点击「查看报告」跳转报告详情页。</p>
</div>
<div id="scorejobs-list"></div>
<div class="empty" id="scorejobs-empty" hidden>
<p>暂无评分记录。</p>
<p class="muted">在 Dify 工作流中调用 <code>POST /api/score/async</code> 后,记录将在此显示。</p>
</div>
</section>
<!-- API 文档视图 -->
<section class="view" id="view-apidocs" hidden>
<iframe
@@ -251,6 +270,7 @@
<script src="/static/js/report.js"></script>
<script src="/static/js/profiles.js"></script>
<script src="/static/js/runner.js"></script>
<script src="/static/js/score_jobs.js"></script>
<script src="/static/js/app.js"></script>
</body>
</html>

View File

@@ -66,6 +66,11 @@ const API = {
},
applyProfiles(body) { return API.post("/api/llm-profiles/apply", body); },
// 异步评分记录 API
scoreJobsAsync(body) { return API.post("/api/score/async", body); },
getScoreJob(jobId) { return API.get(`/api/score/jobs/${encodeURIComponent(jobId)}`); },
listScoreJobs() { return API.get("/api/score/jobs"); },
// 测试已保存 profile 的连通性
testProfile(id) {
return fetch(`/api/llm-profiles/${encodeURIComponent(id)}/test`, { method: "POST" })

View File

@@ -5,8 +5,8 @@
const App = {
currentRunId: null,
activeView: null,
views: ["runs", "new", "report", "profiles", "apidocs"],
titles: { runs: "运行列表", new: "新建评估", report: "报告详情", profiles: "LLM 配置", apidocs: "API 文档" },
views: ["runs", "new", "report", "profiles", "scorejobs", "apidocs"],
titles: { runs: "运行列表", new: "新建评估", report: "报告详情", profiles: "LLM 配置", scorejobs: "评分记录", apidocs: "API 文档" },
// 初始化:绑定导航、从 URL/sessionStorage 恢复上次位置、启动健康检查。
init() {
@@ -68,10 +68,11 @@ const App = {
sessionStorage.setItem("rag_view", view);
if (App.currentRunId) sessionStorage.setItem("rag_run_id", App.currentRunId);
if (view === "runs") App.loadRuns();
if (view === "new") Runner.loadScenarios();
if (view === "report") Report.render(App.currentRunId);
if (view === "profiles") Profiles.load();
if (view === "runs") App.loadRuns();
if (view === "new") Runner.loadScenarios();
if (view === "report") Report.render(App.currentRunId);
if (view === "profiles") Profiles.load();
if (view === "scorejobs") ScoreJobs.load();
},
// ----------------------------------------------------------------

View File

@@ -0,0 +1,125 @@
// score_jobs.js — 评分记录页面(异步 RAGAS 评分任务列表)
// 每条评分完成后自动写入标准 Run 产物,点击「查看报告」复用现有报告详情页。
const ScoreJobs = {
_pollTimers: {}, // job_id -> setInterval handle
async load() {
const list = document.getElementById("scorejobs-list");
const empty = document.getElementById("scorejobs-empty");
list.innerHTML = '<p class="muted">加载中…</p>';
try {
const data = await API.listScoreJobs();
const jobs = data.jobs || [];
list.innerHTML = "";
if (jobs.length === 0) {
empty.hidden = false;
return;
}
empty.hidden = true;
jobs.forEach(job => list.appendChild(ScoreJobs.renderCard(job)));
// Auto-poll any pending jobs
jobs.forEach(job => {
if (job.status === "queued" || job.status === "running") {
ScoreJobs._startPoll(job.job_id);
}
});
} catch (err) {
list.innerHTML = `<p class="muted">加载失败:${App.escape(err.message)}</p>`;
}
},
renderCard(job) {
const card = document.createElement("div");
card.className = "run-card";
card.id = `score-job-${job.job_id}`;
card.innerHTML = ScoreJobs._cardHtml(job);
// Bind report button if already completed
ScoreJobs._bindReportBtn(card, job);
return card;
},
_cardHtml(job) {
const time = App.shortTime(job.created_at);
const question = App.escape((job.request_summary?.question || "—").slice(0, 60));
const metrics = (job.request_summary?.metrics || []).join(", ");
const statusBadge = `<span class="badge ${job.status}">${job.status}</span>`;
let scoreHtml = "";
if (job.status === "completed") {
scoreHtml = Object.entries(job.scores || {})
.map(([k, v]) => {
const cls = App.scoreClass(v);
const text = v === null || v === undefined ? "n/a" : Number(v).toFixed(3);
return `<span class="metric-chip" title="${App.escape(k)}">${App.escape(App.shortMetric(k))} <b class="${cls}">${text}</b></span>`;
})
.join(" ");
if (job.weighted_score !== null && job.weighted_score !== undefined) {
const cls = App.scoreClass(job.weighted_score);
scoreHtml += ` <span class="metric-chip">综合 <b class="${cls}">${Number(job.weighted_score).toFixed(3)}</b></span>`;
}
} else if (job.status === "failed") {
scoreHtml = `<span style="color:var(--bad);font-size:12px">${App.escape((job.error || "").slice(0, 80))}</span>`;
} else {
scoreHtml = `<span class="muted">评分中,请稍候…</span>`;
}
const reportBtn = job.status === "completed" && job.run_id
? `<button class="btn btn-sm btn-primary score-job-report-btn" data-run-id="${App.escape(job.run_id)}">查看报告</button>`
: "";
return `
<div class="run-card-head">
<div class="run-card-title">${question}</div>
<div style="display:flex;gap:8px;align-items:center">${statusBadge}${reportBtn}</div>
</div>
<div class="run-card-meta">
<div>指标:${App.escape(metrics)} · ${time} · ${job.latency_ms}ms</div>
</div>
<div class="run-card-metrics">${scoreHtml}</div>
`;
},
_bindReportBtn(card, job) {
const btn = card.querySelector(".score-job-report-btn");
if (!btn) return;
btn.addEventListener("click", () => {
const runId = btn.dataset.runId;
if (runId) {
App.enableReportNav();
App.navigate("report", runId);
}
});
},
_startPoll(jobId) {
if (ScoreJobs._pollTimers[jobId]) return;
ScoreJobs._pollTimers[jobId] = setInterval(async () => {
try {
const job = await API.getScoreJob(jobId);
const card = document.getElementById(`score-job-${jobId}`);
if (card) {
card.innerHTML = ScoreJobs._cardHtml(job);
ScoreJobs._bindReportBtn(card, job);
}
if (job.status === "completed" || job.status === "failed") {
clearInterval(ScoreJobs._pollTimers[jobId]);
delete ScoreJobs._pollTimers[jobId];
// If completed, pre-enable report nav
if (job.status === "completed" && job.run_id) {
App.enableReportNav();
}
}
} catch (_e) {
clearInterval(ScoreJobs._pollTimers[jobId]);
delete ScoreJobs._pollTimers[jobId];
}
}, 5000);
},
stopAllPolls() {
Object.values(ScoreJobs._pollTimers).forEach(t => clearInterval(t));
ScoreJobs._pollTimers = {};
},
};