Files
siemens_ragas/docs/superpowers/plans/2026-06-22-dify-score-api.md

34 KiB
Raw Permalink Blame History

Dify 实时评分 API Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 新增 POST /api/score 端点,供 Dify 外部 Tool 调用,接受单条问答记录并同步返回 RAGAS 各指标得分。

Architecture: 新增 inline_scorer.py 服务层封装 RAGAS 打分逻辑,以 (judge_model, embedding_model) 为 key 缓存 LLM 客户端;新增 webapp/api/score.py 路由;ScoreRequest/ScoreResponse 放入 webapp/models.pySCORE_API_TOKEN 加入 EvaluationSettings

Tech Stack: Python 3.12, FastAPI, Pydantic v2, RAGAS 0.4.3, pytest

Global Constraints

  • Python 3.12+PEP 84 空格缩进,类型注解必须
  • contexts 用 context_separator(默认 " |||| ")拆分为 list[str]
  • ground_truth 为可选;缺失时跳过 context_recall / factual_correctness / semantic_similarity / noise_sensitivity
  • SCORE_API_TOKEN 为空时不鉴权(内网部署场景)
  • 所有测试用 pytest不依赖真实 LLM

文件清单

操作 文件 职责
新建 webapp/services/inline_scorer.py LLM 客户端缓存 + 单题打分
新建 webapp/api/score.py /api/score 路由
新建 tests/webapp/test_score_api.py 端点测试(全 mock
修改 webapp/models.py 新增 ScoreRequest / ScoreResponse
修改 rag_eval/settings.py 新增 score_api_token 字段
修改 webapp/server.py 注册 score router更新 OPENAPI_TAGS 和 description

Task 1: ScoreRequest / ScoreResponse 模型 + settings 字段

Files:

  • Modify: webapp/models.py
  • Modify: rag_eval/settings.py
  • Test: tests/webapp/test_score_api.py (partial — model validation tests)

Interfaces:

  • Produces:

    • ScoreRequest Pydantic model见下方字段
    • ScoreResponse Pydantic model
    • EvaluationSettings.score_api_token: str | None
  • Step 1: Write failing model-validation tests

Create tests/webapp/test_score_api.py:

"""Tests for POST /api/score endpoint."""
from __future__ import annotations

import math
import pytest
from pydantic import ValidationError
from webapp.models import ScoreRequest, ScoreResponse


class TestScoreRequest:
    def test_minimal_valid_request(self):
        """Only required fields — question, answer, contexts."""
        req = ScoreRequest(
            question="What is CT?",
            answer="CT is imaging.",
            contexts="CT uses X-rays.",
        )
        assert req.question == "What is CT?"
        assert req.contexts == "CT uses X-rays."
        assert req.ground_truth is None
        assert req.context_separator == " |||| "
        assert req.metrics == ["faithfulness", "answer_relevancy", "context_recall", "context_precision"]

    def test_contexts_split_by_separator(self):
        """contexts_as_list() splits on context_separator."""
        req = ScoreRequest(
            question="q", answer="a",
            contexts="ctx1 |||| ctx2 |||| ctx3",
            context_separator=" |||| ",
        )
        assert req.contexts_as_list() == ["ctx1", "ctx2", "ctx3"]

    def test_contexts_split_custom_separator(self):
        req = ScoreRequest(
            question="q", answer="a",
            contexts="a---b---c",
            context_separator="---",
        )
        assert req.contexts_as_list() == ["a", "b", "c"]

    def test_contexts_split_single_item(self):
        req = ScoreRequest(question="q", answer="a", contexts="only one")
        assert req.contexts_as_list() == ["only one"]

    def test_missing_question_raises(self):
        with pytest.raises(ValidationError):
            ScoreRequest(answer="a", contexts="c")  # type: ignore[call-arg]

    def test_missing_answer_raises(self):
        with pytest.raises(ValidationError):
            ScoreRequest(question="q", contexts="c")  # type: ignore[call-arg]

    def test_missing_contexts_raises(self):
        with pytest.raises(ValidationError):
            ScoreRequest(question="q", answer="a")  # type: ignore[call-arg]

    def test_custom_metrics_accepted(self):
        req = ScoreRequest(
            question="q", answer="a", contexts="c",
            metrics=["faithfulness"],
        )
        assert req.metrics == ["faithfulness"]

    def test_invalid_metric_name_raises(self):
        with pytest.raises(ValidationError):
            ScoreRequest(question="q", answer="a", contexts="c", metrics=["not_a_metric"])

    def test_effective_metrics_drops_ground_truth_dependent_when_missing(self):
        """Without ground_truth, GT-dependent metrics are excluded."""
        req = ScoreRequest(
            question="q", answer="a", contexts="c",
            metrics=["faithfulness", "context_recall", "factual_correctness", "semantic_similarity", "noise_sensitivity"],
        )
        effective = req.effective_metrics()
        assert "faithfulness" in effective
        assert "context_recall" not in effective
        assert "factual_correctness" not in effective
        assert "semantic_similarity" not in effective
        assert "noise_sensitivity" not in effective

    def test_effective_metrics_keeps_all_when_ground_truth_present(self):
        req = ScoreRequest(
            question="q", answer="a", contexts="c", ground_truth="gt",
            metrics=["faithfulness", "context_recall", "factual_correctness"],
        )
        effective = req.effective_metrics()
        assert effective == ["faithfulness", "context_recall", "factual_correctness"]


class TestScoreResponse:
    def test_score_response_structure(self):
        resp = ScoreResponse(
            scores={"faithfulness": 0.85, "answer_relevancy": None},
            weighted_score=0.85,
            latency_ms=1200,
        )
        assert resp.scores["faithfulness"] == 0.85
        assert resp.scores["answer_relevancy"] is None
        assert resp.latency_ms == 1200
  • Step 2: Run to verify FAIL
cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas
python -m pytest tests/webapp/test_score_api.py::TestScoreRequest tests/webapp/test_score_api.py::TestScoreResponse -v

Expected: ImportError: cannot import name 'ScoreRequest' from 'webapp.models'

  • Step 3: Add ScoreRequest and ScoreResponse to webapp/models.py

Append to the end of webapp/models.py (after PipelineJobResponse):

# ---------------------------------------------------------------------------
# Dify 实时评分 API 模型
# ---------------------------------------------------------------------------

# 需要 ground_truth 才能计算的指标集合
_GT_DEPENDENT_METRICS: frozenset[str] = frozenset({
    "context_recall",
    "factual_correctness",
    "semantic_similarity",
    "noise_sensitivity",
})

# 所有合法指标名称
_VALID_METRICS: frozenset[str] = frozenset({
    "faithfulness",
    "answer_relevancy",
    "context_recall",
    "context_precision",
    "noise_sensitivity",
    "factual_correctness",
    "semantic_similarity",
})

_DEFAULT_SCORE_METRICS: list[str] = [
    "faithfulness",
    "answer_relevancy",
    "context_recall",
    "context_precision",
]


class ScoreRequest(BaseModel):
    """Request body for the real-time single-sample scoring endpoint."""

    model_config = ConfigDict(
        json_schema_extra={
            "examples": [
                {
                    "summary": "基础评分请求",
                    "value": {
                        "question": "双源CT的时间分辨率是多少?",
                        "answer": "双源CT的单扇区时间分辨率为75ms。",
                        "contexts": "双源CT采用两套管-探测器系统 |||| 单扇区采集旋转135度",
                        "ground_truth": "双源CT单扇区时间分辨率为75ms需旋转135度。",
                        "context_separator": " |||| ",
                        "metrics": ["faithfulness", "answer_relevancy", "context_recall", "context_precision"],
                        "judge_model": "deepseek-v4-flash",
                        "embedding_model": "text-embedding-v3",
                    },
                }
            ]
        }
    )

    question: str = Field(description="问题文本。")
    answer: str = Field(description="待评分的回答。")
    contexts: str = Field(
        description="检索上下文字符串,多段之间用 context_separator 拼接。"
    )
    ground_truth: str | None = Field(
        default=None,
        description="标准参考答案(可选)。缺失时自动跳过需要它的指标。",
    )
    context_separator: str = Field(
        default=" |||| ",
        description="contexts 字段中段落分隔符,默认为四个竖线两侧各一空格。",
    )
    metrics: list[str] = Field(
        default_factory=lambda: list(_DEFAULT_SCORE_METRICS),
        description="需要计算的 RAGAS 指标列表。",
    )
    judge_model: str | None = Field(
        default=None,
        description="Judge LLM 模型名称;为 null 时使用 .env 中的 RAGAS_JUDGE_MODEL。",
    )
    embedding_model: str | None = Field(
        default=None,
        description="Embedding 模型名称;为 null 时使用 .env 中的 RAGAS_EMBEDDING_MODEL。",
    )

    @field_validator("metrics")
    @classmethod
    def validate_metric_names(cls, value: list[str]) -> list[str]:
        """Reject any metric name not in the supported registry."""
        invalid = [m for m in value if m not in _VALID_METRICS]
        if invalid:
            raise ValueError(
                f"不支持的指标名称:{invalid}。"
                f"合法值:{sorted(_VALID_METRICS)}"
            )
        if not value:
            raise ValueError("metrics 不能为空列表。")
        return value

    def contexts_as_list(self) -> list[str]:
        """Split the contexts string into a list of non-empty fragments."""
        sep = self.context_separator or " |||| "
        return [s.strip() for s in self.contexts.split(sep) if s.strip()]

    def effective_metrics(self) -> list[str]:
        """Return metrics filtered to exclude GT-dependent ones when ground_truth is absent."""
        if self.ground_truth is not None:
            return list(self.metrics)
        return [m for m in self.metrics if m not in _GT_DEPENDENT_METRICS]


class ScoreResponse(BaseModel):
    """Response payload for the real-time scoring endpoint."""

    scores: dict[str, float | None] = Field(
        description="各指标得分NaN 或计算失败时为 null。"
    )
    weighted_score: float | None = Field(
        default=None,
        description="等权加权综合得分(仅对非 null 指标求均值)。",
    )
    latency_ms: int = Field(description="服务端打分耗时(毫秒)。")
    skipped_metrics: list[str] = Field(
        default_factory=list,
        description="因缺少 ground_truth 而跳过的指标名称列表。",
    )
    error: str | None = Field(
        default=None,
        description="打分异常时的错误信息HTTP 200 仍返回scores 为空)。",
    )

Also add field_validator to the import line at the top of webapp/models.py:

from pydantic import BaseModel, ConfigDict, Field, field_validator
  • Step 4: Add score_api_token to rag_eval/settings.py

Add after the dataset_generator_model field:

score_api_token: str | None = Field(
    default=None,
    alias="SCORE_API_TOKEN",
    description="Bearer token for /api/score endpoint. Empty = no auth.",
)
  • Step 5: Run to verify PASS
python -m pytest tests/webapp/test_score_api.py::TestScoreRequest tests/webapp/test_score_api.py::TestScoreResponse -v

Expected: all 12 tests PASS.

  • Step 6: Commit
git add webapp/models.py rag_eval/settings.py tests/webapp/test_score_api.py
git commit -m "feat: add ScoreRequest/ScoreResponse models and SCORE_API_TOKEN setting"

Task 2: InlineScorer 服务LLM 缓存 + 打分)

Files:

  • Create: webapp/services/inline_scorer.py

Interfaces:

  • Consumes:

    • build_models(judge_model, embedding_model, settings) -> tuple[Any, Any] from rag_eval.metrics.factory
    • MetricPipeline(metrics, metric_timeout_seconds) from rag_eval.metrics.pipeline
    • NormalizedSample from rag_eval.shared.models
    • compute_weighted_score(scores, metric_weights) -> float | None from rag_eval.metrics.weights
    • EvaluationSettings from rag_eval.settings
  • Produces:

    • inline_scorer: InlineScorer (module-level singleton)
    • InlineScorer.score(question, answer, contexts, ground_truth, metrics, judge_model, embedding_model, settings) -> dict[str, float | None]
  • Step 1: Write failing test

Add to tests/webapp/test_score_api.py:

class TestInlineScorer:
    def test_score_returns_dict_with_requested_metrics(self):
        """InlineScorer.score returns a dict keyed by the requested metrics."""
        from unittest.mock import AsyncMock, MagicMock, patch
        from webapp.services.inline_scorer import InlineScorer
        from rag_eval.settings import EvaluationSettings

        mock_score = MagicMock()
        mock_score.metrics = {"faithfulness": 0.9, "answer_relevancy": 0.8}
        mock_score.error = ""

        mock_pipeline = MagicMock()
        mock_pipeline.score_sample = AsyncMock(return_value=mock_score)

        with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())):
            with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline):
                with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}):
                    scorer = InlineScorer()
                    result = scorer.score(
                        question="q", answer="a",
                        contexts=["ctx1"],
                        ground_truth=None,
                        metrics=["faithfulness", "answer_relevancy"],
                        judge_model="test-model",
                        embedding_model="test-embed",
                        settings=EvaluationSettings(_env_file=None),
                    )
        assert "faithfulness" in result
        assert "answer_relevancy" in result
        assert result["faithfulness"] == pytest.approx(0.9)

    def test_score_converts_nan_to_none(self):
        """NaN scores are converted to None in the returned dict."""
        import math
        from unittest.mock import AsyncMock, MagicMock, patch
        from webapp.services.inline_scorer import InlineScorer
        from rag_eval.settings import EvaluationSettings

        mock_score = MagicMock()
        mock_score.metrics = {"faithfulness": float("nan")}
        mock_score.error = ""

        mock_pipeline = MagicMock()
        mock_pipeline.score_sample = AsyncMock(return_value=mock_score)

        with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())):
            with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline):
                with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}):
                    scorer = InlineScorer()
                    result = scorer.score(
                        question="q", answer="a", contexts=["c"],
                        ground_truth=None,
                        metrics=["faithfulness"],
                        judge_model="m", embedding_model="e",
                        settings=EvaluationSettings(_env_file=None),
                    )
        assert result["faithfulness"] is None
  • Step 2: Run to verify FAIL
python -m pytest tests/webapp/test_score_api.py::TestInlineScorer -v

Expected: ModuleNotFoundError: No module named 'webapp.services.inline_scorer'

  • Step 3: Create webapp/services/inline_scorer.py
"""LLM-cached inline RAGAS scorer for the real-time /api/score endpoint.

A module-level InlineScorer singleton caches (llm, embeddings) pairs keyed by
(judge_model, embedding_model), so repeated Dify Tool calls with the same
models reuse existing AsyncOpenAI connections instead of creating new ones.
"""

from __future__ import annotations

import asyncio
import math
import threading
from typing import Any

from rag_eval.compat import ensure_ragas_import_compat
from rag_eval.metrics.factory import build_models
from rag_eval.metrics.pipeline import MetricPipeline
from rag_eval.metrics.weights import compute_weighted_score
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.models import NormalizedSample

ensure_ragas_import_compat()

from ragas.metrics.collections import (  # noqa: E402
    AnswerRelevancy,
    ContextPrecision,
    ContextRecall,
    FactualCorrectness,
    Faithfulness,
    NoiseSensitivity,
    SemanticSimilarity,
)


def _build_metric_instances(metrics: list[str], llm: Any, embeddings: Any) -> dict[str, Any]:
    """Instantiate only the RAGAS metric objects requested."""
    registry: dict[str, Any] = {
        "faithfulness": Faithfulness(llm=llm),
        "answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings),
        "context_recall": ContextRecall(llm=llm),
        "context_precision": ContextPrecision(llm=llm),
        "noise_sensitivity": NoiseSensitivity(llm=llm),
        "factual_correctness": FactualCorrectness(llm=llm),
        "semantic_similarity": SemanticSimilarity(embeddings=embeddings),
    }
    return {name: registry[name] for name in metrics if name in registry}


class InlineScorer:
    """Thread-safe single-sample RAGAS scorer with LLM client caching."""

    def __init__(self) -> None:
        # Cache keyed by (judge_model, embedding_model) -> (llm, embeddings)
        self._model_cache: dict[tuple[str, str], tuple[Any, Any]] = {}
        self._lock = threading.Lock()

    def _get_models(
        self,
        judge_model: str,
        embedding_model: str,
        settings: EvaluationSettings,
    ) -> tuple[Any, Any]:
        """Return cached LLM/embedding clients, building them on first use."""
        cache_key = (judge_model, embedding_model)
        with self._lock:
            if cache_key not in self._model_cache:
                llm, embeddings = build_models(judge_model, embedding_model, settings)
                self._model_cache[cache_key] = (llm, embeddings)
            return self._model_cache[cache_key]

    def score(
        self,
        question: str,
        answer: str,
        contexts: list[str],
        ground_truth: str | None,
        metrics: list[str],
        judge_model: str,
        embedding_model: str,
        settings: EvaluationSettings,
    ) -> dict[str, float | None]:
        """Score one sample synchronously and return {metric_name: score | None}.

        NaN values from RAGAS are converted to None for clean JSON serialization.
        """
        llm, embeddings = self._get_models(judge_model, embedding_model, settings)
        metric_instances = _build_metric_instances(metrics, llm, embeddings)

        pipeline = MetricPipeline(
            metrics=metric_instances,
            metric_timeout_seconds=settings.ragas_metric_timeout_seconds,
        )

        sample = NormalizedSample(
            sample_id="inline-score",
            question=question,
            answer=answer,
            contexts=contexts,
            ground_truth=ground_truth or "",
        )

        metric_score = asyncio.run(pipeline.score_sample(sample))

        # Convert NaN → None for clean JSON output
        return {
            name: (None if math.isnan(v) or math.isinf(v) else round(v, 4))
            for name, v in metric_score.metrics.items()
        }


# Module-level singleton shared by FastAPI routes.
inline_scorer = InlineScorer()
  • Step 4: Run to verify PASS
python -m pytest tests/webapp/test_score_api.py::TestInlineScorer -v

Expected: both tests PASS.

  • Step 5: Commit
git add webapp/services/inline_scorer.py tests/webapp/test_score_api.py
git commit -m "feat: add InlineScorer service with LLM client caching"

Task 3: /api/score 路由 + 鉴权 + 集成测试

Files:

  • Create: webapp/api/score.py
  • Modify: webapp/server.py

Interfaces:

  • Consumes:

    • ScoreRequest, ScoreResponse from webapp.models
    • inline_scorer: InlineScorer from webapp.services.inline_scorer
    • EvaluationSettings from rag_eval.settings
    • compute_weighted_score(scores, {}) -> float | None from rag_eval.metrics.weights
  • Produces: POST /api/score endpoint

  • Step 1: Write failing endpoint tests

Add to tests/webapp/test_score_api.py:

# ── Fixtures ─────────────────────────────────────────────────────────────────
import pytest
from fastapi.testclient import TestClient
from unittest.mock import MagicMock, patch


@pytest.fixture()
def client(monkeypatch):
    """TestClient with mocked InlineScorer."""
    import webapp.api.score as score_mod

    mock_scorer = MagicMock()
    mock_scorer.score.return_value = {
        "faithfulness": 0.85,
        "answer_relevancy": 0.90,
    }
    monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer)

    from webapp.server import create_app
    return TestClient(create_app())


class TestScoreEndpoint:
    def test_post_score_returns_200(self, client):
        resp = client.post("/api/score", json={
            "question": "What is CT?",
            "answer": "CT is imaging.",
            "contexts": "CT uses X-rays.",
        })
        assert resp.status_code == 200
        data = resp.json()
        assert "scores" in data
        assert "latency_ms" in data
        assert data["scores"]["faithfulness"] == pytest.approx(0.85)

    def test_weighted_score_computed(self, client):
        resp = client.post("/api/score", json={
            "question": "q", "answer": "a", "contexts": "c",
        })
        assert resp.status_code == 200
        data = resp.json()
        # weighted_score is the mean of all non-null scores
        assert data["weighted_score"] is not None

    def test_missing_required_fields_returns_422(self, client):
        resp = client.post("/api/score", json={"question": "q"})
        assert resp.status_code == 422

    def test_invalid_metric_name_returns_422(self, client):
        resp = client.post("/api/score", json={
            "question": "q", "answer": "a", "contexts": "c",
            "metrics": ["not_a_metric"],
        })
        assert resp.status_code == 422

    def test_skipped_metrics_returned_when_no_ground_truth(self, client):
        resp = client.post("/api/score", json={
            "question": "q", "answer": "a", "contexts": "c",
            "metrics": ["faithfulness", "context_recall"],
        })
        assert resp.status_code == 200
        data = resp.json()
        assert "context_recall" in data["skipped_metrics"]

    def test_contexts_split_on_separator(self, client, monkeypatch):
        """contexts string is split before passing to scorer."""
        import webapp.api.score as score_mod
        calls = []
        def capture(*args, **kwargs):
            calls.append(kwargs.get("contexts", []))
            return {"faithfulness": 0.9}
        monkeypatch.setattr(score_mod.inline_scorer, "score", capture)

        client.post("/api/score", json={
            "question": "q", "answer": "a",
            "contexts": "ctx1 |||| ctx2",
            "context_separator": " |||| ",
        })
        assert calls[0] == ["ctx1", "ctx2"]

    def test_bearer_token_auth_required_when_configured(self, monkeypatch):
        """When SCORE_API_TOKEN is set, requests without token get 401."""
        import webapp.api.score as score_mod
        from rag_eval.settings import EvaluationSettings

        mock_settings = EvaluationSettings(_env_file=None)
        object.__setattr__(mock_settings, "score_api_token", "secret-token")
        monkeypatch.setattr(score_mod, "_get_settings", lambda: mock_settings)

        mock_scorer = MagicMock()
        mock_scorer.score.return_value = {"faithfulness": 0.9}
        monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer)

        from webapp.server import create_app
        test_client = TestClient(create_app())

        # No auth header → 401
        resp = test_client.post("/api/score", json={
            "question": "q", "answer": "a", "contexts": "c",
        })
        assert resp.status_code == 401

        # Correct token → 200
        resp = test_client.post("/api/score",
            json={"question": "q", "answer": "a", "contexts": "c"},
            headers={"Authorization": "Bearer secret-token"},
        )
        assert resp.status_code == 200

    def test_wrong_bearer_token_returns_401(self, monkeypatch):
        import webapp.api.score as score_mod
        from rag_eval.settings import EvaluationSettings

        mock_settings = EvaluationSettings(_env_file=None)
        object.__setattr__(mock_settings, "score_api_token", "correct-token")
        monkeypatch.setattr(score_mod, "_get_settings", lambda: mock_settings)

        mock_scorer = MagicMock()
        mock_scorer.score.return_value = {}
        monkeypatch.setattr(score_mod, "inline_scorer", mock_scorer)

        from webapp.server import create_app
        test_client = TestClient(create_app())
        resp = test_client.post("/api/score",
            json={"question": "q", "answer": "a", "contexts": "c"},
            headers={"Authorization": "Bearer wrong-token"},
        )
        assert resp.status_code == 401
  • Step 2: Run to verify FAIL
python -m pytest tests/webapp/test_score_api.py::TestScoreEndpoint -v

Expected: ModuleNotFoundError: No module named 'webapp.api.score'

  • Step 3: Create webapp/api/score.py
"""Route for real-time single-sample RAGAS scoring (Dify external Tool endpoint)."""

from __future__ import annotations

import time

from fastapi import APIRouter, Header, HTTPException
from typing import Annotated

from rag_eval.metrics.weights import compute_weighted_score
from rag_eval.settings import EvaluationSettings
from webapp.models import ScoreRequest, ScoreResponse
from webapp.services.inline_scorer import inline_scorer

router = APIRouter(prefix="/api/score", tags=["score"])


def _get_settings() -> EvaluationSettings:
    """Return a fresh EvaluationSettings instance (overridable in tests)."""
    return EvaluationSettings()


def _check_auth(authorization: str | None, token: str) -> None:
    """Raise 401 if Bearer token does not match the configured token."""
    if authorization is None:
        raise HTTPException(status_code=401, detail="Missing Authorization header.")
    parts = authorization.split(" ", 1)
    if len(parts) != 2 or parts[0].lower() != "bearer" or parts[1] != token:
        raise HTTPException(status_code=401, detail="Invalid Bearer token.")


@router.post(
    "",
    response_model=ScoreResponse,
    summary="单题实时评分Dify 外部 Tool",
    responses={
        200: {"description": "各指标得分和加权综合得分。"},
        401: {"description": "配置了 SCORE_API_TOKEN 但未提供有效 Bearer token。"},
        422: {"description": "请求参数校验失败。"},
    },
)
def score_sample(
    request: ScoreRequest,
    authorization: Annotated[str | None, Header()] = None,
) -> ScoreResponse:
    """接受单条问答记录,同步运行 RAGAS 指标打分,实时返回各指标得分。

    供 Dify 外部 Tool 调用。将 `contexts` 字段按 `context_separator` 拆分后传入
    RAGAS 管道;`ground_truth` 缺失时自动跳过依赖它的指标。
    """
    settings = _get_settings()

    # 鉴权(仅在配置了 token 时生效)
    if settings.score_api_token:
        _check_auth(authorization, settings.score_api_token)

    judge_model = request.judge_model or settings.ragas_judge_model
    embedding_model = request.embedding_model or settings.ragas_embedding_model
    effective = request.effective_metrics()
    requested = set(request.metrics)
    skipped = sorted(requested - set(effective))

    if not effective:
        # All requested metrics require ground_truth which is absent.
        return ScoreResponse(
            scores={m: None for m in request.metrics},
            weighted_score=None,
            latency_ms=0,
            skipped_metrics=skipped,
        )

    t0 = time.monotonic()
    try:
        raw_scores = inline_scorer.score(
            question=request.question,
            answer=request.answer,
            contexts=request.contexts_as_list(),
            ground_truth=request.ground_truth,
            metrics=effective,
            judge_model=judge_model,
            embedding_model=embedding_model,
            settings=settings,
        )
    except Exception as exc:  # noqa: BLE001
        latency_ms = int((time.monotonic() - t0) * 1000)
        return ScoreResponse(
            scores={},
            weighted_score=None,
            latency_ms=latency_ms,
            skipped_metrics=skipped,
            error=f"{type(exc).__name__}: {exc}",
        )

    latency_ms = int((time.monotonic() - t0) * 1000)

    # Merge: skipped metrics appear as null in final scores dict.
    all_scores: dict[str, float | None] = {m: None for m in request.metrics}
    all_scores.update(raw_scores)

    # Weighted score = equal-weight mean of non-null effective scores.
    weighted = compute_weighted_score(
        {k: v for k, v in raw_scores.items() if v is not None},
        {},
    )

    return ScoreResponse(
        scores=all_scores,
        weighted_score=round(weighted, 4) if weighted is not None else None,
        latency_ms=latency_ms,
        skipped_metrics=skipped,
    )
  • Step 4: Register router in webapp/server.py

Add score to the import line:

from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score

Add the router registration after pipeline.router:

app.include_router(score.router)

Add "score" tag to OPENAPI_TAGS list (insert before "meta"):

    {
        "name": "score",
        "description": (
            "**实时评分 APIDify 外部 Tool**\n\n"
            "接受单条问答记录 `(question, answer, contexts, ground_truth)`\n"
            "同步运行 RAGAS 指标打分,返回各指标得分和加权综合得分。\n\n"
            "适用场景Dify Agent 在回答后即时调用,用于质量监控或自我改进。\n\n"
            "**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
            "`Authorization: Bearer <token>` 请求头。"
        ),
    },

Also update the description field in FastAPI(...) to add a bullet:

"- **实时评分 API** — 供 Dify 外部 Tool 调用的单题 RAGAS 评分接口\n"
  • Step 5: Run to verify PASS
python -m pytest tests/webapp/test_score_api.py -v

Expected: all tests PASS.

  • Step 6: Verify server boots and route appears
python -c "
from webapp.server import create_app
app = create_app()
routes = [(r.path, list(getattr(r,'methods',[]))) for r in app.routes]
score_routes = [(p,m) for p,m in routes if 'score' in p]
print('Score routes:', score_routes)
"

Expected output:

Score routes: [('/api/score', ['POST'])]
  • Step 7: Commit
git add webapp/api/score.py webapp/server.py tests/webapp/test_score_api.py
git commit -m "feat: add POST /api/score endpoint for Dify real-time scoring"

Task 4: 全量回归 + .env.example 更新

Files:

  • Modify: .env.example

  • Step 1: Add SCORE_API_TOKEN to .env.example

Add this block after DATASET_GENERATOR_MODEL=qwen3.6-plus:

# ===== Dify 集成 — 实时评分 API =====
# 为 /api/score 端点设置 Bearer Token 鉴权(留空则不鉴权,适合内网部署)
# Dify 外部 Tool 配置 Authorization: Bearer <此处填写相同值>
SCORE_API_TOKEN=
  • Step 2: Run full test suite
python -m pytest tests/ -v --tb=short

Pre-existing failures to ignore:

  • test_normalize_sample_pdf_offline_smoke_row — 缺少 CSV fixture
  • test_evaluator_and_reporting_write_run_assets — 预存在的断言不匹配
  • test_question_generator_rejects_invalid_json — retry 循环吞掉了 ValueError
  • test_question_generator_rejects_non_list_samples — 同上

零新增失败即为通过。

  • Step 3: Final commit
git add .env.example
git commit -m "feat: Dify score API complete — add SCORE_API_TOKEN to .env.example

- POST /api/score: real-time RAGAS scoring for Dify external Tool
- ScoreRequest/ScoreResponse Pydantic models with full field docs
- InlineScorer with (judge_model, embedding_model) client cache
- Bearer token auth via SCORE_API_TOKEN env var (optional)
- contexts split by configurable separator (default ' |||| ')
- GT-dependent metrics auto-skipped when ground_truth absent
- Full test coverage (22 new tests)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>"

Dify 侧配置参考

任务完成后,在 Dify 「工具」→「自定义工具」中填写如下 OpenAPI Schema

openapi: 3.1.0
info:
  title: RAGAS 实时评分
  version: 1.0.0
servers:
  - url: http://<your-server>:8800
paths:
  /api/score:
    post:
      operationId: scoreQA
      summary: 对一条问答记录进行 RAGAS 评分
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              required: [question, answer, contexts]
              properties:
                question:       { type: string }
                answer:         { type: string }
                contexts:       { type: string, description: "多段上下文用 ' |||| ' 拼接" }
                ground_truth:   { type: string }
                metrics:
                  type: array
                  items: { type: string }
                  default: [faithfulness, answer_relevancy, context_recall, context_precision]
      responses:
        '200':
          description: 评分结果
          content:
            application/json:
              schema:
                type: object
                properties:
                  scores:         { type: object }
                  weighted_score: { type: number }
                  latency_ms:     { type: integer }
                  skipped_metrics: { type: array, items: { type: string } }