feat: add InlineScorer service with LLM client caching

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-06-22 15:03:43 +08:00
parent 761faf9c42
commit e4d4e4968b
2 changed files with 168 additions and 0 deletions

View File

@@ -126,3 +126,62 @@ class TestScoreResponse:
assert resp.scores["faithfulness"] == 0.85
assert resp.scores["answer_relevancy"] is None
assert resp.latency_ms == 1200
class TestInlineScorer:
def test_score_returns_dict_with_requested_metrics(self):
"""InlineScorer.score returns a dict keyed by the requested metrics."""
from unittest.mock import AsyncMock, MagicMock, patch
from webapp.services.inline_scorer import InlineScorer
from rag_eval.settings import EvaluationSettings
mock_score = MagicMock()
mock_score.metrics = {"faithfulness": 0.9, "answer_relevancy": 0.8}
mock_score.error = ""
mock_pipeline = MagicMock()
mock_pipeline.score_sample = AsyncMock(return_value=mock_score)
with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())):
with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline):
with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}):
scorer = InlineScorer()
result = scorer.score(
question="q", answer="a",
contexts=["ctx1"],
ground_truth=None,
metrics=["faithfulness", "answer_relevancy"],
judge_model="test-model",
embedding_model="test-embed",
settings=EvaluationSettings(_env_file=None),
)
assert "faithfulness" in result
assert "answer_relevancy" in result
assert result["faithfulness"] == pytest.approx(0.9)
def test_score_converts_nan_to_none(self):
"""NaN scores are converted to None in the returned dict."""
import math
from unittest.mock import AsyncMock, MagicMock, patch
from webapp.services.inline_scorer import InlineScorer
from rag_eval.settings import EvaluationSettings
mock_score = MagicMock()
mock_score.metrics = {"faithfulness": float("nan")}
mock_score.error = ""
mock_pipeline = MagicMock()
mock_pipeline.score_sample = AsyncMock(return_value=mock_score)
with patch("webapp.services.inline_scorer.build_models", return_value=(MagicMock(), MagicMock())):
with patch("webapp.services.inline_scorer.MetricPipeline", return_value=mock_pipeline):
with patch("webapp.services.inline_scorer._build_metric_instances", return_value={}):
scorer = InlineScorer()
result = scorer.score(
question="q", answer="a", contexts=["c"],
ground_truth=None,
metrics=["faithfulness"],
judge_model="m", embedding_model="e",
settings=EvaluationSettings(_env_file=None),
)
assert result["faithfulness"] is None

View File

@@ -0,0 +1,109 @@
"""LLM-cached inline RAGAS scorer for the real-time /api/score endpoint.
A module-level InlineScorer singleton caches (llm, embeddings) pairs keyed by
(judge_model, embedding_model), so repeated Dify Tool calls with the same
models reuse existing AsyncOpenAI connections instead of creating new ones.
"""
from __future__ import annotations
import asyncio
import math
import threading
from typing import Any
from rag_eval.compat import ensure_ragas_import_compat
from rag_eval.metrics.factory import build_models
from rag_eval.metrics.pipeline import MetricPipeline
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.models import NormalizedSample
ensure_ragas_import_compat()
from ragas.metrics.collections import ( # noqa: E402
AnswerRelevancy,
ContextPrecision,
ContextRecall,
FactualCorrectness,
Faithfulness,
NoiseSensitivity,
SemanticSimilarity,
)
def _build_metric_instances(metrics: list[str], llm: Any, embeddings: Any) -> dict[str, Any]:
"""Instantiate only the RAGAS metric objects requested."""
registry: dict[str, Any] = {
"faithfulness": Faithfulness(llm=llm),
"answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings),
"context_recall": ContextRecall(llm=llm),
"context_precision": ContextPrecision(llm=llm),
"noise_sensitivity": NoiseSensitivity(llm=llm),
"factual_correctness": FactualCorrectness(llm=llm),
"semantic_similarity": SemanticSimilarity(embeddings=embeddings),
}
return {name: registry[name] for name in metrics if name in registry}
class InlineScorer:
"""Thread-safe single-sample RAGAS scorer with LLM client caching."""
def __init__(self) -> None:
"""Initialize the scorer cache and synchronization primitives."""
# Cache keyed by (judge_model, embedding_model) -> (llm, embeddings)
self._model_cache: dict[tuple[str, str], tuple[Any, Any]] = {}
self._lock = threading.Lock()
def _get_models(
self,
judge_model: str,
embedding_model: str,
settings: EvaluationSettings,
) -> tuple[Any, Any]:
"""Return cached LLM/embedding clients, building them on first use."""
cache_key = (judge_model, embedding_model)
with self._lock:
if cache_key not in self._model_cache:
llm, embeddings = build_models(judge_model, embedding_model, settings)
self._model_cache[cache_key] = (llm, embeddings)
return self._model_cache[cache_key]
def score(
self,
question: str,
answer: str,
contexts: list[str],
ground_truth: str | None,
metrics: list[str],
judge_model: str,
embedding_model: str,
settings: EvaluationSettings,
) -> dict[str, float | None]:
"""Score one sample synchronously and return {metric_name: score | None}."""
llm, embeddings = self._get_models(judge_model, embedding_model, settings)
metric_instances = _build_metric_instances(metrics, llm, embeddings)
pipeline = MetricPipeline(
metrics=metric_instances,
metric_timeout_seconds=settings.ragas_metric_timeout_seconds,
)
sample = NormalizedSample(
sample_id="inline-score",
question=question,
answer=answer,
contexts=contexts,
ground_truth=ground_truth or "",
)
metric_score = asyncio.run(pipeline.score_sample(sample))
# Convert NaN and Inf into None for clean JSON output.
return {
name: (None if math.isnan(value) or math.isinf(value) else round(value, 4))
for name, value in metric_score.metrics.items()
}
# Module-level singleton shared by FastAPI routes.
inline_scorer = InlineScorer()