docs: add Dify score API implementation plan
This commit is contained in:
974
docs/superpowers/plans/2026-06-22-dify-score-api.md
Normal file
974
docs/superpowers/plans/2026-06-22-dify-score-api.md
Normal file
@@ -0,0 +1,974 @@
|
||||
# Dify 实时评分 API Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** 新增 `POST /api/score` 端点,供 Dify 外部 Tool 调用,接受单条问答记录并同步返回 RAGAS 各指标得分。
|
||||
|
||||
**Architecture:** 新增 `inline_scorer.py` 服务层封装 RAGAS 打分逻辑,以 `(judge_model, embedding_model)` 为 key 缓存 LLM 客户端;新增 `webapp/api/score.py` 路由;`ScoreRequest`/`ScoreResponse` 放入 `webapp/models.py`;`SCORE_API_TOKEN` 加入 `EvaluationSettings`。
|
||||
|
||||
**Tech Stack:** Python 3.12, FastAPI, Pydantic v2, RAGAS 0.4.3, pytest
|
||||
|
||||
## Global Constraints
|
||||
|
||||
- Python 3.12+,PEP 8,4 空格缩进,类型注解必须
|
||||
- contexts 用 `context_separator`(默认 `" |||| "`)拆分为 list[str]
|
||||
- ground_truth 为可选;缺失时跳过 context_recall / factual_correctness / semantic_similarity / noise_sensitivity
|
||||
- SCORE_API_TOKEN 为空时不鉴权(内网部署场景)
|
||||
- 所有测试用 pytest,不依赖真实 LLM
|
||||
|
||||
---
|
||||
|
||||
## 文件清单
|
||||
|
||||
| 操作 | 文件 | 职责 |
|
||||
|------|------|------|
|
||||
| 新建 | `webapp/services/inline_scorer.py` | LLM 客户端缓存 + 单题打分 |
|
||||
| 新建 | `webapp/api/score.py` | `/api/score` 路由 |
|
||||
| 新建 | `tests/webapp/test_score_api.py` | 端点测试(全 mock) |
|
||||
| 修改 | `webapp/models.py` | 新增 ScoreRequest / ScoreResponse |
|
||||
| 修改 | `rag_eval/settings.py` | 新增 score_api_token 字段 |
|
||||
| 修改 | `webapp/server.py` | 注册 score router,更新 OPENAPI_TAGS 和 description |
|
||||
|
||||
---
|
||||
|
||||
## Task 1: ScoreRequest / ScoreResponse 模型 + settings 字段
|
||||
|
||||
**Files:**
|
||||
- Modify: `webapp/models.py`
|
||||
- Modify: `rag_eval/settings.py`
|
||||
- Test: `tests/webapp/test_score_api.py` (partial — model validation tests)
|
||||
|
||||
**Interfaces:**
|
||||
- Produces:
|
||||
- `ScoreRequest` Pydantic model(见下方字段)
|
||||
- `ScoreResponse` Pydantic model
|
||||
- `EvaluationSettings.score_api_token: str | None`
|
||||
|
||||
- [ ] **Step 1: Write failing model-validation tests**
|
||||
|
||||
Create `tests/webapp/test_score_api.py`:
|
||||
|
||||
```python
|
||||
"""Tests for POST /api/score endpoint."""
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
from webapp.models import ScoreRequest, ScoreResponse
|
||||
|
||||
|
||||
class TestScoreRequest:
|
||||
def test_minimal_valid_request(self):
|
||||
"""Only required fields — question, answer, contexts."""
|
||||
req = ScoreRequest(
|
||||
question="What is CT?",
|
||||
answer="CT is imaging.",
|
||||
contexts="CT uses X-rays.",
|
||||
)
|
||||
assert req.question == "What is CT?"
|
||||
assert req.contexts == "CT uses X-rays."
|
||||
assert req.ground_truth is None
|
||||
assert req.context_separator == " |||| "
|
||||
assert req.metrics == ["faithfulness", "answer_relevancy", "context_recall", "context_precision"]
|
||||
|
||||
def test_contexts_split_by_separator(self):
|
||||
"""contexts_as_list() splits on context_separator."""
|
||||
req = ScoreRequest(
|
||||
question="q", answer="a",
|
||||
contexts="ctx1 |||| ctx2 |||| ctx3",
|
||||
context_separator=" |||| ",
|
||||
)
|
||||
assert req.contexts_as_list() == ["ctx1", "ctx2", "ctx3"]
|
||||
|
||||
def test_contexts_split_custom_separator(self):
|
||||
req = ScoreRequest(
|
||||
question="q", answer="a",
|
||||
contexts="a---b---c",
|
||||
context_separator="---",
|
||||
)
|
||||
assert req.contexts_as_list() == ["a", "b", "c"]
|
||||
|
||||
def test_contexts_split_single_item(self):
|
||||
req = ScoreRequest(question="q", answer="a", contexts="only one")
|
||||
assert req.contexts_as_list() == ["only one"]
|
||||
|
||||
def test_missing_question_raises(self):
|
||||
with pytest.raises(ValidationError):
|
||||
ScoreRequest(answer="a", contexts="c") # type: ignore[call-arg]
|
||||
|
||||
def test_missing_answer_raises(self):
|
||||
with pytest.raises(ValidationError):
|
||||
ScoreRequest(question="q", contexts="c") # type: ignore[call-arg]
|
||||
|
||||
def test_missing_contexts_raises(self):
|
||||
with pytest.raises(ValidationError):
|
||||
ScoreRequest(question="q", answer="a") # type: ignore[call-arg]
|
||||
|
||||
def test_custom_metrics_accepted(self):
|
||||
req = ScoreRequest(
|
||||
question="q", answer="a", contexts="c",
|
||||
metrics=["faithfulness"],
|
||||
)
|
||||
assert req.metrics == ["faithfulness"]
|
||||
|
||||
def test_invalid_metric_name_raises(self):
|
||||
with pytest.raises(ValidationError):
|
||||
ScoreRequest(question="q", answer="a", contexts="c", metrics=["not_a_metric"])
|
||||
|
||||
def test_effective_metrics_drops_ground_truth_dependent_when_missing(self):
|
||||
"""Without ground_truth, GT-dependent metrics are excluded."""
|
||||
req = ScoreRequest(
|
||||
question="q", answer="a", contexts="c",
|
||||
metrics=["faithfulness", "context_recall", "factual_correctness", "semantic_similarity", "noise_sensitivity"],
|
||||
)
|
||||
effective = req.effective_metrics()
|
||||
assert "faithfulness" in effective
|
||||
assert "context_recall" not in effective
|
||||
assert "factual_correctness" not in effective
|
||||
assert "semantic_similarity" not in effective
|
||||
assert "noise_sensitivity" not in effective
|
||||
|
||||
def test_effective_metrics_keeps_all_when_ground_truth_present(self):
|
||||
req = ScoreRequest(
|
||||
question="q", answer="a", contexts="c", ground_truth="gt",
|
||||
metrics=["faithfulness", "context_recall", "factual_correctness"],
|
||||
)
|
||||
effective = req.effective_metrics()
|
||||
assert effective == ["faithfulness", "context_recall", "factual_correctness"]
|
||||
|
||||
|
||||
class TestScoreResponse:
|
||||
def test_score_response_structure(self):
|
||||
resp = ScoreResponse(
|
||||
scores={"faithfulness": 0.85, "answer_relevancy": None},
|
||||
weighted_score=0.85,
|
||||
latency_ms=1200,
|
||||
)
|
||||
assert resp.scores["faithfulness"] == 0.85
|
||||
assert resp.scores["answer_relevancy"] is None
|
||||
assert resp.latency_ms == 1200
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run to verify FAIL**
|
||||
|
||||
```
|
||||
cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas
|
||||
python -m pytest tests/webapp/test_score_api.py::TestScoreRequest tests/webapp/test_score_api.py::TestScoreResponse -v
|
||||
```
|
||||
Expected: `ImportError: cannot import name 'ScoreRequest' from 'webapp.models'`
|
||||
|
||||
- [ ] **Step 3: Add ScoreRequest and ScoreResponse to `webapp/models.py`**
|
||||
|
||||
Append to the end of `webapp/models.py` (after `PipelineJobResponse`):
|
||||
|
||||
```python
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dify 实时评分 API 模型
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# 需要 ground_truth 才能计算的指标集合
|
||||
_GT_DEPENDENT_METRICS: frozenset[str] = frozenset({
|
||||
"context_recall",
|
||||
"factual_correctness",
|
||||
"semantic_similarity",
|
||||
"noise_sensitivity",
|
||||
})
|
||||
|
||||
# 所有合法指标名称
|
||||
_VALID_METRICS: frozenset[str] = frozenset({
|
||||
"faithfulness",
|
||||
"answer_relevancy",
|
||||
"context_recall",
|
||||
"context_precision",
|
||||
"noise_sensitivity",
|
||||
"factual_correctness",
|
||||
"semantic_similarity",
|
||||
})
|
||||
|
||||
_DEFAULT_SCORE_METRICS: list[str] = [
|
||||
"faithfulness",
|
||||
"answer_relevancy",
|
||||
"context_recall",
|
||||
"context_precision",
|
||||
]
|
||||
|
||||
|
||||
class ScoreRequest(BaseModel):
|
||||
"""Request body for the real-time single-sample scoring endpoint."""
|
||||
|
||||
model_config = ConfigDict(
|
||||
json_schema_extra={
|
||||
"examples": [
|
||||
{
|
||||
"summary": "基础评分请求",
|
||||
"value": {
|
||||
"question": "双源CT的时间分辨率是多少?",
|
||||
"answer": "双源CT的单扇区时间分辨率为75ms。",
|
||||
"contexts": "双源CT采用两套管-探测器系统 |||| 单扇区采集旋转135度",
|
||||
"ground_truth": "双源CT单扇区时间分辨率为75ms,需旋转135度。",
|
||||
"context_separator": " |||| ",
|
||||
"metrics": ["faithfulness", "answer_relevancy", "context_recall", "context_precision"],
|
||||
"judge_model": "deepseek-v4-flash",
|
||||
"embedding_model": "text-embedding-v3",
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
question: str = Field(description="问题文本。")
|
||||
answer: str = Field(description="待评分的回答。")
|
||||
contexts: str = Field(
|
||||
description="检索上下文字符串,多段之间用 context_separator 拼接。"
|
||||
)
|
||||
ground_truth: str | None = Field(
|
||||
default=None,
|
||||
description="标准参考答案(可选)。缺失时自动跳过需要它的指标。",
|
||||
)
|
||||
context_separator: str = Field(
|
||||
default=" |||| ",
|
||||
description="contexts 字段中段落分隔符,默认为四个竖线两侧各一空格。",
|
||||
)
|
||||
metrics: list[str] = Field(
|
||||
default_factory=lambda: list(_DEFAULT_SCORE_METRICS),
|
||||
description="需要计算的 RAGAS 指标列表。",
|
||||
)
|
||||
judge_model: str | None = Field(
|
||||
default=None,
|
||||
description="Judge LLM 模型名称;为 null 时使用 .env 中的 RAGAS_JUDGE_MODEL。",
|
||||
)
|
||||
embedding_model: str | None = Field(
|
||||
default=None,
|
||||
description="Embedding 模型名称;为 null 时使用 .env 中的 RAGAS_EMBEDDING_MODEL。",
|
||||
)
|
||||
|
||||
@field_validator("metrics")
|
||||
@classmethod
|
||||
def validate_metric_names(cls, value: list[str]) -> list[str]:
|
||||
"""Reject any metric name not in the supported registry."""
|
||||
invalid = [m for m in value if m not in _VALID_METRICS]
|
||||
if invalid:
|
||||
raise ValueError(
|
||||
f"不支持的指标名称:{invalid}。"
|
||||
f"合法值:{sorted(_VALID_METRICS)}"
|
||||
)
|
||||
if not value:
|
||||
raise ValueError("metrics 不能为空列表。")
|
||||
return value
|
||||
|
||||
def contexts_as_list(self) -> list[str]:
|
||||
"""Split the contexts string into a list of non-empty fragments."""
|
||||
sep = self.context_separator or " |||| "
|
||||
return [s.strip() for s in self.contexts.split(sep) if s.strip()]
|
||||
|
||||
def effective_metrics(self) -> list[str]:
|
||||
"""Return metrics filtered to exclude GT-dependent ones when ground_truth is absent."""
|
||||
if self.ground_truth is not None:
|
||||
return list(self.metrics)
|
||||
return [m for m in self.metrics if m not in _GT_DEPENDENT_METRICS]
|
||||
|
||||
|
||||
class ScoreResponse(BaseModel):
|
||||
"""Response payload for the real-time scoring endpoint."""
|
||||
|
||||
scores: dict[str, float | None] = Field(
|
||||
description="各指标得分(NaN 或计算失败时为 null)。"
|
||||
)
|
||||
weighted_score: float | None = Field(
|
||||
default=None,
|
||||
description="等权加权综合得分(仅对非 null 指标求均值)。",
|
||||
)
|
||||
latency_ms: int = Field(description="服务端打分耗时(毫秒)。")
|
||||
skipped_metrics: list[str] = Field(
|
||||
default_factory=list,
|
||||
description="因缺少 ground_truth 而跳过的指标名称列表。",
|
||||
)
|
||||
error: str | None = Field(
|
||||
default=None,
|
||||
description="打分异常时的错误信息(HTTP 200 仍返回,scores 为空)。",
|
||||
)
|
||||
```
|
||||
|
||||
Also add `field_validator` to the import line at the top of `webapp/models.py`:
|
||||
```python
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add `score_api_token` to `rag_eval/settings.py`**
|
||||
|
||||
Add after the `dataset_generator_model` field:
|
||||
```python
|
||||
score_api_token: str | None = Field(
|
||||
default=None,
|
||||
alias="SCORE_API_TOKEN",
|
||||
description="Bearer token for /api/score endpoint. Empty = no auth.",
|
||||
)
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Run to verify PASS**
|
||||
|
||||
```
|
||||
python -m pytest tests/webapp/test_score_api.py::TestScoreRequest tests/webapp/test_score_api.py::TestScoreResponse -v
|
||||
```
|
||||
Expected: all 12 tests PASS.
|
||||
|
||||
- [ ] **Step 6: Commit**
|
||||
|
||||
```
|
||||
git add webapp/models.py rag_eval/settings.py tests/webapp/test_score_api.py
|
||||
git commit -m "feat: add ScoreRequest/ScoreResponse models and SCORE_API_TOKEN setting"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: InlineScorer 服务(LLM 缓存 + 打分)
|
||||
|
||||
**Files:**
|
||||
- Create: `webapp/services/inline_scorer.py`
|
||||
|
||||
**Interfaces:**
|
||||
- Consumes:
|
||||
- `build_models(judge_model, embedding_model, settings) -> tuple[Any, Any]` from `rag_eval.metrics.factory`
|
||||
- `MetricPipeline(metrics, metric_timeout_seconds)` from `rag_eval.metrics.pipeline`
|
||||
- `NormalizedSample` from `rag_eval.shared.models`
|
||||
- `compute_weighted_score(scores, metric_weights) -> float | None` from `rag_eval.metrics.weights`
|
||||
- `EvaluationSettings` from `rag_eval.settings`
|
||||
- Produces:
|
||||
- `inline_scorer: InlineScorer` (module-level singleton)
|
||||
- `InlineScorer.score(question, answer, contexts, ground_truth, metrics, judge_model, embedding_model, settings) -> dict[str, float | None]`
|
||||
|
||||
- [ ] **Step 1: Write failing test**
|
||||
|
||||
Add to `tests/webapp/test_score_api.py`:
|
||||
|
||||
```python
|
||||
class TestInlineScorer:
|
||||
def test_score_returns_dict_with_requested_metrics(self):
|
||||
"""InlineScorer.score returns a dict keyed by the requested metrics."""
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from webapp.services.inline_scorer import InlineScorer
|
||||
from rag_eval.settings import EvaluationSettings
|
||||
|
||||
mock_score = MagicMock()
|
||||
mock_score.metrics = {"faithfulness": 0.9, "answer_relevancy": 0.8}
|
||||
mock_score.error = ""
|
||||
|
||||
mock_pipeline = MagicMock()
|
||||
mock_pipeline.score_sample = AsyncMock(return_value=mock_score)
|
||||
|
||||
with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())):
|
||||
with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline):
|
||||
with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}):
|
||||
scorer = InlineScorer()
|
||||
result = scorer.score(
|
||||
question="q", answer="a",
|
||||
contexts=["ctx1"],
|
||||
ground_truth=None,
|
||||
metrics=["faithfulness", "answer_relevancy"],
|
||||
judge_model="test-model",
|
||||
embedding_model="test-embed",
|
||||
settings=EvaluationSettings(_env_file=None),
|
||||
)
|
||||
assert "faithfulness" in result
|
||||
assert "answer_relevancy" in result
|
||||
assert result["faithfulness"] == pytest.approx(0.9)
|
||||
|
||||
def test_score_converts_nan_to_none(self):
|
||||
"""NaN scores are converted to None in the returned dict."""
|
||||
import math
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from webapp.services.inline_scorer import InlineScorer
|
||||
from rag_eval.settings import EvaluationSettings
|
||||
|
||||
mock_score = MagicMock()
|
||||
mock_score.metrics = {"faithfulness": float("nan")}
|
||||
mock_score.error = ""
|
||||
|
||||
mock_pipeline = MagicMock()
|
||||
mock_pipeline.score_sample = AsyncMock(return_value=mock_score)
|
||||
|
||||
with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())):
|
||||
with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline):
|
||||
with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}):
|
||||
scorer = InlineScorer()
|
||||
result = scorer.score(
|
||||
question="q", answer="a", contexts=["c"],
|
||||
ground_truth=None,
|
||||
metrics=["faithfulness"],
|
||||
judge_model="m", embedding_model="e",
|
||||
settings=EvaluationSettings(_env_file=None),
|
||||
)
|
||||
assert result["faithfulness"] is None
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run to verify FAIL**
|
||||
|
||||
```
|
||||
python -m pytest tests/webapp/test_score_api.py::TestInlineScorer -v
|
||||
```
|
||||
Expected: `ModuleNotFoundError: No module named 'webapp.services.inline_scorer'`
|
||||
|
||||
- [ ] **Step 3: Create `webapp/services/inline_scorer.py`**
|
||||
|
||||
```python
|
||||
"""LLM-cached inline RAGAS scorer for the real-time /api/score endpoint.
|
||||
|
||||
A module-level InlineScorer singleton caches (llm, embeddings) pairs keyed by
|
||||
(judge_model, embedding_model), so repeated Dify Tool calls with the same
|
||||
models reuse existing AsyncOpenAI connections instead of creating new ones.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import math
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from rag_eval.compat import ensure_ragas_import_compat
|
||||
from rag_eval.metrics.factory import build_models
|
||||
from rag_eval.metrics.pipeline import MetricPipeline
|
||||
from rag_eval.metrics.weights import compute_weighted_score
|
||||
from rag_eval.settings import EvaluationSettings
|
||||
from rag_eval.shared.models import NormalizedSample
|
||||
|
||||
ensure_ragas_import_compat()
|
||||
|
||||
from ragas.metrics.collections import ( # noqa: E402
|
||||
AnswerRelevancy,
|
||||
ContextPrecision,
|
||||
ContextRecall,
|
||||
FactualCorrectness,
|
||||
Faithfulness,
|
||||
NoiseSensitivity,
|
||||
SemanticSimilarity,
|
||||
)
|
||||
|
||||
|
||||
def _build_metric_instances(metrics: list[str], llm: Any, embeddings: Any) -> dict[str, Any]:
|
||||
"""Instantiate only the RAGAS metric objects requested."""
|
||||
registry: dict[str, Any] = {
|
||||
"faithfulness": Faithfulness(llm=llm),
|
||||
"answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings),
|
||||
"context_recall": ContextRecall(llm=llm),
|
||||
"context_precision": ContextPrecision(llm=llm),
|
||||
"noise_sensitivity": NoiseSensitivity(llm=llm),
|
||||
"factual_correctness": FactualCorrectness(llm=llm),
|
||||
"semantic_similarity": SemanticSimilarity(embeddings=embeddings),
|
||||
}
|
||||
return {name: registry[name] for name in metrics if name in registry}
|
||||
|
||||
|
||||
class InlineScorer:
|
||||
"""Thread-safe single-sample RAGAS scorer with LLM client caching."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
# Cache keyed by (judge_model, embedding_model) -> (llm, embeddings)
|
||||
self._model_cache: dict[tuple[str, str], tuple[Any, Any]] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _get_models(
|
||||
self,
|
||||
judge_model: str,
|
||||
embedding_model: str,
|
||||
settings: EvaluationSettings,
|
||||
) -> tuple[Any, Any]:
|
||||
"""Return cached LLM/embedding clients, building them on first use."""
|
||||
cache_key = (judge_model, embedding_model)
|
||||
with self._lock:
|
||||
if cache_key not in self._model_cache:
|
||||
llm, embeddings = build_models(judge_model, embedding_model, settings)
|
||||
self._model_cache[cache_key] = (llm, embeddings)
|
||||
return self._model_cache[cache_key]
|
||||
|
||||
def score(
|
||||
self,
|
||||
question: str,
|
||||
answer: str,
|
||||
contexts: list[str],
|
||||
ground_truth: str | None,
|
||||
metrics: list[str],
|
||||
judge_model: str,
|
||||
embedding_model: str,
|
||||
settings: EvaluationSettings,
|
||||
) -> dict[str, float | None]:
|
||||
"""Score one sample synchronously and return {metric_name: score | None}.
|
||||
|
||||
NaN values from RAGAS are converted to None for clean JSON serialization.
|
||||
"""
|
||||
llm, embeddings = self._get_models(judge_model, embedding_model, settings)
|
||||
metric_instances = _build_metric_instances(metrics, llm, embeddings)
|
||||
|
||||
pipeline = MetricPipeline(
|
||||
metrics=metric_instances,
|
||||
metric_timeout_seconds=settings.ragas_metric_timeout_seconds,
|
||||
)
|
||||
|
||||
sample = NormalizedSample(
|
||||
sample_id="inline-score",
|
||||
question=question,
|
||||
answer=answer,
|
||||
contexts=contexts,
|
||||
ground_truth=ground_truth or "",
|
||||
)
|
||||
|
||||
metric_score = asyncio.run(pipeline.score_sample(sample))
|
||||
|
||||
# Convert NaN → None for clean JSON output
|
||||
return {
|
||||
name: (None if math.isnan(v) or math.isinf(v) else round(v, 4))
|
||||
for name, v in metric_score.metrics.items()
|
||||
}
|
||||
|
||||
|
||||
# Module-level singleton shared by FastAPI routes.
|
||||
inline_scorer = InlineScorer()
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run to verify PASS**
|
||||
|
||||
```
|
||||
python -m pytest tests/webapp/test_score_api.py::TestInlineScorer -v
|
||||
```
|
||||
Expected: both tests PASS.
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```
|
||||
git add webapp/services/inline_scorer.py tests/webapp/test_score_api.py
|
||||
git commit -m "feat: add InlineScorer service with LLM client caching"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: `/api/score` 路由 + 鉴权 + 集成测试
|
||||
|
||||
**Files:**
|
||||
- Create: `webapp/api/score.py`
|
||||
- Modify: `webapp/server.py`
|
||||
|
||||
**Interfaces:**
|
||||
- Consumes:
|
||||
- `ScoreRequest`, `ScoreResponse` from `webapp.models`
|
||||
- `inline_scorer: InlineScorer` from `webapp.services.inline_scorer`
|
||||
- `EvaluationSettings` from `rag_eval.settings`
|
||||
- `compute_weighted_score(scores, {}) -> float | None` from `rag_eval.metrics.weights`
|
||||
- Produces: `POST /api/score` endpoint
|
||||
|
||||
- [ ] **Step 1: Write failing endpoint tests**
|
||||
|
||||
Add to `tests/webapp/test_score_api.py`:
|
||||
|
||||
```python
|
||||
# ── Fixtures ─────────────────────────────────────────────────────────────────
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def client(monkeypatch):
|
||||
"""TestClient with mocked InlineScorer."""
|
||||
import webapp.api.score as score_mod
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score.return_value = {
|
||||
"faithfulness": 0.85,
|
||||
"answer_relevancy": 0.90,
|
||||
}
|
||||
monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer)
|
||||
|
||||
from webapp.server import create_app
|
||||
return TestClient(create_app())
|
||||
|
||||
|
||||
class TestScoreEndpoint:
|
||||
def test_post_score_returns_200(self, client):
|
||||
resp = client.post("/api/score", json={
|
||||
"question": "What is CT?",
|
||||
"answer": "CT is imaging.",
|
||||
"contexts": "CT uses X-rays.",
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "scores" in data
|
||||
assert "latency_ms" in data
|
||||
assert data["scores"]["faithfulness"] == pytest.approx(0.85)
|
||||
|
||||
def test_weighted_score_computed(self, client):
|
||||
resp = client.post("/api/score", json={
|
||||
"question": "q", "answer": "a", "contexts": "c",
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
# weighted_score is the mean of all non-null scores
|
||||
assert data["weighted_score"] is not None
|
||||
|
||||
def test_missing_required_fields_returns_422(self, client):
|
||||
resp = client.post("/api/score", json={"question": "q"})
|
||||
assert resp.status_code == 422
|
||||
|
||||
def test_invalid_metric_name_returns_422(self, client):
|
||||
resp = client.post("/api/score", json={
|
||||
"question": "q", "answer": "a", "contexts": "c",
|
||||
"metrics": ["not_a_metric"],
|
||||
})
|
||||
assert resp.status_code == 422
|
||||
|
||||
def test_skipped_metrics_returned_when_no_ground_truth(self, client):
|
||||
resp = client.post("/api/score", json={
|
||||
"question": "q", "answer": "a", "contexts": "c",
|
||||
"metrics": ["faithfulness", "context_recall"],
|
||||
})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "context_recall" in data["skipped_metrics"]
|
||||
|
||||
def test_contexts_split_on_separator(self, client, monkeypatch):
|
||||
"""contexts string is split before passing to scorer."""
|
||||
import webapp.api.score as score_mod
|
||||
calls = []
|
||||
def capture(*args, **kwargs):
|
||||
calls.append(kwargs.get("contexts", []))
|
||||
return {"faithfulness": 0.9}
|
||||
monkeypatch.setattr(score_mod.inline_scorer, "score", capture)
|
||||
|
||||
client.post("/api/score", json={
|
||||
"question": "q", "answer": "a",
|
||||
"contexts": "ctx1 |||| ctx2",
|
||||
"context_separator": " |||| ",
|
||||
})
|
||||
assert calls[0] == ["ctx1", "ctx2"]
|
||||
|
||||
def test_bearer_token_auth_required_when_configured(self, monkeypatch):
|
||||
"""When SCORE_API_TOKEN is set, requests without token get 401."""
|
||||
import webapp.api.score as score_mod
|
||||
from rag_eval.settings import EvaluationSettings
|
||||
|
||||
mock_settings = EvaluationSettings(_env_file=None)
|
||||
object.__setattr__(mock_settings, "score_api_token", "secret-token")
|
||||
monkeypatch.setattr(score_mod, "_get_settings", lambda: mock_settings)
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score.return_value = {"faithfulness": 0.9}
|
||||
monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer)
|
||||
|
||||
from webapp.server import create_app
|
||||
test_client = TestClient(create_app())
|
||||
|
||||
# No auth header → 401
|
||||
resp = test_client.post("/api/score", json={
|
||||
"question": "q", "answer": "a", "contexts": "c",
|
||||
})
|
||||
assert resp.status_code == 401
|
||||
|
||||
# Correct token → 200
|
||||
resp = test_client.post("/api/score",
|
||||
json={"question": "q", "answer": "a", "contexts": "c"},
|
||||
headers={"Authorization": "Bearer secret-token"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_wrong_bearer_token_returns_401(self, monkeypatch):
|
||||
import webapp.api.score as score_mod
|
||||
from rag_eval.settings import EvaluationSettings
|
||||
|
||||
mock_settings = EvaluationSettings(_env_file=None)
|
||||
object.__setattr__(mock_settings, "score_api_token", "correct-token")
|
||||
monkeypatch.setattr(score_mod, "_get_settings", lambda: mock_settings)
|
||||
|
||||
mock_scorer = MagicMock()
|
||||
mock_scorer.score.return_value = {}
|
||||
monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer)
|
||||
|
||||
from webapp.server import create_app
|
||||
test_client = TestClient(create_app())
|
||||
resp = test_client.post("/api/score",
|
||||
json={"question": "q", "answer": "a", "contexts": "c"},
|
||||
headers={"Authorization": "Bearer wrong-token"},
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run to verify FAIL**
|
||||
|
||||
```
|
||||
python -m pytest tests/webapp/test_score_api.py::TestScoreEndpoint -v
|
||||
```
|
||||
Expected: `ModuleNotFoundError: No module named 'webapp.api.score'`
|
||||
|
||||
- [ ] **Step 3: Create `webapp/api/score.py`**
|
||||
|
||||
```python
|
||||
"""Route for real-time single-sample RAGAS scoring (Dify external Tool endpoint)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
from fastapi import APIRouter, Header, HTTPException
|
||||
from typing import Annotated
|
||||
|
||||
from rag_eval.metrics.weights import compute_weighted_score
|
||||
from rag_eval.settings import EvaluationSettings
|
||||
from webapp.models import ScoreRequest, ScoreResponse
|
||||
from webapp.services.inline_scorer import inline_scorer
|
||||
|
||||
router = APIRouter(prefix="/api/score", tags=["score"])
|
||||
|
||||
|
||||
def _get_settings() -> EvaluationSettings:
|
||||
"""Return a fresh EvaluationSettings instance (overridable in tests)."""
|
||||
return EvaluationSettings()
|
||||
|
||||
|
||||
def _check_auth(authorization: str | None, token: str) -> None:
|
||||
"""Raise 401 if Bearer token does not match the configured token."""
|
||||
if authorization is None:
|
||||
raise HTTPException(status_code=401, detail="Missing Authorization header.")
|
||||
parts = authorization.split(" ", 1)
|
||||
if len(parts) != 2 or parts[0].lower() != "bearer" or parts[1] != token:
|
||||
raise HTTPException(status_code=401, detail="Invalid Bearer token.")
|
||||
|
||||
|
||||
@router.post(
|
||||
"",
|
||||
response_model=ScoreResponse,
|
||||
summary="单题实时评分(Dify 外部 Tool)",
|
||||
responses={
|
||||
200: {"description": "各指标得分和加权综合得分。"},
|
||||
401: {"description": "配置了 SCORE_API_TOKEN 但未提供有效 Bearer token。"},
|
||||
422: {"description": "请求参数校验失败。"},
|
||||
},
|
||||
)
|
||||
def score_sample(
|
||||
request: ScoreRequest,
|
||||
authorization: Annotated[str | None, Header()] = None,
|
||||
) -> ScoreResponse:
|
||||
"""接受单条问答记录,同步运行 RAGAS 指标打分,实时返回各指标得分。
|
||||
|
||||
供 Dify 外部 Tool 调用。将 `contexts` 字段按 `context_separator` 拆分后传入
|
||||
RAGAS 管道;`ground_truth` 缺失时自动跳过依赖它的指标。
|
||||
"""
|
||||
settings = _get_settings()
|
||||
|
||||
# 鉴权(仅在配置了 token 时生效)
|
||||
if settings.score_api_token:
|
||||
_check_auth(authorization, settings.score_api_token)
|
||||
|
||||
judge_model = request.judge_model or settings.ragas_judge_model
|
||||
embedding_model = request.embedding_model or settings.ragas_embedding_model
|
||||
effective = request.effective_metrics()
|
||||
requested = set(request.metrics)
|
||||
skipped = sorted(requested - set(effective))
|
||||
|
||||
if not effective:
|
||||
# All requested metrics require ground_truth which is absent.
|
||||
return ScoreResponse(
|
||||
scores={m: None for m in request.metrics},
|
||||
weighted_score=None,
|
||||
latency_ms=0,
|
||||
skipped_metrics=skipped,
|
||||
)
|
||||
|
||||
t0 = time.monotonic()
|
||||
try:
|
||||
raw_scores = inline_scorer.score(
|
||||
question=request.question,
|
||||
answer=request.answer,
|
||||
contexts=request.contexts_as_list(),
|
||||
ground_truth=request.ground_truth,
|
||||
metrics=effective,
|
||||
judge_model=judge_model,
|
||||
embedding_model=embedding_model,
|
||||
settings=settings,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
latency_ms = int((time.monotonic() - t0) * 1000)
|
||||
return ScoreResponse(
|
||||
scores={},
|
||||
weighted_score=None,
|
||||
latency_ms=latency_ms,
|
||||
skipped_metrics=skipped,
|
||||
error=f"{type(exc).__name__}: {exc}",
|
||||
)
|
||||
|
||||
latency_ms = int((time.monotonic() - t0) * 1000)
|
||||
|
||||
# Merge: skipped metrics appear as null in final scores dict.
|
||||
all_scores: dict[str, float | None] = {m: None for m in request.metrics}
|
||||
all_scores.update(raw_scores)
|
||||
|
||||
# Weighted score = equal-weight mean of non-null effective scores.
|
||||
weighted = compute_weighted_score(
|
||||
{k: v for k, v in raw_scores.items() if v is not None},
|
||||
{},
|
||||
)
|
||||
|
||||
return ScoreResponse(
|
||||
scores=all_scores,
|
||||
weighted_score=round(weighted, 4) if weighted is not None else None,
|
||||
latency_ms=latency_ms,
|
||||
skipped_metrics=skipped,
|
||||
)
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Register router in `webapp/server.py`**
|
||||
|
||||
Add `score` to the import line:
|
||||
```python
|
||||
from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score
|
||||
```
|
||||
|
||||
Add the router registration after `pipeline.router`:
|
||||
```python
|
||||
app.include_router(score.router)
|
||||
```
|
||||
|
||||
Add `"score"` tag to `OPENAPI_TAGS` list (insert before `"meta"`):
|
||||
```python
|
||||
{
|
||||
"name": "score",
|
||||
"description": (
|
||||
"**实时评分 API(Dify 外部 Tool)**\n\n"
|
||||
"接受单条问答记录 `(question, answer, contexts, ground_truth)`,\n"
|
||||
"同步运行 RAGAS 指标打分,返回各指标得分和加权综合得分。\n\n"
|
||||
"适用场景:Dify Agent 在回答后即时调用,用于质量监控或自我改进。\n\n"
|
||||
"**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
|
||||
"`Authorization: Bearer <token>` 请求头。"
|
||||
),
|
||||
},
|
||||
```
|
||||
|
||||
Also update the `description` field in `FastAPI(...)` to add a bullet:
|
||||
```python
|
||||
"- **实时评分 API** — 供 Dify 外部 Tool 调用的单题 RAGAS 评分接口\n"
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Run to verify PASS**
|
||||
|
||||
```
|
||||
python -m pytest tests/webapp/test_score_api.py -v
|
||||
```
|
||||
Expected: all tests PASS.
|
||||
|
||||
- [ ] **Step 6: Verify server boots and route appears**
|
||||
|
||||
```
|
||||
python -c "
|
||||
from webapp.server import create_app
|
||||
app = create_app()
|
||||
routes = [(r.path, list(getattr(r,'methods',[]))) for r in app.routes]
|
||||
score_routes = [(p,m) for p,m in routes if 'score' in p]
|
||||
print('Score routes:', score_routes)
|
||||
"
|
||||
```
|
||||
Expected output:
|
||||
```
|
||||
Score routes: [('/api/score', ['POST'])]
|
||||
```
|
||||
|
||||
- [ ] **Step 7: Commit**
|
||||
|
||||
```
|
||||
git add webapp/api/score.py webapp/server.py tests/webapp/test_score_api.py
|
||||
git commit -m "feat: add POST /api/score endpoint for Dify real-time scoring"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4: 全量回归 + `.env.example` 更新
|
||||
|
||||
**Files:**
|
||||
- Modify: `.env.example`
|
||||
|
||||
- [ ] **Step 1: Add SCORE_API_TOKEN to `.env.example`**
|
||||
|
||||
Add this block after `DATASET_GENERATOR_MODEL=qwen3.6-plus`:
|
||||
|
||||
```
|
||||
# ===== Dify 集成 — 实时评分 API =====
|
||||
# 为 /api/score 端点设置 Bearer Token 鉴权(留空则不鉴权,适合内网部署)
|
||||
# Dify 外部 Tool 配置 Authorization: Bearer <此处填写相同值>
|
||||
SCORE_API_TOKEN=
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run full test suite**
|
||||
|
||||
```
|
||||
python -m pytest tests/ -v --tb=short
|
||||
```
|
||||
|
||||
Pre-existing failures to ignore:
|
||||
- `test_normalize_sample_pdf_offline_smoke_row` — 缺少 CSV fixture
|
||||
- `test_evaluator_and_reporting_write_run_assets` — 预存在的断言不匹配
|
||||
- `test_question_generator_rejects_invalid_json` — retry 循环吞掉了 ValueError
|
||||
- `test_question_generator_rejects_non_list_samples` — 同上
|
||||
|
||||
**零新增失败**即为通过。
|
||||
|
||||
- [ ] **Step 3: Final commit**
|
||||
|
||||
```
|
||||
git add .env.example
|
||||
git commit -m "feat: Dify score API complete — add SCORE_API_TOKEN to .env.example
|
||||
|
||||
- POST /api/score: real-time RAGAS scoring for Dify external Tool
|
||||
- ScoreRequest/ScoreResponse Pydantic models with full field docs
|
||||
- InlineScorer with (judge_model, embedding_model) client cache
|
||||
- Bearer token auth via SCORE_API_TOKEN env var (optional)
|
||||
- contexts split by configurable separator (default ' |||| ')
|
||||
- GT-dependent metrics auto-skipped when ground_truth absent
|
||||
- Full test coverage (22 new tests)
|
||||
|
||||
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Dify 侧配置参考
|
||||
|
||||
任务完成后,在 Dify 「工具」→「自定义工具」中填写如下 OpenAPI Schema:
|
||||
|
||||
```yaml
|
||||
openapi: 3.1.0
|
||||
info:
|
||||
title: RAGAS 实时评分
|
||||
version: 1.0.0
|
||||
servers:
|
||||
- url: http://<your-server>:8800
|
||||
paths:
|
||||
/api/score:
|
||||
post:
|
||||
operationId: scoreQA
|
||||
summary: 对一条问答记录进行 RAGAS 评分
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
required: [question, answer, contexts]
|
||||
properties:
|
||||
question: { type: string }
|
||||
answer: { type: string }
|
||||
contexts: { type: string, description: "多段上下文用 ' |||| ' 拼接" }
|
||||
ground_truth: { type: string }
|
||||
metrics:
|
||||
type: array
|
||||
items: { type: string }
|
||||
default: [faithfulness, answer_relevancy, context_recall, context_precision]
|
||||
responses:
|
||||
'200':
|
||||
description: 评分结果
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
scores: { type: object }
|
||||
weighted_score: { type: number }
|
||||
latency_ms: { type: integer }
|
||||
skipped_metrics: { type: array, items: { type: string } }
|
||||
```
|
||||
Reference in New Issue
Block a user