"""LLM-cached inline RAGAS scorer for the real-time /api/score endpoint. A module-level InlineScorer singleton caches (llm, embeddings) pairs keyed by (judge_model, embedding_model), so repeated Dify Tool calls with the same models reuse existing AsyncOpenAI connections instead of creating new ones. """ from __future__ import annotations import asyncio import math import threading from typing import Any from rag_eval.compat import ensure_ragas_import_compat from rag_eval.metrics.factory import build_models from rag_eval.metrics.pipeline import MetricPipeline from rag_eval.settings import EvaluationSettings from rag_eval.shared.models import NormalizedSample ensure_ragas_import_compat() from ragas.metrics.collections import ( # noqa: E402 AnswerRelevancy, ContextPrecision, ContextRecall, FactualCorrectness, Faithfulness, NoiseSensitivity, SemanticSimilarity, ) def _build_metric_instances(metrics: list[str], llm: Any, embeddings: Any) -> dict[str, Any]: """Instantiate only the RAGAS metric objects requested.""" registry: dict[str, Any] = { "faithfulness": Faithfulness(llm=llm), "answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings), "context_recall": ContextRecall(llm=llm), "context_precision": ContextPrecision(llm=llm), "noise_sensitivity": NoiseSensitivity(llm=llm), "factual_correctness": FactualCorrectness(llm=llm), "semantic_similarity": SemanticSimilarity(embeddings=embeddings), } return {name: registry[name] for name in metrics if name in registry} class InlineScorer: """Thread-safe single-sample RAGAS scorer with LLM client caching.""" def __init__(self) -> None: """Initialize the scorer cache and synchronization primitives.""" # Cache keyed by (judge_model, embedding_model) -> (llm, embeddings) self._model_cache: dict[tuple[str, str], tuple[Any, Any]] = {} self._lock = threading.Lock() def _get_models( self, judge_model: str, embedding_model: str, settings: EvaluationSettings, ) -> tuple[Any, Any]: """Return cached LLM/embedding clients, building them on first use.""" cache_key = (judge_model, embedding_model) with self._lock: if cache_key not in self._model_cache: llm, embeddings = build_models(judge_model, embedding_model, settings) self._model_cache[cache_key] = (llm, embeddings) return self._model_cache[cache_key] def score( self, question: str, answer: str, contexts: list[str], ground_truth: str | None, metrics: list[str], judge_model: str, embedding_model: str, settings: EvaluationSettings, ) -> dict[str, float | None]: """Score one sample synchronously and return {metric_name: score | None}.""" llm, embeddings = self._get_models(judge_model, embedding_model, settings) metric_instances = _build_metric_instances(metrics, llm, embeddings) pipeline = MetricPipeline( metrics=metric_instances, metric_timeout_seconds=settings.ragas_metric_timeout_seconds, ) sample = NormalizedSample( sample_id="inline-score", question=question, answer=answer, contexts=contexts, ground_truth=ground_truth or "", ) metric_score = asyncio.run(pipeline.score_sample(sample)) # Convert NaN and Inf into None for clean JSON output. return { name: (None if math.isnan(value) or math.isinf(value) else round(value, 4)) for name, value in metric_score.metrics.items() } # Module-level singleton shared by FastAPI routes. inline_scorer = InlineScorer()