# Dify 实时评分 API Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 新增 `POST /api/score` 端点,供 Dify 外部 Tool 调用,接受单条问答记录并同步返回 RAGAS 各指标得分。 **Architecture:** 新增 `inline_scorer.py` 服务层封装 RAGAS 打分逻辑,以 `(judge_model, embedding_model)` 为 key 缓存 LLM 客户端;新增 `webapp/api/score.py` 路由;`ScoreRequest`/`ScoreResponse` 放入 `webapp/models.py`;`SCORE_API_TOKEN` 加入 `EvaluationSettings`。 **Tech Stack:** Python 3.12, FastAPI, Pydantic v2, RAGAS 0.4.3, pytest ## Global Constraints - Python 3.12+,PEP 8,4 空格缩进,类型注解必须 - contexts 用 `context_separator`(默认 `" |||| "`)拆分为 list[str] - ground_truth 为可选;缺失时跳过 context_recall / factual_correctness / semantic_similarity / noise_sensitivity - SCORE_API_TOKEN 为空时不鉴权(内网部署场景) - 所有测试用 pytest,不依赖真实 LLM --- ## 文件清单 | 操作 | 文件 | 职责 | |------|------|------| | 新建 | `webapp/services/inline_scorer.py` | LLM 客户端缓存 + 单题打分 | | 新建 | `webapp/api/score.py` | `/api/score` 路由 | | 新建 | `tests/webapp/test_score_api.py` | 端点测试(全 mock) | | 修改 | `webapp/models.py` | 新增 ScoreRequest / ScoreResponse | | 修改 | `rag_eval/settings.py` | 新增 score_api_token 字段 | | 修改 | `webapp/server.py` | 注册 score router,更新 OPENAPI_TAGS 和 description | --- ## Task 1: ScoreRequest / ScoreResponse 模型 + settings 字段 **Files:** - Modify: `webapp/models.py` - Modify: `rag_eval/settings.py` - Test: `tests/webapp/test_score_api.py` (partial — model validation tests) **Interfaces:** - Produces: - `ScoreRequest` Pydantic model(见下方字段) - `ScoreResponse` Pydantic model - `EvaluationSettings.score_api_token: str | None` - [ ] **Step 1: Write failing model-validation tests** Create `tests/webapp/test_score_api.py`: ```python """Tests for POST /api/score endpoint.""" from __future__ import annotations import math import pytest from pydantic import ValidationError from webapp.models import ScoreRequest, ScoreResponse class TestScoreRequest: def test_minimal_valid_request(self): """Only required fields — question, answer, contexts.""" req = ScoreRequest( question="What is CT?", answer="CT is imaging.", contexts="CT uses X-rays.", ) assert req.question == "What is CT?" assert req.contexts == "CT uses X-rays." assert req.ground_truth is None assert req.context_separator == " |||| " assert req.metrics == ["faithfulness", "answer_relevancy", "context_recall", "context_precision"] def test_contexts_split_by_separator(self): """contexts_as_list() splits on context_separator.""" req = ScoreRequest( question="q", answer="a", contexts="ctx1 |||| ctx2 |||| ctx3", context_separator=" |||| ", ) assert req.contexts_as_list() == ["ctx1", "ctx2", "ctx3"] def test_contexts_split_custom_separator(self): req = ScoreRequest( question="q", answer="a", contexts="a---b---c", context_separator="---", ) assert req.contexts_as_list() == ["a", "b", "c"] def test_contexts_split_single_item(self): req = ScoreRequest(question="q", answer="a", contexts="only one") assert req.contexts_as_list() == ["only one"] def test_missing_question_raises(self): with pytest.raises(ValidationError): ScoreRequest(answer="a", contexts="c") # type: ignore[call-arg] def test_missing_answer_raises(self): with pytest.raises(ValidationError): ScoreRequest(question="q", contexts="c") # type: ignore[call-arg] def test_missing_contexts_raises(self): with pytest.raises(ValidationError): ScoreRequest(question="q", answer="a") # type: ignore[call-arg] def test_custom_metrics_accepted(self): req = ScoreRequest( question="q", answer="a", contexts="c", metrics=["faithfulness"], ) assert req.metrics == ["faithfulness"] def test_invalid_metric_name_raises(self): with pytest.raises(ValidationError): ScoreRequest(question="q", answer="a", contexts="c", metrics=["not_a_metric"]) def test_effective_metrics_drops_ground_truth_dependent_when_missing(self): """Without ground_truth, GT-dependent metrics are excluded.""" req = ScoreRequest( question="q", answer="a", contexts="c", metrics=["faithfulness", "context_recall", "factual_correctness", "semantic_similarity", "noise_sensitivity"], ) effective = req.effective_metrics() assert "faithfulness" in effective assert "context_recall" not in effective assert "factual_correctness" not in effective assert "semantic_similarity" not in effective assert "noise_sensitivity" not in effective def test_effective_metrics_keeps_all_when_ground_truth_present(self): req = ScoreRequest( question="q", answer="a", contexts="c", ground_truth="gt", metrics=["faithfulness", "context_recall", "factual_correctness"], ) effective = req.effective_metrics() assert effective == ["faithfulness", "context_recall", "factual_correctness"] class TestScoreResponse: def test_score_response_structure(self): resp = ScoreResponse( scores={"faithfulness": 0.85, "answer_relevancy": None}, weighted_score=0.85, latency_ms=1200, ) assert resp.scores["faithfulness"] == 0.85 assert resp.scores["answer_relevancy"] is None assert resp.latency_ms == 1200 ``` - [ ] **Step 2: Run to verify FAIL** ``` cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas python -m pytest tests/webapp/test_score_api.py::TestScoreRequest tests/webapp/test_score_api.py::TestScoreResponse -v ``` Expected: `ImportError: cannot import name 'ScoreRequest' from 'webapp.models'` - [ ] **Step 3: Add ScoreRequest and ScoreResponse to `webapp/models.py`** Append to the end of `webapp/models.py` (after `PipelineJobResponse`): ```python # --------------------------------------------------------------------------- # Dify 实时评分 API 模型 # --------------------------------------------------------------------------- # 需要 ground_truth 才能计算的指标集合 _GT_DEPENDENT_METRICS: frozenset[str] = frozenset({ "context_recall", "factual_correctness", "semantic_similarity", "noise_sensitivity", }) # 所有合法指标名称 _VALID_METRICS: frozenset[str] = frozenset({ "faithfulness", "answer_relevancy", "context_recall", "context_precision", "noise_sensitivity", "factual_correctness", "semantic_similarity", }) _DEFAULT_SCORE_METRICS: list[str] = [ "faithfulness", "answer_relevancy", "context_recall", "context_precision", ] class ScoreRequest(BaseModel): """Request body for the real-time single-sample scoring endpoint.""" model_config = ConfigDict( json_schema_extra={ "examples": [ { "summary": "基础评分请求", "value": { "question": "双源CT的时间分辨率是多少?", "answer": "双源CT的单扇区时间分辨率为75ms。", "contexts": "双源CT采用两套管-探测器系统 |||| 单扇区采集旋转135度", "ground_truth": "双源CT单扇区时间分辨率为75ms,需旋转135度。", "context_separator": " |||| ", "metrics": ["faithfulness", "answer_relevancy", "context_recall", "context_precision"], "judge_model": "deepseek-v4-flash", "embedding_model": "text-embedding-v3", }, } ] } ) question: str = Field(description="问题文本。") answer: str = Field(description="待评分的回答。") contexts: str = Field( description="检索上下文字符串,多段之间用 context_separator 拼接。" ) ground_truth: str | None = Field( default=None, description="标准参考答案(可选)。缺失时自动跳过需要它的指标。", ) context_separator: str = Field( default=" |||| ", description="contexts 字段中段落分隔符,默认为四个竖线两侧各一空格。", ) metrics: list[str] = Field( default_factory=lambda: list(_DEFAULT_SCORE_METRICS), description="需要计算的 RAGAS 指标列表。", ) judge_model: str | None = Field( default=None, description="Judge LLM 模型名称;为 null 时使用 .env 中的 RAGAS_JUDGE_MODEL。", ) embedding_model: str | None = Field( default=None, description="Embedding 模型名称;为 null 时使用 .env 中的 RAGAS_EMBEDDING_MODEL。", ) @field_validator("metrics") @classmethod def validate_metric_names(cls, value: list[str]) -> list[str]: """Reject any metric name not in the supported registry.""" invalid = [m for m in value if m not in _VALID_METRICS] if invalid: raise ValueError( f"不支持的指标名称:{invalid}。" f"合法值:{sorted(_VALID_METRICS)}" ) if not value: raise ValueError("metrics 不能为空列表。") return value def contexts_as_list(self) -> list[str]: """Split the contexts string into a list of non-empty fragments.""" sep = self.context_separator or " |||| " return [s.strip() for s in self.contexts.split(sep) if s.strip()] def effective_metrics(self) -> list[str]: """Return metrics filtered to exclude GT-dependent ones when ground_truth is absent.""" if self.ground_truth is not None: return list(self.metrics) return [m for m in self.metrics if m not in _GT_DEPENDENT_METRICS] class ScoreResponse(BaseModel): """Response payload for the real-time scoring endpoint.""" scores: dict[str, float | None] = Field( description="各指标得分(NaN 或计算失败时为 null)。" ) weighted_score: float | None = Field( default=None, description="等权加权综合得分(仅对非 null 指标求均值)。", ) latency_ms: int = Field(description="服务端打分耗时(毫秒)。") skipped_metrics: list[str] = Field( default_factory=list, description="因缺少 ground_truth 而跳过的指标名称列表。", ) error: str | None = Field( default=None, description="打分异常时的错误信息(HTTP 200 仍返回,scores 为空)。", ) ``` Also add `field_validator` to the import line at the top of `webapp/models.py`: ```python from pydantic import BaseModel, ConfigDict, Field, field_validator ``` - [ ] **Step 4: Add `score_api_token` to `rag_eval/settings.py`** Add after the `dataset_generator_model` field: ```python score_api_token: str | None = Field( default=None, alias="SCORE_API_TOKEN", description="Bearer token for /api/score endpoint. Empty = no auth.", ) ``` - [ ] **Step 5: Run to verify PASS** ``` python -m pytest tests/webapp/test_score_api.py::TestScoreRequest tests/webapp/test_score_api.py::TestScoreResponse -v ``` Expected: all 12 tests PASS. - [ ] **Step 6: Commit** ``` git add webapp/models.py rag_eval/settings.py tests/webapp/test_score_api.py git commit -m "feat: add ScoreRequest/ScoreResponse models and SCORE_API_TOKEN setting" ``` --- ## Task 2: InlineScorer 服务(LLM 缓存 + 打分) **Files:** - Create: `webapp/services/inline_scorer.py` **Interfaces:** - Consumes: - `build_models(judge_model, embedding_model, settings) -> tuple[Any, Any]` from `rag_eval.metrics.factory` - `MetricPipeline(metrics, metric_timeout_seconds)` from `rag_eval.metrics.pipeline` - `NormalizedSample` from `rag_eval.shared.models` - `compute_weighted_score(scores, metric_weights) -> float | None` from `rag_eval.metrics.weights` - `EvaluationSettings` from `rag_eval.settings` - Produces: - `inline_scorer: InlineScorer` (module-level singleton) - `InlineScorer.score(question, answer, contexts, ground_truth, metrics, judge_model, embedding_model, settings) -> dict[str, float | None]` - [ ] **Step 1: Write failing test** Add to `tests/webapp/test_score_api.py`: ```python class TestInlineScorer: def test_score_returns_dict_with_requested_metrics(self): """InlineScorer.score returns a dict keyed by the requested metrics.""" from unittest.mock import AsyncMock, MagicMock, patch from webapp.services.inline_scorer import InlineScorer from rag_eval.settings import EvaluationSettings mock_score = MagicMock() mock_score.metrics = {"faithfulness": 0.9, "answer_relevancy": 0.8} mock_score.error = "" mock_pipeline = MagicMock() mock_pipeline.score_sample = AsyncMock(return_value=mock_score) with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())): with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline): with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}): scorer = InlineScorer() result = scorer.score( question="q", answer="a", contexts=["ctx1"], ground_truth=None, metrics=["faithfulness", "answer_relevancy"], judge_model="test-model", embedding_model="test-embed", settings=EvaluationSettings(_env_file=None), ) assert "faithfulness" in result assert "answer_relevancy" in result assert result["faithfulness"] == pytest.approx(0.9) def test_score_converts_nan_to_none(self): """NaN scores are converted to None in the returned dict.""" import math from unittest.mock import AsyncMock, MagicMock, patch from webapp.services.inline_scorer import InlineScorer from rag_eval.settings import EvaluationSettings mock_score = MagicMock() mock_score.metrics = {"faithfulness": float("nan")} mock_score.error = "" mock_pipeline = MagicMock() mock_pipeline.score_sample = AsyncMock(return_value=mock_score) with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())): with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline): with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}): scorer = InlineScorer() result = scorer.score( question="q", answer="a", contexts=["c"], ground_truth=None, metrics=["faithfulness"], judge_model="m", embedding_model="e", settings=EvaluationSettings(_env_file=None), ) assert result["faithfulness"] is None ``` - [ ] **Step 2: Run to verify FAIL** ``` python -m pytest tests/webapp/test_score_api.py::TestInlineScorer -v ``` Expected: `ModuleNotFoundError: No module named 'webapp.services.inline_scorer'` - [ ] **Step 3: Create `webapp/services/inline_scorer.py`** ```python """LLM-cached inline RAGAS scorer for the real-time /api/score endpoint. A module-level InlineScorer singleton caches (llm, embeddings) pairs keyed by (judge_model, embedding_model), so repeated Dify Tool calls with the same models reuse existing AsyncOpenAI connections instead of creating new ones. """ from __future__ import annotations import asyncio import math import threading from typing import Any from rag_eval.compat import ensure_ragas_import_compat from rag_eval.metrics.factory import build_models from rag_eval.metrics.pipeline import MetricPipeline from rag_eval.metrics.weights import compute_weighted_score from rag_eval.settings import EvaluationSettings from rag_eval.shared.models import NormalizedSample ensure_ragas_import_compat() from ragas.metrics.collections import ( # noqa: E402 AnswerRelevancy, ContextPrecision, ContextRecall, FactualCorrectness, Faithfulness, NoiseSensitivity, SemanticSimilarity, ) def _build_metric_instances(metrics: list[str], llm: Any, embeddings: Any) -> dict[str, Any]: """Instantiate only the RAGAS metric objects requested.""" registry: dict[str, Any] = { "faithfulness": Faithfulness(llm=llm), "answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings), "context_recall": ContextRecall(llm=llm), "context_precision": ContextPrecision(llm=llm), "noise_sensitivity": NoiseSensitivity(llm=llm), "factual_correctness": FactualCorrectness(llm=llm), "semantic_similarity": SemanticSimilarity(embeddings=embeddings), } return {name: registry[name] for name in metrics if name in registry} class InlineScorer: """Thread-safe single-sample RAGAS scorer with LLM client caching.""" def __init__(self) -> None: # Cache keyed by (judge_model, embedding_model) -> (llm, embeddings) self._model_cache: dict[tuple[str, str], tuple[Any, Any]] = {} self._lock = threading.Lock() def _get_models( self, judge_model: str, embedding_model: str, settings: EvaluationSettings, ) -> tuple[Any, Any]: """Return cached LLM/embedding clients, building them on first use.""" cache_key = (judge_model, embedding_model) with self._lock: if cache_key not in self._model_cache: llm, embeddings = build_models(judge_model, embedding_model, settings) self._model_cache[cache_key] = (llm, embeddings) return self._model_cache[cache_key] def score( self, question: str, answer: str, contexts: list[str], ground_truth: str | None, metrics: list[str], judge_model: str, embedding_model: str, settings: EvaluationSettings, ) -> dict[str, float | None]: """Score one sample synchronously and return {metric_name: score | None}. NaN values from RAGAS are converted to None for clean JSON serialization. """ llm, embeddings = self._get_models(judge_model, embedding_model, settings) metric_instances = _build_metric_instances(metrics, llm, embeddings) pipeline = MetricPipeline( metrics=metric_instances, metric_timeout_seconds=settings.ragas_metric_timeout_seconds, ) sample = NormalizedSample( sample_id="inline-score", question=question, answer=answer, contexts=contexts, ground_truth=ground_truth or "", ) metric_score = asyncio.run(pipeline.score_sample(sample)) # Convert NaN → None for clean JSON output return { name: (None if math.isnan(v) or math.isinf(v) else round(v, 4)) for name, v in metric_score.metrics.items() } # Module-level singleton shared by FastAPI routes. inline_scorer = InlineScorer() ``` - [ ] **Step 4: Run to verify PASS** ``` python -m pytest tests/webapp/test_score_api.py::TestInlineScorer -v ``` Expected: both tests PASS. - [ ] **Step 5: Commit** ``` git add webapp/services/inline_scorer.py tests/webapp/test_score_api.py git commit -m "feat: add InlineScorer service with LLM client caching" ``` --- ## Task 3: `/api/score` 路由 + 鉴权 + 集成测试 **Files:** - Create: `webapp/api/score.py` - Modify: `webapp/server.py` **Interfaces:** - Consumes: - `ScoreRequest`, `ScoreResponse` from `webapp.models` - `inline_scorer: InlineScorer` from `webapp.services.inline_scorer` - `EvaluationSettings` from `rag_eval.settings` - `compute_weighted_score(scores, {}) -> float | None` from `rag_eval.metrics.weights` - Produces: `POST /api/score` endpoint - [ ] **Step 1: Write failing endpoint tests** Add to `tests/webapp/test_score_api.py`: ```python # ── Fixtures ───────────────────────────────────────────────────────────────── import pytest from fastapi.testclient import TestClient from unittest.mock import MagicMock, patch @pytest.fixture() def client(monkeypatch): """TestClient with mocked InlineScorer.""" import webapp.api.score as score_mod mock_scorer = MagicMock() mock_scorer.score.return_value = { "faithfulness": 0.85, "answer_relevancy": 0.90, } monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer) from webapp.server import create_app return TestClient(create_app()) class TestScoreEndpoint: def test_post_score_returns_200(self, client): resp = client.post("/api/score", json={ "question": "What is CT?", "answer": "CT is imaging.", "contexts": "CT uses X-rays.", }) assert resp.status_code == 200 data = resp.json() assert "scores" in data assert "latency_ms" in data assert data["scores"]["faithfulness"] == pytest.approx(0.85) def test_weighted_score_computed(self, client): resp = client.post("/api/score", json={ "question": "q", "answer": "a", "contexts": "c", }) assert resp.status_code == 200 data = resp.json() # weighted_score is the mean of all non-null scores assert data["weighted_score"] is not None def test_missing_required_fields_returns_422(self, client): resp = client.post("/api/score", json={"question": "q"}) assert resp.status_code == 422 def test_invalid_metric_name_returns_422(self, client): resp = client.post("/api/score", json={ "question": "q", "answer": "a", "contexts": "c", "metrics": ["not_a_metric"], }) assert resp.status_code == 422 def test_skipped_metrics_returned_when_no_ground_truth(self, client): resp = client.post("/api/score", json={ "question": "q", "answer": "a", "contexts": "c", "metrics": ["faithfulness", "context_recall"], }) assert resp.status_code == 200 data = resp.json() assert "context_recall" in data["skipped_metrics"] def test_contexts_split_on_separator(self, client, monkeypatch): """contexts string is split before passing to scorer.""" import webapp.api.score as score_mod calls = [] def capture(*args, **kwargs): calls.append(kwargs.get("contexts", [])) return {"faithfulness": 0.9} monkeypatch.setattr(score_mod.inline_scorer, "score", capture) client.post("/api/score", json={ "question": "q", "answer": "a", "contexts": "ctx1 |||| ctx2", "context_separator": " |||| ", }) assert calls[0] == ["ctx1", "ctx2"] def test_bearer_token_auth_required_when_configured(self, monkeypatch): """When SCORE_API_TOKEN is set, requests without token get 401.""" import webapp.api.score as score_mod from rag_eval.settings import EvaluationSettings mock_settings = EvaluationSettings(_env_file=None) object.__setattr__(mock_settings, "score_api_token", "secret-token") monkeypatch.setattr(score_mod, "_get_settings", lambda: mock_settings) mock_scorer = MagicMock() mock_scorer.score.return_value = {"faithfulness": 0.9} monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer) from webapp.server import create_app test_client = TestClient(create_app()) # No auth header → 401 resp = test_client.post("/api/score", json={ "question": "q", "answer": "a", "contexts": "c", }) assert resp.status_code == 401 # Correct token → 200 resp = test_client.post("/api/score", json={"question": "q", "answer": "a", "contexts": "c"}, headers={"Authorization": "Bearer secret-token"}, ) assert resp.status_code == 200 def test_wrong_bearer_token_returns_401(self, monkeypatch): import webapp.api.score as score_mod from rag_eval.settings import EvaluationSettings mock_settings = EvaluationSettings(_env_file=None) object.__setattr__(mock_settings, "score_api_token", "correct-token") monkeypatch.setattr(score_mod, "_get_settings", lambda: mock_settings) mock_scorer = MagicMock() mock_scorer.score.return_value = {} monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer) from webapp.server import create_app test_client = TestClient(create_app()) resp = test_client.post("/api/score", json={"question": "q", "answer": "a", "contexts": "c"}, headers={"Authorization": "Bearer wrong-token"}, ) assert resp.status_code == 401 ``` - [ ] **Step 2: Run to verify FAIL** ``` python -m pytest tests/webapp/test_score_api.py::TestScoreEndpoint -v ``` Expected: `ModuleNotFoundError: No module named 'webapp.api.score'` - [ ] **Step 3: Create `webapp/api/score.py`** ```python """Route for real-time single-sample RAGAS scoring (Dify external Tool endpoint).""" from __future__ import annotations import time from fastapi import APIRouter, Header, HTTPException from typing import Annotated from rag_eval.metrics.weights import compute_weighted_score from rag_eval.settings import EvaluationSettings from webapp.models import ScoreRequest, ScoreResponse from webapp.services.inline_scorer import inline_scorer router = APIRouter(prefix="/api/score", tags=["score"]) def _get_settings() -> EvaluationSettings: """Return a fresh EvaluationSettings instance (overridable in tests).""" return EvaluationSettings() def _check_auth(authorization: str | None, token: str) -> None: """Raise 401 if Bearer token does not match the configured token.""" if authorization is None: raise HTTPException(status_code=401, detail="Missing Authorization header.") parts = authorization.split(" ", 1) if len(parts) != 2 or parts[0].lower() != "bearer" or parts[1] != token: raise HTTPException(status_code=401, detail="Invalid Bearer token.") @router.post( "", response_model=ScoreResponse, summary="单题实时评分(Dify 外部 Tool)", responses={ 200: {"description": "各指标得分和加权综合得分。"}, 401: {"description": "配置了 SCORE_API_TOKEN 但未提供有效 Bearer token。"}, 422: {"description": "请求参数校验失败。"}, }, ) def score_sample( request: ScoreRequest, authorization: Annotated[str | None, Header()] = None, ) -> ScoreResponse: """接受单条问答记录,同步运行 RAGAS 指标打分,实时返回各指标得分。 供 Dify 外部 Tool 调用。将 `contexts` 字段按 `context_separator` 拆分后传入 RAGAS 管道;`ground_truth` 缺失时自动跳过依赖它的指标。 """ settings = _get_settings() # 鉴权(仅在配置了 token 时生效) if settings.score_api_token: _check_auth(authorization, settings.score_api_token) judge_model = request.judge_model or settings.ragas_judge_model embedding_model = request.embedding_model or settings.ragas_embedding_model effective = request.effective_metrics() requested = set(request.metrics) skipped = sorted(requested - set(effective)) if not effective: # All requested metrics require ground_truth which is absent. return ScoreResponse( scores={m: None for m in request.metrics}, weighted_score=None, latency_ms=0, skipped_metrics=skipped, ) t0 = time.monotonic() try: raw_scores = inline_scorer.score( question=request.question, answer=request.answer, contexts=request.contexts_as_list(), ground_truth=request.ground_truth, metrics=effective, judge_model=judge_model, embedding_model=embedding_model, settings=settings, ) except Exception as exc: # noqa: BLE001 latency_ms = int((time.monotonic() - t0) * 1000) return ScoreResponse( scores={}, weighted_score=None, latency_ms=latency_ms, skipped_metrics=skipped, error=f"{type(exc).__name__}: {exc}", ) latency_ms = int((time.monotonic() - t0) * 1000) # Merge: skipped metrics appear as null in final scores dict. all_scores: dict[str, float | None] = {m: None for m in request.metrics} all_scores.update(raw_scores) # Weighted score = equal-weight mean of non-null effective scores. weighted = compute_weighted_score( {k: v for k, v in raw_scores.items() if v is not None}, {}, ) return ScoreResponse( scores=all_scores, weighted_score=round(weighted, 4) if weighted is not None else None, latency_ms=latency_ms, skipped_metrics=skipped, ) ``` - [ ] **Step 4: Register router in `webapp/server.py`** Add `score` to the import line: ```python from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score ``` Add the router registration after `pipeline.router`: ```python app.include_router(score.router) ``` Add `"score"` tag to `OPENAPI_TAGS` list (insert before `"meta"`): ```python { "name": "score", "description": ( "**实时评分 API(Dify 外部 Tool)**\n\n" "接受单条问答记录 `(question, answer, contexts, ground_truth)`,\n" "同步运行 RAGAS 指标打分,返回各指标得分和加权综合得分。\n\n" "适用场景:Dify Agent 在回答后即时调用,用于质量监控或自我改进。\n\n" "**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 " "`Authorization: Bearer ` 请求头。" ), }, ``` Also update the `description` field in `FastAPI(...)` to add a bullet: ```python "- **实时评分 API** — 供 Dify 外部 Tool 调用的单题 RAGAS 评分接口\n" ``` - [ ] **Step 5: Run to verify PASS** ``` python -m pytest tests/webapp/test_score_api.py -v ``` Expected: all tests PASS. - [ ] **Step 6: Verify server boots and route appears** ``` python -c " from webapp.server import create_app app = create_app() routes = [(r.path, list(getattr(r,'methods',[]))) for r in app.routes] score_routes = [(p,m) for p,m in routes if 'score' in p] print('Score routes:', score_routes) " ``` Expected output: ``` Score routes: [('/api/score', ['POST'])] ``` - [ ] **Step 7: Commit** ``` git add webapp/api/score.py webapp/server.py tests/webapp/test_score_api.py git commit -m "feat: add POST /api/score endpoint for Dify real-time scoring" ``` --- ## Task 4: 全量回归 + `.env.example` 更新 **Files:** - Modify: `.env.example` - [ ] **Step 1: Add SCORE_API_TOKEN to `.env.example`** Add this block after `DATASET_GENERATOR_MODEL=qwen3.6-plus`: ``` # ===== Dify 集成 — 实时评分 API ===== # 为 /api/score 端点设置 Bearer Token 鉴权(留空则不鉴权,适合内网部署) # Dify 外部 Tool 配置 Authorization: Bearer <此处填写相同值> SCORE_API_TOKEN= ``` - [ ] **Step 2: Run full test suite** ``` python -m pytest tests/ -v --tb=short ``` Pre-existing failures to ignore: - `test_normalize_sample_pdf_offline_smoke_row` — 缺少 CSV fixture - `test_evaluator_and_reporting_write_run_assets` — 预存在的断言不匹配 - `test_question_generator_rejects_invalid_json` — retry 循环吞掉了 ValueError - `test_question_generator_rejects_non_list_samples` — 同上 **零新增失败**即为通过。 - [ ] **Step 3: Final commit** ``` git add .env.example git commit -m "feat: Dify score API complete — add SCORE_API_TOKEN to .env.example - POST /api/score: real-time RAGAS scoring for Dify external Tool - ScoreRequest/ScoreResponse Pydantic models with full field docs - InlineScorer with (judge_model, embedding_model) client cache - Bearer token auth via SCORE_API_TOKEN env var (optional) - contexts split by configurable separator (default ' |||| ') - GT-dependent metrics auto-skipped when ground_truth absent - Full test coverage (22 new tests) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>" ``` --- ## Dify 侧配置参考 任务完成后,在 Dify 「工具」→「自定义工具」中填写如下 OpenAPI Schema: ```yaml openapi: 3.1.0 info: title: RAGAS 实时评分 version: 1.0.0 servers: - url: http://:8800 paths: /api/score: post: operationId: scoreQA summary: 对一条问答记录进行 RAGAS 评分 requestBody: required: true content: application/json: schema: type: object required: [question, answer, contexts] properties: question: { type: string } answer: { type: string } contexts: { type: string, description: "多段上下文用 ' |||| ' 拼接" } ground_truth: { type: string } metrics: type: array items: { type: string } default: [faithfulness, answer_relevancy, context_recall, context_precision] responses: '200': description: 评分结果 content: application/json: schema: type: object properties: scores: { type: object } weighted_score: { type: number } latency_ms: { type: integer } skipped_metrics: { type: array, items: { type: string } } ```