Files
siemens_ragas/tests/test_offline_eval.py
wangwei 754a30ad59 feat(session-async): add /api/score/session_async with incremental session report aggregation
- New POST /api/score/session_async endpoint: same session_id calls append to one shared report
- New GET /api/score/sessions/{session_id}: returns call_count, metric_means, all job records
- New GET /api/score/session/jobs/{job_id}: individual call status
- SessionScoreJobManager: deterministic run_id from session_id, per-session mutex for CSV append, advisor regenerated on every call
- SessionScoreRequest (extends ScoreRequest + session_id), SessionScoreJobResponse, SessionStatus models added
- 24 new tests, all passing

chore(weighted-score): comment out 综合加权得分 display and computation

- report.js: hide 综合加权得分 card in report detail page
- score_jobs.js: hide 综合 chip in async job list
- report_builder.py: overall_ws=None (computation disabled)
- summary.py: weighted_score summary line disabled
- evaluator.py: weighted_score/sample_weight columns no longer written to scores.csv
- score.py /api/score: weighted_score always returns null
- score_job_manager.py + session_score_manager.py: weighted=None
- Updated 3 tests to match new behaviour (6 pre-existing failures unchanged)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-26 16:09:33 +08:00

482 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import unittest
from pathlib import Path
from unittest import mock
import pandas as pd
from pydantic_settings import SettingsConfigDict
from rag_eval.config.loader import load_scenario
from rag_eval.datasets.normalizers import normalize_records
from rag_eval.execution.evaluator import Evaluator
from rag_eval.metrics.pipeline import MetricPipeline
from rag_eval.reporting.summary import build_summary_markdown
from rag_eval.reporting.writers import write_run_artifacts
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.models import EvaluationResult
class EnvOnlySettings(EvaluationSettings):
model_config = SettingsConfigDict(env_file=None, extra="ignore")
class FakeMetric:
def __init__(self, value: float):
self.value = value
async def ascore(self, **kwargs):
class Result:
def __init__(self, value: float):
self.value = value
return Result(self.value)
class SlowMetric:
async def ascore(self, **kwargs):
await __import__("asyncio").sleep(0.05)
return type("Result", (), {"value": 1.0})()
class OpenAIConfigTests(unittest.TestCase):
def test_openai_client_kwargs_without_base_url(self) -> None:
with mock.patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}, clear=True):
settings = EnvOnlySettings()
self.assertEqual(
settings.openai_client_kwargs,
{"api_key": "test-key", "base_url": "http://6.86.80.4:30080/v1", "timeout": 30.0},
)
def test_openai_client_kwargs_with_base_url(self) -> None:
with mock.patch.dict(
os.environ,
{
"OPENAI_API_KEY": "test-key",
"OPENAI_BASE_URL": "https://proxy.example/v1",
},
clear=True,
):
settings = EnvOnlySettings()
self.assertEqual(
settings.openai_client_kwargs,
{"api_key": "test-key", "base_url": "https://proxy.example/v1", "timeout": 30.0},
)
def test_settings_defaults(self) -> None:
with mock.patch.dict(os.environ, {}, clear=True):
settings = EnvOnlySettings()
self.assertEqual(settings.openai_base_url, "http://6.86.80.4:30080/v1")
self.assertEqual(settings.ragas_judge_model, "deepseek-v4-flash")
self.assertEqual(settings.ragas_embedding_model, "text-embedding-v3")
self.assertEqual(settings.openai_timeout_seconds, 30.0)
self.assertEqual(settings.ragas_metric_timeout_seconds, 45.0)
self.assertEqual(settings.batch_size, 8)
class ScenarioAndDatasetTests(unittest.TestCase):
def test_load_scenario_resolves_relative_paths(self) -> None:
scenario = load_scenario("scenarios/offline/sample-offline.yaml")
self.assertEqual(scenario.mode, "offline")
self.assertTrue(scenario.dataset.path.name.endswith(".csv"))
self.assertTrue(scenario.output_dir.name == "sample-offline-baseline")
def test_load_scenario_metric_and_doc_weights(self) -> None:
"""load_scenario passes metric_weights and doc_weights into Scenario."""
import os
import tempfile
import yaml
from rag_eval.config.loader import load_scenario
payload = {
"scenario_name": "w-test",
"mode": "offline",
"dataset": "nonexistent.csv",
"judge_model": "m",
"embedding_model": "e",
"metrics": ["faithfulness"],
"output_dir": "out",
"metric_weights": {"faithfulness": 0.7},
"doc_weights": {"doc.pdf": 2.0},
}
with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w", encoding="utf-8", delete=False) as f:
yaml.dump(payload, f, allow_unicode=True)
tmp_path = f.name
try:
scenario = load_scenario(tmp_path)
assert scenario.metric_weights == {"faithfulness": 0.7}
assert scenario.doc_weights == {"doc.pdf": 2.0}
finally:
os.unlink(tmp_path)
def test_load_scenario_defaults_to_empty_weights(self) -> None:
"""load_scenario defaults metric_weights and doc_weights to empty dicts."""
import os
import tempfile
import yaml
from rag_eval.config.loader import load_scenario
payload = {
"scenario_name": "no-w",
"mode": "offline",
"dataset": "nonexistent.csv",
"judge_model": "m",
"embedding_model": "e",
"metrics": ["faithfulness"],
"output_dir": "out",
}
with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w", encoding="utf-8", delete=False) as f:
yaml.dump(payload, f, allow_unicode=True)
tmp_path = f.name
try:
scenario = load_scenario(tmp_path)
assert scenario.metric_weights == {}
assert scenario.doc_weights == {}
finally:
os.unlink(tmp_path)
def test_scenario_snapshot_serializes_path_static_kwargs(self) -> None:
scenario = load_scenario("scenarios/online/sample-pdf-question-bank-online.yaml")
snapshot = scenario.snapshot()
self.assertIsInstance(snapshot["app_adapter"]["static_kwargs"]["source_chunks_path"], str)
self.assertTrue(
snapshot["app_adapter"]["static_kwargs"]["source_chunks_path"].endswith("source_chunks.jsonl")
)
def test_load_sample_pdf_offline_smoke_scenario(self) -> None:
scenario = load_scenario("scenarios/offline/sample-pdf-offline-smoke.yaml")
self.assertEqual(scenario.mode, "offline")
self.assertEqual(scenario.dataset.path.name, "sample_pdf_offline_smoke.csv")
self.assertEqual(scenario.output_dir.name, "sample-pdf-offline-smoke")
def test_normalize_records_splits_valid_and_invalid(self) -> None:
records = [
{
"question": "Q1",
"contexts": '["C1"]',
"answer": "A1",
"ground_truth": "G1",
},
{
"question": "",
"contexts": "",
"answer": "",
"ground_truth": "",
},
]
valid, invalid = normalize_records(records)
self.assertEqual(len(valid), 1)
self.assertEqual(len(invalid), 1)
self.assertEqual(valid[0].contexts, ["C1"])
def test_normalize_sample_pdf_offline_smoke_row(self) -> None:
frame = pd.read_csv("datasets/normalized/sample_pdf_offline_smoke.csv")
valid, invalid = normalize_records(frame.to_dict(orient="records"))
self.assertEqual(len(invalid), 0)
self.assertEqual(len(valid), 3)
self.assertTrue(valid[0].answer)
self.assertTrue(valid[0].ground_truth)
self.assertTrue(valid[0].contexts)
class EvaluatorAndReportingTests(unittest.TestCase):
def test_merge_score_includes_weighted_score_and_sample_weight(self):
"""_merge_score no longer adds weighted_score/sample_weight (feature disabled)."""
from unittest.mock import MagicMock
from rag_eval.execution.evaluator import Evaluator
from rag_eval.shared.models import (
MetricScore, NormalizedSample, RuntimeConfig, Scenario, DatasetConfig,
)
scenario = Scenario(
scenario_name="w-test", mode="offline",
dataset=DatasetConfig(path=Path("d.csv")),
judge_model="m", embedding_model="e",
metrics=["faithfulness", "context_recall"],
output_dir=Path("out"),
metric_weights={"faithfulness": 3.0, "context_recall": 1.0},
doc_weights={"doc.pdf": 2.0},
)
evaluator = Evaluator(
scenario=scenario,
metric_pipeline=MagicMock(),
app_adapter=None,
)
sample = NormalizedSample(
sample_id="s1", question="q", contexts=["ctx"],
answer="a", ground_truth="gt",
metadata={"doc_name": "doc.pdf"},
)
score = MetricScore(metrics={"faithfulness": 1.0, "context_recall": 0.0})
row = evaluator._merge_score(sample, score)
# 综合加权得分已暂时禁用weighted_score 和 sample_weight 不再写入
assert "weighted_score" not in row
assert "sample_weight" not in row
assert row["faithfulness"] == 1.0
assert row["context_recall"] == 0.0
def test_summary_markdown_shows_weighted_score(self):
"""build_summary_markdown includes weighted_score when metric_weights set."""
import math
from rag_eval.reporting.summary import build_summary_markdown
from rag_eval.shared.models import (
EvaluationResult, NormalizedSample, DatasetConfig, Scenario,
)
from pathlib import Path
scenario = Scenario(
scenario_name="ws-test", mode="offline",
dataset=DatasetConfig(path=Path("d.csv")),
judge_model="m", embedding_model="e",
metrics=["faithfulness"],
output_dir=Path("out"),
metric_weights={"faithfulness": 1.0},
doc_weights={},
)
sample = NormalizedSample(
sample_id="s1", question="q", contexts=["c"],
answer="a", ground_truth="gt",
)
result = EvaluationResult(
scenario=scenario, run_id="r1",
started_at="2026-01-01T00:00:00", finished_at="2026-01-01T00:01:00",
valid_samples=[sample], invalid_samples=[],
score_rows=[{
"sample_id": "s1", "faithfulness": 0.8,
"weighted_score": 0.8, "sample_weight": 1.0,
"doc_name": "", "error": "",
}],
)
md = build_summary_markdown(result)
assert "weighted_score" in md
assert "0.8000" in md
def test_summary_markdown_hides_weighted_score_without_weights(self):
"""build_summary_markdown preserves unweighted summaries when no weights set."""
from rag_eval.shared.models import DatasetConfig, EvaluationResult, NormalizedSample, Scenario
scenario = Scenario(
scenario_name="plain-test",
mode="offline",
dataset=DatasetConfig(path=Path("d.csv")),
judge_model="m",
embedding_model="e",
metrics=["faithfulness"],
output_dir=Path("out"),
metric_weights={},
doc_weights={},
)
sample = NormalizedSample(
sample_id="s1",
question="q",
contexts=["c"],
answer="a",
ground_truth="gt",
)
result = EvaluationResult(
scenario=scenario,
run_id="r1",
started_at="2026-01-01T00:00:00",
finished_at="2026-01-01T00:01:00",
valid_samples=[sample],
invalid_samples=[],
score_rows=[{
"sample_id": "s1",
"faithfulness": 0.8,
"weighted_score": 0.8,
"sample_weight": 1.0,
"doc_name": "",
"error": "",
}],
)
md = build_summary_markdown(result)
assert "- **weighted_score" not in md
def test_metric_pipeline_scores_sample(self) -> None:
pipeline = MetricPipeline(
metrics={
"faithfulness": FakeMetric(0.1),
"answer_relevancy": FakeMetric(0.2),
"context_recall": FakeMetric(0.3),
"context_precision": FakeMetric(0.4),
}
)
valid, _ = normalize_records(
[
{
"question": "What is RAG?",
"contexts": ["RAG combines retrieval and generation."],
"answer": "RAG combines retrieval and generation.",
"ground_truth": "RAG combines retrieval and generation.",
}
]
)
score = __import__("asyncio").run(pipeline.score_sample(valid[0]))
self.assertEqual(score.metrics["faithfulness"], 0.1)
self.assertEqual(score.metrics["context_precision"], 0.4)
def test_metric_pipeline_captures_metric_timeout_without_aborting(self) -> None:
pipeline = MetricPipeline(
metrics={
"faithfulness": SlowMetric(),
"answer_relevancy": FakeMetric(0.2),
},
metric_timeout_seconds=0.01,
)
valid, _ = normalize_records(
[
{
"question": "What is RAG?",
"contexts": ["RAG combines retrieval and generation."],
"answer": "RAG combines retrieval and generation.",
"ground_truth": "RAG combines retrieval and generation.",
}
]
)
score = __import__("asyncio").run(pipeline.score_sample(valid[0]))
self.assertEqual(score.metrics["faithfulness"], 1.0)
self.assertEqual(score.metrics["answer_relevancy"], 0.2)
self.assertEqual(score.error, "")
def test_evaluator_and_reporting_write_run_assets(self) -> None:
temp_root = Path("tests/.tmp/run-assets")
temp_root.mkdir(parents=True, exist_ok=True)
for child in temp_root.iterdir():
if child.is_dir():
import shutil
shutil.rmtree(child)
else:
child.unlink()
output_root = temp_root
try:
scenario = load_scenario("scenarios/offline/sample-offline.yaml")
scenario.output_dir = output_root
pipeline = MetricPipeline(
metrics={
"faithfulness": FakeMetric(0.1),
"answer_relevancy": FakeMetric(0.2),
"context_recall": FakeMetric(0.3),
"context_precision": FakeMetric(0.4),
}
)
evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline)
result = evaluator.evaluate()
write_run_artifacts(result)
run_dir = output_root / result.run_id
self.assertTrue((run_dir / "scenario.snapshot.yaml").exists())
self.assertTrue((run_dir / "scores.csv").exists())
self.assertTrue((run_dir / "invalid.csv").exists())
self.assertTrue((run_dir / "summary.md").exists())
self.assertTrue((run_dir / "metadata.json").exists())
scores = pd.read_csv(run_dir / "scores.csv")
self.assertEqual(len(scores), 3)
self.assertIn("faithfulness", scores.columns)
finally:
import shutil
shutil.rmtree(temp_root, ignore_errors=True)
def test_summary_markdown_lists_all_scored_samples_and_errors(self) -> None:
scenario = load_scenario("scenarios/offline/sample-offline.yaml")
valid, invalid = normalize_records(
[
{
"sample_id": "sample-1",
"question": "Q1",
"contexts": ["C1"],
"answer": "A1",
"ground_truth": "G1",
},
{
"sample_id": "sample-2",
"question": "Q2",
"contexts": ["C2"],
"answer": "A2",
"ground_truth": "G2",
},
{
"sample_id": "sample-3",
"question": "Q3",
"contexts": ["C3"],
"answer": "A3",
"ground_truth": "G3",
},
{
"sample_id": "sample-4",
"question": "Q4",
"contexts": ["C4"],
"answer": "A4",
"ground_truth": "G4",
},
]
)
summary = build_summary_markdown(
EvaluationResult(
scenario=scenario,
run_id="test-run",
started_at="2026-06-10T00:00:00+00:00",
finished_at="2026-06-10T00:01:00+00:00",
valid_samples=valid,
invalid_samples=invalid,
score_rows=[
{
"sample_id": "sample-1",
"faithfulness": 1.0,
"answer_relevancy": 0.9,
"context_recall": 1.0,
"context_precision": 0.8,
"error": "",
},
{
"sample_id": "sample-2",
"faithfulness": 0.8,
"answer_relevancy": 0.7,
"context_recall": 0.9,
"context_precision": 0.6,
"error": "faithfulness: timeout",
},
{
"sample_id": "sample-3",
"faithfulness": 0.7,
"answer_relevancy": 0.6,
"context_recall": 0.8,
"context_precision": 0.5,
"error": "",
},
{
"sample_id": "sample-4",
"faithfulness": 0.6,
"answer_relevancy": 0.5,
"context_recall": 0.7,
"context_precision": 0.4,
"error": "context_precision: failed",
},
],
)
)
self.assertIn("## Per-sample Scores", summary)
self.assertIn("sample-1", summary)
self.assertIn("sample-2", summary)
self.assertIn("sample-3", summary)
self.assertIn("sample-4", summary)
self.assertIn("faithfulness", summary)
self.assertIn("answer_relevancy", summary)
self.assertIn("context_recall", summary)
self.assertIn("context_precision", summary)
self.assertIn("error", summary)
self.assertIn("faithfulness: timeout", summary)
self.assertIn("context_precision: failed", summary)
if __name__ == "__main__":
unittest.main()