diff --git a/docs/superpowers/plans/2026-06-22-dify-score-api.md b/docs/superpowers/plans/2026-06-22-dify-score-api.md new file mode 100644 index 0000000..4e4ee40 --- /dev/null +++ b/docs/superpowers/plans/2026-06-22-dify-score-api.md @@ -0,0 +1,974 @@ +# Dify 实时评分 API Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 新增 `POST /api/score` 端点,供 Dify 外部 Tool 调用,接受单条问答记录并同步返回 RAGAS 各指标得分。 + +**Architecture:** 新增 `inline_scorer.py` 服务层封装 RAGAS 打分逻辑,以 `(judge_model, embedding_model)` 为 key 缓存 LLM 客户端;新增 `webapp/api/score.py` 路由;`ScoreRequest`/`ScoreResponse` 放入 `webapp/models.py`;`SCORE_API_TOKEN` 加入 `EvaluationSettings`。 + +**Tech Stack:** Python 3.12, FastAPI, Pydantic v2, RAGAS 0.4.3, pytest + +## Global Constraints + +- Python 3.12+,PEP 8,4 空格缩进,类型注解必须 +- contexts 用 `context_separator`(默认 `" |||| "`)拆分为 list[str] +- ground_truth 为可选;缺失时跳过 context_recall / factual_correctness / semantic_similarity / noise_sensitivity +- SCORE_API_TOKEN 为空时不鉴权(内网部署场景) +- 所有测试用 pytest,不依赖真实 LLM + +--- + +## 文件清单 + +| 操作 | 文件 | 职责 | +|------|------|------| +| 新建 | `webapp/services/inline_scorer.py` | LLM 客户端缓存 + 单题打分 | +| 新建 | `webapp/api/score.py` | `/api/score` 路由 | +| 新建 | `tests/webapp/test_score_api.py` | 端点测试(全 mock) | +| 修改 | `webapp/models.py` | 新增 ScoreRequest / ScoreResponse | +| 修改 | `rag_eval/settings.py` | 新增 score_api_token 字段 | +| 修改 | `webapp/server.py` | 注册 score router,更新 OPENAPI_TAGS 和 description | + +--- + +## Task 1: ScoreRequest / ScoreResponse 模型 + settings 字段 + +**Files:** +- Modify: `webapp/models.py` +- Modify: `rag_eval/settings.py` +- Test: `tests/webapp/test_score_api.py` (partial — model validation tests) + +**Interfaces:** +- Produces: + - `ScoreRequest` Pydantic model(见下方字段) + - `ScoreResponse` Pydantic model + - `EvaluationSettings.score_api_token: str | None` + +- [ ] **Step 1: Write failing model-validation tests** + +Create `tests/webapp/test_score_api.py`: + +```python +"""Tests for POST /api/score endpoint.""" +from __future__ import annotations + +import math +import pytest +from pydantic import ValidationError +from webapp.models import ScoreRequest, ScoreResponse + + +class TestScoreRequest: + def test_minimal_valid_request(self): + """Only required fields — question, answer, contexts.""" + req = ScoreRequest( + question="What is CT?", + answer="CT is imaging.", + contexts="CT uses X-rays.", + ) + assert req.question == "What is CT?" + assert req.contexts == "CT uses X-rays." + assert req.ground_truth is None + assert req.context_separator == " |||| " + assert req.metrics == ["faithfulness", "answer_relevancy", "context_recall", "context_precision"] + + def test_contexts_split_by_separator(self): + """contexts_as_list() splits on context_separator.""" + req = ScoreRequest( + question="q", answer="a", + contexts="ctx1 |||| ctx2 |||| ctx3", + context_separator=" |||| ", + ) + assert req.contexts_as_list() == ["ctx1", "ctx2", "ctx3"] + + def test_contexts_split_custom_separator(self): + req = ScoreRequest( + question="q", answer="a", + contexts="a---b---c", + context_separator="---", + ) + assert req.contexts_as_list() == ["a", "b", "c"] + + def test_contexts_split_single_item(self): + req = ScoreRequest(question="q", answer="a", contexts="only one") + assert req.contexts_as_list() == ["only one"] + + def test_missing_question_raises(self): + with pytest.raises(ValidationError): + ScoreRequest(answer="a", contexts="c") # type: ignore[call-arg] + + def test_missing_answer_raises(self): + with pytest.raises(ValidationError): + ScoreRequest(question="q", contexts="c") # type: ignore[call-arg] + + def test_missing_contexts_raises(self): + with pytest.raises(ValidationError): + ScoreRequest(question="q", answer="a") # type: ignore[call-arg] + + def test_custom_metrics_accepted(self): + req = ScoreRequest( + question="q", answer="a", contexts="c", + metrics=["faithfulness"], + ) + assert req.metrics == ["faithfulness"] + + def test_invalid_metric_name_raises(self): + with pytest.raises(ValidationError): + ScoreRequest(question="q", answer="a", contexts="c", metrics=["not_a_metric"]) + + def test_effective_metrics_drops_ground_truth_dependent_when_missing(self): + """Without ground_truth, GT-dependent metrics are excluded.""" + req = ScoreRequest( + question="q", answer="a", contexts="c", + metrics=["faithfulness", "context_recall", "factual_correctness", "semantic_similarity", "noise_sensitivity"], + ) + effective = req.effective_metrics() + assert "faithfulness" in effective + assert "context_recall" not in effective + assert "factual_correctness" not in effective + assert "semantic_similarity" not in effective + assert "noise_sensitivity" not in effective + + def test_effective_metrics_keeps_all_when_ground_truth_present(self): + req = ScoreRequest( + question="q", answer="a", contexts="c", ground_truth="gt", + metrics=["faithfulness", "context_recall", "factual_correctness"], + ) + effective = req.effective_metrics() + assert effective == ["faithfulness", "context_recall", "factual_correctness"] + + +class TestScoreResponse: + def test_score_response_structure(self): + resp = ScoreResponse( + scores={"faithfulness": 0.85, "answer_relevancy": None}, + weighted_score=0.85, + latency_ms=1200, + ) + assert resp.scores["faithfulness"] == 0.85 + assert resp.scores["answer_relevancy"] is None + assert resp.latency_ms == 1200 +``` + +- [ ] **Step 2: Run to verify FAIL** + +``` +cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas +python -m pytest tests/webapp/test_score_api.py::TestScoreRequest tests/webapp/test_score_api.py::TestScoreResponse -v +``` +Expected: `ImportError: cannot import name 'ScoreRequest' from 'webapp.models'` + +- [ ] **Step 3: Add ScoreRequest and ScoreResponse to `webapp/models.py`** + +Append to the end of `webapp/models.py` (after `PipelineJobResponse`): + +```python +# --------------------------------------------------------------------------- +# Dify 实时评分 API 模型 +# --------------------------------------------------------------------------- + +# 需要 ground_truth 才能计算的指标集合 +_GT_DEPENDENT_METRICS: frozenset[str] = frozenset({ + "context_recall", + "factual_correctness", + "semantic_similarity", + "noise_sensitivity", +}) + +# 所有合法指标名称 +_VALID_METRICS: frozenset[str] = frozenset({ + "faithfulness", + "answer_relevancy", + "context_recall", + "context_precision", + "noise_sensitivity", + "factual_correctness", + "semantic_similarity", +}) + +_DEFAULT_SCORE_METRICS: list[str] = [ + "faithfulness", + "answer_relevancy", + "context_recall", + "context_precision", +] + + +class ScoreRequest(BaseModel): + """Request body for the real-time single-sample scoring endpoint.""" + + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "summary": "基础评分请求", + "value": { + "question": "双源CT的时间分辨率是多少?", + "answer": "双源CT的单扇区时间分辨率为75ms。", + "contexts": "双源CT采用两套管-探测器系统 |||| 单扇区采集旋转135度", + "ground_truth": "双源CT单扇区时间分辨率为75ms,需旋转135度。", + "context_separator": " |||| ", + "metrics": ["faithfulness", "answer_relevancy", "context_recall", "context_precision"], + "judge_model": "deepseek-v4-flash", + "embedding_model": "text-embedding-v3", + }, + } + ] + } + ) + + question: str = Field(description="问题文本。") + answer: str = Field(description="待评分的回答。") + contexts: str = Field( + description="检索上下文字符串,多段之间用 context_separator 拼接。" + ) + ground_truth: str | None = Field( + default=None, + description="标准参考答案(可选)。缺失时自动跳过需要它的指标。", + ) + context_separator: str = Field( + default=" |||| ", + description="contexts 字段中段落分隔符,默认为四个竖线两侧各一空格。", + ) + metrics: list[str] = Field( + default_factory=lambda: list(_DEFAULT_SCORE_METRICS), + description="需要计算的 RAGAS 指标列表。", + ) + judge_model: str | None = Field( + default=None, + description="Judge LLM 模型名称;为 null 时使用 .env 中的 RAGAS_JUDGE_MODEL。", + ) + embedding_model: str | None = Field( + default=None, + description="Embedding 模型名称;为 null 时使用 .env 中的 RAGAS_EMBEDDING_MODEL。", + ) + + @field_validator("metrics") + @classmethod + def validate_metric_names(cls, value: list[str]) -> list[str]: + """Reject any metric name not in the supported registry.""" + invalid = [m for m in value if m not in _VALID_METRICS] + if invalid: + raise ValueError( + f"不支持的指标名称:{invalid}。" + f"合法值:{sorted(_VALID_METRICS)}" + ) + if not value: + raise ValueError("metrics 不能为空列表。") + return value + + def contexts_as_list(self) -> list[str]: + """Split the contexts string into a list of non-empty fragments.""" + sep = self.context_separator or " |||| " + return [s.strip() for s in self.contexts.split(sep) if s.strip()] + + def effective_metrics(self) -> list[str]: + """Return metrics filtered to exclude GT-dependent ones when ground_truth is absent.""" + if self.ground_truth is not None: + return list(self.metrics) + return [m for m in self.metrics if m not in _GT_DEPENDENT_METRICS] + + +class ScoreResponse(BaseModel): + """Response payload for the real-time scoring endpoint.""" + + scores: dict[str, float | None] = Field( + description="各指标得分(NaN 或计算失败时为 null)。" + ) + weighted_score: float | None = Field( + default=None, + description="等权加权综合得分(仅对非 null 指标求均值)。", + ) + latency_ms: int = Field(description="服务端打分耗时(毫秒)。") + skipped_metrics: list[str] = Field( + default_factory=list, + description="因缺少 ground_truth 而跳过的指标名称列表。", + ) + error: str | None = Field( + default=None, + description="打分异常时的错误信息(HTTP 200 仍返回,scores 为空)。", + ) +``` + +Also add `field_validator` to the import line at the top of `webapp/models.py`: +```python +from pydantic import BaseModel, ConfigDict, Field, field_validator +``` + +- [ ] **Step 4: Add `score_api_token` to `rag_eval/settings.py`** + +Add after the `dataset_generator_model` field: +```python +score_api_token: str | None = Field( + default=None, + alias="SCORE_API_TOKEN", + description="Bearer token for /api/score endpoint. Empty = no auth.", +) +``` + +- [ ] **Step 5: Run to verify PASS** + +``` +python -m pytest tests/webapp/test_score_api.py::TestScoreRequest tests/webapp/test_score_api.py::TestScoreResponse -v +``` +Expected: all 12 tests PASS. + +- [ ] **Step 6: Commit** + +``` +git add webapp/models.py rag_eval/settings.py tests/webapp/test_score_api.py +git commit -m "feat: add ScoreRequest/ScoreResponse models and SCORE_API_TOKEN setting" +``` + +--- + +## Task 2: InlineScorer 服务(LLM 缓存 + 打分) + +**Files:** +- Create: `webapp/services/inline_scorer.py` + +**Interfaces:** +- Consumes: + - `build_models(judge_model, embedding_model, settings) -> tuple[Any, Any]` from `rag_eval.metrics.factory` + - `MetricPipeline(metrics, metric_timeout_seconds)` from `rag_eval.metrics.pipeline` + - `NormalizedSample` from `rag_eval.shared.models` + - `compute_weighted_score(scores, metric_weights) -> float | None` from `rag_eval.metrics.weights` + - `EvaluationSettings` from `rag_eval.settings` +- Produces: + - `inline_scorer: InlineScorer` (module-level singleton) + - `InlineScorer.score(question, answer, contexts, ground_truth, metrics, judge_model, embedding_model, settings) -> dict[str, float | None]` + +- [ ] **Step 1: Write failing test** + +Add to `tests/webapp/test_score_api.py`: + +```python +class TestInlineScorer: + def test_score_returns_dict_with_requested_metrics(self): + """InlineScorer.score returns a dict keyed by the requested metrics.""" + from unittest.mock import AsyncMock, MagicMock, patch + from webapp.services.inline_scorer import InlineScorer + from rag_eval.settings import EvaluationSettings + + mock_score = MagicMock() + mock_score.metrics = {"faithfulness": 0.9, "answer_relevancy": 0.8} + mock_score.error = "" + + mock_pipeline = MagicMock() + mock_pipeline.score_sample = AsyncMock(return_value=mock_score) + + with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())): + with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline): + with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}): + scorer = InlineScorer() + result = scorer.score( + question="q", answer="a", + contexts=["ctx1"], + ground_truth=None, + metrics=["faithfulness", "answer_relevancy"], + judge_model="test-model", + embedding_model="test-embed", + settings=EvaluationSettings(_env_file=None), + ) + assert "faithfulness" in result + assert "answer_relevancy" in result + assert result["faithfulness"] == pytest.approx(0.9) + + def test_score_converts_nan_to_none(self): + """NaN scores are converted to None in the returned dict.""" + import math + from unittest.mock import AsyncMock, MagicMock, patch + from webapp.services.inline_scorer import InlineScorer + from rag_eval.settings import EvaluationSettings + + mock_score = MagicMock() + mock_score.metrics = {"faithfulness": float("nan")} + mock_score.error = "" + + mock_pipeline = MagicMock() + mock_pipeline.score_sample = AsyncMock(return_value=mock_score) + + with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())): + with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline): + with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}): + scorer = InlineScorer() + result = scorer.score( + question="q", answer="a", contexts=["c"], + ground_truth=None, + metrics=["faithfulness"], + judge_model="m", embedding_model="e", + settings=EvaluationSettings(_env_file=None), + ) + assert result["faithfulness"] is None +``` + +- [ ] **Step 2: Run to verify FAIL** + +``` +python -m pytest tests/webapp/test_score_api.py::TestInlineScorer -v +``` +Expected: `ModuleNotFoundError: No module named 'webapp.services.inline_scorer'` + +- [ ] **Step 3: Create `webapp/services/inline_scorer.py`** + +```python +"""LLM-cached inline RAGAS scorer for the real-time /api/score endpoint. + +A module-level InlineScorer singleton caches (llm, embeddings) pairs keyed by +(judge_model, embedding_model), so repeated Dify Tool calls with the same +models reuse existing AsyncOpenAI connections instead of creating new ones. +""" + +from __future__ import annotations + +import asyncio +import math +import threading +from typing import Any + +from rag_eval.compat import ensure_ragas_import_compat +from rag_eval.metrics.factory import build_models +from rag_eval.metrics.pipeline import MetricPipeline +from rag_eval.metrics.weights import compute_weighted_score +from rag_eval.settings import EvaluationSettings +from rag_eval.shared.models import NormalizedSample + +ensure_ragas_import_compat() + +from ragas.metrics.collections import ( # noqa: E402 + AnswerRelevancy, + ContextPrecision, + ContextRecall, + FactualCorrectness, + Faithfulness, + NoiseSensitivity, + SemanticSimilarity, +) + + +def _build_metric_instances(metrics: list[str], llm: Any, embeddings: Any) -> dict[str, Any]: + """Instantiate only the RAGAS metric objects requested.""" + registry: dict[str, Any] = { + "faithfulness": Faithfulness(llm=llm), + "answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings), + "context_recall": ContextRecall(llm=llm), + "context_precision": ContextPrecision(llm=llm), + "noise_sensitivity": NoiseSensitivity(llm=llm), + "factual_correctness": FactualCorrectness(llm=llm), + "semantic_similarity": SemanticSimilarity(embeddings=embeddings), + } + return {name: registry[name] for name in metrics if name in registry} + + +class InlineScorer: + """Thread-safe single-sample RAGAS scorer with LLM client caching.""" + + def __init__(self) -> None: + # Cache keyed by (judge_model, embedding_model) -> (llm, embeddings) + self._model_cache: dict[tuple[str, str], tuple[Any, Any]] = {} + self._lock = threading.Lock() + + def _get_models( + self, + judge_model: str, + embedding_model: str, + settings: EvaluationSettings, + ) -> tuple[Any, Any]: + """Return cached LLM/embedding clients, building them on first use.""" + cache_key = (judge_model, embedding_model) + with self._lock: + if cache_key not in self._model_cache: + llm, embeddings = build_models(judge_model, embedding_model, settings) + self._model_cache[cache_key] = (llm, embeddings) + return self._model_cache[cache_key] + + def score( + self, + question: str, + answer: str, + contexts: list[str], + ground_truth: str | None, + metrics: list[str], + judge_model: str, + embedding_model: str, + settings: EvaluationSettings, + ) -> dict[str, float | None]: + """Score one sample synchronously and return {metric_name: score | None}. + + NaN values from RAGAS are converted to None for clean JSON serialization. + """ + llm, embeddings = self._get_models(judge_model, embedding_model, settings) + metric_instances = _build_metric_instances(metrics, llm, embeddings) + + pipeline = MetricPipeline( + metrics=metric_instances, + metric_timeout_seconds=settings.ragas_metric_timeout_seconds, + ) + + sample = NormalizedSample( + sample_id="inline-score", + question=question, + answer=answer, + contexts=contexts, + ground_truth=ground_truth or "", + ) + + metric_score = asyncio.run(pipeline.score_sample(sample)) + + # Convert NaN → None for clean JSON output + return { + name: (None if math.isnan(v) or math.isinf(v) else round(v, 4)) + for name, v in metric_score.metrics.items() + } + + +# Module-level singleton shared by FastAPI routes. +inline_scorer = InlineScorer() +``` + +- [ ] **Step 4: Run to verify PASS** + +``` +python -m pytest tests/webapp/test_score_api.py::TestInlineScorer -v +``` +Expected: both tests PASS. + +- [ ] **Step 5: Commit** + +``` +git add webapp/services/inline_scorer.py tests/webapp/test_score_api.py +git commit -m "feat: add InlineScorer service with LLM client caching" +``` + +--- + +## Task 3: `/api/score` 路由 + 鉴权 + 集成测试 + +**Files:** +- Create: `webapp/api/score.py` +- Modify: `webapp/server.py` + +**Interfaces:** +- Consumes: + - `ScoreRequest`, `ScoreResponse` from `webapp.models` + - `inline_scorer: InlineScorer` from `webapp.services.inline_scorer` + - `EvaluationSettings` from `rag_eval.settings` + - `compute_weighted_score(scores, {}) -> float | None` from `rag_eval.metrics.weights` +- Produces: `POST /api/score` endpoint + +- [ ] **Step 1: Write failing endpoint tests** + +Add to `tests/webapp/test_score_api.py`: + +```python +# ── Fixtures ───────────────────────────────────────────────────────────────── +import pytest +from fastapi.testclient import TestClient +from unittest.mock import MagicMock, patch + + +@pytest.fixture() +def client(monkeypatch): + """TestClient with mocked InlineScorer.""" + import webapp.api.score as score_mod + + mock_scorer = MagicMock() + mock_scorer.score.return_value = { + "faithfulness": 0.85, + "answer_relevancy": 0.90, + } + monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer) + + from webapp.server import create_app + return TestClient(create_app()) + + +class TestScoreEndpoint: + def test_post_score_returns_200(self, client): + resp = client.post("/api/score", json={ + "question": "What is CT?", + "answer": "CT is imaging.", + "contexts": "CT uses X-rays.", + }) + assert resp.status_code == 200 + data = resp.json() + assert "scores" in data + assert "latency_ms" in data + assert data["scores"]["faithfulness"] == pytest.approx(0.85) + + def test_weighted_score_computed(self, client): + resp = client.post("/api/score", json={ + "question": "q", "answer": "a", "contexts": "c", + }) + assert resp.status_code == 200 + data = resp.json() + # weighted_score is the mean of all non-null scores + assert data["weighted_score"] is not None + + def test_missing_required_fields_returns_422(self, client): + resp = client.post("/api/score", json={"question": "q"}) + assert resp.status_code == 422 + + def test_invalid_metric_name_returns_422(self, client): + resp = client.post("/api/score", json={ + "question": "q", "answer": "a", "contexts": "c", + "metrics": ["not_a_metric"], + }) + assert resp.status_code == 422 + + def test_skipped_metrics_returned_when_no_ground_truth(self, client): + resp = client.post("/api/score", json={ + "question": "q", "answer": "a", "contexts": "c", + "metrics": ["faithfulness", "context_recall"], + }) + assert resp.status_code == 200 + data = resp.json() + assert "context_recall" in data["skipped_metrics"] + + def test_contexts_split_on_separator(self, client, monkeypatch): + """contexts string is split before passing to scorer.""" + import webapp.api.score as score_mod + calls = [] + def capture(*args, **kwargs): + calls.append(kwargs.get("contexts", [])) + return {"faithfulness": 0.9} + monkeypatch.setattr(score_mod.inline_scorer, "score", capture) + + client.post("/api/score", json={ + "question": "q", "answer": "a", + "contexts": "ctx1 |||| ctx2", + "context_separator": " |||| ", + }) + assert calls[0] == ["ctx1", "ctx2"] + + def test_bearer_token_auth_required_when_configured(self, monkeypatch): + """When SCORE_API_TOKEN is set, requests without token get 401.""" + import webapp.api.score as score_mod + from rag_eval.settings import EvaluationSettings + + mock_settings = EvaluationSettings(_env_file=None) + object.__setattr__(mock_settings, "score_api_token", "secret-token") + monkeypatch.setattr(score_mod, "_get_settings", lambda: mock_settings) + + mock_scorer = MagicMock() + mock_scorer.score.return_value = {"faithfulness": 0.9} + monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer) + + from webapp.server import create_app + test_client = TestClient(create_app()) + + # No auth header → 401 + resp = test_client.post("/api/score", json={ + "question": "q", "answer": "a", "contexts": "c", + }) + assert resp.status_code == 401 + + # Correct token → 200 + resp = test_client.post("/api/score", + json={"question": "q", "answer": "a", "contexts": "c"}, + headers={"Authorization": "Bearer secret-token"}, + ) + assert resp.status_code == 200 + + def test_wrong_bearer_token_returns_401(self, monkeypatch): + import webapp.api.score as score_mod + from rag_eval.settings import EvaluationSettings + + mock_settings = EvaluationSettings(_env_file=None) + object.__setattr__(mock_settings, "score_api_token", "correct-token") + monkeypatch.setattr(score_mod, "_get_settings", lambda: mock_settings) + + mock_scorer = MagicMock() + mock_scorer.score.return_value = {} + monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer) + + from webapp.server import create_app + test_client = TestClient(create_app()) + resp = test_client.post("/api/score", + json={"question": "q", "answer": "a", "contexts": "c"}, + headers={"Authorization": "Bearer wrong-token"}, + ) + assert resp.status_code == 401 +``` + +- [ ] **Step 2: Run to verify FAIL** + +``` +python -m pytest tests/webapp/test_score_api.py::TestScoreEndpoint -v +``` +Expected: `ModuleNotFoundError: No module named 'webapp.api.score'` + +- [ ] **Step 3: Create `webapp/api/score.py`** + +```python +"""Route for real-time single-sample RAGAS scoring (Dify external Tool endpoint).""" + +from __future__ import annotations + +import time + +from fastapi import APIRouter, Header, HTTPException +from typing import Annotated + +from rag_eval.metrics.weights import compute_weighted_score +from rag_eval.settings import EvaluationSettings +from webapp.models import ScoreRequest, ScoreResponse +from webapp.services.inline_scorer import inline_scorer + +router = APIRouter(prefix="/api/score", tags=["score"]) + + +def _get_settings() -> EvaluationSettings: + """Return a fresh EvaluationSettings instance (overridable in tests).""" + return EvaluationSettings() + + +def _check_auth(authorization: str | None, token: str) -> None: + """Raise 401 if Bearer token does not match the configured token.""" + if authorization is None: + raise HTTPException(status_code=401, detail="Missing Authorization header.") + parts = authorization.split(" ", 1) + if len(parts) != 2 or parts[0].lower() != "bearer" or parts[1] != token: + raise HTTPException(status_code=401, detail="Invalid Bearer token.") + + +@router.post( + "", + response_model=ScoreResponse, + summary="单题实时评分(Dify 外部 Tool)", + responses={ + 200: {"description": "各指标得分和加权综合得分。"}, + 401: {"description": "配置了 SCORE_API_TOKEN 但未提供有效 Bearer token。"}, + 422: {"description": "请求参数校验失败。"}, + }, +) +def score_sample( + request: ScoreRequest, + authorization: Annotated[str | None, Header()] = None, +) -> ScoreResponse: + """接受单条问答记录,同步运行 RAGAS 指标打分,实时返回各指标得分。 + + 供 Dify 外部 Tool 调用。将 `contexts` 字段按 `context_separator` 拆分后传入 + RAGAS 管道;`ground_truth` 缺失时自动跳过依赖它的指标。 + """ + settings = _get_settings() + + # 鉴权(仅在配置了 token 时生效) + if settings.score_api_token: + _check_auth(authorization, settings.score_api_token) + + judge_model = request.judge_model or settings.ragas_judge_model + embedding_model = request.embedding_model or settings.ragas_embedding_model + effective = request.effective_metrics() + requested = set(request.metrics) + skipped = sorted(requested - set(effective)) + + if not effective: + # All requested metrics require ground_truth which is absent. + return ScoreResponse( + scores={m: None for m in request.metrics}, + weighted_score=None, + latency_ms=0, + skipped_metrics=skipped, + ) + + t0 = time.monotonic() + try: + raw_scores = inline_scorer.score( + question=request.question, + answer=request.answer, + contexts=request.contexts_as_list(), + ground_truth=request.ground_truth, + metrics=effective, + judge_model=judge_model, + embedding_model=embedding_model, + settings=settings, + ) + except Exception as exc: # noqa: BLE001 + latency_ms = int((time.monotonic() - t0) * 1000) + return ScoreResponse( + scores={}, + weighted_score=None, + latency_ms=latency_ms, + skipped_metrics=skipped, + error=f"{type(exc).__name__}: {exc}", + ) + + latency_ms = int((time.monotonic() - t0) * 1000) + + # Merge: skipped metrics appear as null in final scores dict. + all_scores: dict[str, float | None] = {m: None for m in request.metrics} + all_scores.update(raw_scores) + + # Weighted score = equal-weight mean of non-null effective scores. + weighted = compute_weighted_score( + {k: v for k, v in raw_scores.items() if v is not None}, + {}, + ) + + return ScoreResponse( + scores=all_scores, + weighted_score=round(weighted, 4) if weighted is not None else None, + latency_ms=latency_ms, + skipped_metrics=skipped, + ) +``` + +- [ ] **Step 4: Register router in `webapp/server.py`** + +Add `score` to the import line: +```python +from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score +``` + +Add the router registration after `pipeline.router`: +```python +app.include_router(score.router) +``` + +Add `"score"` tag to `OPENAPI_TAGS` list (insert before `"meta"`): +```python + { + "name": "score", + "description": ( + "**实时评分 API(Dify 外部 Tool)**\n\n" + "接受单条问答记录 `(question, answer, contexts, ground_truth)`,\n" + "同步运行 RAGAS 指标打分,返回各指标得分和加权综合得分。\n\n" + "适用场景:Dify Agent 在回答后即时调用,用于质量监控或自我改进。\n\n" + "**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 " + "`Authorization: Bearer ` 请求头。" + ), + }, +``` + +Also update the `description` field in `FastAPI(...)` to add a bullet: +```python +"- **实时评分 API** — 供 Dify 外部 Tool 调用的单题 RAGAS 评分接口\n" +``` + +- [ ] **Step 5: Run to verify PASS** + +``` +python -m pytest tests/webapp/test_score_api.py -v +``` +Expected: all tests PASS. + +- [ ] **Step 6: Verify server boots and route appears** + +``` +python -c " +from webapp.server import create_app +app = create_app() +routes = [(r.path, list(getattr(r,'methods',[]))) for r in app.routes] +score_routes = [(p,m) for p,m in routes if 'score' in p] +print('Score routes:', score_routes) +" +``` +Expected output: +``` +Score routes: [('/api/score', ['POST'])] +``` + +- [ ] **Step 7: Commit** + +``` +git add webapp/api/score.py webapp/server.py tests/webapp/test_score_api.py +git commit -m "feat: add POST /api/score endpoint for Dify real-time scoring" +``` + +--- + +## Task 4: 全量回归 + `.env.example` 更新 + +**Files:** +- Modify: `.env.example` + +- [ ] **Step 1: Add SCORE_API_TOKEN to `.env.example`** + +Add this block after `DATASET_GENERATOR_MODEL=qwen3.6-plus`: + +``` +# ===== Dify 集成 — 实时评分 API ===== +# 为 /api/score 端点设置 Bearer Token 鉴权(留空则不鉴权,适合内网部署) +# Dify 外部 Tool 配置 Authorization: Bearer <此处填写相同值> +SCORE_API_TOKEN= +``` + +- [ ] **Step 2: Run full test suite** + +``` +python -m pytest tests/ -v --tb=short +``` + +Pre-existing failures to ignore: +- `test_normalize_sample_pdf_offline_smoke_row` — 缺少 CSV fixture +- `test_evaluator_and_reporting_write_run_assets` — 预存在的断言不匹配 +- `test_question_generator_rejects_invalid_json` — retry 循环吞掉了 ValueError +- `test_question_generator_rejects_non_list_samples` — 同上 + +**零新增失败**即为通过。 + +- [ ] **Step 3: Final commit** + +``` +git add .env.example +git commit -m "feat: Dify score API complete — add SCORE_API_TOKEN to .env.example + +- POST /api/score: real-time RAGAS scoring for Dify external Tool +- ScoreRequest/ScoreResponse Pydantic models with full field docs +- InlineScorer with (judge_model, embedding_model) client cache +- Bearer token auth via SCORE_API_TOKEN env var (optional) +- contexts split by configurable separator (default ' |||| ') +- GT-dependent metrics auto-skipped when ground_truth absent +- Full test coverage (22 new tests) + +Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>" +``` + +--- + +## Dify 侧配置参考 + +任务完成后,在 Dify 「工具」→「自定义工具」中填写如下 OpenAPI Schema: + +```yaml +openapi: 3.1.0 +info: + title: RAGAS 实时评分 + version: 1.0.0 +servers: + - url: http://:8800 +paths: + /api/score: + post: + operationId: scoreQA + summary: 对一条问答记录进行 RAGAS 评分 + requestBody: + required: true + content: + application/json: + schema: + type: object + required: [question, answer, contexts] + properties: + question: { type: string } + answer: { type: string } + contexts: { type: string, description: "多段上下文用 ' |||| ' 拼接" } + ground_truth: { type: string } + metrics: + type: array + items: { type: string } + default: [faithfulness, answer_relevancy, context_recall, context_precision] + responses: + '200': + description: 评分结果 + content: + application/json: + schema: + type: object + properties: + scores: { type: object } + weighted_score: { type: number } + latency_ms: { type: integer } + skipped_metrics: { type: array, items: { type: string } } +```