diff --git a/docs/rag-eval-architecture.md b/docs/rag-eval-architecture.md index bfc39f0..75f1aee 100644 --- a/docs/rag-eval-architecture.md +++ b/docs/rag-eval-architecture.md @@ -318,6 +318,10 @@ metrics: - answer_relevancy - context_recall - context_precision + # 可选:鲁棒性 / 端到端指标(需数据集含 ground_truth),完整列表见 §9.4 + # - noise_sensitivity + # - factual_correctness + # - semantic_similarity output_dir: runs/legal-assistant-offline-baseline runtime: batch_size: 4 @@ -338,7 +342,7 @@ runtime: - `embedding_model` - 负责向量相关指标的模型 - `metrics` - - 本次启用的指标列表 + - 本次启用的指标列表(完整可选项与依赖见 §9.4) - `output_dir` - 本次运行结果输出目录 - `runtime.batch_size` @@ -399,6 +403,32 @@ app_adapter: - embedding model - 指标实例 +当前支持的指标(`rag_eval/metrics/registry.py` 中的 `SUPPORTED_METRICS`): + +| 指标名 | 层面 | 依赖 | +|---|---|---| +| `faithfulness` | 生成 | judge model | +| `answer_relevancy` | 生成 | judge model + embedding | +| `context_recall` | 检索 | judge model + ground_truth | +| `context_precision` | 检索 | judge model + ground_truth | +| `noise_sensitivity` | 鲁棒性 | judge model + ground_truth | +| `factual_correctness` | 端到端 | judge model + ground_truth | +| `semantic_similarity` | 端到端 | embedding + ground_truth(无 LLM 调用) | + +后四项以 `ground_truth`(标准答案)为参照,数据集必须提供该字段。新增指标统一在 `registry.py` / `factory.py` / `pipeline.py` 三处对齐装配。 + +**Optimization Advisor(§11 优化策略落地):** + +评测结束后,若场景配置 `optimization_advisor: true`,则自动调用 `rag_eval/advisor/` 模块: +- 规则引擎(`rules.py`)对 7 个指标各自设阈值,识别触发项并选取 top-3 低分样本 +- LLM 分析器(`llm_analyzer.py`)结合低分样本生成中文 Markdown 优化建议(复用 judge_model,失败自动降级为纯规则报告) +- 写出层(`writer.py`)输出 `optimization_advice.md` 并打日志摘要 + +```yaml +# 场景配置示例 +optimization_advisor: true +``` + ### 9.5 并发控制 执行层负责并发上限,不把并发策略散落到各指标实现中。 diff --git a/docs/rag-eval-engine-flow.md b/docs/rag-eval-engine-flow.md index 11b82b8..2df468d 100644 --- a/docs/rag-eval-engine-flow.md +++ b/docs/rag-eval-engine-flow.md @@ -316,11 +316,21 @@ adapter 层的目标是:**把不同类型的目标应用,统一成同一套 当前支持的指标包括: +核心检索 / 生成指标(始终可用): + - `faithfulness` - `answer_relevancy` - `context_recall` - `context_precision` +鲁棒性 / 端到端指标(架构设计 §10.2,需数据集含 `ground_truth`): + +- `noise_sensitivity` —— 鲁棒性:对检索噪声的敏感度 +- `factual_correctness` —— 端到端:回答相对标准答案的事实正确性 +- `semantic_similarity` —— 端到端:回答与标准答案的语义相似度(基于 embedding,无 LLM 调用) + +所有指标都通过同一套装配点接入:`registry.py`(校验白名单)、`factory.py`(实例化)、`pipeline.py`(`ascore` 入参分发),新增指标只需在这三处对齐即可。 + 所以 metric pipeline 的职责可以总结为: **把标准样本转换成结构化评分结果。** @@ -414,3 +424,39 @@ main.py - 可以把每次实验的资产稳定留住 这也是它和一次性离线脚本的根本区别。 + +--- + +## 15. Optimization Advisor 链路 + +相关代码: + +- `rag_eval/advisor/__init__.py` — 外部入口 `run_advisor()` +- `rag_eval/advisor/rules.py` — 规则引擎(纯函数,无 LLM),7 条指标诊断规则 +- `rag_eval/advisor/llm_analyzer.py` — LLM 分析器(复用 judge_model llm 实例,失败自动降级) +- `rag_eval/advisor/writer.py` — 写出 `optimization_advice.md` + 日志摘要 + +Advisor 在 `write_run_artifacts()` 之后触发,仅当场景配置 `optimization_advisor: true` 时生效,默认关闭。 + +执行链路: + +```text +run_advisor(result, scenario, llm) + -> rules.diagnose(score_rows, metrics) # 识别异常指标,选取 top-3 低分样本 + -> llm_analyzer.analyze(diagnoses, llm) # LLM 生成中文建议(失败自动降级为纯规则报告) + -> writer.write_advice(...) # 写 optimization_advice.md + 日志摘要 +``` + +输出产物追加在现有 run 目录: + +```text +outputs/online/siemens-pdf-question-bank// + scenario.snapshot.yaml + scores.csv + invalid.csv + summary.md + metadata.json + optimization_advice.md <- 新增(optimization_advisor: true 时生成) +``` + +规则引擎对 7 个指标各自设 warning / critical 双档阈值,`noise_sensitivity` 为"越低越好"(方向相反)。所有诊断均附带 top-3 低分样本,喂给 LLM 生成针对具体内容的中文建议。 diff --git a/docs/superpowers/plans/2026-06-16-optimization-advisor.md b/docs/superpowers/plans/2026-06-16-optimization-advisor.md new file mode 100644 index 0000000..37986fd --- /dev/null +++ b/docs/superpowers/plans/2026-06-16-optimization-advisor.md @@ -0,0 +1,1378 @@ +# Optimization Advisor Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 新增 `rag_eval/advisor/` 模块,在每次评测结束后自动分析指标低分原因并输出中文优化建议报告(`optimization_advice.md` + 日志摘要)。 + +**Architecture:** 规则引擎(`rules.py`)根据各指标均值和阈值识别异常、选取低分样本;LLM 分析器(`llm_analyzer.py`)复用已有 judge_model llm 实例生成中文 Markdown 建议;写出层(`writer.py`)写文件并打日志摘要。通过 YAML `optimization_advisor: true` 开关触发,默认关闭。 + +**Tech Stack:** Python 3.12, dataclasses, ragas LLM instance (AsyncOpenAI-backed), pandas (score_rows already available), pytest (unit tests) + +--- + +## File Map + +### New files +- `rag_eval/advisor/__init__.py` — 暴露 `run_advisor()`,外部唯一入口 +- `rag_eval/advisor/rules.py` — 纯函数规则引擎,`Diagnosis` dataclass + `diagnose()` +- `rag_eval/advisor/llm_analyzer.py` — `analyze()` 接收 llm + diagnoses → Markdown str +- `rag_eval/advisor/writer.py` — `write_advice()` 写 md 文件 + log 摘要 +- `tests/test_advisor_rules.py` — 规则引擎单测 +- `tests/test_advisor_writer.py` — writer 单测 + +### Modified files +- `rag_eval/shared/models.py` — `Scenario` 加 `optimization_advisor: bool = False`;`RunArtifactPaths` 加 `advice_md: Path` +- `rag_eval/config/schema.py` — `ScenarioModel` 加 `optimization_advisor: bool = False` +- `rag_eval/config/loader.py` — `load_scenario()` 透传 `optimization_advisor` 到 `Scenario` +- `rag_eval/reporting/artifacts.py` — `build_artifact_paths()` 加 `advice_md` 字段 +- `rag_eval/metrics/factory.py` — `build_metric_pipeline()` 改为同时返回 `llm`(`build_models_and_pipeline()`),供 runner 传给 advisor +- `rag_eval/execution/runner.py` — 接收 llm,末尾条件调用 `run_advisor()` +- `scenarios/online/siemens-pdf-question-bank-online.yaml` — 加 `optimization_advisor: true` +- `docs/rag-eval-engine-flow.md` — 补充 advisor 链路说明 +- `docs/rag-eval-architecture.md` — §9.4 指标编排末尾加 advisor 说明 + +--- + +## Task 1: Diagnosis dataclass + rules engine + +**Files:** +- Create: `rag_eval/advisor/rules.py` +- Create: `tests/test_advisor_rules.py` + +- [ ] **Step 1: Write failing tests** + +```python +# tests/test_advisor_rules.py +import math +import unittest +from rag_eval.advisor.rules import Diagnosis, diagnose, METRIC_RULES + +class TestDiagnosis(unittest.TestCase): + def _make_rows(self, metric: str, scores: list[float]) -> list[dict]: + return [{metric: s, "question": f"q{i}", "answer": f"a{i}", + "ground_truth": f"gt{i}", "sample_id": f"s{i}"} + for i, s in enumerate(scores)] + + def test_no_diagnosis_when_all_scores_above_threshold(self): + rows = self._make_rows("faithfulness", [0.8, 0.9, 0.85]) + result = diagnose(rows, metrics=["faithfulness"]) + self.assertEqual(result, []) + + def test_warning_when_mean_below_warning_threshold(self): + rows = self._make_rows("faithfulness", [0.65, 0.62, 0.68]) + result = diagnose(rows, metrics=["faithfulness"]) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].metric, "faithfulness") + self.assertEqual(result[0].severity, "warning") + self.assertAlmostEqual(result[0].mean_score, 0.65, places=2) + + def test_critical_when_mean_below_critical_threshold(self): + rows = self._make_rows("faithfulness", [0.3, 0.4, 0.45]) + result = diagnose(rows, metrics=["faithfulness"]) + self.assertEqual(result[0].severity, "critical") + + def test_low_samples_selected_are_bottom_three(self): + rows = self._make_rows("faithfulness", [0.1, 0.2, 0.3, 0.8, 0.9]) + result = diagnose(rows, metrics=["faithfulness"]) + self.assertEqual(len(result[0].low_samples), 3) + scores = [s["faithfulness"] for s in result[0].low_samples] + self.assertEqual(sorted(scores), [0.1, 0.2, 0.3]) + + def test_nan_scores_excluded_from_mean_and_low_samples(self): + rows = self._make_rows("faithfulness", [0.3, float("nan"), 0.4]) + result = diagnose(rows, metrics=["faithfulness"]) + self.assertEqual(len(result), 1) + for s in result[0].low_samples: + self.assertFalse(math.isnan(s["faithfulness"])) + + def test_noise_sensitivity_direction_inverted(self): + # noise_sensitivity: higher is worse; threshold > 0.3 is warning + rows = self._make_rows("noise_sensitivity", [0.4, 0.45, 0.5]) + result = diagnose(rows, metrics=["noise_sensitivity"]) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].metric, "noise_sensitivity") + + def test_noise_sensitivity_no_diagnosis_when_low(self): + rows = self._make_rows("noise_sensitivity", [0.1, 0.15, 0.2]) + result = diagnose(rows, metrics=["noise_sensitivity"]) + self.assertEqual(result, []) + + def test_skips_metric_not_in_rows(self): + rows = [{"faithfulness": 0.3, "question": "q", "answer": "a", + "ground_truth": "gt", "sample_id": "s1"}] + result = diagnose(rows, metrics=["faithfulness", "context_recall"]) + metrics_found = [d.metric for d in result] + self.assertIn("faithfulness", metrics_found) + self.assertNotIn("context_recall", metrics_found) + + def test_all_seven_metrics_have_rules(self): + expected = {"faithfulness", "answer_relevancy", "context_recall", + "context_precision", "noise_sensitivity", + "factual_correctness", "semantic_similarity"} + self.assertEqual(set(METRIC_RULES.keys()), expected) + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +``` +cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas +python -m pytest tests/test_advisor_rules.py -v 2>&1 | head -20 +``` + +Expected: `ModuleNotFoundError: No module named 'rag_eval.advisor'` + +- [ ] **Step 3: Create rules.py** + +```python +# rag_eval/advisor/rules.py +"""Rule-based diagnostic engine for RAG evaluation metric scores.""" +from __future__ import annotations + +import math +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class MetricRule: + """Threshold configuration and diagnostic text for one metric.""" + warning_threshold: float + critical_threshold: float + higher_is_better: bool # False for noise_sensitivity + root_causes: list[str] + suggested_actions: list[str] + + +METRIC_RULES: dict[str, MetricRule] = { + "faithfulness": MetricRule( + warning_threshold=0.7, + critical_threshold=0.5, + higher_is_better=True, + root_causes=[ + "生成回答包含检索片段中不支持的陈述(幻觉)", + "生成阶段未严格遵循 grounding 约束", + "校验阶段未开启或未生效", + ], + suggested_actions=[ + "强化生成 prompt 的 grounding 约束('只依据参考资料作答')", + "开启校验阶段(validation: by_scenario)", + "检查低分样本中模型是否引用了片段外的知识", + ], + ), + "answer_relevancy": MetricRule( + warning_threshold=0.7, + critical_threshold=0.5, + higher_is_better=True, + root_causes=[ + "回答偏离问题主旨或包含大量冗余内容", + "查询改写后问题语义漂移", + "生成 prompt 格式约束不足", + ], + suggested_actions=[ + "优化查询改写 prompt,确保改写后语义不偏移", + "在生成 prompt 中加入'简洁准确、直接回答问题'的约束", + "检查低分样本的回答是否存在格式冗余或话题偏移", + ], + ), + "context_recall": MetricRule( + warning_threshold=0.7, + critical_threshold=0.5, + higher_is_better=True, + root_causes=[ + "检索未能召回标准答案所涉及的关键信息", + "单一查询未能覆盖问题的多个角度", + "过召回数量不足,关键片段被截断", + ], + suggested_actions=[ + "启用多查询扩展(use_multi_query)覆盖不同措辞", + "对多跳问题启用问题分解(sub_questions)", + "加大过召回宽度(recall_top_k)", + "对颗粒度细的问题尝试 Step-back 双路检索", + ], + ), + "context_precision": MetricRule( + warning_threshold=0.6, + critical_threshold=0.4, + higher_is_better=True, + root_causes=[ + "检索引入过多与问题无关的片段", + "重排未能将相关片段排在前列", + "缺少相关性过滤,噪声片段进入上下文", + ], + suggested_actions=[ + "启用或优化 listwise 重排,将相关片段排在前列", + "启用上下文压缩(compression)过滤无关句子", + "启用相关性过滤(relevance_filter)丢弃明确无关片段", + "缩小 rerank_keep_k(如从 8 降到 5)", + ], + ), + "noise_sensitivity": MetricRule( + warning_threshold=0.3, # higher is worse; trigger when mean > threshold + critical_threshold=0.5, + higher_is_better=False, + root_causes=[ + "回答中包含检索到的噪声片段所引入的错误陈述", + "相关性过滤未能拦截干扰性片段", + "生成阶段对噪声片段未加区分地引用", + ], + suggested_actions=[ + "启用相关性过滤(relevance_filter)拦截噪声", + "优化重排,将不相关片段排到截断点之后", + "在生成 prompt 中强调'来源冲突时并列陈述,不擅自下定论'", + ], + ), + "factual_correctness": MetricRule( + warning_threshold=0.6, + critical_threshold=0.4, + higher_is_better=True, + root_causes=[ + "回答的事实陈述与标准答案存在偏差", + "检索未能命中标准答案所依据的关键片段", + "生成阶段对多个来源综合时产生事实错误", + ], + suggested_actions=[ + "重点检查低分样本,确认是检索遗漏还是生成错误", + "提升 context_recall 以确保关键信息被检索到", + "对事实型问题将 temperature 降至 0", + ], + ), + "semantic_similarity": MetricRule( + warning_threshold=0.7, + critical_threshold=0.5, + higher_is_better=True, + root_causes=[ + "回答语义与标准答案差距较大", + "回答过于简短或过于冗长,语义偏移", + "检索到的片段质量不足,导致生成内容偏离", + ], + suggested_actions=[ + "检查低分样本的回答与标准答案的表述差异", + "优化生成 prompt 使回答更贴近标准表述风格", + "提升检索质量(context_recall / context_precision)", + ], + ), +} + + +@dataclass +class Diagnosis: + """Diagnostic result for one metric that triggered a threshold.""" + metric: str + mean_score: float + threshold: float # the triggered threshold + severity: str # "warning" | "critical" + root_causes: list[str] = field(default_factory=list) + suggested_actions: list[str] = field(default_factory=list) + low_samples: list[dict[str, Any]] = field(default_factory=list) + + +def _mean_ignoring_nan(values: list[float]) -> float | None: + valid = [v for v in values if not math.isnan(v)] + if not valid: + return None + return sum(valid) / len(valid) + + +def _select_low_samples( + rows: list[dict[str, Any]], + metric: str, + top_n: int, + higher_is_better: bool, +) -> list[dict[str, Any]]: + """Return the top_n worst-scoring rows for a metric, excluding NaN.""" + valid = [r for r in rows if metric in r and not math.isnan(float(r[metric]))] + sorted_rows = sorted(valid, key=lambda r: float(r[metric]), reverse=not higher_is_better) + worst = sorted_rows[:top_n] + keep_keys = {"sample_id", "question", "answer", "ground_truth", metric} + return [{k: v for k, v in row.items() if k in keep_keys} for row in worst] + + +def diagnose( + score_rows: list[dict[str, Any]], + metrics: list[str], + top_low_samples: int = 3, +) -> list[Diagnosis]: + """Analyse score_rows and return a Diagnosis for each metric below threshold. + + Args: + score_rows: List of per-sample score dicts (from EvaluationResult.score_rows). + metrics: Metric names to evaluate (from Scenario.metrics). + top_low_samples: How many worst-scoring samples to attach per diagnosis. + + Returns: + List of Diagnosis objects, one per triggered metric. Empty if all OK. + """ + diagnoses: list[Diagnosis] = [] + + for metric in metrics: + rule = METRIC_RULES.get(metric) + if rule is None: + continue # unknown metric, skip + + values = [] + for row in score_rows: + raw = row.get(metric) + if raw is None: + continue + try: + v = float(raw) + except (TypeError, ValueError): + continue + values.append(v) + + if not values: + continue + + mean = _mean_ignoring_nan(values) + if mean is None: + continue + + # Determine severity (direction-aware) + if rule.higher_is_better: + if mean < rule.critical_threshold: + severity = "critical" + threshold = rule.critical_threshold + elif mean < rule.warning_threshold: + severity = "warning" + threshold = rule.warning_threshold + else: + continue # above warning threshold → no diagnosis + else: + # lower is better (noise_sensitivity) + if mean > rule.critical_threshold: + severity = "critical" + threshold = rule.critical_threshold + elif mean > rule.warning_threshold: + severity = "warning" + threshold = rule.warning_threshold + else: + continue + + low_samples = _select_low_samples(score_rows, metric, top_low_samples, rule.higher_is_better) + + diagnoses.append(Diagnosis( + metric=metric, + mean_score=round(mean, 4), + threshold=threshold, + severity=severity, + root_causes=list(rule.root_causes), + suggested_actions=list(rule.suggested_actions), + low_samples=low_samples, + )) + + return diagnoses +``` + +- [ ] **Step 4: Create `rag_eval/advisor/__init__.py` (stub — full version in Task 5)** + +```python +# rag_eval/advisor/__init__.py +"""Optimization advisor: rule-based diagnosis + LLM-powered recommendations.""" + +from .rules import Diagnosis, diagnose + +__all__ = ["Diagnosis", "diagnose"] +``` + +- [ ] **Step 5: Run tests — expect pass** + +``` +python -m pytest tests/test_advisor_rules.py -v +``` + +Expected: all 9 tests PASS. + +- [ ] **Step 6: Commit** + +``` +git add rag_eval/advisor/__init__.py rag_eval/advisor/rules.py tests/test_advisor_rules.py +git commit -m "feat(advisor): add rule-based diagnostic engine with 7-metric coverage" +``` + +--- + +## Task 2: Writer module + +**Files:** +- Create: `rag_eval/advisor/writer.py` +- Create: `tests/test_advisor_writer.py` + +- [ ] **Step 1: Write failing tests** + +```python +# tests/test_advisor_writer.py +import logging +import shutil +import unittest +from pathlib import Path + +from rag_eval.advisor.rules import Diagnosis +from rag_eval.advisor.writer import write_advice, _format_log_summary + + +class TestWriteAdvice(unittest.TestCase): + def setUp(self): + self.tmp = Path("tests/.tmp/test_advisor_writer") + shutil.rmtree(self.tmp, ignore_errors=True) + self.tmp.mkdir(parents=True, exist_ok=True) + self.advice_path = self.tmp / "optimization_advice.md" + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _make_diagnosis(self, metric="faithfulness", severity="warning"): + return Diagnosis( + metric=metric, + mean_score=0.55, + threshold=0.7, + severity=severity, + root_causes=["原因1", "原因2"], + suggested_actions=["建议1", "建议2"], + low_samples=[ + {"sample_id": "s1", "question": "问题1", "answer": "答案1", + "ground_truth": "标准1", metric: 0.4}, + ], + ) + + def test_write_creates_file(self): + diag = self._make_diagnosis() + write_advice( + diagnoses=[diag], + llm_markdown="## faithfulness\n\nLLM 建议内容", + advice_path=self.advice_path, + scenario_name="test-scenario", + run_id="2026-01-01T00-00-00", + judge_model="deepseek-v4-flash", + ) + self.assertTrue(self.advice_path.exists()) + + def test_write_contains_scenario_name_and_run_id(self): + diag = self._make_diagnosis() + write_advice( + diagnoses=[diag], + llm_markdown="## faithfulness\n\nLLM 建议", + advice_path=self.advice_path, + scenario_name="siemens-test", + run_id="2026-01-01T00-00-00", + judge_model="deepseek-v4-flash", + ) + content = self.advice_path.read_text(encoding="utf-8") + self.assertIn("siemens-test", content) + self.assertIn("2026-01-01T00-00-00", content) + + def test_write_contains_llm_markdown(self): + diag = self._make_diagnosis() + write_advice( + diagnoses=[diag], + llm_markdown="## faithfulness\n\n具体建议文本", + advice_path=self.advice_path, + scenario_name="test", + run_id="rid", + judge_model="model", + ) + content = self.advice_path.read_text(encoding="utf-8") + self.assertIn("具体建议文本", content) + + def test_write_fallback_when_no_llm_markdown(self): + """When llm_markdown is empty, writer emits rule-only report.""" + diag = self._make_diagnosis() + write_advice( + diagnoses=[diag], + llm_markdown="", + advice_path=self.advice_path, + scenario_name="test", + run_id="rid", + judge_model="model", + ) + content = self.advice_path.read_text(encoding="utf-8") + self.assertIn("faithfulness", content) + self.assertIn("原因1", content) + + def test_log_summary_format(self): + diags = [ + self._make_diagnosis("faithfulness", "critical"), + self._make_diagnosis("context_recall", "warning"), + ] + summary = _format_log_summary(diags, self.advice_path) + self.assertIn("faithfulness", summary) + self.assertIn("critical", summary) + self.assertIn("context_recall", summary) + self.assertIn("warning", summary) + + def test_write_empty_diagnoses_still_creates_file(self): + write_advice( + diagnoses=[], + llm_markdown="", + advice_path=self.advice_path, + scenario_name="test", + run_id="rid", + judge_model="model", + ) + self.assertTrue(self.advice_path.exists()) + content = self.advice_path.read_text(encoding="utf-8") + self.assertIn("未发现明显指标异常", content) + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +``` +python -m pytest tests/test_advisor_writer.py -v 2>&1 | head -15 +``` + +Expected: `ImportError: cannot import name 'write_advice' from 'rag_eval.advisor.writer'` + +- [ ] **Step 3: Create writer.py** + +```python +# rag_eval/advisor/writer.py +"""Write optimization advice to markdown file and emit log summary.""" +from __future__ import annotations + +import logging +from pathlib import Path + +from .rules import Diagnosis + +logger = logging.getLogger("rag_eval.advisor") + + +def _format_log_summary(diagnoses: list[Diagnosis], advice_path: Path) -> str: + """Return a single-line log summary of triggered diagnoses.""" + if not diagnoses: + return "[advisor] 所有指标正常,无需优化建议。" + parts = [f"{d.metric}({d.mean_score:.2f}, {d.severity})" for d in diagnoses] + triggered = " ".join(parts) + return f"[advisor] 触发诊断 {len(diagnoses)} 项: {triggered} → {advice_path}" + + +def _build_fallback_report(diagnoses: list[Diagnosis]) -> str: + """Build a rules-only report when LLM analysis is unavailable.""" + if not diagnoses: + return "" + lines = ["## 规则诊断(LLM 分析不可用)\n"] + for d in diagnoses: + lines.append(f"### {d.metric} [{d.severity}] 均值={d.mean_score:.4f}") + lines.append("\n**可能原因:**") + for cause in d.root_causes: + lines.append(f"- {cause}") + lines.append("\n**建议动作:**") + for action in d.suggested_actions: + lines.append(f"- {action}") + lines.append("") + return "\n".join(lines) + + +def write_advice( + diagnoses: list[Diagnosis], + llm_markdown: str, + advice_path: Path, + scenario_name: str, + run_id: str, + judge_model: str, +) -> None: + """Write optimization_advice.md and emit a log summary line. + + Args: + diagnoses: List of Diagnosis from rules.diagnose(). + llm_markdown: LLM-generated Markdown body. Empty string triggers fallback. + advice_path: Full path to write the .md file. + scenario_name: Human-readable scenario identifier for the report header. + run_id: Run identifier string. + judge_model: Model used for LLM analysis (shown in header). + """ + advice_path.parent.mkdir(parents=True, exist_ok=True) + + # Header + from rag_eval.shared.utils import utc_now_iso + header_lines = [ + f"# 优化建议报告 — {scenario_name}", + "", + f"- run_id: `{run_id}`", + f"- 生成时间: `{utc_now_iso()}`", + f"- judge_model: `{judge_model}`", + "", + "---", + "", + ] + + if not diagnoses: + body = "## ✅ 未发现明显指标异常\n\n所有指标均在正常范围内,当前 RAG 链路表现良好。\n" + elif llm_markdown: + body = llm_markdown + else: + body = _build_fallback_report(diagnoses) + + content = "\n".join(header_lines) + body + advice_path.write_text(content, encoding="utf-8") + + summary = _format_log_summary(diagnoses, advice_path) + logger.info(summary) + logger.info("[advisor] 优化建议已写出: %s", advice_path) +``` + +- [ ] **Step 4: Run tests — expect pass** + +``` +python -m pytest tests/test_advisor_writer.py -v +``` + +Expected: all 6 tests PASS. + +- [ ] **Step 5: Commit** + +``` +git add rag_eval/advisor/writer.py tests/test_advisor_writer.py +git commit -m "feat(advisor): add advice writer with fallback rule-only report" +``` + +--- + +## Task 3: LLM analyzer + +**Files:** +- Create: `rag_eval/advisor/llm_analyzer.py` + +No LLM unit tests (network-dependent); tested in Task 7 integration. + +- [ ] **Step 1: Create llm_analyzer.py** + +```python +# rag_eval/advisor/llm_analyzer.py +"""LLM-powered analysis of rule diagnostics and low-score samples.""" +from __future__ import annotations + +import logging +from typing import Any + +from .rules import Diagnosis + +logger = logging.getLogger("rag_eval.advisor") + +_PROMPT_TEMPLATE = """\ +你是一个 RAG 系统优化专家,正在分析西门子医疗 CT 文档问答系统的评测结果。 +请用中文撰写一份优化建议报告,格式为 Markdown。 + +## 评测诊断摘要 + +{diagnosis_summary} + +## 低分样本示例 + +{low_sample_text} + +## 报告要求 + +1. 按指标分节(## 指标名 [severity]),先解释"为什么低"(结合低分样本具体分析),再给出"具体怎么改" +2. "具体怎么改"要结合低分样本的实际内容,而不只是泛泛建议 +3. 最后写一节 **## 优先优化次序**,按性价比排序(不增加 LLM 调用次数的优化优先) +4. 语言简洁,面向工程师,不要废话,不要重复列表内容 + +只输出 Markdown 报告正文,不要任何前置说明。 +""" + + +def _build_diagnosis_summary(diagnoses: list[Diagnosis]) -> str: + lines = [] + for d in diagnoses: + direction = "(越低越好)" if d.metric == "noise_sensitivity" else "" + lines.append( + f"- **{d.metric}** {direction} 均值={d.mean_score:.4f}," + f"阈值={d.threshold},严重程度={d.severity}" + ) + lines.append(f" - 可能原因:{'; '.join(d.root_causes)}") + lines.append(f" - 建议动作:{'; '.join(d.suggested_actions)}") + return "\n".join(lines) + + +def _build_low_sample_text(diagnoses: list[Diagnosis]) -> str: + lines = [] + for d in diagnoses: + if not d.low_samples: + continue + lines.append(f"### {d.metric} 低分样本(最多 3 条)") + for i, s in enumerate(d.low_samples, 1): + score = s.get(d.metric, "N/A") + lines.append(f"\n**样本 {i}**(分数={score})") + lines.append(f"- 问题:{s.get('question', '')}") + lines.append(f"- 回答:{s.get('answer', '')[:300]}") + lines.append(f"- 标准答案:{s.get('ground_truth', '')[:200]}") + return "\n".join(lines) + + +async def analyze( + diagnoses: list[Diagnosis], + llm: Any, + scenario_name: str, +) -> str: + """Call the judge LLM to generate a Chinese optimization report. + + Args: + diagnoses: Non-empty list of Diagnosis from rules.diagnose(). + llm: RAGAS LLM wrapper (has .agenerate() method). + scenario_name: Used only for logging. + + Returns: + LLM-generated Markdown string, or "" on failure (triggers writer fallback). + """ + if not diagnoses: + return "" + + diagnosis_summary = _build_diagnosis_summary(diagnoses) + low_sample_text = _build_low_sample_text(diagnoses) + prompt = _PROMPT_TEMPLATE.format( + diagnosis_summary=diagnosis_summary, + low_sample_text=low_sample_text, + ) + + try: + logger.info("[advisor] calling LLM for optimization analysis scenario=%s", scenario_name) + # ragas LLM wrapper: generate() accepts list[list[str]] and returns LLMResult + from langchain_core.messages import HumanMessage + result = await llm.agenerate(texts=[[HumanMessage(content=prompt)]]) + text = result.generations[0][0].text.strip() + logger.info("[advisor] LLM analysis complete chars=%d", len(text)) + return text + except Exception as exc: + logger.warning("[advisor] LLM analysis failed (%s: %s) — falling back to rule report", type(exc).__name__, exc) + return "" +``` + +- [ ] **Step 2: Verify import works** + +``` +python -c "from rag_eval.advisor.llm_analyzer import analyze; print('OK')" +``` + +Expected: `OK` + +- [ ] **Step 3: Commit** + +``` +git add rag_eval/advisor/llm_analyzer.py +git commit -m "feat(advisor): add LLM analyzer with graceful fallback on failure" +``` + +--- + +## Task 4: Wire advisor into models, config schema, and loader + +**Files:** +- Modify: `rag_eval/shared/models.py` +- Modify: `rag_eval/config/schema.py` +- Modify: `rag_eval/config/loader.py` +- Modify: `rag_eval/reporting/artifacts.py` + +- [ ] **Step 1: Add `optimization_advisor` to `Scenario` and `RunArtifactPaths`** + +In `rag_eval/shared/models.py`, add one field to `Scenario` (after `source_path`) and one to `RunArtifactPaths`: + +```python +# In Scenario dataclass — add after source_path field: +optimization_advisor: bool = False +``` + +```python +# In RunArtifactPaths dataclass — add after metadata_json field: +advice_md: Path | None = None +``` + +Full updated `Scenario` dataclass (slots=True, so field order matters — add at end): +```python +@dataclass(slots=True) +class Scenario: + scenario_name: str + mode: Mode + dataset: DatasetConfig + judge_model: str + embedding_model: str + metrics: list[str] + output_dir: Path + runtime: RuntimeConfig = field(default_factory=RuntimeConfig) + app_adapter: AppAdapterConfig | None = None + source_path: Path | None = None + optimization_advisor: bool = False # NEW +``` + +Full updated `RunArtifactPaths`: +```python +@dataclass(slots=True) +class RunArtifactPaths: + root_dir: Path + scenario_snapshot: Path + scores_csv: Path + invalid_csv: Path + summary_md: Path + metadata_json: Path + advice_md: Path | None = None # NEW +``` + +- [ ] **Step 2: Add field to ScenarioModel in schema.py** + +In `rag_eval/config/schema.py`, add to `ScenarioModel`: + +```python +optimization_advisor: bool = False # NEW — enable optimization advisor output +``` + +(add after the `runtime` field) + +- [ ] **Step 3:透传 optimization_advisor in loader.py** + +In `rag_eval/config/loader.py`, in the `Scenario(...)` constructor call, add: + +```python + optimization_advisor=model.optimization_advisor, # NEW +``` + +- [ ] **Step 4: Add advice_md to artifact paths in artifacts.py** + +In `rag_eval/reporting/artifacts.py`, update `build_artifact_paths()`: + +```python +def build_artifact_paths(output_dir: Path, run_id: str) -> RunArtifactPaths: + """Build the canonical artifact file paths for a single evaluation run.""" + run_dir = output_dir / run_id + return RunArtifactPaths( + root_dir=run_dir, + scenario_snapshot=run_dir / "scenario.snapshot.yaml", + scores_csv=run_dir / "scores.csv", + invalid_csv=run_dir / "invalid.csv", + summary_md=run_dir / "summary.md", + metadata_json=run_dir / "metadata.json", + advice_md=run_dir / "optimization_advice.md", # NEW + ) +``` + +- [ ] **Step 5: Verify existing tests still pass** + +``` +python -m pytest tests/test_offline_eval.py tests/test_online_eval.py -v 2>&1 | tail -15 +``` + +Expected: same pass/fail as before (the 4 pre-existing failures are unrelated). + +- [ ] **Step 6: Commit** + +``` +git add rag_eval/shared/models.py rag_eval/config/schema.py rag_eval/config/loader.py rag_eval/reporting/artifacts.py +git commit -m "feat(advisor): wire optimization_advisor field through Scenario, ScenarioModel, loader, and RunArtifactPaths" +``` + +--- + +## Task 5: Lift build_models() to runner + wire run_advisor() + +**Files:** +- Modify: `rag_eval/metrics/factory.py` +- Modify: `rag_eval/execution/runner.py` +- Modify: `rag_eval/advisor/__init__.py` + +This is the integration wiring. The key change: `build_metric_pipeline()` currently creates llm internally and returns only `MetricPipeline`. We add a companion function `build_models()` that `runner.py` calls first, then passes `llm` to both `build_metric_pipeline()` and `run_advisor()`. + +- [ ] **Step 1: Add `build_models()` as public function in factory.py** + +The existing `build_models()` is already defined in `factory.py` (lines 30-39) but is module-private (no `__all__`). We expose it and update `build_metric_pipeline()` to accept optional pre-built models: + +```python +# rag_eval/metrics/factory.py — full replacement + +"""Factories for OpenAI-backed RAGAS models and metric pipelines.""" + +from __future__ import annotations + +from typing import Any + +from openai import AsyncOpenAI + +from rag_eval.compat import ensure_ragas_import_compat +from rag_eval.settings import EvaluationSettings +from rag_eval.shared.models import Scenario + +ensure_ragas_import_compat() + +from ragas.embeddings.base import embedding_factory +from ragas.llms import llm_factory +from ragas.metrics.collections import ( + AnswerRelevancy, + ContextPrecision, + ContextRecall, + FactualCorrectness, + Faithfulness, + NoiseSensitivity, + SemanticSimilarity, +) + +from .pipeline import MetricPipeline + + +def build_models( + judge_model: str, + embedding_model: str, + settings: EvaluationSettings, +) -> tuple[Any, Any]: + """Create the LLM and embedding clients required by the selected RAGAS metrics.""" + client = AsyncOpenAI(**settings.openai_client_kwargs) + llm = llm_factory(judge_model, client=client) + embeddings = embedding_factory(provider="openai", model=embedding_model, client=client) + return llm, embeddings + + +def build_metric_pipeline( + scenario: Scenario, + settings: EvaluationSettings, + llm: Any | None = None, + embeddings: Any | None = None, +) -> MetricPipeline: + """Build a metric pipeline containing only the metrics requested by the scenario. + + If llm and embeddings are provided (pre-built by the caller), they are reused. + Otherwise, new instances are created from scenario + settings. + """ + if llm is None or embeddings is None: + llm, embeddings = build_models( + scenario.judge_model, + scenario.embedding_model, + settings, + ) + + registry: dict[str, Any] = { + "faithfulness": Faithfulness(llm=llm), + "answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings), + "context_recall": ContextRecall(llm=llm), + "context_precision": ContextPrecision(llm=llm), + "noise_sensitivity": NoiseSensitivity(llm=llm), + "factual_correctness": FactualCorrectness(llm=llm), + "semantic_similarity": SemanticSimilarity(embeddings=embeddings), + } + return MetricPipeline( + metrics={name: registry[name] for name in scenario.metrics}, + metric_timeout_seconds=settings.ragas_metric_timeout_seconds, + ) +``` + +- [ ] **Step 2: Update `rag_eval/advisor/__init__.py` with full `run_advisor()`** + +```python +# rag_eval/advisor/__init__.py +"""Optimization advisor: rule-based diagnosis + LLM-powered recommendations.""" +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from rag_eval.reporting.artifacts import build_artifact_paths +from rag_eval.shared.models import EvaluationResult, Scenario + +from .llm_analyzer import analyze +from .rules import Diagnosis, diagnose +from .writer import write_advice + +logger = logging.getLogger("rag_eval.advisor") + +__all__ = ["run_advisor", "Diagnosis", "diagnose"] + + +def run_advisor( + result: EvaluationResult, + scenario: Scenario, + llm: Any, +) -> None: + """Run the full optimization advisor pipeline after an evaluation completes. + + Skips silently if scenario.optimization_advisor is False. + Never raises — failures are logged as warnings, not exceptions. + + Args: + result: Completed EvaluationResult from Evaluator.evaluate(). + scenario: The resolved Scenario (provides metrics, judge_model, output_dir). + llm: Pre-built RAGAS LLM instance (from build_models()) for LLM analysis. + """ + if not scenario.optimization_advisor: + return + + logger.info("[advisor] starting optimization analysis scenario=%s", scenario.scenario_name) + + try: + artifact_paths = build_artifact_paths(scenario.output_dir, result.run_id) + if artifact_paths.advice_md is None: + logger.warning("[advisor] advice_md path not set in RunArtifactPaths — skipping") + return + + diagnoses = diagnose(result.score_rows, scenario.metrics) + logger.info("[advisor] rule diagnosis complete: %d metric(s) triggered", len(diagnoses)) + + if diagnoses: + llm_markdown = asyncio.run(analyze(diagnoses, llm, scenario.scenario_name)) + else: + llm_markdown = "" + + write_advice( + diagnoses=diagnoses, + llm_markdown=llm_markdown, + advice_path=artifact_paths.advice_md, + scenario_name=scenario.scenario_name, + run_id=result.run_id, + judge_model=scenario.judge_model, + ) + + except Exception as exc: + logger.warning( + "[advisor] advisor failed (%s: %s) — evaluation result is unaffected", + type(exc).__name__, exc, + ) +``` + +- [ ] **Step 3: Update runner.py to lift llm and call run_advisor()** + +In `rag_eval/execution/runner.py`, make these changes: + +1. Add import at top: +```python +from rag_eval.advisor import run_advisor +from rag_eval.metrics.factory import build_models, build_metric_pipeline +``` + +2. Replace the `build_metric_pipeline` import (it's already imported from `rag_eval.metrics.factory`) and update `run_scenario()`: + +```python +# rag_eval/execution/runner.py — full replacement + +"""High-level scenario runner used by the package and CLI entrypoints.""" + +from __future__ import annotations + +import logging +import sys +from pathlib import Path + +from rag_eval.adapters.http import HttpAppAdapter +from rag_eval.adapters.python import PythonFunctionAdapter +from rag_eval.advisor import run_advisor +from rag_eval.config.loader import load_scenario +from rag_eval.metrics.factory import build_models, build_metric_pipeline +from rag_eval.reporting.writers import write_run_artifacts +from rag_eval.settings import EvaluationSettings +from rag_eval.shared.models import Scenario + +from .evaluator import Evaluator + +logger = logging.getLogger("rag_eval.execution.runner") + + +def _setup_logging(log_file: Path | None = None, level: int = logging.INFO) -> None: + """Configure root logger: always write to stderr, optionally also to a file.""" + fmt = "%(asctime)s %(levelname)-8s %(name)s %(message)s" + datefmt = "%H:%M:%S" + + handlers: list[logging.Handler] = [logging.StreamHandler(sys.stderr)] + if log_file is not None: + log_file.parent.mkdir(parents=True, exist_ok=True) + fh = logging.FileHandler(log_file, encoding="utf-8") + fh.setFormatter(logging.Formatter(fmt, datefmt=datefmt)) + handlers.append(fh) + + logging.basicConfig(level=level, format=fmt, datefmt=datefmt, handlers=handlers, force=True) + logging.getLogger("ragas").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("openai").setLevel(logging.WARNING) + + +def build_adapter(scenario: Scenario): + """Instantiate the adapter required by the resolved scenario, if any.""" + if scenario.app_adapter is None: + return None + if scenario.app_adapter.type == "http": + return HttpAppAdapter(scenario.app_adapter) + if scenario.app_adapter.type == "python": + return PythonFunctionAdapter(scenario.app_adapter) + raise ValueError(f"Unsupported adapter type: {scenario.app_adapter.type}") + + +def run_scenario( + scenario_path: str, + settings: EvaluationSettings | None = None, + log_file: Path | None = None, + log_level: int = logging.INFO, +): + """Run one scenario end to end and persist its reporting artifacts.""" + _setup_logging(log_file=log_file, level=log_level) + logger.info("[runner] run_scenario path=%s", scenario_path) + + settings = settings or EvaluationSettings() + if not settings.openai_api_key: + raise EnvironmentError("OPENAI_API_KEY must be set before running the evaluator.") + + scenario = load_scenario(scenario_path) + logger.info("[runner] scenario loaded: name=%s mode=%s max_samples=%s", + scenario.scenario_name, scenario.mode, scenario.runtime.max_samples) + + # Build models once; reuse llm in both MetricPipeline and advisor. + llm, embeddings = build_models(scenario.judge_model, scenario.embedding_model, settings) + + adapter = build_adapter(scenario) + pipeline = build_metric_pipeline(scenario, settings, llm=llm, embeddings=embeddings) + evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline, app_adapter=adapter) + result = evaluator.evaluate() + write_run_artifacts(result) + logger.info("[runner] artifacts written for run_id=%s", result.run_id) + + # Optimization advisor — runs only if scenario.optimization_advisor is True. + run_advisor(result, scenario, llm) + + return result +``` + +- [ ] **Step 4: Verify existing tests still pass** + +``` +python -m pytest tests/ -v 2>&1 | tail -20 +``` + +Expected: same pass count as before this change (only pre-existing 4 failures). + +- [ ] **Step 5: Commit** + +``` +git add rag_eval/metrics/factory.py rag_eval/execution/runner.py rag_eval/advisor/__init__.py +git commit -m "feat(advisor): wire run_advisor into runner, lift llm from build_metric_pipeline" +``` + +--- + +## Task 6: Enable advisor in Siemens online YAML + +**Files:** +- Modify: `scenarios/online/siemens-pdf-question-bank-online.yaml` + +- [ ] **Step 1: Add optimization_advisor field** + +Read the current file first, then add one line after `embedding_model`: + +```yaml +scenario_name: siemens-pdf-question-bank-online +mode: online +dataset: ../../datasets/raw/generated/siemens-pdf-question-bank.csv +judge_model: deepseek-v4-flash +embedding_model: text-embedding-v3 +optimization_advisor: true # 评测结束后自动生成优化建议报告 +metrics: + - faithfulness + - answer_relevancy + - context_recall + - context_precision + # 已启用:鲁棒性 / 端到端指标(数据集已含 ground_truth) + - noise_sensitivity # 鲁棒性:对检索噪声的敏感度 + - factual_correctness # 端到端:事实正确性(相对标准答案) + - semantic_similarity # 端到端:语义相似度(embedding,无 LLM 调用) +output_dir: ../../outputs/online/siemens-pdf-question-bank +runtime: + batch_size: 4 + app_concurrency: 4 + metric_concurrency: 4 + max_samples: 50 +app_adapter: + type: python + callable: apps.siemens_pdf_qa.adapter:run + static_kwargs: + source_chunks_path: ../../outputs/dataset-builds/siemens-pdf-question-bank/latest/source_chunks.jsonl + model: deepseek-v4-flash +``` + +- [ ] **Step 2: Verify scenario loads correctly** + +``` +python -c " +from rag_eval.config.loader import load_scenario +s = load_scenario('scenarios/online/siemens-pdf-question-bank-online.yaml') +print('optimization_advisor:', s.optimization_advisor) +print('metrics:', s.metrics) +" +``` + +Expected: +``` +optimization_advisor: True +metrics: ['faithfulness', 'answer_relevancy', 'context_recall', 'context_precision', 'noise_sensitivity', 'factual_correctness', 'semantic_similarity'] +``` + +- [ ] **Step 3: Commit** + +``` +git add scenarios/online/siemens-pdf-question-bank-online.yaml +git commit -m "feat(advisor): enable optimization_advisor in siemens online scenario" +``` + +--- + +## Task 7: Run all advisor tests + smoke check + +**Files:** none new + +- [ ] **Step 1: Run full advisor test suite** + +``` +python -m pytest tests/test_advisor_rules.py tests/test_advisor_writer.py -v +``` + +Expected: 15 tests PASS (9 rules + 6 writer). + +- [ ] **Step 2: Smoke-check the full module wiring (no network)** + +```python +# paste into Python REPL or save as scripts/smoke_advisor.py and run +import math, sys +sys.path.insert(0, ".") + +from rag_eval.advisor.rules import diagnose +from rag_eval.advisor.writer import write_advice, _format_log_summary +from pathlib import Path +import tempfile, os + +# Simulate score_rows with low faithfulness and high noise_sensitivity +rows = [ + {"sample_id": f"s{i}", "question": f"q{i}", "answer": f"a{i}", + "ground_truth": f"gt{i}", "faithfulness": 0.3 + i*0.05, + "noise_sensitivity": 0.4 + i*0.02} + for i in range(5) +] + +diags = diagnose(rows, metrics=["faithfulness", "noise_sensitivity"]) +print(f"Diagnosed {len(diags)} metric(s):") +for d in diags: + print(f" {d.metric}: mean={d.mean_score}, severity={d.severity}, samples={len(d.low_samples)}") + +with tempfile.TemporaryDirectory() as tmp: + path = Path(tmp) / "optimization_advice.md" + write_advice( + diagnoses=diags, + llm_markdown="", # fallback mode + advice_path=path, + scenario_name="smoke-test", + run_id="2026-01-01T00-00-00", + judge_model="deepseek-v4-flash", + ) + content = path.read_text(encoding="utf-8") + print(f"\nAdvice file ({len(content)} chars):") + print(content[:600]) + +print("\nSmoke check PASSED") +``` + +``` +python scripts/smoke_advisor.py +``` + +Expected: prints diagnosed metrics, advice content, `Smoke check PASSED`. + +- [ ] **Step 3: Commit smoke script** + +``` +git add scripts/smoke_advisor.py +git commit -m "test(advisor): add smoke-check script for offline wiring verification" +``` + +--- + +## Task 8: Update docs + +**Files:** +- Modify: `docs/rag-eval-engine-flow.md` +- Modify: `docs/rag-eval-architecture.md` + +- [ ] **Step 1: Add advisor section to rag-eval-engine-flow.md** + +Append a new section at the end of `docs/rag-eval-engine-flow.md`: + +```markdown + +--- + +## 15. Optimization Advisor 链路 + +相关代码: + +- `rag_eval/advisor/__init__.py` — 外部入口 `run_advisor()` +- `rag_eval/advisor/rules.py` — 规则引擎(纯函数,无 LLM) +- `rag_eval/advisor/llm_analyzer.py` — LLM 分析器(复用 judge_model) +- `rag_eval/advisor/writer.py` — 写出 `optimization_advice.md` + 日志摘要 + +Advisor 在 `write_run_artifacts()` 之后触发,仅当场景配置 `optimization_advisor: true` 时生效。 + +执行链路: + +```text +run_advisor(result, scenario, llm) + -> rules.diagnose(score_rows, metrics) # 识别异常指标,选取低分样本 + -> llm_analyzer.analyze(diagnoses, llm) # LLM 生成中文建议(失败自动降级) + -> writer.write_advice(...) # 写 optimization_advice.md + 日志摘要 +``` + +输出产物追加在现有 run 目录: + +```text +outputs/online/siemens-pdf-question-bank// + ...(现有文件) + optimization_advice.md ← 新增(optimization_advisor: true 时生成) +``` +``` + +- [ ] **Step 2: Add advisor note to rag-eval-architecture.md §9.4** + +In `docs/rag-eval-architecture.md`, find the end of section `### 9.4 指标编排` and append: + +```markdown + +**Optimization Advisor(§11 优化策略落地):** + +评测结束后,若场景配置 `optimization_advisor: true`,则自动调用 `rag_eval/advisor/` 模块: +- 规则引擎(`rules.py`)对 7 个指标各自设阈值,识别触发项并选取 top-3 低分样本 +- LLM 分析器(`llm_analyzer.py`)结合低分样本生成中文 Markdown 优化建议(复用 judge_model,失败自动降级为纯规则报告) +- 写出层(`writer.py`)输出 `optimization_advice.md` 并打日志摘要 + +```yaml +# 场景配置示例 +optimization_advisor: true +``` +``` + +- [ ] **Step 3: Commit docs** + +``` +git add docs/rag-eval-engine-flow.md docs/rag-eval-architecture.md +git commit -m "docs: add optimization advisor section to engine-flow and architecture docs" +``` + +--- + +## Self-Review + +**Spec coverage check:** +- §2 决策摘要 → Task 5 (llm reuse), Task 6 (YAML trigger), Task 1+2+3 (outputs) ✓ +- §3.1 执行链路 → Task 5 runner.py wiring ✓ +- §3.2 新增文件 → Tasks 1, 2, 3 ✓ +- §3.3 修改文件 → Task 4 (models/schema/loader/artifacts), Task 5 (factory/runner) ✓ +- §3.4 输出产物 → Task 4 advice_md in RunArtifactPaths ✓ +- §4 规则引擎 7条规则 → Task 1 rules.py METRIC_RULES ✓ +- §4.3 low_samples top-3 → Task 1 `_select_low_samples` ✓ +- §5 LLM分析器 → Task 3 llm_analyzer.py ✓ +- §5.4 失败降级 → Task 3 try/except returns "" → Task 2 writer fallback ✓ +- §6.1 文件写出 + §6.2 日志摘要 → Task 2 writer.py ✓ +- §7 YAML配置 → Task 6 ✓ +- §8 测试策略 → Tasks 1 (rules tests), 2 (writer tests) ✓ +- §9 非目标 → not implemented ✓ + +**Type consistency check:** +- `Diagnosis` defined in `rules.py`, imported in `__init__.py`, `llm_analyzer.py`, `writer.py` — all consistent ✓ +- `write_advice()` signature matches calls in `__init__.py` ✓ +- `build_models()` returns `tuple[Any, Any]`; `build_metric_pipeline()` accepts `llm, embeddings` — consistent ✓ +- `run_advisor(result, scenario, llm)` signature matches call in `runner.py` ✓ + +**Placeholder scan:** No TBD/TODO/fill-in-later found ✓ diff --git a/docs/superpowers/specs/2026-06-16-optimization-advisor-design.md b/docs/superpowers/specs/2026-06-16-optimization-advisor-design.md new file mode 100644 index 0000000..84a6c77 --- /dev/null +++ b/docs/superpowers/specs/2026-06-16-optimization-advisor-design.md @@ -0,0 +1,225 @@ +# 优化顾问模块设计 Spec + +- 日期:2026-06-16 +- 状态:已确认,进入实现。 + +## 1. 目标 + +在现有 RAG 评测流程结束后,新增一个**优化顾问模块**(Optimization Advisor),根据本次评测的多项指标分数与低分样本,自动诊断指标偏低的原因并给出针对性的优化建议,输出为中文 Markdown 报告 + 日志摘要。 + +对应架构设计 §11(优化策略):将"指标到动作的映射"(§11.2)从文档形式落地为代码自动执行。 + +--- + +## 2. 决策摘要 + +| 决策点 | 选择 | +|---|---| +| 输出形式 | `optimization_advice.md`(文件)+ 控制台/日志摘要(双输出) | +| 生成机制 | 规则引擎定位异常指标 → LLM 结合低分样本二次解读(两层) | +| 触发方式 | YAML 场景文件显式声明 `optimization_advisor: true`,默认关闭 | +| LLM 实例 | 复用 `build_models()` 已创建的 `llm` 实例,不重建 client | +| 包位置 | `rag_eval/advisor/`(独立包,对外暴露 `run_advisor()` 单一入口) | + +--- + +## 3. 架构 + +### 3.1 执行链路 + +``` +run_scenario() + → load_scenario() # 读 YAML,解析 optimization_advisor 字段 + → build_models() # 已有:创建 llm, embeddings + → build_metric_pipeline() # 已有 + → Evaluator.evaluate() # 已有:打分 → EvaluationResult + → write_run_artifacts() # 已有:scores.csv / summary.md / ... + → run_advisor( # 新增(3 行) + result, scenario, llm, artifact_paths + ) + → rules.diagnose(score_rows) # 规则引擎:返回 Diagnosis 列表 + → llm_analyzer.analyze(diags, samples) # LLM:生成中文 Markdown 建议 + → writer.write(advice, paths) # 写文件 + 打日志 +``` + +### 3.2 新增文件 + +``` +rag_eval/advisor/ + __init__.py ← 暴露 run_advisor(),外部唯一入口 + rules.py ← 纯函数规则引擎,无 LLM,可单独单测 + llm_analyzer.py ← 接收 llm 实例 + 诊断结构 → 中文 Markdown + writer.py ← 写 optimization_advice.md,打日志摘要 +``` + +### 3.3 修改文件(最小改动) + +| 文件 | 改动 | +|---|---| +| `rag_eval/shared/models.py` | `Scenario` 加 `optimization_advisor: bool = False` 字段 | +| `rag_eval/config/schema.py` | `ScenarioModel` 加同名字段 + 透传到 `Scenario` | +| `rag_eval/config/loader.py` | 透传 `optimization_advisor` 到 `Scenario` 构造 | +| `rag_eval/reporting/artifacts.py` | `RunArtifactPaths` 加 `advice_md: Path` 字段 + `build_artifact_paths()` 加赋值 | +| `rag_eval/execution/runner.py` | `run_scenario()` 末尾:`build_models` 返回 llm 传入,条件调用 `run_advisor()` | + +### 3.4 输出产物 + +``` +outputs/online/siemens-pdf-question-bank// + scenario.snapshot.yaml + scores.csv + invalid.csv + summary.md + metadata.json + optimization_advice.md ← 新增(optimization_advisor: true 时生成) +``` + +--- + +## 4. 规则引擎(rules.py) + +### 4.1 数据结构 + +```python +@dataclass +class Diagnosis: + metric: str # 指标名 + mean_score: float # 本次均值 + threshold: float # 警戒阈值 + severity: str # "warning" | "critical" + root_causes: list[str] # 可能原因(来自架构设计 §11.2) + suggested_actions: list[str] # 对应可调阶段 + low_samples: list[dict] # 分数最低的 N 条样本(含 question/answer/ground_truth) +``` + +### 4.2 七条指标诊断规则 + +阈值参考 RAG 评测最佳实践,分 warning / critical 两档: + +| 指标 | warning | critical | 根因方向 | 对应优化阶段(§11.2) | +|---|---|---|---|---| +| `faithfulness` | < 0.7 | < 0.5 | 生成未严格基于检索片段 / 幻觉 | 生成 prompt grounding、开启校验 | +| `answer_relevancy` | < 0.7 | < 0.5 | 回答偏离问题 / 格式冗余 | 查询改写、生成 prompt 格式 | +| `context_recall` | < 0.7 | < 0.5 | 检索遗漏关键信息 | 多查询、问题分解、Step-back、加大过召回 | +| `context_precision` | < 0.6 | < 0.4 | 检索引入过多噪声 / 排序差 | 后检索重排、压缩、相关性过滤 | +| `noise_sensitivity` | > 0.3 | > 0.5 | 回答被噪声片段干扰(越低越好) | 后检索相关性过滤、重排 | +| `factual_correctness` | < 0.6 | < 0.4 | 回答事实与标准答案偏差大 | 检索与生成综合优化 | +| `semantic_similarity` | < 0.7 | < 0.5 | 回答语义与标准答案差距大 | 生成 prompt、检索质量 | + +> 注:`noise_sensitivity` 越低越好(0=完全不受噪声影响),其阈值方向与其余相反。 + +### 4.3 低分样本选取 + +每个触发诊断的指标,取该指标分数最低的 **top-3** 样本(排除 NaN)附入 `Diagnosis.low_samples`,字段包含 `sample_id / question / answer / ground_truth / `。 + +--- + +## 5. LLM 分析器(llm_analyzer.py) + +### 5.1 输入 + +- `diagnoses: list[Diagnosis]` — 规则引擎输出(仅触发阈值的指标) +- `llm` — 已有 RAGAS LLM 实例(scenario 的 judge_model) +- `scenario_name: str` — 用于报告标题 + +### 5.2 Prompt 设计 + +使用**一次 LLM 调用**,把所有触发诊断的指标和低分样本一起发送: + +``` +你是一个 RAG 系统优化专家,正在分析西门子医疗 CT 文档问答系统的评测结果。 +请用中文撰写一份优化建议报告,格式为 Markdown。 + +## 评测诊断摘要 +{for each diagnosis: 指标名、均值、阈值、可能原因、建议动作} + +## 低分样本示例 +{for each diagnosis: top-3 低分样本的 question / answer / ground_truth} + +## 要求 +1. 按指标分节(## 指标名),先解释"为什么低",再给出"具体怎么改" +2. "具体怎么改"要结合低分样本的具体内容,而不只是泛泛建议 +3. 最后写一节 ## 优先优化次序,按性价比排序(参考:不增加调用次数的优先) +4. 语言简洁,面向工程师,不要废话 +``` + +### 5.3 输出 + +LLM 返回的 Markdown 字符串,直接写入 `optimization_advice.md`(在报告头部追加运行元信息)。 + +### 5.4 失败降级 + +LLM 调用失败(超时/异常)时:降级为**纯规则报告**(只输出规则引擎的诊断结构,不含 LLM 解读),文件照常写出,错误信息写入报告末尾,不阻断整个评测流程。 + +--- + +## 6. 写出层(writer.py) + +### 6.1 文件写出 + +`optimization_advice.md` 结构: + +```markdown +# 优化建议报告 — + +- run_id: `` +- 生成时间: `` +- judge_model: `` + +--- + + +``` + +### 6.2 日志摘要 + +`run_advisor()` 完成后向 `logger.info` 打印一条精简摘要(单行,适合 `run_eval.bat` 结束后一眼扫到): + +``` +[advisor] 触发诊断 3 项: faithfulness(0.42, critical) context_recall(0.58, warning) noise_sensitivity(0.41, critical) +[advisor] 优化建议已写出: outputs/online/.../optimization_advice.md +``` + +--- + +## 7. YAML 配置 + +场景文件新增一个顶层字段: + +```yaml +optimization_advisor: true # 默认 false;true 时评测结束后自动生成优化建议 +``` + +后续若需精细配置(阈值覆盖、top-N 低分样本数),可扩展为: + +```yaml +optimization_advisor: + enabled: true + top_low_samples: 3 # 每个指标取几条低分样本(默认 3) + # thresholds: # 可选:覆盖默认阈值 + # faithfulness: 0.65 +``` + +本轮实现仅支持 `optimization_advisor: true/false`,扩展接口预留但不实现。 + +--- + +## 8. 测试策略 + +| 测试 | 文件 | 说明 | +|---|---|---| +| 规则引擎单测 | `tests/test_advisor_rules.py` | 纯函数,无 LLM,覆盖每条规则的 warning/critical 触发、NaN 跳过、low_samples 选取 | +| writer 单测 | `tests/test_advisor_writer.py` | mock Diagnosis 列表,验证 md 文件写出格式和日志输出 | +| 集成(可选) | 现有 `tests/test_online_eval.py` | 验证 `optimization_advisor: true` 场景下 advice_md 存在 | + +LLM 分析器不写单测(依赖网络),由集成场景覆盖。 + +--- + +## 9. 不覆盖(本轮边界) + +- 不支持跨版本对比分析(只分析本次 run) +- 不支持批量场景聚合建议 +- 不建设 Web UI 展示 +- LLM 分析器 prompt 本轮不做多语言适配(直接中文) +- advisor 阈值本轮硬编码在 `rules.py`,不从 YAML 读取 diff --git a/rag_eval/advisor/__init__.py b/rag_eval/advisor/__init__.py new file mode 100644 index 0000000..7ea2cb6 --- /dev/null +++ b/rag_eval/advisor/__init__.py @@ -0,0 +1,67 @@ +"""Optimization advisor: rule-based diagnosis + LLM-powered recommendations.""" +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from rag_eval.reporting.artifacts import build_artifact_paths +from rag_eval.shared.models import EvaluationResult, Scenario + +from .llm_analyzer import analyze +from .rules import Diagnosis, diagnose +from .writer import write_advice + +logger = logging.getLogger("rag_eval.advisor") + +__all__ = ["run_advisor", "Diagnosis", "diagnose"] + + +def run_advisor( + result: EvaluationResult, + scenario: Scenario, + llm: Any, +) -> None: + """Run the full optimization advisor pipeline after an evaluation completes. + + Skips silently if scenario.optimization_advisor is False. + Never raises — failures are logged as warnings, not exceptions. + + Args: + result: Completed EvaluationResult from Evaluator.evaluate(). + scenario: The resolved Scenario (provides metrics, judge_model, output_dir). + llm: Pre-built RAGAS LLM instance (from build_models()) for LLM analysis. + """ + if not scenario.optimization_advisor: + return + + logger.info("[advisor] starting optimization analysis scenario=%s", scenario.scenario_name) + + try: + artifact_paths = build_artifact_paths(scenario.output_dir, result.run_id) + if artifact_paths.advice_md is None: + logger.warning("[advisor] advice_md path not set in RunArtifactPaths — skipping") + return + + diagnoses = diagnose(result.score_rows, scenario.metrics) + logger.info("[advisor] rule diagnosis complete: %d metric(s) triggered", len(diagnoses)) + + if diagnoses: + llm_markdown = asyncio.run(analyze(diagnoses, llm, scenario.scenario_name)) + else: + llm_markdown = "" + + write_advice( + diagnoses=diagnoses, + llm_markdown=llm_markdown, + advice_path=artifact_paths.advice_md, + scenario_name=scenario.scenario_name, + run_id=result.run_id, + judge_model=scenario.judge_model, + ) + + except Exception as exc: + logger.warning( + "[advisor] advisor failed (%s: %s) — evaluation result is unaffected", + type(exc).__name__, exc, + ) diff --git a/rag_eval/advisor/llm_analyzer.py b/rag_eval/advisor/llm_analyzer.py new file mode 100644 index 0000000..df9140c --- /dev/null +++ b/rag_eval/advisor/llm_analyzer.py @@ -0,0 +1,99 @@ +"""LLM-powered analysis of rule diagnostics and low-score samples.""" +from __future__ import annotations + +import logging +from typing import Any + +from .rules import Diagnosis + +logger = logging.getLogger("rag_eval.advisor") + +_PROMPT_TEMPLATE = """\ +你是一个 RAG 系统优化专家,正在分析西门子医疗 CT 文档问答系统的评测结果。 +请用中文撰写一份优化建议报告,格式为 Markdown。 + +## 评测诊断摘要 + +{diagnosis_summary} + +## 低分样本示例 + +{low_sample_text} + +## 报告要求 + +1. 按指标分节(## 指标名 [severity]),先解释"为什么低"(结合低分样本具体分析),再给出"具体怎么改" +2. "具体怎么改"要结合低分样本的实际内容,而不只是泛泛建议 +3. 最后写一节 **## 优先优化次序**,按性价比排序(不增加 LLM 调用次数的优化优先) +4. 语言简洁,面向工程师,不要废话,不要重复列表内容 + +只输出 Markdown 报告正文,不要任何前置说明。 +""" + + +def _build_diagnosis_summary(diagnoses: list[Diagnosis]) -> str: + lines = [] + for d in diagnoses: + direction = "(越低越好)" if d.metric == "noise_sensitivity" else "" + lines.append( + f"- **{d.metric}** {direction} 均值={d.mean_score:.4f}," + f"阈值={d.threshold},严重程度={d.severity}" + ) + lines.append(f" - 可能原因:{'; '.join(d.root_causes)}") + lines.append(f" - 建议动作:{'; '.join(d.suggested_actions)}") + return "\n".join(lines) + + +def _build_low_sample_text(diagnoses: list[Diagnosis]) -> str: + lines = [] + for d in diagnoses: + if not d.low_samples: + continue + lines.append(f"### {d.metric} 低分样本(最多 3 条)") + for i, s in enumerate(d.low_samples, 1): + score = s.get(d.metric, "N/A") + lines.append(f"\n**样本 {i}**(分数={score})") + lines.append(f"- 问题:{s.get('question', '')}") + lines.append(f"- 回答:{s.get('answer', '')[:300]}") + lines.append(f"- 标准答案:{s.get('ground_truth', '')[:200]}") + return "\n".join(lines) + + +async def analyze( + diagnoses: list[Diagnosis], + llm: Any, + scenario_name: str, +) -> str: + """Call the judge LLM to generate a Chinese optimization report. + + Args: + diagnoses: Non-empty list of Diagnosis from rules.diagnose(). + llm: RAGAS LLM wrapper (has .agenerate() method). + scenario_name: Used only for logging. + + Returns: + LLM-generated Markdown string, or "" on failure (triggers writer fallback). + """ + if not diagnoses: + return "" + + diagnosis_summary = _build_diagnosis_summary(diagnoses) + low_sample_text = _build_low_sample_text(diagnoses) + prompt = _PROMPT_TEMPLATE.format( + diagnosis_summary=diagnosis_summary, + low_sample_text=low_sample_text, + ) + + try: + logger.info("[advisor] calling LLM for optimization analysis scenario=%s", scenario_name) + from langchain_core.messages import HumanMessage + result = await llm.agenerate(texts=[[HumanMessage(content=prompt)]]) + text = result.generations[0][0].text.strip() + logger.info("[advisor] LLM analysis complete chars=%d", len(text)) + return text + except Exception as exc: + logger.warning( + "[advisor] LLM analysis failed (%s: %s) — falling back to rule report", + type(exc).__name__, exc, + ) + return "" diff --git a/rag_eval/advisor/rules.py b/rag_eval/advisor/rules.py new file mode 100644 index 0000000..8de7dc1 --- /dev/null +++ b/rag_eval/advisor/rules.py @@ -0,0 +1,236 @@ +"""Rule-based diagnostic engine for RAG evaluation metric scores.""" +from __future__ import annotations + +import math +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class MetricRule: + """Threshold configuration and diagnostic text for one metric.""" + warning_threshold: float + critical_threshold: float + higher_is_better: bool # False for noise_sensitivity + root_causes: list[str] + suggested_actions: list[str] + + +METRIC_RULES: dict[str, MetricRule] = { + "faithfulness": MetricRule( + warning_threshold=0.7, + critical_threshold=0.5, + higher_is_better=True, + root_causes=[ + "生成回答包含检索片段中不支持的陈述(幻觉)", + "生成阶段未严格遵循 grounding 约束", + "校验阶段未开启或未生效", + ], + suggested_actions=[ + "强化生成 prompt 的 grounding 约束('只依据参考资料作答')", + "开启校验阶段(validation: by_scenario)", + "检查低分样本中模型是否引用了片段外的知识", + ], + ), + "answer_relevancy": MetricRule( + warning_threshold=0.7, + critical_threshold=0.5, + higher_is_better=True, + root_causes=[ + "回答偏离问题主旨或包含大量冗余内容", + "查询改写后问题语义漂移", + "生成 prompt 格式约束不足", + ], + suggested_actions=[ + "优化查询改写 prompt,确保改写后语义不偏移", + "在生成 prompt 中加入'简洁准确、直接回答问题'的约束", + "检查低分样本的回答是否存在格式冗余或话题偏移", + ], + ), + "context_recall": MetricRule( + warning_threshold=0.7, + critical_threshold=0.5, + higher_is_better=True, + root_causes=[ + "检索未能召回标准答案所涉及的关键信息", + "单一查询未能覆盖问题的多个角度", + "过召回数量不足,关键片段被截断", + ], + suggested_actions=[ + "启用多查询扩展(use_multi_query)覆盖不同措辞", + "对多跳问题启用问题分解(sub_questions)", + "加大过召回宽度(recall_top_k)", + "对颗粒度细的问题尝试 Step-back 双路检索", + ], + ), + "context_precision": MetricRule( + warning_threshold=0.6, + critical_threshold=0.4, + higher_is_better=True, + root_causes=[ + "检索引入过多与问题无关的片段", + "重排未能将相关片段排在前列", + "缺少相关性过滤,噪声片段进入上下文", + ], + suggested_actions=[ + "启用或优化 listwise 重排,将相关片段排在前列", + "启用上下文压缩(compression)过滤无关句子", + "启用相关性过滤(relevance_filter)丢弃明确无关片段", + "缩小 rerank_keep_k(如从 8 降到 5)", + ], + ), + "noise_sensitivity": MetricRule( + warning_threshold=0.3, # higher is worse; trigger when mean > threshold + critical_threshold=0.5, + higher_is_better=False, + root_causes=[ + "回答中包含检索到的噪声片段所引入的错误陈述", + "相关性过滤未能拦截干扰性片段", + "生成阶段对噪声片段未加区分地引用", + ], + suggested_actions=[ + "启用相关性过滤(relevance_filter)拦截噪声", + "优化重排,将不相关片段排到截断点之后", + "在生成 prompt 中强调'来源冲突时并列陈述,不擅自下定论'", + ], + ), + "factual_correctness": MetricRule( + warning_threshold=0.6, + critical_threshold=0.4, + higher_is_better=True, + root_causes=[ + "回答的事实陈述与标准答案存在偏差", + "检索未能命中标准答案所依据的关键片段", + "生成阶段对多个来源综合时产生事实错误", + ], + suggested_actions=[ + "重点检查低分样本,确认是检索遗漏还是生成错误", + "提升 context_recall 以确保关键信息被检索到", + "对事实型问题将 temperature 降至 0", + ], + ), + "semantic_similarity": MetricRule( + warning_threshold=0.7, + critical_threshold=0.5, + higher_is_better=True, + root_causes=[ + "回答语义与标准答案差距较大", + "回答过于简短或过于冗长,语义偏移", + "检索到的片段质量不足,导致生成内容偏离", + ], + suggested_actions=[ + "检查低分样本的回答与标准答案的表述差异", + "优化生成 prompt 使回答更贴近标准表述风格", + "提升检索质量(context_recall / context_precision)", + ], + ), +} + + +@dataclass +class Diagnosis: + """Diagnostic result for one metric that triggered a threshold.""" + metric: str + mean_score: float + threshold: float # the triggered threshold + severity: str # "warning" | "critical" + root_causes: list[str] = field(default_factory=list) + suggested_actions: list[str] = field(default_factory=list) + low_samples: list[dict[str, Any]] = field(default_factory=list) + + +def _mean_ignoring_nan(values: list[float]) -> float | None: + valid = [v for v in values if not math.isnan(v)] + if not valid: + return None + return sum(valid) / len(valid) + + +def _select_low_samples( + rows: list[dict[str, Any]], + metric: str, + top_n: int, + higher_is_better: bool, +) -> list[dict[str, Any]]: + """Return the top_n worst-scoring rows for a metric, excluding NaN.""" + valid = [r for r in rows if metric in r and not math.isnan(float(r[metric]))] + sorted_rows = sorted(valid, key=lambda r: float(r[metric]), reverse=not higher_is_better) + worst = sorted_rows[:top_n] + keep_keys = {"sample_id", "question", "answer", "ground_truth", metric} + return [{k: v for k, v in row.items() if k in keep_keys} for row in worst] + + +def diagnose( + score_rows: list[dict[str, Any]], + metrics: list[str], + top_low_samples: int = 3, +) -> list[Diagnosis]: + """Analyse score_rows and return a Diagnosis for each metric below threshold. + + Args: + score_rows: List of per-sample score dicts (from EvaluationResult.score_rows). + metrics: Metric names to evaluate (from Scenario.metrics). + top_low_samples: How many worst-scoring samples to attach per diagnosis. + + Returns: + List of Diagnosis objects, one per triggered metric. Empty if all OK. + """ + diagnoses: list[Diagnosis] = [] + + for metric in metrics: + rule = METRIC_RULES.get(metric) + if rule is None: + continue # unknown metric, skip + + values = [] + for row in score_rows: + raw = row.get(metric) + if raw is None: + continue + try: + v = float(raw) + except (TypeError, ValueError): + continue + values.append(v) + + if not values: + continue + + mean = _mean_ignoring_nan(values) + if mean is None: + continue + + # Determine severity (direction-aware) + if rule.higher_is_better: + if mean < rule.critical_threshold: + severity = "critical" + threshold = rule.critical_threshold + elif mean < rule.warning_threshold: + severity = "warning" + threshold = rule.warning_threshold + else: + continue # above warning threshold → no diagnosis + else: + # lower is better (noise_sensitivity) + if mean > rule.critical_threshold: + severity = "critical" + threshold = rule.critical_threshold + elif mean > rule.warning_threshold: + severity = "warning" + threshold = rule.warning_threshold + else: + continue + + low_samples = _select_low_samples(score_rows, metric, top_low_samples, rule.higher_is_better) + + diagnoses.append(Diagnosis( + metric=metric, + mean_score=round(mean, 4), + threshold=threshold, + severity=severity, + root_causes=list(rule.root_causes), + suggested_actions=list(rule.suggested_actions), + low_samples=low_samples, + )) + + return diagnoses diff --git a/rag_eval/advisor/writer.py b/rag_eval/advisor/writer.py new file mode 100644 index 0000000..e46c919 --- /dev/null +++ b/rag_eval/advisor/writer.py @@ -0,0 +1,82 @@ +"""Write optimization advice to markdown file and emit log summary.""" +from __future__ import annotations + +import logging +from pathlib import Path + +from .rules import Diagnosis + +logger = logging.getLogger("rag_eval.advisor") + + +def _format_log_summary(diagnoses: list[Diagnosis], advice_path: Path) -> str: + """Return a single-line log summary of triggered diagnoses.""" + if not diagnoses: + return "[advisor] 所有指标正常,无需优化建议。" + parts = [f"{d.metric}({d.mean_score:.2f}, {d.severity})" for d in diagnoses] + triggered = " ".join(parts) + return f"[advisor] 触发诊断 {len(diagnoses)} 项: {triggered} → {advice_path}" + + +def _build_fallback_report(diagnoses: list[Diagnosis]) -> str: + """Build a rules-only report when LLM analysis is unavailable.""" + if not diagnoses: + return "" + lines = ["## 规则诊断(LLM 分析不可用)\n"] + for d in diagnoses: + lines.append(f"### {d.metric} [{d.severity}] 均值={d.mean_score:.4f}") + lines.append("\n**可能原因:**") + for cause in d.root_causes: + lines.append(f"- {cause}") + lines.append("\n**建议动作:**") + for action in d.suggested_actions: + lines.append(f"- {action}") + lines.append("") + return "\n".join(lines) + + +def write_advice( + diagnoses: list[Diagnosis], + llm_markdown: str, + advice_path: Path, + scenario_name: str, + run_id: str, + judge_model: str, +) -> None: + """Write optimization_advice.md and emit a log summary line. + + Args: + diagnoses: List of Diagnosis from rules.diagnose(). + llm_markdown: LLM-generated Markdown body. Empty string triggers fallback. + advice_path: Full path to write the .md file. + scenario_name: Human-readable scenario identifier for the report header. + run_id: Run identifier string. + judge_model: Model used for LLM analysis (shown in header). + """ + advice_path.parent.mkdir(parents=True, exist_ok=True) + + from rag_eval.shared.utils import utc_now_iso + header_lines = [ + f"# 优化建议报告 — {scenario_name}", + "", + f"- run_id: `{run_id}`", + f"- 生成时间: `{utc_now_iso()}`", + f"- judge_model: `{judge_model}`", + "", + "---", + "", + ] + + if not diagnoses: + body = "## ✅ 未发现明显指标异常\n\n所有指标均在正常范围内,当前 RAG 链路表现良好。\n" + elif llm_markdown: + body = llm_markdown + else: + body = _build_fallback_report(diagnoses) + + content = "\n".join(header_lines) + body + advice_path.write_text(content, encoding="utf-8") + + summary = _format_log_summary(diagnoses, advice_path) + logger.info(summary) + logger.info("[advisor] 优化建议已写出: %s", advice_path) diff --git a/rag_eval/config/loader.py b/rag_eval/config/loader.py index 78fd7af..f4ffd4b 100644 --- a/rag_eval/config/loader.py +++ b/rag_eval/config/loader.py @@ -61,6 +61,7 @@ def load_scenario(path: str | Path) -> Scenario: max_samples=model.runtime.max_samples, ), source_path=scenario_path, + optimization_advisor=model.optimization_advisor, ) # Run cross-field checks after all relative paths have been resolved. validate_scenario(scenario) diff --git a/rag_eval/config/schema.py b/rag_eval/config/schema.py index f319c17..f36e8ac 100644 --- a/rag_eval/config/schema.py +++ b/rag_eval/config/schema.py @@ -54,6 +54,7 @@ class ScenarioModel(BaseModel): metrics: list[str] output_dir: str runtime: RuntimeConfigModel = Field(default_factory=RuntimeConfigModel) + optimization_advisor: bool = False @field_validator("metrics") @classmethod diff --git a/rag_eval/execution/runner.py b/rag_eval/execution/runner.py index 46a1824..a1e4b03 100644 --- a/rag_eval/execution/runner.py +++ b/rag_eval/execution/runner.py @@ -8,8 +8,9 @@ from pathlib import Path from rag_eval.adapters.http import HttpAppAdapter from rag_eval.adapters.python import PythonFunctionAdapter +from rag_eval.advisor import run_advisor from rag_eval.config.loader import load_scenario -from rag_eval.metrics.factory import build_metric_pipeline +from rag_eval.metrics.factory import build_models, build_metric_pipeline from rag_eval.reporting.writers import write_run_artifacts from rag_eval.settings import EvaluationSettings from rag_eval.shared.models import Scenario @@ -67,10 +68,17 @@ def run_scenario( logger.info("[runner] scenario loaded: name=%s mode=%s max_samples=%s", scenario.scenario_name, scenario.mode, scenario.runtime.max_samples) + # Build models once; reuse llm in both MetricPipeline and advisor. + llm, embeddings = build_models(scenario.judge_model, scenario.embedding_model, settings) + adapter = build_adapter(scenario) - pipeline = build_metric_pipeline(scenario, settings) + pipeline = build_metric_pipeline(scenario, settings, llm=llm, embeddings=embeddings) evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline, app_adapter=adapter) result = evaluator.evaluate() write_run_artifacts(result) logger.info("[runner] artifacts written for run_id=%s", result.run_id) + + # Optimization advisor — runs only if scenario.optimization_advisor is True. + run_advisor(result, scenario, llm) + return result diff --git a/rag_eval/metrics/factory.py b/rag_eval/metrics/factory.py index 7ab1445..4c2cbcd 100644 --- a/rag_eval/metrics/factory.py +++ b/rag_eval/metrics/factory.py @@ -18,7 +18,10 @@ from ragas.metrics.collections import ( AnswerRelevancy, ContextPrecision, ContextRecall, + FactualCorrectness, Faithfulness, + NoiseSensitivity, + SemanticSimilarity, ) from .pipeline import MetricPipeline @@ -39,19 +42,34 @@ def build_models( def build_metric_pipeline( scenario: Scenario, settings: EvaluationSettings, + llm: Any | None = None, + embeddings: Any | None = None, ) -> MetricPipeline: - """Build a metric pipeline containing only the metrics requested by the scenario.""" - llm, embeddings = build_models( - scenario.judge_model, - scenario.embedding_model, - settings, - ) + """Build a metric pipeline containing only the metrics requested by the scenario. + + If llm and embeddings are provided (pre-built by the caller), they are reused. + Otherwise, new instances are created from scenario + settings. + """ + if llm is None or embeddings is None: + llm, embeddings = build_models( + scenario.judge_model, + scenario.embedding_model, + settings, + ) + # Build the full registry once, then slice it by configured metric names. registry: dict[str, Any] = { "faithfulness": Faithfulness(llm=llm), "answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings), "context_recall": ContextRecall(llm=llm), "context_precision": ContextPrecision(llm=llm), + # Robustness / end-to-end metrics (架构设计 §10.2). + # NoiseSensitivity mode='relevant': sensitivity to noise from relevant contexts. + "noise_sensitivity": NoiseSensitivity(llm=llm), + # FactualCorrectness mode='f1': balances claim precision and recall vs. ground truth. + "factual_correctness": FactualCorrectness(llm=llm), + # SemanticSimilarity: embedding cosine between answer and ground truth (no LLM call). + "semantic_similarity": SemanticSimilarity(embeddings=embeddings), } return MetricPipeline( metrics={name: registry[name] for name in scenario.metrics}, diff --git a/rag_eval/reporting/artifacts.py b/rag_eval/reporting/artifacts.py index 647207e..0cbec98 100644 --- a/rag_eval/reporting/artifacts.py +++ b/rag_eval/reporting/artifacts.py @@ -17,4 +17,5 @@ def build_artifact_paths(output_dir: Path, run_id: str) -> RunArtifactPaths: invalid_csv=run_dir / "invalid.csv", summary_md=run_dir / "summary.md", metadata_json=run_dir / "metadata.json", + advice_md=run_dir / "optimization_advice.md", ) diff --git a/rag_eval/shared/models.py b/rag_eval/shared/models.py index e8f4e6e..9284788 100644 --- a/rag_eval/shared/models.py +++ b/rag_eval/shared/models.py @@ -76,6 +76,7 @@ class Scenario: runtime: RuntimeConfig = field(default_factory=RuntimeConfig) app_adapter: AppAdapterConfig | None = None source_path: Path | None = None + optimization_advisor: bool = False def snapshot(self) -> dict[str, Any]: """Serialize the scenario into a reporting-friendly dictionary snapshot.""" @@ -159,3 +160,4 @@ class RunArtifactPaths: invalid_csv: Path summary_md: Path metadata_json: Path + advice_md: Path | None = None diff --git a/scenarios/online/siemens-pdf-question-bank-online.yaml b/scenarios/online/siemens-pdf-question-bank-online.yaml index defc90d..4a614b4 100644 --- a/scenarios/online/siemens-pdf-question-bank-online.yaml +++ b/scenarios/online/siemens-pdf-question-bank-online.yaml @@ -1,13 +1,19 @@ scenario_name: siemens-pdf-question-bank-online mode: online dataset: ../../datasets/raw/generated/siemens-pdf-question-bank.csv +# judge_model: qwen3.5-flash judge_model: deepseek-v4-flash embedding_model: text-embedding-v3 +optimization_advisor: true # 评测结束后自动生成优化建议报告 metrics: - faithfulness - answer_relevancy - context_recall - context_precision + # 已启用:鲁棒性 / 端到端指标(数据集已含 ground_truth) + - noise_sensitivity # 鲁棒性:对检索噪声的敏感度 + - factual_correctness # 端到端:事实正确性(相对标准答案) + - semantic_similarity # 端到端:语义相似度(embedding,无 LLM 调用) output_dir: ../../outputs/online/siemens-pdf-question-bank runtime: batch_size: 4 diff --git a/scripts/smoke_advisor.py b/scripts/smoke_advisor.py new file mode 100644 index 0000000..cfa5b98 --- /dev/null +++ b/scripts/smoke_advisor.py @@ -0,0 +1,59 @@ +"""Offline smoke-check for the advisor module wiring (no network required).""" +import math +import sys +import tempfile +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from rag_eval.advisor.rules import diagnose +from rag_eval.advisor.writer import write_advice, _format_log_summary + +# Simulate score_rows with low faithfulness and high noise_sensitivity +rows = [ + { + "sample_id": f"s{i}", + "question": f"问题{i}:西门子CT扫描的Flash技术原理是什么?", + "answer": f"答案{i}:Flash技术采用双源CT扫描", + "ground_truth": f"标准答案{i}:Flash扫描利用双源CT和大螺距实现超低辐射剂量扫描", + "faithfulness": 0.3 + i * 0.05, + "noise_sensitivity": 0.4 + i * 0.02, + "context_recall": 0.75, + "semantic_similarity": 0.65, + } + for i in range(5) +] + +diags = diagnose(rows, metrics=["faithfulness", "noise_sensitivity", "context_recall", "semantic_similarity"]) +print(f"Diagnosed {len(diags)} metric(s):") +for d in diags: + print(f" {d.metric}: mean={d.mean_score}, severity={d.severity}, low_samples={len(d.low_samples)}") + +assert len(diags) >= 2, f"Expected at least 2 diagnoses, got {len(diags)}" +metrics_hit = {d.metric for d in diags} +assert "faithfulness" in metrics_hit, "faithfulness should be triggered" +assert "noise_sensitivity" in metrics_hit, "noise_sensitivity should be triggered" + +with tempfile.TemporaryDirectory() as tmp: + path = Path(tmp) / "optimization_advice.md" + write_advice( + diagnoses=diags, + llm_markdown="", # fallback mode (no LLM) + advice_path=path, + scenario_name="smoke-test-siemens", + run_id="2026-06-16T00-00-00", + judge_model="deepseek-v4-flash", + ) + content = path.read_text(encoding="utf-8") + assert "smoke-test-siemens" in content, "scenario name missing from report" + assert "faithfulness" in content, "faithfulness missing from report" + assert "noise_sensitivity" in content, "noise_sensitivity missing from report" + print(f"\nAdvice file ({len(content)} chars) — assertions OK") + +# Verify log summary format +summary = _format_log_summary(diags, Path("optimization_advice.md")) +print(f"\nLog summary length: {len(summary)} chars, faithfulness present: {'faithfulness' in summary}") +assert "触发诊断" in summary +assert "faithfulness" in summary + +print("\nSmoke check PASSED") diff --git a/tests/test_advisor_writer.py b/tests/test_advisor_writer.py new file mode 100644 index 0000000..e2dd190 --- /dev/null +++ b/tests/test_advisor_writer.py @@ -0,0 +1,113 @@ +import shutil +import unittest +from pathlib import Path + +from rag_eval.advisor.rules import Diagnosis +from rag_eval.advisor.writer import write_advice, _format_log_summary + + +class TestWriteAdvice(unittest.TestCase): + def setUp(self): + self.tmp = Path("tests/.tmp/test_advisor_writer") + shutil.rmtree(self.tmp, ignore_errors=True) + self.tmp.mkdir(parents=True, exist_ok=True) + self.advice_path = self.tmp / "optimization_advice.md" + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _make_diagnosis(self, metric="faithfulness", severity="warning"): + return Diagnosis( + metric=metric, + mean_score=0.55, + threshold=0.7, + severity=severity, + root_causes=["原因1", "原因2"], + suggested_actions=["建议1", "建议2"], + low_samples=[ + {"sample_id": "s1", "question": "问题1", "answer": "答案1", + "ground_truth": "标准1", metric: 0.4}, + ], + ) + + def test_write_creates_file(self): + diag = self._make_diagnosis() + write_advice( + diagnoses=[diag], + llm_markdown="## faithfulness\n\nLLM 建议内容", + advice_path=self.advice_path, + scenario_name="test-scenario", + run_id="2026-01-01T00-00-00", + judge_model="deepseek-v4-flash", + ) + self.assertTrue(self.advice_path.exists()) + + def test_write_contains_scenario_name_and_run_id(self): + diag = self._make_diagnosis() + write_advice( + diagnoses=[diag], + llm_markdown="## faithfulness\n\nLLM 建议", + advice_path=self.advice_path, + scenario_name="siemens-test", + run_id="2026-01-01T00-00-00", + judge_model="deepseek-v4-flash", + ) + content = self.advice_path.read_text(encoding="utf-8") + self.assertIn("siemens-test", content) + self.assertIn("2026-01-01T00-00-00", content) + + def test_write_contains_llm_markdown(self): + diag = self._make_diagnosis() + write_advice( + diagnoses=[diag], + llm_markdown="## faithfulness\n\n具体建议文本", + advice_path=self.advice_path, + scenario_name="test", + run_id="rid", + judge_model="model", + ) + content = self.advice_path.read_text(encoding="utf-8") + self.assertIn("具体建议文本", content) + + def test_write_fallback_when_no_llm_markdown(self): + """When llm_markdown is empty, writer emits rule-only report.""" + diag = self._make_diagnosis() + write_advice( + diagnoses=[diag], + llm_markdown="", + advice_path=self.advice_path, + scenario_name="test", + run_id="rid", + judge_model="model", + ) + content = self.advice_path.read_text(encoding="utf-8") + self.assertIn("faithfulness", content) + self.assertIn("原因1", content) + + def test_log_summary_format(self): + diags = [ + self._make_diagnosis("faithfulness", "critical"), + self._make_diagnosis("context_recall", "warning"), + ] + summary = _format_log_summary(diags, self.advice_path) + self.assertIn("faithfulness", summary) + self.assertIn("critical", summary) + self.assertIn("context_recall", summary) + self.assertIn("warning", summary) + + def test_write_empty_diagnoses_still_creates_file(self): + write_advice( + diagnoses=[], + llm_markdown="", + advice_path=self.advice_path, + scenario_name="test", + run_id="rid", + judge_model="model", + ) + self.assertTrue(self.advice_path.exists()) + content = self.advice_path.read_text(encoding="utf-8") + self.assertIn("未发现明显指标异常", content) + + +if __name__ == "__main__": + unittest.main()