Files
siemens_ragas/docs/superpowers/plans/2026-06-16-optimization-advisor.md
wangwei f5c2dce64a feat(advisor): add optimization advisor module
- rag_eval/advisor/: new package with rules engine, LLM analyzer, writer
  - rules.py: 7-metric diagnostic rules (warning/critical thresholds, top-3 low samples)
  - llm_analyzer.py: Chinese optimization report via judge_model, graceful fallback
  - writer.py: writes optimization_advice.md + log summary
  - __init__.py: run_advisor() entry point (no-op when optimization_advisor=False)
- Scenario.optimization_advisor: new bool field (default False)
- ScenarioModel: same field added, loader.py透传
- RunArtifactPaths.advice_md: new path field
- factory.py: build_models() now public; build_metric_pipeline() accepts pre-built llm/embeddings
- runner.py: lifts llm, passes to pipeline and advisor; calls run_advisor() at end
- siemens online YAML: optimization_advisor: true enabled
- tests: 9 rules tests + 6 writer tests, all pass
- docs: advisor section added to engine-flow.md and architecture.md

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-16 17:06:19 +08:00

1379 lines
46 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Optimization Advisor Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 新增 `rag_eval/advisor/` 模块,在每次评测结束后自动分析指标低分原因并输出中文优化建议报告(`optimization_advice.md` + 日志摘要)。
**Architecture:** 规则引擎(`rules.py`根据各指标均值和阈值识别异常、选取低分样本LLM 分析器(`llm_analyzer.py`)复用已有 judge_model llm 实例生成中文 Markdown 建议;写出层(`writer.py`)写文件并打日志摘要。通过 YAML `optimization_advisor: true` 开关触发,默认关闭。
**Tech Stack:** Python 3.12, dataclasses, ragas LLM instance (AsyncOpenAI-backed), pandas (score_rows already available), pytest (unit tests)
---
## File Map
### New files
- `rag_eval/advisor/__init__.py` — 暴露 `run_advisor()`,外部唯一入口
- `rag_eval/advisor/rules.py` — 纯函数规则引擎,`Diagnosis` dataclass + `diagnose()`
- `rag_eval/advisor/llm_analyzer.py``analyze()` 接收 llm + diagnoses → Markdown str
- `rag_eval/advisor/writer.py``write_advice()` 写 md 文件 + log 摘要
- `tests/test_advisor_rules.py` — 规则引擎单测
- `tests/test_advisor_writer.py` — writer 单测
### Modified files
- `rag_eval/shared/models.py``Scenario``optimization_advisor: bool = False``RunArtifactPaths``advice_md: Path`
- `rag_eval/config/schema.py``ScenarioModel``optimization_advisor: bool = False`
- `rag_eval/config/loader.py``load_scenario()` 透传 `optimization_advisor``Scenario`
- `rag_eval/reporting/artifacts.py``build_artifact_paths()``advice_md` 字段
- `rag_eval/metrics/factory.py``build_metric_pipeline()` 改为同时返回 `llm``build_models_and_pipeline()`),供 runner 传给 advisor
- `rag_eval/execution/runner.py` — 接收 llm末尾条件调用 `run_advisor()`
- `scenarios/online/siemens-pdf-question-bank-online.yaml` — 加 `optimization_advisor: true`
- `docs/rag-eval-engine-flow.md` — 补充 advisor 链路说明
- `docs/rag-eval-architecture.md` — §9.4 指标编排末尾加 advisor 说明
---
## Task 1: Diagnosis dataclass + rules engine
**Files:**
- Create: `rag_eval/advisor/rules.py`
- Create: `tests/test_advisor_rules.py`
- [ ] **Step 1: Write failing tests**
```python
# tests/test_advisor_rules.py
import math
import unittest
from rag_eval.advisor.rules import Diagnosis, diagnose, METRIC_RULES
class TestDiagnosis(unittest.TestCase):
def _make_rows(self, metric: str, scores: list[float]) -> list[dict]:
return [{metric: s, "question": f"q{i}", "answer": f"a{i}",
"ground_truth": f"gt{i}", "sample_id": f"s{i}"}
for i, s in enumerate(scores)]
def test_no_diagnosis_when_all_scores_above_threshold(self):
rows = self._make_rows("faithfulness", [0.8, 0.9, 0.85])
result = diagnose(rows, metrics=["faithfulness"])
self.assertEqual(result, [])
def test_warning_when_mean_below_warning_threshold(self):
rows = self._make_rows("faithfulness", [0.65, 0.62, 0.68])
result = diagnose(rows, metrics=["faithfulness"])
self.assertEqual(len(result), 1)
self.assertEqual(result[0].metric, "faithfulness")
self.assertEqual(result[0].severity, "warning")
self.assertAlmostEqual(result[0].mean_score, 0.65, places=2)
def test_critical_when_mean_below_critical_threshold(self):
rows = self._make_rows("faithfulness", [0.3, 0.4, 0.45])
result = diagnose(rows, metrics=["faithfulness"])
self.assertEqual(result[0].severity, "critical")
def test_low_samples_selected_are_bottom_three(self):
rows = self._make_rows("faithfulness", [0.1, 0.2, 0.3, 0.8, 0.9])
result = diagnose(rows, metrics=["faithfulness"])
self.assertEqual(len(result[0].low_samples), 3)
scores = [s["faithfulness"] for s in result[0].low_samples]
self.assertEqual(sorted(scores), [0.1, 0.2, 0.3])
def test_nan_scores_excluded_from_mean_and_low_samples(self):
rows = self._make_rows("faithfulness", [0.3, float("nan"), 0.4])
result = diagnose(rows, metrics=["faithfulness"])
self.assertEqual(len(result), 1)
for s in result[0].low_samples:
self.assertFalse(math.isnan(s["faithfulness"]))
def test_noise_sensitivity_direction_inverted(self):
# noise_sensitivity: higher is worse; threshold > 0.3 is warning
rows = self._make_rows("noise_sensitivity", [0.4, 0.45, 0.5])
result = diagnose(rows, metrics=["noise_sensitivity"])
self.assertEqual(len(result), 1)
self.assertEqual(result[0].metric, "noise_sensitivity")
def test_noise_sensitivity_no_diagnosis_when_low(self):
rows = self._make_rows("noise_sensitivity", [0.1, 0.15, 0.2])
result = diagnose(rows, metrics=["noise_sensitivity"])
self.assertEqual(result, [])
def test_skips_metric_not_in_rows(self):
rows = [{"faithfulness": 0.3, "question": "q", "answer": "a",
"ground_truth": "gt", "sample_id": "s1"}]
result = diagnose(rows, metrics=["faithfulness", "context_recall"])
metrics_found = [d.metric for d in result]
self.assertIn("faithfulness", metrics_found)
self.assertNotIn("context_recall", metrics_found)
def test_all_seven_metrics_have_rules(self):
expected = {"faithfulness", "answer_relevancy", "context_recall",
"context_precision", "noise_sensitivity",
"factual_correctness", "semantic_similarity"}
self.assertEqual(set(METRIC_RULES.keys()), expected)
if __name__ == "__main__":
unittest.main()
```
- [ ] **Step 2: Run tests to verify they fail**
```
cd C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas
python -m pytest tests/test_advisor_rules.py -v 2>&1 | head -20
```
Expected: `ModuleNotFoundError: No module named 'rag_eval.advisor'`
- [ ] **Step 3: Create rules.py**
```python
# rag_eval/advisor/rules.py
"""Rule-based diagnostic engine for RAG evaluation metric scores."""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from typing import Any
@dataclass
class MetricRule:
"""Threshold configuration and diagnostic text for one metric."""
warning_threshold: float
critical_threshold: float
higher_is_better: bool # False for noise_sensitivity
root_causes: list[str]
suggested_actions: list[str]
METRIC_RULES: dict[str, MetricRule] = {
"faithfulness": MetricRule(
warning_threshold=0.7,
critical_threshold=0.5,
higher_is_better=True,
root_causes=[
"生成回答包含检索片段中不支持的陈述(幻觉)",
"生成阶段未严格遵循 grounding 约束",
"校验阶段未开启或未生效",
],
suggested_actions=[
"强化生成 prompt 的 grounding 约束('只依据参考资料作答'",
"开启校验阶段validation: by_scenario",
"检查低分样本中模型是否引用了片段外的知识",
],
),
"answer_relevancy": MetricRule(
warning_threshold=0.7,
critical_threshold=0.5,
higher_is_better=True,
root_causes=[
"回答偏离问题主旨或包含大量冗余内容",
"查询改写后问题语义漂移",
"生成 prompt 格式约束不足",
],
suggested_actions=[
"优化查询改写 prompt确保改写后语义不偏移",
"在生成 prompt 中加入'简洁准确、直接回答问题'的约束",
"检查低分样本的回答是否存在格式冗余或话题偏移",
],
),
"context_recall": MetricRule(
warning_threshold=0.7,
critical_threshold=0.5,
higher_is_better=True,
root_causes=[
"检索未能召回标准答案所涉及的关键信息",
"单一查询未能覆盖问题的多个角度",
"过召回数量不足,关键片段被截断",
],
suggested_actions=[
"启用多查询扩展use_multi_query覆盖不同措辞",
"对多跳问题启用问题分解sub_questions",
"加大过召回宽度recall_top_k",
"对颗粒度细的问题尝试 Step-back 双路检索",
],
),
"context_precision": MetricRule(
warning_threshold=0.6,
critical_threshold=0.4,
higher_is_better=True,
root_causes=[
"检索引入过多与问题无关的片段",
"重排未能将相关片段排在前列",
"缺少相关性过滤,噪声片段进入上下文",
],
suggested_actions=[
"启用或优化 listwise 重排,将相关片段排在前列",
"启用上下文压缩compression过滤无关句子",
"启用相关性过滤relevance_filter丢弃明确无关片段",
"缩小 rerank_keep_k如从 8 降到 5",
],
),
"noise_sensitivity": MetricRule(
warning_threshold=0.3, # higher is worse; trigger when mean > threshold
critical_threshold=0.5,
higher_is_better=False,
root_causes=[
"回答中包含检索到的噪声片段所引入的错误陈述",
"相关性过滤未能拦截干扰性片段",
"生成阶段对噪声片段未加区分地引用",
],
suggested_actions=[
"启用相关性过滤relevance_filter拦截噪声",
"优化重排,将不相关片段排到截断点之后",
"在生成 prompt 中强调'来源冲突时并列陈述,不擅自下定论'",
],
),
"factual_correctness": MetricRule(
warning_threshold=0.6,
critical_threshold=0.4,
higher_is_better=True,
root_causes=[
"回答的事实陈述与标准答案存在偏差",
"检索未能命中标准答案所依据的关键片段",
"生成阶段对多个来源综合时产生事实错误",
],
suggested_actions=[
"重点检查低分样本,确认是检索遗漏还是生成错误",
"提升 context_recall 以确保关键信息被检索到",
"对事实型问题将 temperature 降至 0",
],
),
"semantic_similarity": MetricRule(
warning_threshold=0.7,
critical_threshold=0.5,
higher_is_better=True,
root_causes=[
"回答语义与标准答案差距较大",
"回答过于简短或过于冗长,语义偏移",
"检索到的片段质量不足,导致生成内容偏离",
],
suggested_actions=[
"检查低分样本的回答与标准答案的表述差异",
"优化生成 prompt 使回答更贴近标准表述风格",
"提升检索质量context_recall / context_precision",
],
),
}
@dataclass
class Diagnosis:
"""Diagnostic result for one metric that triggered a threshold."""
metric: str
mean_score: float
threshold: float # the triggered threshold
severity: str # "warning" | "critical"
root_causes: list[str] = field(default_factory=list)
suggested_actions: list[str] = field(default_factory=list)
low_samples: list[dict[str, Any]] = field(default_factory=list)
def _mean_ignoring_nan(values: list[float]) -> float | None:
valid = [v for v in values if not math.isnan(v)]
if not valid:
return None
return sum(valid) / len(valid)
def _select_low_samples(
rows: list[dict[str, Any]],
metric: str,
top_n: int,
higher_is_better: bool,
) -> list[dict[str, Any]]:
"""Return the top_n worst-scoring rows for a metric, excluding NaN."""
valid = [r for r in rows if metric in r and not math.isnan(float(r[metric]))]
sorted_rows = sorted(valid, key=lambda r: float(r[metric]), reverse=not higher_is_better)
worst = sorted_rows[:top_n]
keep_keys = {"sample_id", "question", "answer", "ground_truth", metric}
return [{k: v for k, v in row.items() if k in keep_keys} for row in worst]
def diagnose(
score_rows: list[dict[str, Any]],
metrics: list[str],
top_low_samples: int = 3,
) -> list[Diagnosis]:
"""Analyse score_rows and return a Diagnosis for each metric below threshold.
Args:
score_rows: List of per-sample score dicts (from EvaluationResult.score_rows).
metrics: Metric names to evaluate (from Scenario.metrics).
top_low_samples: How many worst-scoring samples to attach per diagnosis.
Returns:
List of Diagnosis objects, one per triggered metric. Empty if all OK.
"""
diagnoses: list[Diagnosis] = []
for metric in metrics:
rule = METRIC_RULES.get(metric)
if rule is None:
continue # unknown metric, skip
values = []
for row in score_rows:
raw = row.get(metric)
if raw is None:
continue
try:
v = float(raw)
except (TypeError, ValueError):
continue
values.append(v)
if not values:
continue
mean = _mean_ignoring_nan(values)
if mean is None:
continue
# Determine severity (direction-aware)
if rule.higher_is_better:
if mean < rule.critical_threshold:
severity = "critical"
threshold = rule.critical_threshold
elif mean < rule.warning_threshold:
severity = "warning"
threshold = rule.warning_threshold
else:
continue # above warning threshold → no diagnosis
else:
# lower is better (noise_sensitivity)
if mean > rule.critical_threshold:
severity = "critical"
threshold = rule.critical_threshold
elif mean > rule.warning_threshold:
severity = "warning"
threshold = rule.warning_threshold
else:
continue
low_samples = _select_low_samples(score_rows, metric, top_low_samples, rule.higher_is_better)
diagnoses.append(Diagnosis(
metric=metric,
mean_score=round(mean, 4),
threshold=threshold,
severity=severity,
root_causes=list(rule.root_causes),
suggested_actions=list(rule.suggested_actions),
low_samples=low_samples,
))
return diagnoses
```
- [ ] **Step 4: Create `rag_eval/advisor/__init__.py` (stub — full version in Task 5)**
```python
# rag_eval/advisor/__init__.py
"""Optimization advisor: rule-based diagnosis + LLM-powered recommendations."""
from .rules import Diagnosis, diagnose
__all__ = ["Diagnosis", "diagnose"]
```
- [ ] **Step 5: Run tests — expect pass**
```
python -m pytest tests/test_advisor_rules.py -v
```
Expected: all 9 tests PASS.
- [ ] **Step 6: Commit**
```
git add rag_eval/advisor/__init__.py rag_eval/advisor/rules.py tests/test_advisor_rules.py
git commit -m "feat(advisor): add rule-based diagnostic engine with 7-metric coverage"
```
---
## Task 2: Writer module
**Files:**
- Create: `rag_eval/advisor/writer.py`
- Create: `tests/test_advisor_writer.py`
- [ ] **Step 1: Write failing tests**
```python
# tests/test_advisor_writer.py
import logging
import shutil
import unittest
from pathlib import Path
from rag_eval.advisor.rules import Diagnosis
from rag_eval.advisor.writer import write_advice, _format_log_summary
class TestWriteAdvice(unittest.TestCase):
def setUp(self):
self.tmp = Path("tests/.tmp/test_advisor_writer")
shutil.rmtree(self.tmp, ignore_errors=True)
self.tmp.mkdir(parents=True, exist_ok=True)
self.advice_path = self.tmp / "optimization_advice.md"
def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)
def _make_diagnosis(self, metric="faithfulness", severity="warning"):
return Diagnosis(
metric=metric,
mean_score=0.55,
threshold=0.7,
severity=severity,
root_causes=["原因1", "原因2"],
suggested_actions=["建议1", "建议2"],
low_samples=[
{"sample_id": "s1", "question": "问题1", "answer": "答案1",
"ground_truth": "标准1", metric: 0.4},
],
)
def test_write_creates_file(self):
diag = self._make_diagnosis()
write_advice(
diagnoses=[diag],
llm_markdown="## faithfulness\n\nLLM 建议内容",
advice_path=self.advice_path,
scenario_name="test-scenario",
run_id="2026-01-01T00-00-00",
judge_model="deepseek-v4-flash",
)
self.assertTrue(self.advice_path.exists())
def test_write_contains_scenario_name_and_run_id(self):
diag = self._make_diagnosis()
write_advice(
diagnoses=[diag],
llm_markdown="## faithfulness\n\nLLM 建议",
advice_path=self.advice_path,
scenario_name="siemens-test",
run_id="2026-01-01T00-00-00",
judge_model="deepseek-v4-flash",
)
content = self.advice_path.read_text(encoding="utf-8")
self.assertIn("siemens-test", content)
self.assertIn("2026-01-01T00-00-00", content)
def test_write_contains_llm_markdown(self):
diag = self._make_diagnosis()
write_advice(
diagnoses=[diag],
llm_markdown="## faithfulness\n\n具体建议文本",
advice_path=self.advice_path,
scenario_name="test",
run_id="rid",
judge_model="model",
)
content = self.advice_path.read_text(encoding="utf-8")
self.assertIn("具体建议文本", content)
def test_write_fallback_when_no_llm_markdown(self):
"""When llm_markdown is empty, writer emits rule-only report."""
diag = self._make_diagnosis()
write_advice(
diagnoses=[diag],
llm_markdown="",
advice_path=self.advice_path,
scenario_name="test",
run_id="rid",
judge_model="model",
)
content = self.advice_path.read_text(encoding="utf-8")
self.assertIn("faithfulness", content)
self.assertIn("原因1", content)
def test_log_summary_format(self):
diags = [
self._make_diagnosis("faithfulness", "critical"),
self._make_diagnosis("context_recall", "warning"),
]
summary = _format_log_summary(diags, self.advice_path)
self.assertIn("faithfulness", summary)
self.assertIn("critical", summary)
self.assertIn("context_recall", summary)
self.assertIn("warning", summary)
def test_write_empty_diagnoses_still_creates_file(self):
write_advice(
diagnoses=[],
llm_markdown="",
advice_path=self.advice_path,
scenario_name="test",
run_id="rid",
judge_model="model",
)
self.assertTrue(self.advice_path.exists())
content = self.advice_path.read_text(encoding="utf-8")
self.assertIn("未发现明显指标异常", content)
if __name__ == "__main__":
unittest.main()
```
- [ ] **Step 2: Run tests to verify they fail**
```
python -m pytest tests/test_advisor_writer.py -v 2>&1 | head -15
```
Expected: `ImportError: cannot import name 'write_advice' from 'rag_eval.advisor.writer'`
- [ ] **Step 3: Create writer.py**
```python
# rag_eval/advisor/writer.py
"""Write optimization advice to markdown file and emit log summary."""
from __future__ import annotations
import logging
from pathlib import Path
from .rules import Diagnosis
logger = logging.getLogger("rag_eval.advisor")
def _format_log_summary(diagnoses: list[Diagnosis], advice_path: Path) -> str:
"""Return a single-line log summary of triggered diagnoses."""
if not diagnoses:
return "[advisor] 所有指标正常,无需优化建议。"
parts = [f"{d.metric}({d.mean_score:.2f}, {d.severity})" for d in diagnoses]
triggered = " ".join(parts)
return f"[advisor] 触发诊断 {len(diagnoses)} 项: {triggered}{advice_path}"
def _build_fallback_report(diagnoses: list[Diagnosis]) -> str:
"""Build a rules-only report when LLM analysis is unavailable."""
if not diagnoses:
return ""
lines = ["## 规则诊断LLM 分析不可用)\n"]
for d in diagnoses:
lines.append(f"### {d.metric} [{d.severity}] 均值={d.mean_score:.4f}")
lines.append("\n**可能原因:**")
for cause in d.root_causes:
lines.append(f"- {cause}")
lines.append("\n**建议动作:**")
for action in d.suggested_actions:
lines.append(f"- {action}")
lines.append("")
return "\n".join(lines)
def write_advice(
diagnoses: list[Diagnosis],
llm_markdown: str,
advice_path: Path,
scenario_name: str,
run_id: str,
judge_model: str,
) -> None:
"""Write optimization_advice.md and emit a log summary line.
Args:
diagnoses: List of Diagnosis from rules.diagnose().
llm_markdown: LLM-generated Markdown body. Empty string triggers fallback.
advice_path: Full path to write the .md file.
scenario_name: Human-readable scenario identifier for the report header.
run_id: Run identifier string.
judge_model: Model used for LLM analysis (shown in header).
"""
advice_path.parent.mkdir(parents=True, exist_ok=True)
# Header
from rag_eval.shared.utils import utc_now_iso
header_lines = [
f"# 优化建议报告 — {scenario_name}",
"",
f"- run_id: `{run_id}`",
f"- 生成时间: `{utc_now_iso()}`",
f"- judge_model: `{judge_model}`",
"",
"---",
"",
]
if not diagnoses:
body = "## ✅ 未发现明显指标异常\n\n所有指标均在正常范围内,当前 RAG 链路表现良好。\n"
elif llm_markdown:
body = llm_markdown
else:
body = _build_fallback_report(diagnoses)
content = "\n".join(header_lines) + body
advice_path.write_text(content, encoding="utf-8")
summary = _format_log_summary(diagnoses, advice_path)
logger.info(summary)
logger.info("[advisor] 优化建议已写出: %s", advice_path)
```
- [ ] **Step 4: Run tests — expect pass**
```
python -m pytest tests/test_advisor_writer.py -v
```
Expected: all 6 tests PASS.
- [ ] **Step 5: Commit**
```
git add rag_eval/advisor/writer.py tests/test_advisor_writer.py
git commit -m "feat(advisor): add advice writer with fallback rule-only report"
```
---
## Task 3: LLM analyzer
**Files:**
- Create: `rag_eval/advisor/llm_analyzer.py`
No LLM unit tests (network-dependent); tested in Task 7 integration.
- [ ] **Step 1: Create llm_analyzer.py**
```python
# rag_eval/advisor/llm_analyzer.py
"""LLM-powered analysis of rule diagnostics and low-score samples."""
from __future__ import annotations
import logging
from typing import Any
from .rules import Diagnosis
logger = logging.getLogger("rag_eval.advisor")
_PROMPT_TEMPLATE = """\
你是一个 RAG 系统优化专家,正在分析西门子医疗 CT 文档问答系统的评测结果。
请用中文撰写一份优化建议报告,格式为 Markdown。
## 评测诊断摘要
{diagnosis_summary}
## 低分样本示例
{low_sample_text}
## 报告要求
1. 按指标分节(## 指标名 [severity]),先解释"为什么低"(结合低分样本具体分析),再给出"具体怎么改"
2. "具体怎么改"要结合低分样本的实际内容,而不只是泛泛建议
3. 最后写一节 **## 优先优化次序**,按性价比排序(不增加 LLM 调用次数的优化优先)
4. 语言简洁,面向工程师,不要废话,不要重复列表内容
只输出 Markdown 报告正文,不要任何前置说明。
"""
def _build_diagnosis_summary(diagnoses: list[Diagnosis]) -> str:
lines = []
for d in diagnoses:
direction = "(越低越好)" if d.metric == "noise_sensitivity" else ""
lines.append(
f"- **{d.metric}** {direction} 均值={d.mean_score:.4f}"
f"阈值={d.threshold},严重程度={d.severity}"
)
lines.append(f" - 可能原因:{'; '.join(d.root_causes)}")
lines.append(f" - 建议动作:{'; '.join(d.suggested_actions)}")
return "\n".join(lines)
def _build_low_sample_text(diagnoses: list[Diagnosis]) -> str:
lines = []
for d in diagnoses:
if not d.low_samples:
continue
lines.append(f"### {d.metric} 低分样本(最多 3 条)")
for i, s in enumerate(d.low_samples, 1):
score = s.get(d.metric, "N/A")
lines.append(f"\n**样本 {i}**(分数={score}")
lines.append(f"- 问题:{s.get('question', '')}")
lines.append(f"- 回答:{s.get('answer', '')[:300]}")
lines.append(f"- 标准答案:{s.get('ground_truth', '')[:200]}")
return "\n".join(lines)
async def analyze(
diagnoses: list[Diagnosis],
llm: Any,
scenario_name: str,
) -> str:
"""Call the judge LLM to generate a Chinese optimization report.
Args:
diagnoses: Non-empty list of Diagnosis from rules.diagnose().
llm: RAGAS LLM wrapper (has .agenerate() method).
scenario_name: Used only for logging.
Returns:
LLM-generated Markdown string, or "" on failure (triggers writer fallback).
"""
if not diagnoses:
return ""
diagnosis_summary = _build_diagnosis_summary(diagnoses)
low_sample_text = _build_low_sample_text(diagnoses)
prompt = _PROMPT_TEMPLATE.format(
diagnosis_summary=diagnosis_summary,
low_sample_text=low_sample_text,
)
try:
logger.info("[advisor] calling LLM for optimization analysis scenario=%s", scenario_name)
# ragas LLM wrapper: generate() accepts list[list[str]] and returns LLMResult
from langchain_core.messages import HumanMessage
result = await llm.agenerate(texts=[[HumanMessage(content=prompt)]])
text = result.generations[0][0].text.strip()
logger.info("[advisor] LLM analysis complete chars=%d", len(text))
return text
except Exception as exc:
logger.warning("[advisor] LLM analysis failed (%s: %s) — falling back to rule report", type(exc).__name__, exc)
return ""
```
- [ ] **Step 2: Verify import works**
```
python -c "from rag_eval.advisor.llm_analyzer import analyze; print('OK')"
```
Expected: `OK`
- [ ] **Step 3: Commit**
```
git add rag_eval/advisor/llm_analyzer.py
git commit -m "feat(advisor): add LLM analyzer with graceful fallback on failure"
```
---
## Task 4: Wire advisor into models, config schema, and loader
**Files:**
- Modify: `rag_eval/shared/models.py`
- Modify: `rag_eval/config/schema.py`
- Modify: `rag_eval/config/loader.py`
- Modify: `rag_eval/reporting/artifacts.py`
- [ ] **Step 1: Add `optimization_advisor` to `Scenario` and `RunArtifactPaths`**
In `rag_eval/shared/models.py`, add one field to `Scenario` (after `source_path`) and one to `RunArtifactPaths`:
```python
# In Scenario dataclass — add after source_path field:
optimization_advisor: bool = False
```
```python
# In RunArtifactPaths dataclass — add after metadata_json field:
advice_md: Path | None = None
```
Full updated `Scenario` dataclass (slots=True, so field order matters — add at end):
```python
@dataclass(slots=True)
class Scenario:
scenario_name: str
mode: Mode
dataset: DatasetConfig
judge_model: str
embedding_model: str
metrics: list[str]
output_dir: Path
runtime: RuntimeConfig = field(default_factory=RuntimeConfig)
app_adapter: AppAdapterConfig | None = None
source_path: Path | None = None
optimization_advisor: bool = False # NEW
```
Full updated `RunArtifactPaths`:
```python
@dataclass(slots=True)
class RunArtifactPaths:
root_dir: Path
scenario_snapshot: Path
scores_csv: Path
invalid_csv: Path
summary_md: Path
metadata_json: Path
advice_md: Path | None = None # NEW
```
- [ ] **Step 2: Add field to ScenarioModel in schema.py**
In `rag_eval/config/schema.py`, add to `ScenarioModel`:
```python
optimization_advisor: bool = False # NEW — enable optimization advisor output
```
(add after the `runtime` field)
- [ ] **Step 3:透传 optimization_advisor in loader.py**
In `rag_eval/config/loader.py`, in the `Scenario(...)` constructor call, add:
```python
optimization_advisor=model.optimization_advisor, # NEW
```
- [ ] **Step 4: Add advice_md to artifact paths in artifacts.py**
In `rag_eval/reporting/artifacts.py`, update `build_artifact_paths()`:
```python
def build_artifact_paths(output_dir: Path, run_id: str) -> RunArtifactPaths:
"""Build the canonical artifact file paths for a single evaluation run."""
run_dir = output_dir / run_id
return RunArtifactPaths(
root_dir=run_dir,
scenario_snapshot=run_dir / "scenario.snapshot.yaml",
scores_csv=run_dir / "scores.csv",
invalid_csv=run_dir / "invalid.csv",
summary_md=run_dir / "summary.md",
metadata_json=run_dir / "metadata.json",
advice_md=run_dir / "optimization_advice.md", # NEW
)
```
- [ ] **Step 5: Verify existing tests still pass**
```
python -m pytest tests/test_offline_eval.py tests/test_online_eval.py -v 2>&1 | tail -15
```
Expected: same pass/fail as before (the 4 pre-existing failures are unrelated).
- [ ] **Step 6: Commit**
```
git add rag_eval/shared/models.py rag_eval/config/schema.py rag_eval/config/loader.py rag_eval/reporting/artifacts.py
git commit -m "feat(advisor): wire optimization_advisor field through Scenario, ScenarioModel, loader, and RunArtifactPaths"
```
---
## Task 5: Lift build_models() to runner + wire run_advisor()
**Files:**
- Modify: `rag_eval/metrics/factory.py`
- Modify: `rag_eval/execution/runner.py`
- Modify: `rag_eval/advisor/__init__.py`
This is the integration wiring. The key change: `build_metric_pipeline()` currently creates llm internally and returns only `MetricPipeline`. We add a companion function `build_models()` that `runner.py` calls first, then passes `llm` to both `build_metric_pipeline()` and `run_advisor()`.
- [ ] **Step 1: Add `build_models()` as public function in factory.py**
The existing `build_models()` is already defined in `factory.py` (lines 30-39) but is module-private (no `__all__`). We expose it and update `build_metric_pipeline()` to accept optional pre-built models:
```python
# rag_eval/metrics/factory.py — full replacement
"""Factories for OpenAI-backed RAGAS models and metric pipelines."""
from __future__ import annotations
from typing import Any
from openai import AsyncOpenAI
from rag_eval.compat import ensure_ragas_import_compat
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.models import Scenario
ensure_ragas_import_compat()
from ragas.embeddings.base import embedding_factory
from ragas.llms import llm_factory
from ragas.metrics.collections import (
AnswerRelevancy,
ContextPrecision,
ContextRecall,
FactualCorrectness,
Faithfulness,
NoiseSensitivity,
SemanticSimilarity,
)
from .pipeline import MetricPipeline
def build_models(
judge_model: str,
embedding_model: str,
settings: EvaluationSettings,
) -> tuple[Any, Any]:
"""Create the LLM and embedding clients required by the selected RAGAS metrics."""
client = AsyncOpenAI(**settings.openai_client_kwargs)
llm = llm_factory(judge_model, client=client)
embeddings = embedding_factory(provider="openai", model=embedding_model, client=client)
return llm, embeddings
def build_metric_pipeline(
scenario: Scenario,
settings: EvaluationSettings,
llm: Any | None = None,
embeddings: Any | None = None,
) -> MetricPipeline:
"""Build a metric pipeline containing only the metrics requested by the scenario.
If llm and embeddings are provided (pre-built by the caller), they are reused.
Otherwise, new instances are created from scenario + settings.
"""
if llm is None or embeddings is None:
llm, embeddings = build_models(
scenario.judge_model,
scenario.embedding_model,
settings,
)
registry: dict[str, Any] = {
"faithfulness": Faithfulness(llm=llm),
"answer_relevancy": AnswerRelevancy(llm=llm, embeddings=embeddings),
"context_recall": ContextRecall(llm=llm),
"context_precision": ContextPrecision(llm=llm),
"noise_sensitivity": NoiseSensitivity(llm=llm),
"factual_correctness": FactualCorrectness(llm=llm),
"semantic_similarity": SemanticSimilarity(embeddings=embeddings),
}
return MetricPipeline(
metrics={name: registry[name] for name in scenario.metrics},
metric_timeout_seconds=settings.ragas_metric_timeout_seconds,
)
```
- [ ] **Step 2: Update `rag_eval/advisor/__init__.py` with full `run_advisor()`**
```python
# rag_eval/advisor/__init__.py
"""Optimization advisor: rule-based diagnosis + LLM-powered recommendations."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from rag_eval.reporting.artifacts import build_artifact_paths
from rag_eval.shared.models import EvaluationResult, Scenario
from .llm_analyzer import analyze
from .rules import Diagnosis, diagnose
from .writer import write_advice
logger = logging.getLogger("rag_eval.advisor")
__all__ = ["run_advisor", "Diagnosis", "diagnose"]
def run_advisor(
result: EvaluationResult,
scenario: Scenario,
llm: Any,
) -> None:
"""Run the full optimization advisor pipeline after an evaluation completes.
Skips silently if scenario.optimization_advisor is False.
Never raises — failures are logged as warnings, not exceptions.
Args:
result: Completed EvaluationResult from Evaluator.evaluate().
scenario: The resolved Scenario (provides metrics, judge_model, output_dir).
llm: Pre-built RAGAS LLM instance (from build_models()) for LLM analysis.
"""
if not scenario.optimization_advisor:
return
logger.info("[advisor] starting optimization analysis scenario=%s", scenario.scenario_name)
try:
artifact_paths = build_artifact_paths(scenario.output_dir, result.run_id)
if artifact_paths.advice_md is None:
logger.warning("[advisor] advice_md path not set in RunArtifactPaths — skipping")
return
diagnoses = diagnose(result.score_rows, scenario.metrics)
logger.info("[advisor] rule diagnosis complete: %d metric(s) triggered", len(diagnoses))
if diagnoses:
llm_markdown = asyncio.run(analyze(diagnoses, llm, scenario.scenario_name))
else:
llm_markdown = ""
write_advice(
diagnoses=diagnoses,
llm_markdown=llm_markdown,
advice_path=artifact_paths.advice_md,
scenario_name=scenario.scenario_name,
run_id=result.run_id,
judge_model=scenario.judge_model,
)
except Exception as exc:
logger.warning(
"[advisor] advisor failed (%s: %s) — evaluation result is unaffected",
type(exc).__name__, exc,
)
```
- [ ] **Step 3: Update runner.py to lift llm and call run_advisor()**
In `rag_eval/execution/runner.py`, make these changes:
1. Add import at top:
```python
from rag_eval.advisor import run_advisor
from rag_eval.metrics.factory import build_models, build_metric_pipeline
```
2. Replace the `build_metric_pipeline` import (it's already imported from `rag_eval.metrics.factory`) and update `run_scenario()`:
```python
# rag_eval/execution/runner.py — full replacement
"""High-level scenario runner used by the package and CLI entrypoints."""
from __future__ import annotations
import logging
import sys
from pathlib import Path
from rag_eval.adapters.http import HttpAppAdapter
from rag_eval.adapters.python import PythonFunctionAdapter
from rag_eval.advisor import run_advisor
from rag_eval.config.loader import load_scenario
from rag_eval.metrics.factory import build_models, build_metric_pipeline
from rag_eval.reporting.writers import write_run_artifacts
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.models import Scenario
from .evaluator import Evaluator
logger = logging.getLogger("rag_eval.execution.runner")
def _setup_logging(log_file: Path | None = None, level: int = logging.INFO) -> None:
"""Configure root logger: always write to stderr, optionally also to a file."""
fmt = "%(asctime)s %(levelname)-8s %(name)s %(message)s"
datefmt = "%H:%M:%S"
handlers: list[logging.Handler] = [logging.StreamHandler(sys.stderr)]
if log_file is not None:
log_file.parent.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setFormatter(logging.Formatter(fmt, datefmt=datefmt))
handlers.append(fh)
logging.basicConfig(level=level, format=fmt, datefmt=datefmt, handlers=handlers, force=True)
logging.getLogger("ragas").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)
def build_adapter(scenario: Scenario):
"""Instantiate the adapter required by the resolved scenario, if any."""
if scenario.app_adapter is None:
return None
if scenario.app_adapter.type == "http":
return HttpAppAdapter(scenario.app_adapter)
if scenario.app_adapter.type == "python":
return PythonFunctionAdapter(scenario.app_adapter)
raise ValueError(f"Unsupported adapter type: {scenario.app_adapter.type}")
def run_scenario(
scenario_path: str,
settings: EvaluationSettings | None = None,
log_file: Path | None = None,
log_level: int = logging.INFO,
):
"""Run one scenario end to end and persist its reporting artifacts."""
_setup_logging(log_file=log_file, level=log_level)
logger.info("[runner] run_scenario path=%s", scenario_path)
settings = settings or EvaluationSettings()
if not settings.openai_api_key:
raise EnvironmentError("OPENAI_API_KEY must be set before running the evaluator.")
scenario = load_scenario(scenario_path)
logger.info("[runner] scenario loaded: name=%s mode=%s max_samples=%s",
scenario.scenario_name, scenario.mode, scenario.runtime.max_samples)
# Build models once; reuse llm in both MetricPipeline and advisor.
llm, embeddings = build_models(scenario.judge_model, scenario.embedding_model, settings)
adapter = build_adapter(scenario)
pipeline = build_metric_pipeline(scenario, settings, llm=llm, embeddings=embeddings)
evaluator = Evaluator(scenario=scenario, metric_pipeline=pipeline, app_adapter=adapter)
result = evaluator.evaluate()
write_run_artifacts(result)
logger.info("[runner] artifacts written for run_id=%s", result.run_id)
# Optimization advisor — runs only if scenario.optimization_advisor is True.
run_advisor(result, scenario, llm)
return result
```
- [ ] **Step 4: Verify existing tests still pass**
```
python -m pytest tests/ -v 2>&1 | tail -20
```
Expected: same pass count as before this change (only pre-existing 4 failures).
- [ ] **Step 5: Commit**
```
git add rag_eval/metrics/factory.py rag_eval/execution/runner.py rag_eval/advisor/__init__.py
git commit -m "feat(advisor): wire run_advisor into runner, lift llm from build_metric_pipeline"
```
---
## Task 6: Enable advisor in Siemens online YAML
**Files:**
- Modify: `scenarios/online/siemens-pdf-question-bank-online.yaml`
- [ ] **Step 1: Add optimization_advisor field**
Read the current file first, then add one line after `embedding_model`:
```yaml
scenario_name: siemens-pdf-question-bank-online
mode: online
dataset: ../../datasets/raw/generated/siemens-pdf-question-bank.csv
judge_model: deepseek-v4-flash
embedding_model: text-embedding-v3
optimization_advisor: true # 评测结束后自动生成优化建议报告
metrics:
- faithfulness
- answer_relevancy
- context_recall
- context_precision
# 已启用:鲁棒性 / 端到端指标(数据集已含 ground_truth
- noise_sensitivity # 鲁棒性:对检索噪声的敏感度
- factual_correctness # 端到端:事实正确性(相对标准答案)
- semantic_similarity # 端到端语义相似度embedding无 LLM 调用)
output_dir: ../../outputs/online/siemens-pdf-question-bank
runtime:
batch_size: 4
app_concurrency: 4
metric_concurrency: 4
max_samples: 50
app_adapter:
type: python
callable: apps.siemens_pdf_qa.adapter:run
static_kwargs:
source_chunks_path: ../../outputs/dataset-builds/siemens-pdf-question-bank/latest/source_chunks.jsonl
model: deepseek-v4-flash
```
- [ ] **Step 2: Verify scenario loads correctly**
```
python -c "
from rag_eval.config.loader import load_scenario
s = load_scenario('scenarios/online/siemens-pdf-question-bank-online.yaml')
print('optimization_advisor:', s.optimization_advisor)
print('metrics:', s.metrics)
"
```
Expected:
```
optimization_advisor: True
metrics: ['faithfulness', 'answer_relevancy', 'context_recall', 'context_precision', 'noise_sensitivity', 'factual_correctness', 'semantic_similarity']
```
- [ ] **Step 3: Commit**
```
git add scenarios/online/siemens-pdf-question-bank-online.yaml
git commit -m "feat(advisor): enable optimization_advisor in siemens online scenario"
```
---
## Task 7: Run all advisor tests + smoke check
**Files:** none new
- [ ] **Step 1: Run full advisor test suite**
```
python -m pytest tests/test_advisor_rules.py tests/test_advisor_writer.py -v
```
Expected: 15 tests PASS (9 rules + 6 writer).
- [ ] **Step 2: Smoke-check the full module wiring (no network)**
```python
# paste into Python REPL or save as scripts/smoke_advisor.py and run
import math, sys
sys.path.insert(0, ".")
from rag_eval.advisor.rules import diagnose
from rag_eval.advisor.writer import write_advice, _format_log_summary
from pathlib import Path
import tempfile, os
# Simulate score_rows with low faithfulness and high noise_sensitivity
rows = [
{"sample_id": f"s{i}", "question": f"q{i}", "answer": f"a{i}",
"ground_truth": f"gt{i}", "faithfulness": 0.3 + i*0.05,
"noise_sensitivity": 0.4 + i*0.02}
for i in range(5)
]
diags = diagnose(rows, metrics=["faithfulness", "noise_sensitivity"])
print(f"Diagnosed {len(diags)} metric(s):")
for d in diags:
print(f" {d.metric}: mean={d.mean_score}, severity={d.severity}, samples={len(d.low_samples)}")
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "optimization_advice.md"
write_advice(
diagnoses=diags,
llm_markdown="", # fallback mode
advice_path=path,
scenario_name="smoke-test",
run_id="2026-01-01T00-00-00",
judge_model="deepseek-v4-flash",
)
content = path.read_text(encoding="utf-8")
print(f"\nAdvice file ({len(content)} chars):")
print(content[:600])
print("\nSmoke check PASSED")
```
```
python scripts/smoke_advisor.py
```
Expected: prints diagnosed metrics, advice content, `Smoke check PASSED`.
- [ ] **Step 3: Commit smoke script**
```
git add scripts/smoke_advisor.py
git commit -m "test(advisor): add smoke-check script for offline wiring verification"
```
---
## Task 8: Update docs
**Files:**
- Modify: `docs/rag-eval-engine-flow.md`
- Modify: `docs/rag-eval-architecture.md`
- [ ] **Step 1: Add advisor section to rag-eval-engine-flow.md**
Append a new section at the end of `docs/rag-eval-engine-flow.md`:
```markdown
---
## 15. Optimization Advisor 链路
相关代码:
- `rag_eval/advisor/__init__.py` — 外部入口 `run_advisor()`
- `rag_eval/advisor/rules.py` — 规则引擎(纯函数,无 LLM
- `rag_eval/advisor/llm_analyzer.py` — LLM 分析器(复用 judge_model
- `rag_eval/advisor/writer.py` — 写出 `optimization_advice.md` + 日志摘要
Advisor 在 `write_run_artifacts()` 之后触发,仅当场景配置 `optimization_advisor: true` 时生效。
执行链路:
```text
run_advisor(result, scenario, llm)
-> rules.diagnose(score_rows, metrics) # 识别异常指标,选取低分样本
-> llm_analyzer.analyze(diagnoses, llm) # LLM 生成中文建议(失败自动降级)
-> writer.write_advice(...) # 写 optimization_advice.md + 日志摘要
```
输出产物追加在现有 run 目录:
```text
outputs/online/siemens-pdf-question-bank/<run_id>/
...(现有文件)
optimization_advice.md ← 新增optimization_advisor: true 时生成)
```
```
- [ ] **Step 2: Add advisor note to rag-eval-architecture.md §9.4**
In `docs/rag-eval-architecture.md`, find the end of section `### 9.4 指标编排` and append:
```markdown
**Optimization Advisor§11 优化策略落地):**
评测结束后,若场景配置 `optimization_advisor: true`,则自动调用 `rag_eval/advisor/` 模块:
- 规则引擎(`rules.py`)对 7 个指标各自设阈值,识别触发项并选取 top-3 低分样本
- LLM 分析器(`llm_analyzer.py`)结合低分样本生成中文 Markdown 优化建议(复用 judge_model失败自动降级为纯规则报告
- 写出层(`writer.py`)输出 `optimization_advice.md` 并打日志摘要
```yaml
# 场景配置示例
optimization_advisor: true
```
```
- [ ] **Step 3: Commit docs**
```
git add docs/rag-eval-engine-flow.md docs/rag-eval-architecture.md
git commit -m "docs: add optimization advisor section to engine-flow and architecture docs"
```
---
## Self-Review
**Spec coverage check:**
- §2 决策摘要 → Task 5 (llm reuse), Task 6 (YAML trigger), Task 1+2+3 (outputs) ✓
- §3.1 执行链路 → Task 5 runner.py wiring ✓
- §3.2 新增文件 → Tasks 1, 2, 3 ✓
- §3.3 修改文件 → Task 4 (models/schema/loader/artifacts), Task 5 (factory/runner) ✓
- §3.4 输出产物 → Task 4 advice_md in RunArtifactPaths ✓
- §4 规则引擎 7条规则 → Task 1 rules.py METRIC_RULES ✓
- §4.3 low_samples top-3 → Task 1 `_select_low_samples` ✓
- §5 LLM分析器 → Task 3 llm_analyzer.py ✓
- §5.4 失败降级 → Task 3 try/except returns "" → Task 2 writer fallback ✓
- §6.1 文件写出 + §6.2 日志摘要 → Task 2 writer.py ✓
- §7 YAML配置 → Task 6 ✓
- §8 测试策略 → Tasks 1 (rules tests), 2 (writer tests) ✓
- §9 非目标 → not implemented ✓
**Type consistency check:**
- `Diagnosis` defined in `rules.py`, imported in `__init__.py`, `llm_analyzer.py`, `writer.py` — all consistent ✓
- `write_advice()` signature matches calls in `__init__.py` ✓
- `build_models()` returns `tuple[Any, Any]`; `build_metric_pipeline()` accepts `llm, embeddings` — consistent ✓
- `run_advisor(result, scenario, llm)` signature matches call in `runner.py` ✓
**Placeholder scan:** No TBD/TODO/fill-in-later found ✓