# Optimization Advisor Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 新增 `rag_eval/advisor/` 模块,在每次评测结束后自动分析指标低分原因并输出中文优化建议报告(`optimization_advice.md` + 日志摘要)。 **Architecture:** 规则引擎(`rules.py`)根据各指标均值和阈值识别异常、选取低分样本;LLM 分析器(`llm_analyzer.py`)复用已有 judge_model llm 实例生成中文 Markdown 建议;写出层(`writer.py`)写文件并打日志摘要。通过 YAML `optimization_advisor: true` 开关触发,默认关闭。 **Tech Stack:** Python 3.12, dataclasses, ragas LLM instance (AsyncOpenAI-backed), pandas (score_rows already available), pytest (unit tests) --- ## File Map ### New files - `rag_eval/advisor/__init__.py` — 暴露 `run_advisor()`,外部唯一入口 - `rag_eval/advisor/rules.py` — 纯函数规则引擎,`Diagnosis` dataclass + `diagnose()` - `rag_eval/advisor/llm_analyzer.py` — `analyze()` 接收 llm + diagnoses → Markdown str - `rag_eval/advisor/writer.py` — `write_advice()` 写 md 文件 + log 摘要 - `tests/test_advisor_rules.py` — 规则引擎单测 - `tests/test_advisor_writer.py` — writer 单测 ### Modified files - `rag_eval/shared/models.py` — `Scenario` 加 `optimization_advisor: bool = False`;`RunArtifactPaths` 加 `advice_md: Path` - `rag_eval/config/schema.py` — `ScenarioModel` 加 `optimization_advisor: bool = False` - `rag_eval/config/loader.py` — `load_scenario()` 透传 `optimization_advisor` 到 `Scenario` - `rag_eval/reporting/artifacts.py` — `build_artifact_paths()` 加 `advice_md` 字段 - `rag_eval/metrics/factory.py` — `build_metric_pipeline()` 改为同时返回 `llm`(`build_models_and_pipeline()`),供 runner 传给 advisor - `rag_eval/execution/runner.py` — 接收 llm,末尾条件调用 `run_advisor()` - `scenarios/online/siemens-pdf-question-bank-online.yaml` — 加 `optimization_advisor: true` - `docs/rag-eval-engine-flow.md` — 补充 advisor 链路说明 - `docs/rag-eval-architecture.md` — §9.4 指标编排末尾加 advisor 说明 --- ## Task 1: Diagnosis dataclass + rules engine **Files:** - Create: `rag_eval/advisor/rules.py` - Create: `tests/test_advisor_rules.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_advisor_rules.py import math import unittest from rag_eval.advisor.rules import Diagnosis, diagnose, METRIC_RULES class TestDiagnosis(unittest.TestCase): def _make_rows(self, metric: str, scores: list[float]) -> list[dict]: return [{metric: s, "question": f"q{i}", "answer": f"a{i}", "ground_truth": f"gt{i}", "sample_id": f"s{i}"} for i, s in enumerate(scores)] def test_no_diagnosis_when_all_scores_above_threshold(self): rows = self._make_rows("faithfulness", [0.8, 0.9, 0.85]) result = diagnose(rows, metrics=["faithfulness"]) self.assertEqual(result, []) def test_warning_when_mean_below_warning_threshold(self): rows = self._make_rows("faithfulness", [0.65, 0.62, 0.68]) result = diagnose(rows, metrics=["faithfulness"]) self.assertEqual(len(result), 1) self.assertEqual(result[0].metric, "faithfulness") self.assertEqual(result[0].severity, "warning") self.assertAlmostEqual(result[0].mean_score, 0.65, places=2) def test_critical_when_mean_below_critical_threshold(self): rows = self._make_rows("faithfulness", [0.3, 0.4, 0.45]) result = diagnose(rows, metrics=["faithfulness"]) self.assertEqual(result[0].severity, "critical") def test_low_samples_selected_are_bottom_three(self): rows = self._make_rows("faithfulness", [0.1, 0.2, 0.3, 0.8, 0.9]) result = diagnose(rows, metrics=["faithfulness"]) self.assertEqual(len(result[0].low_samples), 3) scores = [s["faithfulness"] for s in result[0].low_samples] self.assertEqual(sorted(scores), [0.1, 0.2, 0.3]) def test_nan_scores_excluded_from_mean_and_low_samples(self): rows = self._make_rows("faithfulness", [0.3, float("nan"), 0.4]) result = diagnose(rows, metrics=["faithfulness"]) self.assertEqual(len(result), 1) for s in result[0].low_samples: self.assertFalse(math.isnan(s["faithfulness"])) def test_noise_sensitivity_direction_inverted(self): # noise_sensitivity: higher is worse; threshold > 0.3 is warning rows = self._make_rows("noise_sensitivity", [0.4, 0.45, 0.5]) result = diagnose(rows, metrics=["noise_sensitivity"]) self.assertEqual(len(result), 1) self.assertEqual(result[0].metric, "noise_sensitivity") def test_noise_sensitivity_no_diagnosis_when_low(self): rows = self._make_rows("noise_sensitivity", [0.1, 0.15, 0.2]) result = diagnose(rows, metrics=["noise_sensitivity"]) self.assertEqual(result, []) def test_skips_metric_not_in_rows(self): rows = [{"faithfulness": 0.3, "question": "q", "answer": "a", "ground_truth": "gt", "sample_id": "s1"}] result = diagnose(rows, metrics=["faithfulness", "context_recall"]) metrics_found = [d.metric for d in result] self.assertIn("faithfulness", metrics_found) self.assertNotIn("context_recall", metrics_found) def test_all_seven_metrics_have_rules(self): expected = {"faithfulness", "answer_relevancy", "context_recall", "context_precision", "noise_sensitivity", "factual_correctness", "semantic_similarity"} self.assertEqual(set(METRIC_RULES.keys()), expected) if __name__ == "__main__": unittest.main() ``` - [ ] **Step 2: Run tests to verify they fail** ``` cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas python -m pytest tests/test_advisor_rules.py -v 2>&1 | head -20 ``` Expected: `ModuleNotFoundError: No module named 'rag_eval.advisor'` - [ ] **Step 3: Create rules.py** ```python # rag_eval/advisor/rules.py """Rule-based diagnostic engine for RAG evaluation metric scores.""" from __future__ import annotations import math from dataclasses import dataclass, field from typing import Any @dataclass class MetricRule: """Threshold configuration and diagnostic text for one metric.""" warning_threshold: float critical_threshold: float higher_is_better: bool # False for noise_sensitivity root_causes: list[str] suggested_actions: list[str] METRIC_RULES: dict[str, MetricRule] = { "faithfulness": MetricRule( warning_threshold=0.7, critical_threshold=0.5, higher_is_better=True, root_causes=[ "生成回答包含检索片段中不支持的陈述(幻觉)", "生成阶段未严格遵循 grounding 约束", "校验阶段未开启或未生效", ], suggested_actions=[ "强化生成 prompt 的 grounding 约束('只依据参考资料作答')", "开启校验阶段(validation: by_scenario)", "检查低分样本中模型是否引用了片段外的知识", ], ), "answer_relevancy": MetricRule( warning_threshold=0.7, critical_threshold=0.5, higher_is_better=True, root_causes=[ "回答偏离问题主旨或包含大量冗余内容", "查询改写后问题语义漂移", "生成 prompt 格式约束不足", ], suggested_actions=[ "优化查询改写 prompt,确保改写后语义不偏移", "在生成 prompt 中加入'简洁准确、直接回答问题'的约束", "检查低分样本的回答是否存在格式冗余或话题偏移", ], ), "context_recall": MetricRule( warning_threshold=0.7, critical_threshold=0.5, higher_is_better=True, root_causes=[ "检索未能召回标准答案所涉及的关键信息", "单一查询未能覆盖问题的多个角度", "过召回数量不足,关键片段被截断", ], suggested_actions=[ "启用多查询扩展(use_multi_query)覆盖不同措辞", "对多跳问题启用问题分解(sub_questions)", "加大过召回宽度(recall_top_k)", "对颗粒度细的问题尝试 Step-back 双路检索", ], ), "context_precision": MetricRule( warning_threshold=0.6, critical_threshold=0.4, higher_is_better=True, root_causes=[ "检索引入过多与问题无关的片段", "重排未能将相关片段排在前列", "缺少相关性过滤,噪声片段进入上下文", ], suggested_actions=[ "启用或优化 listwise 重排,将相关片段排在前列", "启用上下文压缩(compression)过滤无关句子", "启用相关性过滤(relevance_filter)丢弃明确无关片段", "缩小 rerank_keep_k(如从 8 降到 5)", ], ), "noise_sensitivity": MetricRule( warning_threshold=0.3, # higher is worse; trigger when mean > threshold critical_threshold=0.5, higher_is_better=False, root_causes=[ "回答中包含检索到的噪声片段所引入的错误陈述", "相关性过滤未能拦截干扰性片段", "生成阶段对噪声片段未加区分地引用", ], suggested_actions=[ "启用相关性过滤(relevance_filter)拦截噪声", "优化重排,将不相关片段排到截断点之后", "在生成 prompt 中强调'来源冲突时并列陈述,不擅自下定论'", ], ), "factual_correctness": MetricRule( warning_threshold=0.6, critical_threshold=0.4, higher_is_better=True, root_causes=[ "回答的事实陈述与标准答案存在偏差", "检索未能命中标准答案所依据的关键片段", "生成阶段对多个来源综合时产生事实错误", ], suggested_actions=[ "重点检查低分样本,确认是检索遗漏还是生成错误", "提升 context_recall 以确保关键信息被检索到", "对事实型问题将 temperature 降至 0", ], ), "semantic_similarity": MetricRule( warning_threshold=0.7, critical_threshold=0.5, higher_is_better=True, root_causes=[ "回答语义与标准答案差距较大", "回答过于简短或过于冗长,语义偏移", "检索到的片段质量不足,导致生成内容偏离", ], suggested_actions=[ "检查低分样本的回答与标准答案的表述差异", "优化生成 prompt 使回答更贴近标准表述风格", "提升检索质量(context_recall / context_precision)", ], ), } @dataclass class Diagnosis: """Diagnostic result for one metric that triggered a threshold.""" metric: str mean_score: float threshold: float # the triggered threshold severity: str # "warning" | "critical" root_causes: list[str] = field(default_factory=list) suggested_actions: list[str] = field(default_factory=list) low_samples: list[dict[str, Any]] = field(default_factory=list) def _mean_ignoring_nan(values: list[float]) -> float | None: valid = [v for v in values if not math.isnan(v)] if not valid: return None return sum(valid) / len(valid) def _select_low_samples( rows: list[dict[str, Any]], metric: str, top_n: int, higher_is_better: bool, ) -> list[dict[str, Any]]: """Return the top_n worst-scoring rows for a metric, excluding NaN.""" valid = [r for r in rows if metric in r and not math.isnan(float(r[metric]))] sorted_rows = sorted(valid, key=lambda r: float(r[metric]), reverse=not higher_is_better) worst = sorted_rows[:top_n] keep_keys = {"sample_id", "question", "answer", "ground_truth", metric} return [{k: v for k, v in row.items() if k in keep_keys} for row in worst] def diagnose( score_rows: list[dict[str, Any]], metrics: list[str], top_low_samples: int = 3, ) -> list[Diagnosis]: """Analyse score_rows and return a Diagnosis for each metric below threshold. Args: score_rows: List of per-sample score dicts (from EvaluationResult.score_rows). metrics: Metric names to evaluate (from Scenario.metrics). top_low_samples: How many worst-scoring samples to attach per diagnosis. Returns: List of Diagnosis objects, one per triggered metric. Empty if all OK. """ diagnoses: list[Diagnosis] = [] for metric in metrics: rule = METRIC_RULES.get(metric) if rule is None: continue # unknown metric, skip values = [] for row in score_rows: raw = row.get(metric) if raw is None: continue try: v = float(raw) except (TypeError, ValueError): continue values.append(v) if not values: continue mean = _mean_ignoring_nan(values) if mean is None: continue # Determine severity (direction-aware) if rule.higher_is_better: if mean < rule.critical_threshold: severity = "critical" threshold = rule.critical_threshold elif mean < rule.warning_threshold: severity = "warning" threshold = rule.warning_threshold else: continue # above warning threshold → no diagnosis else: # lower is better (noise_sensitivity) if mean > rule.critical_threshold: severity = "critical" threshold = rule.critical_threshold elif mean > rule.warning_threshold: severity = "warning" threshold = rule.warning_threshold else: continue low_samples = _select_low_samples(score_rows, metric, top_low_samples, rule.higher_is_better) diagnoses.append(Diagnosis( metric=metric, mean_score=round(mean, 4), threshold=threshold, severity=severity, root_causes=list(rule.root_causes), suggested_actions=list(rule.suggested_actions), low_samples=low_samples, )) return diagnoses ``` - [ ] **Step 4: Create `rag_eval/advisor/__init__.py` (stub — full version in Task 5)** ```python # rag_eval/advisor/__init__.py """Optimization advisor: rule-based diagnosis + LLM-powered recommendations.""" from .rules import Diagnosis, diagnose __all__ = ["Diagnosis", "diagnose"] ``` - [ ] **Step 5: Run tests — expect pass** ``` python -m pytest tests/test_advisor_rules.py -v ``` Expected: all 9 tests PASS. - [ ] **Step 6: Commit** ``` git add rag_eval/advisor/__init__.py rag_eval/advisor/rules.py tests/test_advisor_rules.py git commit -m "feat(advisor): add rule-based diagnostic engine with 7-metric coverage" ``` --- ## Task 2: Writer module **Files:** - Create: `rag_eval/advisor/writer.py` - Create: `tests/test_advisor_writer.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_advisor_writer.py import logging import shutil import unittest from pathlib import Path from rag_eval.advisor.rules import Diagnosis from rag_eval.advisor.writer import write_advice, _format_log_summary class TestWriteAdvice(unittest.TestCase): def setUp(self): self.tmp = Path("tests/.tmp/test_advisor_writer") shutil.rmtree(self.tmp, ignore_errors=True) self.tmp.mkdir(parents=True, exist_ok=True) self.advice_path = self.tmp / "optimization_advice.md" def tearDown(self): shutil.rmtree(self.tmp, ignore_errors=True) def _make_diagnosis(self, metric="faithfulness", severity="warning"): return Diagnosis( metric=metric, mean_score=0.55, threshold=0.7, severity=severity, root_causes=["原因1", "原因2"], suggested_actions=["建议1", "建议2"], low_samples=[ {"sample_id": "s1", "question": "问题1", "answer": "答案1", "ground_truth": "标准1", metric: 0.4}, ], ) def test_write_creates_file(self): diag = self._make_diagnosis() write_advice( diagnoses=[diag], llm_markdown="## faithfulness\n\nLLM 建议内容", advice_path=self.advice_path, scenario_name="test-scenario", run_id="2026-01-01T00-00-00", judge_model="deepseek-v4-flash", ) self.assertTrue(self.advice_path.exists()) def test_write_contains_scenario_name_and_run_id(self): diag = self._make_diagnosis() write_advice( diagnoses=[diag], llm_markdown="## faithfulness\n\nLLM 建议", advice_path=self.advice_path, scenario_name="siemens-test", run_id="2026-01-01T00-00-00", judge_model="deepseek-v4-flash", ) content = self.advice_path.read_text(encoding="utf-8") self.assertIn("siemens-test", content) self.assertIn("2026-01-01T00-00-00", content) def test_write_contains_llm_markdown(self): diag = self._make_diagnosis() write_advice( diagnoses=[diag], llm_markdown="## faithfulness\n\n具体建议文本", advice_path=self.advice_path, scenario_name="test", run_id="rid", judge_model="model", ) content = self.advice_path.read_text(encoding="utf-8") self.assertIn("具体建议文本", content) def test_write_fallback_when_no_llm_markdown(self): """When llm_markdown is empty, writer emits rule-only report.""" diag = self._make_diagnosis() write_advice( diagnoses=[diag], llm_markdown="", advice_path=self.advice_path, scenario_name="test", run_id="rid", judge_model="model", ) content = self.advice_path.read_text(encoding="utf-8") self.assertIn("faithfulness", content) self.assertIn("原因1", content) def test_log_summary_format(self): diags = [ self._make_diagnosis("faithfulness", "critical"), self._make_diagnosis("context_recall", "warning"), ] summary = _format_log_summary(diags, self.advice_path) self.assertIn("faithfulness", summary) self.assertIn("critical", summary) self.assertIn("context_recall", summary) self.assertIn("warning", summary) def test_write_empty_diagnoses_still_creates_file(self): write_advice( diagnoses=[], llm_markdown="", advice_path=self.advice_path, scenario_name="test", run_id="rid", judge_model="model", ) self.assertTrue(self.advice_path.exists()) content = self.advice_path.read_text(encoding="utf-8") self.assertIn("未发现明显指标异常", content) if __name__ == "__main__": unittest.main() ``` - [ ] **Step 2: Run tests to verify they fail** ``` python -m pytest tests/test_advisor_writer.py -v 2>&1 | head -15 ``` Expected: `ImportError: cannot import name 'write_advice' from 'rag_eval.advisor.writer'` - [ ] **Step 3: Create writer.py** ```python # rag_eval/advisor/writer.py """Write optimization advice to markdown file and emit log summary.""" from __future__ import annotations import logging from pathlib import Path from .rules import Diagnosis logger = logging.getLogger("rag_eval.advisor") def _format_log_summary(diagnoses: list[Diagnosis], advice_path: Path) -> str: """Return a single-line log summary of triggered diagnoses.""" if not diagnoses: return "[advisor] 所有指标正常,无需优化建议。" parts = [f"{d.metric}({d.mean_score:.2f}, {d.severity})" for d in diagnoses] triggered = " ".join(parts) return f"[advisor] 触发诊断 {len(diagnoses)} 项: {triggered} → {advice_path}" def _build_fallback_report(diagnoses: list[Diagnosis]) -> str: """Build a rules-only report when LLM analysis is unavailable.""" if not diagnoses: return "" lines = ["## 规则诊断(LLM 分析不可用)\n"] for d in diagnoses: lines.append(f"### {d.metric} [{d.severity}] 均值={d.mean_score:.4f}") lines.append("\n**可能原因:**") for cause in d.root_causes: lines.append(f"- {cause}") lines.append("\n**建议动作:**") for action in d.suggested_actions: lines.append(f"- {action}") lines.append("") return "\n".join(lines) def write_advice( diagnoses: list[Diagnosis], llm_markdown: str, advice_path: Path, scenario_name: str, run_id: str, judge_model: str, ) -> None: """Write optimization_advice.md and emit a log summary line. Args: diagnoses: List of Diagnosis from rules.diagnose(). llm_markdown: LLM-generated Markdown body. Empty string triggers fallback. advice_path: Full path to write the .md file. scenario_name: Human-readable scenario identifier for the report header. run_id: Run identifier string. judge_model: Model used for LLM analysis (shown in header). """ advice_path.parent.mkdir(parents=True, exist_ok=True) # Header from rag_eval.shared.utils import utc_now_iso header_lines = [ f"# 优化建议报告 — {scenario_name}", "", f"- run_id: `{run_id}`", f"- 生成时间: `{utc_now_iso()}`", f"- judge_model: `{judge_model}`", "", "---", "", ] if not diagnoses: body = "## ✅ 未发现明显指标异常\n\n所有指标均在正常范围内,当前 RAG 链路表现良好。\n" elif llm_markdown: body = llm_markdown else: body = _build_fallback_report(diagnoses) content = "\n".join(header_lines) + body advice_path.write_text(content, encoding="utf-8") summary = _format_log_summary(diagnoses, advice_path) logger.info(summary) logger.info("[advisor] 优化建议已写出: %s", advice_path) ``` - [ ] **Step 4: Run tests — expect pass** ``` python -m pytest tests/test_advisor_writer.py -v ``` Expected: all 6 tests PASS. - [ ] **Step 5: Commit** ``` git add rag_eval/advisor/writer.py tests/test_advisor_writer.py git commit -m "feat(advisor): add advice writer with fallback rule-only report" ``` --- ## Task 3: LLM analyzer **Files:** - Create: `rag_eval/advisor/llm_analyzer.py` No LLM unit tests (network-dependent); tested in Task 7 integration. - [ ] **Step 1: Create llm_analyzer.py** ```python # rag_eval/advisor/llm_analyzer.py """LLM-powered analysis of rule diagnostics and low-score samples.""" from __future__ import annotations import logging from typing import Any from .rules import Diagnosis logger = logging.getLogger("rag_eval.advisor") _PROMPT_TEMPLATE = """\ 你是一个 RAG 系统优化专家,正在分析西门子医疗 CT 文档问答系统的评测结果。 请用中文撰写一份优化建议报告,格式为 Markdown。 ## 评测诊断摘要 {diagnosis_summary} ## 低分样本示例 {low_sample_text} ## 报告要求 1. 按指标分节(## 指标名 [severity]),先解释"为什么低"(结合低分样本具体分析),再给出"具体怎么改" 2. "具体怎么改"要结合低分样本的实际内容,而不只是泛泛建议 3. 最后写一节 **## 优先优化次序**,按性价比排序(不增加 LLM 调用次数的优化优先) 4. 语言简洁,面向工程师,不要废话,不要重复列表内容 只输出 Markdown 报告正文,不要任何前置说明。 """ def _build_diagnosis_summary(diagnoses: list[Diagnosis]) -> str: lines = [] for d in diagnoses: direction = "(越低越好)" if d.metric == "noise_sensitivity" else "" lines.append( f"- **{d.metric}** {direction} 均值={d.mean_score:.4f}," f"阈值={d.threshold},严重程度={d.severity}" ) lines.append(f" - 可能原因:{'; '.join(d.root_causes)}") lines.append(f" - 建议动作:{'; '.join(d.suggested_actions)}") return "\n".join(lines) def _build_low_sample_text(diagnoses: list[Diagnosis]) -> str: lines = [] for d in diagnoses: if not d.low_samples: continue lines.append(f"### {d.metric} 低分样本(最多 3 条)") for i, s in enumerate(d.low_samples, 1): score = s.get(d.metric, "N/A") lines.append(f"\n**样本 {i}**(分数={score})") lines.append(f"- 问题:{s.get('question', '')}") lines.append(f"- 回答:{s.get('answer', '')[:300]}") lines.append(f"- 标准答案:{s.get('ground_truth', '')[:200]}") return "\n".join(lines) async def analyze( diagnoses: list[Diagnosis], llm: Any, scenario_name: str, ) -> str: """Call the judge LLM to generate a Chinese optimization report. Args: diagnoses: Non-empty list of Diagnosis from rules.diagnose(). llm: RAGAS LLM wrapper (has .agenerate() method). scenario_name: Used only for logging. Returns: LLM-generated Markdown string, or "" on failure (triggers writer fallback). """ if not diagnoses: return "" diagnosis_summary = _build_diagnosis_summary(diagnoses) low_sample_text = _build_low_sample_text(diagnoses) prompt = _PROMPT_TEMPLATE.format( diagnosis_summary=diagnosis_summary, low_sample_text=low_sample_text, ) try: logger.info("[advisor] calling LLM for optimization analysis scenario=%s", scenario_name) # ragas LLM wrapper: generate() accepts list[list[str]] and returns LLMResult from langchain_core.messages import HumanMessage result = await llm.agenerate(texts=[[HumanMessage(content=prompt)]]) text = result.generations[0][0].text.strip() logger.info("[advisor] LLM analysis complete chars=%d", len(text)) return text except Exception as exc: logger.warning("[advisor] LLM analysis failed (%s: %s) — falling back to rule report", type(exc).__name__, exc) return "" ``` - [ ] **Step 2: Verify import works** ``` python -c "from rag_eval.advisor.llm_analyzer import analyze; print('OK')" ``` Expected: `OK` - [ ] **Step 3: Commit** ``` git add rag_eval/advisor/llm_analyzer.py git commit -m "feat(advisor): add LLM analyzer with graceful fallback on failure" ``` --- ## Task 4: Wire advisor into models, config schema, and loader **Files:** - Modify: `rag_eval/shared/models.py` - Modify: `rag_eval/config/schema.py` - Modify: `rag_eval/config/loader.py` - Modify: `rag_eval/reporting/artifacts.py` - [ ] **Step 1: Add `optimization_advisor` to `Scenario` and `RunArtifactPaths`** In `rag_eval/shared/models.py`, add one field to `Scenario` (after `source_path`) and one to `RunArtifactPaths`: ```python # In Scenario dataclass — add after source_path field: optimization_advisor: bool = False ``` ```python # In RunArtifactPaths dataclass — add after metadata_json field: advice_md: Path | None = None ``` Full updated `Scenario` dataclass (slots=True, so field order matters — add at end): ```python @dataclass(slots=True) class Scenario: scenario_name: str mode: Mode dataset: DatasetConfig judge_model: str embedding_model: str metrics: list[str] output_dir: Path runtime: RuntimeConfig = field(default_factory=RuntimeConfig) app_adapter: AppAdapterConfig | None = None source_path: Path | None = None optimization_advisor: bool = False # NEW ``` Full updated `RunArtifactPaths`: ```python @dataclass(slots=True) class RunArtifactPaths: root_dir: Path scenario_snapshot: Path scores_csv: Path invalid_csv: Path summary_md: Path metadata_json: Path advice_md: Path | None = None # NEW ``` - [ ] **Step 2: Add field to ScenarioModel in schema.py** In `rag_eval/config/schema.py`, add to `ScenarioModel`: ```python optimization_advisor: bool = False # NEW — enable optimization advisor output ``` (add after the `runtime` field) - [ ] **Step 3:透传 optimization_advisor in loader.py** In `rag_eval/config/loader.py`, in the `Scenario(...)` constructor call, add: ```python optimization_advisor=model.optimization_advisor, # NEW ``` - [ ] **Step 4: Add advice_md to artifact paths in artifacts.py** In `rag_eval/reporting/artifacts.py`, update `build_artifact_paths()`: ```python def build_artifact_paths(output_dir: Path, run_id: str) -> RunArtifactPaths: """Build the canonical artifact file paths for a single evaluation run.""" run_dir = output_dir / run_id return RunArtifactPaths( root_dir=run_dir, scenario_snapshot=run_dir / "scenario.snapshot.yaml", scores_csv=run_dir / "scores.csv", invalid_csv=run_dir / "invalid.csv", summary_md=run_dir / "summary.md", metadata_json=run_dir / "metadata.json", advice_md=run_dir / "optimization_advice.md", # NEW ) ``` - [ ] **Step 5: Verify existing tests still pass** ``` python -m pytest tests/test_offline_eval.py tests/test_online_eval.py -v 2>&1 | tail -15 ``` Expected: same pass/fail as before (the 4 pre-existing failures are unrelated). - [ ] **Step 6: Commit** ``` git add rag_eval/shared/models.py rag_eval/config/schema.py rag_eval/config/loader.py rag_eval/reporting/artifacts.py git commit -m "feat(advisor): wire optimization_advisor field through Scenario, ScenarioModel, loader, and RunArtifactPaths" ``` --- ## Task 5: Lift build_models() to runner + wire run_advisor() **Files:** - Modify: `rag_eval/metrics/factory.py` - Modify: `rag_eval/execution/runner.py` - Modify: `rag_eval/advisor/__init__.py` This is the integration wiring. The key change: `build_metric_pipeline()` currently creates llm internally and returns only `MetricPipeline`. We add a companion function `build_models()` that `runner.py` calls first, then passes `llm` to both `build_metric_pipeline()` and `run_advisor()`. - [ ] **Step 1: Add `build_models()` as public function in factory.py** The existing `build_models()` is already defined in `factory.py` (lines 30-39) but is module-private (no `__all__`). We expose it and update `build_metric_pipeline()` to accept optional pre-built models: ```python # rag_eval/metrics/factory.py — full replacement """Factories for OpenAI-backed RAGAS models and metric pipelines.""" from __future__ import annotations from typing import Any from openai import AsyncOpenAI from rag_eval.compat import ensure_ragas_import_compat from rag_eval.settings import EvaluationSettings from rag_eval.shared.models import Scenario ensure_ragas_import_compat() from ragas.embeddings.base import embedding_factory from ragas.llms import llm_factory from ragas.metrics.collections import ( AnswerRelevancy, ContextPrecision, ContextRecall, FactualCorrectness, Faithfulness, NoiseSensitivity, SemanticSimilarity, ) from .pipeline import MetricPipeline def build_models( judge_model: str, embedding_model: str, settings: EvaluationSettings, ) -> tuple[Any, Any]: """Create the LLM and embedding clients required by the selected RAGAS metrics.""" client = AsyncOpenAI(**settings.openai_client_kwargs) llm = llm_factory(judge_model, client=client) embeddings = embedding_factory(provider="openai", model=embedding_model, client=client) return llm, embeddings def build_metric_pipeline( scenario: Scenario, settings: EvaluationSettings, llm: Any | None = None, embeddings: Any | None = None, ) -> MetricPipeline: """Build a metric pipeline containing only the metrics requested by the scenario. If llm and embeddings are provided (pre-built by the caller), they are reused. Otherwise, new instances are created from scenario + settings. """ if llm is None or embeddings is None: llm, embeddings = build_models( scenario.judge_model, scenario.embedding_model, settings, ) registry: dict[str, Any] = { "faithfulness": Faithfulness(llm=llm), "answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings), "context_recall": ContextRecall(llm=llm), "context_precision": ContextPrecision(llm=llm), "noise_sensitivity": NoiseSensitivity(llm=llm), "factual_correctness": FactualCorrectness(llm=llm), "semantic_similarity": SemanticSimilarity(embeddings=embeddings), } return MetricPipeline( metrics={name: registry[name] for name in scenario.metrics}, metric_timeout_seconds=settings.ragas_metric_timeout_seconds, ) ``` - [ ] **Step 2: Update `rag_eval/advisor/__init__.py` with full `run_advisor()`** ```python # rag_eval/advisor/__init__.py """Optimization advisor: rule-based diagnosis + LLM-powered recommendations.""" from __future__ import annotations import asyncio import logging from typing import Any from rag_eval.reporting.artifacts import build_artifact_paths from rag_eval.shared.models import EvaluationResult, Scenario from .llm_analyzer import analyze from .rules import Diagnosis, diagnose from .writer import write_advice logger = logging.getLogger("rag_eval.advisor") __all__ = ["run_advisor", "Diagnosis", "diagnose"] def run_advisor( result: EvaluationResult, scenario: Scenario, llm: Any, ) -> None: """Run the full optimization advisor pipeline after an evaluation completes. Skips silently if scenario.optimization_advisor is False. Never raises — failures are logged as warnings, not exceptions. Args: result: Completed EvaluationResult from Evaluator.evaluate(). scenario: The resolved Scenario (provides metrics, judge_model, output_dir). llm: Pre-built RAGAS LLM instance (from build_models()) for LLM analysis. """ if not scenario.optimization_advisor: return logger.info("[advisor] starting optimization analysis scenario=%s", scenario.scenario_name) try: artifact_paths = build_artifact_paths(scenario.output_dir, result.run_id) if artifact_paths.advice_md is None: logger.warning("[advisor] advice_md path not set in RunArtifactPaths — skipping") return diagnoses = diagnose(result.score_rows, scenario.metrics) logger.info("[advisor] rule diagnosis complete: %d metric(s) triggered", len(diagnoses)) if diagnoses: llm_markdown = asyncio.run(analyze(diagnoses, llm, scenario.scenario_name)) else: llm_markdown = "" write_advice( diagnoses=diagnoses, llm_markdown=llm_markdown, advice_path=artifact_paths.advice_md, scenario_name=scenario.scenario_name, run_id=result.run_id, judge_model=scenario.judge_model, ) except Exception as exc: logger.warning( "[advisor] advisor failed (%s: %s) — evaluation result is unaffected", type(exc).__name__, exc, ) ``` - [ ] **Step 3: Update runner.py to lift llm and call run_advisor()** In `rag_eval/execution/runner.py`, make these changes: 1. Add import at top: ```python from rag_eval.advisor import run_advisor from rag_eval.metrics.factory import build_models, build_metric_pipeline ``` 2. Replace the `build_metric_pipeline` import (it's already imported from `rag_eval.metrics.factory`) and update `run_scenario()`: ```python # rag_eval/execution/runner.py — full replacement """High-level scenario runner used by the package and CLI entrypoints.""" from __future__ import annotations import logging import sys from pathlib import Path from rag_eval.adapters.http import HttpAppAdapter from rag_eval.adapters.python import PythonFunctionAdapter from rag_eval.advisor import run_advisor from rag_eval.config.loader import load_scenario from rag_eval.metrics.factory import build_models, build_metric_pipeline from rag_eval.reporting.writers import write_run_artifacts from rag_eval.settings import EvaluationSettings from rag_eval.shared.models import Scenario from .evaluator import Evaluator logger = logging.getLogger("rag_eval.execution.runner") def _setup_logging(log_file: Path | None = None, level: int = logging.INFO) -> None: """Configure root logger: always write to stderr, optionally also to a file.""" fmt = "%(asctime)s %(levelname)-8s %(name)s %(message)s" datefmt = "%H:%M:%S" handlers: list[logging.Handler] = [logging.StreamHandler(sys.stderr)] if log_file is not None: log_file.parent.mkdir(parents=True, exist_ok=True) fh = logging.FileHandler(log_file, encoding="utf-8") fh.setFormatter(logging.Formatter(fmt, datefmt=datefmt)) handlers.append(fh) logging.basicConfig(level=level, format=fmt, datefmt=datefmt, handlers=handlers, force=True) logging.getLogger("ragas").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("openai").setLevel(logging.WARNING) def build_adapter(scenario: Scenario): """Instantiate the adapter required by the resolved scenario, if any.""" if scenario.app_adapter is None: return None if scenario.app_adapter.type == "http": return HttpAppAdapter(scenario.app_adapter) if scenario.app_adapter.type == "python": return PythonFunctionAdapter(scenario.app_adapter) raise ValueError(f"Unsupported adapter type: {scenario.app_adapter.type}") def run_scenario( scenario_path: str, settings: EvaluationSettings | None = None, log_file: Path | None = None, log_level: int = logging.INFO, ): """Run one scenario end to end and persist its reporting artifacts.""" _setup_logging(log_file=log_file, level=log_level) logger.info("[runner] run_scenario path=%s", scenario_path) settings = settings or EvaluationSettings() if not settings.openai_api_key: raise EnvironmentError("OPENAI_API_KEY must be set before running the evaluator.") scenario = load_scenario(scenario_path) logger.info("[runner] scenario loaded: name=%s mode=%s max_samples=%s", scenario.scenario_name, scenario.mode, scenario.runtime.max_samples) # Build models once; reuse llm in both MetricPipeline and advisor. llm, embeddings = build_models(scenario.judge_model, scenario.embedding_model, settings) adapter = build_adapter(scenario) pipeline = build_metric_pipeline(scenario, settings, llm=llm, embeddings=embeddings) evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline, app_adapter=adapter) result = evaluator.evaluate() write_run_artifacts(result) logger.info("[runner] artifacts written for run_id=%s", result.run_id) # Optimization advisor — runs only if scenario.optimization_advisor is True. run_advisor(result, scenario, llm) return result ``` - [ ] **Step 4: Verify existing tests still pass** ``` python -m pytest tests/ -v 2>&1 | tail -20 ``` Expected: same pass count as before this change (only pre-existing 4 failures). - [ ] **Step 5: Commit** ``` git add rag_eval/metrics/factory.py rag_eval/execution/runner.py rag_eval/advisor/__init__.py git commit -m "feat(advisor): wire run_advisor into runner, lift llm from build_metric_pipeline" ``` --- ## Task 6: Enable advisor in Siemens online YAML **Files:** - Modify: `scenarios/online/siemens-pdf-question-bank-online.yaml` - [ ] **Step 1: Add optimization_advisor field** Read the current file first, then add one line after `embedding_model`: ```yaml scenario_name: siemens-pdf-question-bank-online mode: online dataset: ../../datasets/raw/generated/siemens-pdf-question-bank.csv judge_model: deepseek-v4-flash embedding_model: text-embedding-v3 optimization_advisor: true # 评测结束后自动生成优化建议报告 metrics: - faithfulness - answer_relevancy - context_recall - context_precision # 已启用:鲁棒性 / 端到端指标(数据集已含 ground_truth) - noise_sensitivity # 鲁棒性:对检索噪声的敏感度 - factual_correctness # 端到端:事实正确性(相对标准答案) - semantic_similarity # 端到端:语义相似度(embedding,无 LLM 调用) output_dir: ../../outputs/online/siemens-pdf-question-bank runtime: batch_size: 4 app_concurrency: 4 metric_concurrency: 4 max_samples: 50 app_adapter: type: python callable: apps.siemens_pdf_qa.adapter:run static_kwargs: source_chunks_path: ../../outputs/dataset-builds/siemens-pdf-question-bank/latest/source_chunks.jsonl model: deepseek-v4-flash ``` - [ ] **Step 2: Verify scenario loads correctly** ``` python -c " from rag_eval.config.loader import load_scenario s = load_scenario('scenarios/online/siemens-pdf-question-bank-online.yaml') print('optimization_advisor:', s.optimization_advisor) print('metrics:', s.metrics) " ``` Expected: ``` optimization_advisor: True metrics: ['faithfulness', 'answer_relevancy', 'context_recall', 'context_precision', 'noise_sensitivity', 'factual_correctness', 'semantic_similarity'] ``` - [ ] **Step 3: Commit** ``` git add scenarios/online/siemens-pdf-question-bank-online.yaml git commit -m "feat(advisor): enable optimization_advisor in siemens online scenario" ``` --- ## Task 7: Run all advisor tests + smoke check **Files:** none new - [ ] **Step 1: Run full advisor test suite** ``` python -m pytest tests/test_advisor_rules.py tests/test_advisor_writer.py -v ``` Expected: 15 tests PASS (9 rules + 6 writer). - [ ] **Step 2: Smoke-check the full module wiring (no network)** ```python # paste into Python REPL or save as scripts/smoke_advisor.py and run import math, sys sys.path.insert(0, ".") from rag_eval.advisor.rules import diagnose from rag_eval.advisor.writer import write_advice, _format_log_summary from pathlib import Path import tempfile, os # Simulate score_rows with low faithfulness and high noise_sensitivity rows = [ {"sample_id": f"s{i}", "question": f"q{i}", "answer": f"a{i}", "ground_truth": f"gt{i}", "faithfulness": 0.3 + i*0.05, "noise_sensitivity": 0.4 + i*0.02} for i in range(5) ] diags = diagnose(rows, metrics=["faithfulness", "noise_sensitivity"]) print(f"Diagnosed {len(diags)} metric(s):") for d in diags: print(f" {d.metric}: mean={d.mean_score}, severity={d.severity}, samples={len(d.low_samples)}") with tempfile.TemporaryDirectory() as tmp: path = Path(tmp) / "optimization_advice.md" write_advice( diagnoses=diags, llm_markdown="", # fallback mode advice_path=path, scenario_name="smoke-test", run_id="2026-01-01T00-00-00", judge_model="deepseek-v4-flash", ) content = path.read_text(encoding="utf-8") print(f"\nAdvice file ({len(content)} chars):") print(content[:600]) print("\nSmoke check PASSED") ``` ``` python scripts/smoke_advisor.py ``` Expected: prints diagnosed metrics, advice content, `Smoke check PASSED`. - [ ] **Step 3: Commit smoke script** ``` git add scripts/smoke_advisor.py git commit -m "test(advisor): add smoke-check script for offline wiring verification" ``` --- ## Task 8: Update docs **Files:** - Modify: `docs/rag-eval-engine-flow.md` - Modify: `docs/rag-eval-architecture.md` - [ ] **Step 1: Add advisor section to rag-eval-engine-flow.md** Append a new section at the end of `docs/rag-eval-engine-flow.md`: ```markdown --- ## 15. Optimization Advisor 链路 相关代码: - `rag_eval/advisor/__init__.py` — 外部入口 `run_advisor()` - `rag_eval/advisor/rules.py` — 规则引擎(纯函数,无 LLM) - `rag_eval/advisor/llm_analyzer.py` — LLM 分析器(复用 judge_model) - `rag_eval/advisor/writer.py` — 写出 `optimization_advice.md` + 日志摘要 Advisor 在 `write_run_artifacts()` 之后触发,仅当场景配置 `optimization_advisor: true` 时生效。 执行链路: ```text run_advisor(result, scenario, llm) -> rules.diagnose(score_rows, metrics) # 识别异常指标,选取低分样本 -> llm_analyzer.analyze(diagnoses, llm) # LLM 生成中文建议(失败自动降级) -> writer.write_advice(...) # 写 optimization_advice.md + 日志摘要 ``` 输出产物追加在现有 run 目录: ```text outputs/online/siemens-pdf-question-bank// ...(现有文件) optimization_advice.md ← 新增(optimization_advisor: true 时生成) ``` ``` - [ ] **Step 2: Add advisor note to rag-eval-architecture.md §9.4** In `docs/rag-eval-architecture.md`, find the end of section `### 9.4 指标编排` and append: ```markdown **Optimization Advisor(§11 优化策略落地):** 评测结束后,若场景配置 `optimization_advisor: true`,则自动调用 `rag_eval/advisor/` 模块: - 规则引擎(`rules.py`)对 7 个指标各自设阈值,识别触发项并选取 top-3 低分样本 - LLM 分析器(`llm_analyzer.py`)结合低分样本生成中文 Markdown 优化建议(复用 judge_model,失败自动降级为纯规则报告) - 写出层(`writer.py`)输出 `optimization_advice.md` 并打日志摘要 ```yaml # 场景配置示例 optimization_advisor: true ``` ``` - [ ] **Step 3: Commit docs** ``` git add docs/rag-eval-engine-flow.md docs/rag-eval-architecture.md git commit -m "docs: add optimization advisor section to engine-flow and architecture docs" ``` --- ## Self-Review **Spec coverage check:** - §2 决策摘要 → Task 5 (llm reuse), Task 6 (YAML trigger), Task 1+2+3 (outputs) ✓ - §3.1 执行链路 → Task 5 runner.py wiring ✓ - §3.2 新增文件 → Tasks 1, 2, 3 ✓ - §3.3 修改文件 → Task 4 (models/schema/loader/artifacts), Task 5 (factory/runner) ✓ - §3.4 输出产物 → Task 4 advice_md in RunArtifactPaths ✓ - §4 规则引擎 7条规则 → Task 1 rules.py METRIC_RULES ✓ - §4.3 low_samples top-3 → Task 1 `_select_low_samples` ✓ - §5 LLM分析器 → Task 3 llm_analyzer.py ✓ - §5.4 失败降级 → Task 3 try/except returns "" → Task 2 writer fallback ✓ - §6.1 文件写出 + §6.2 日志摘要 → Task 2 writer.py ✓ - §7 YAML配置 → Task 6 ✓ - §8 测试策略 → Tasks 1 (rules tests), 2 (writer tests) ✓ - §9 非目标 → not implemented ✓ **Type consistency check:** - `Diagnosis` defined in `rules.py`, imported in `__init__.py`, `llm_analyzer.py`, `writer.py` — all consistent ✓ - `write_advice()` signature matches calls in `__init__.py` ✓ - `build_models()` returns `tuple[Any, Any]`; `build_metric_pipeline()` accepts `llm, embeddings` — consistent ✓ - `run_advisor(result, scenario, llm)` signature matches call in `runner.py` ✓ **Placeholder scan:** No TBD/TODO/fill-in-later found ✓