diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..13566b8
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..1e305c1
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..76b7fe3
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/siemens_ragas.iml b/.idea/siemens_ragas.iml
new file mode 100644
index 0000000..d6ebd48
--- /dev/null
+++ b/.idea/siemens_ragas.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.superpowers/brainstorm/1625-1781595805/content/analysis-approach.html b/.superpowers/brainstorm/1625-1781595805/content/analysis-approach.html
new file mode 100644
index 0000000..85419b2
--- /dev/null
+++ b/.superpowers/brainstorm/1625-1781595805/content/analysis-approach.html
@@ -0,0 +1,60 @@
+
优化建议怎么生成?
+这决定了模块的核心机制与可维护性
+
+
+
+
A
+
+
纯规则引擎
+
每个指标设阈值(如 faithfulness < 0.6),触发时给出预设建议文本。
+
+
优点
+ - 零 LLM 调用,零额外成本
+ - 结果可预测、可审计
+ - 响应极快
+
+
缺点
+ - 建议固定,无法结合具体样本
+ - 不能解释"为什么这批数据这个指标低"
+
+
+
+
+
+
+
B
+
+
LLM 分析(全自动)
+
把评测结果(各指标均值 + 低分样本)一起交给 LLM,生成上下文感知的中文分析报告。
+
+
优点
+ - 能结合具体低分样本给出针对性建议
+ - 可用中文解释西门子场景下的问题
+ - 建议质量高、内容丰富
+
+
缺点
+ - 每次评测多 1 次 LLM 调用
+ - 依赖 judge_model 的质量
+
+
+
+
+
+
+
C
+
+
规则定位 + LLM 解读(推荐)
+
规则引擎先识别哪些指标异常、触发哪条优化方向;再把"规则诊断 + 低分样本"一起给 LLM 做二次解读,生成中文建议。
+
+
优点
+ - 规则保证诊断稳定,不依赖 LLM 自由发挥
+ - LLM 在有结构的输入下输出更准确
+ - 两层可独立测试
+
+
+
+
+
+
diff --git a/.superpowers/brainstorm/1625-1781595805/content/approaches.html b/.superpowers/brainstorm/1625-1781595805/content/approaches.html
new file mode 100644
index 0000000..638bed6
--- /dev/null
+++ b/.superpowers/brainstorm/1625-1781595805/content/approaches.html
@@ -0,0 +1,77 @@
+优化顾问模块 — 实现方案对比
+三个方案的核心区别在于 LLM 调用边界和代码入侵程度
+
+
+
+
A
+
+
独立后处理器(轻量集成)
+
新增 rag_eval/advisor/ 包,run_scenario() 末尾调用一行 maybe_run_advisor(result, scenario)。
+
文件结构:
+
+ rag_eval/advisor/__init__.py
+ rag_eval/advisor/rules.py — 规则引擎,输入 score_rows,输出诊断列表
+ rag_eval/advisor/llm_analyzer.py — 把规则诊断 + 低分样本交给 judge_model
+ rag_eval/advisor/writer.py — 写 optimization_advice.md,打日志摘要
+
+
+
优点
+ - 改动最小,runner.py 只加 3 行
+ - advisor 完全独立,可单独测试
+ - 与现有分层架构完全吻合
+
+
缺点
+ - 无法拿到 per-metric 的原始 NaN 率(需从 score_rows 重新算)
+
+
+
+
+
+
+
B
+
+
嵌入 reporting 层(复用写出基础设施)
+
把 advisor 作为 rag_eval/reporting/ 的一部分,write_run_artifacts() 内部判断是否写 advice。
+
文件结构:
+
+ rag_eval/reporting/advisor.py — 规则 + LLM + 写出三合一
+ write_run_artifacts() 里追加 if scenario.optimization_advisor: write_advice(...)
+
+
+
优点
+ - artifacts 路径管理统一,advice 自然进 run 目录
+ - 文件更少
+
+
缺点
+ - reporting 层本是"无副作用写文件",混入 LLM 调用破坏这一约定
+ - advisor 逻辑和写出逻辑耦合,难以单独测试规则引擎
+
+
+
+
+
+
+
C
+
+
方案 A 变体:advisor 有独立 settings(推荐)
+
与方案 A 相同的文件结构,但 LLM 调用使用 scenario 已有的 judge_model,不新增任何模型配置——advisor 复用 build_models() 已构建好的 llm 实例。
+
+ rag_eval/advisor/rules.py — 纯函数,7 条指标诊断规则
+ rag_eval/advisor/llm_analyzer.py — 接收已有 llm 实例,不重新建 client
+ rag_eval/advisor/writer.py — 写 md + 日志
+ rag_eval/advisor/__init__.py — 暴露 run_advisor()
+
+
+
优点
+ - 不重复创建 LLM client(节省资源)
+ - advisor 阈值可通过 YAML 的 optimization_advisor 块扩展配置
+ - 独立包边界清晰,易于单测
+ - runner.py 改动最小
+
+
缺点
+ - 需把 llm 实例从 runner 传入 advisor(多传一个参数)
+
+
+
+
+
diff --git a/.superpowers/brainstorm/1625-1781595805/content/design-architecture.html b/.superpowers/brainstorm/1625-1781595805/content/design-architecture.html
new file mode 100644
index 0000000..acbde1b
--- /dev/null
+++ b/.superpowers/brainstorm/1625-1781595805/content/design-architecture.html
@@ -0,0 +1,53 @@
+优化顾问模块 — 整体架构与数据流
+新增 rag_eval/advisor/ 包,插入 run_scenario() 末尾,复用已有 llm 实例
+
+
+
+
+ run_scenario()
+ → load_scenario() # 读 YAML,解析 Scenario + optimization_advisor 字段
+ → build_models() # 已有:创建 llm, embeddings
+ → build_metric_pipeline() # 已有
+ → Evaluator.evaluate() # 已有:打分 → EvaluationResult
+ → write_run_artifacts() # 已有:scores.csv / summary.md / ...
+ → run_advisor(result, scenario, llm) # 新增 3 行
+ → rules.diagnose(score_rows) # 规则引擎:识别异常指标 + 方向
+ → llm_analyzer.analyze(diag, samples) # LLM:结合低分样本生成中文建议
+ → writer.write(advice, paths) # 写 optimization_advice.md + 日志
+
+
+
+
+
新增文件一览
+
+
+ rag_eval/advisor/
+ __init__.py ← 暴露 run_advisor(),是外部唯一入口
+ rules.py ← 纯函数,无 LLM,可单独单测
+ llm_analyzer.py ← 接收 llm 实例 + 诊断结构 → 中文 Markdown
+ writer.py ← 写 optimization_advice.md,打日志摘要
+
+ rag_eval/shared/models.py ← 修改:Scenario 加 optimization_advisor 字段
+ rag_eval/config/schema.py ← 修改:ScenarioModel 加字段
+ rag_eval/execution/runner.py ← 修改:末尾加 3 行调用
+ rag_eval/reporting/artifacts.py ← 修改:RunArtifactPaths 加 advice_md 路径
+
+
+
+
+
+
输出产物
+
+
+ outputs/online/siemens-pdf-question-bank/<run_id>/
+ scenario.snapshot.yaml
+ scores.csv
+ invalid.csv
+ summary.md
+ metadata.json
+ optimization_advice.md ← 新增
+
+
+
+
+整体看起来 OK 吗?这是新模块与现有链路的接入方式。
diff --git a/.superpowers/brainstorm/1625-1781595805/content/trigger-mode.html b/.superpowers/brainstorm/1625-1781595805/content/trigger-mode.html
new file mode 100644
index 0000000..617bddd
--- /dev/null
+++ b/.superpowers/brainstorm/1625-1781595805/content/trigger-mode.html
@@ -0,0 +1,68 @@
+优化顾问在什么情况下运行?
+这决定了模块与现有评测流程的集成方式
+
+
+
+
A
+
+
每次评测自动运行
+
run_scenario() 结束后自动调用,无需任何额外配置。
+
+
优点
+ - 零感知,开箱即用
+ - 每次跑完都有建议报告
+
+
缺点
+ - 每次都多一次 LLM 调用,不管是否需要
+ - 无法关闭
+
+
+
+
+
+
+
B
+
+
YAML 场景中显式开启(推荐)
+
在 scenario YAML 里加一行 optimization_advisor: true,默认关闭。
+
+
+
+ metrics:
+ - faithfulness
+ - noise_sensitivity
+ ...
+ optimization_advisor: true # 新增
+
+
+
+
优点
+ - 显式可见,按需开启
+ - 与现有 YAML 驱动风格一致
+ - 可为不同场景独立配置
+
+
+
+
+
+
+
+
C
+
+
阈值触发(任一指标低于警戒线时自动激活)
+
规则引擎先算,若发现有指标低于阈值则自动启动 LLM 分析;一切正常则跳过。
+
+
优点
+ - "有问题才报警",符合直觉
+ - 高分场景无额外成本
+
+
缺点
+ - 阈值需要维护,不同场景可能不同
+ - 正常分数时无建议,但用户可能仍想看优化空间
+
+
+
+
+
diff --git a/.superpowers/brainstorm/1625-1781595805/content/waiting-2.html b/.superpowers/brainstorm/1625-1781595805/content/waiting-2.html
new file mode 100644
index 0000000..b5f5028
--- /dev/null
+++ b/.superpowers/brainstorm/1625-1781595805/content/waiting-2.html
@@ -0,0 +1,3 @@
+
+
Writing spec & moving to implementation...
+
diff --git a/.superpowers/brainstorm/1625-1781595805/content/waiting.html b/.superpowers/brainstorm/1625-1781595805/content/waiting.html
new file mode 100644
index 0000000..e588f77
--- /dev/null
+++ b/.superpowers/brainstorm/1625-1781595805/content/waiting.html
@@ -0,0 +1,3 @@
+
+
Continuing in terminal — 正在设计方案...
+
diff --git a/.superpowers/brainstorm/1625-1781595805/state/server-stopped b/.superpowers/brainstorm/1625-1781595805/state/server-stopped
new file mode 100644
index 0000000..c4e2401
--- /dev/null
+++ b/.superpowers/brainstorm/1625-1781595805/state/server-stopped
@@ -0,0 +1 @@
+{"reason":"idle timeout","timestamp":1781598635371}
diff --git a/.superpowers/brainstorm/1625-1781595805/state/server.pid b/.superpowers/brainstorm/1625-1781595805/state/server.pid
new file mode 100644
index 0000000..f71f46d
--- /dev/null
+++ b/.superpowers/brainstorm/1625-1781595805/state/server.pid
@@ -0,0 +1 @@
+1625
diff --git a/logs/online_eval.log b/logs/online_eval.log
new file mode 100644
index 0000000..061116f
--- /dev/null
+++ b/logs/online_eval.log
@@ -0,0 +1 @@
+Completed run: C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas\outputs\online\siemens-pdf-question-bank
diff --git a/logs/server_2026-06-23.log b/logs/server_2026-06-23.log
new file mode 100644
index 0000000..9005d33
--- /dev/null
+++ b/logs/server_2026-06-23.log
@@ -0,0 +1,24 @@
+2026-06-23 13:55:00 INFO webapp.server Starting RAGAS Console host=127.0.0.1 port=8800 log_level=info log_file=C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas\logs\server_2026-06-23.log
+2026-06-23 13:55:14 INFO uvicorn.error Started server process [83868]
+2026-06-23 13:55:14 INFO uvicorn.error Waiting for application startup.
+2026-06-23 13:55:14 INFO uvicorn.error Application startup complete.
+2026-06-23 13:55:14 INFO uvicorn.error Uvicorn running on http://127.0.0.1:8800 (Press CTRL+C to quit)
+2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:53487 - "GET / HTTP/1.1" 200
+2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:53487 - "GET /static/css/app.css HTTP/1.1" 200
+2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:50321 - "GET /static/js/api.js HTTP/1.1" 200
+2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:51325 - "GET /static/js/profiles.js HTTP/1.1" 200
+2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:59869 - "GET /static/js/report.js HTTP/1.1" 200
+2026-06-23 13:59:48 INFO uvicorn.access 127.0.0.1:50980 - "GET /static/js/runner.js HTTP/1.1" 200
+2026-06-23 13:59:48 INFO uvicorn.access 127.0.0.1:63223 - "GET /static/js/app.js HTTP/1.1" 200
+2026-06-23 13:59:48 INFO webapp.access GET /docs → 200 (0ms)
+2026-06-23 13:59:48 INFO uvicorn.access 127.0.0.1:63223 - "GET /docs HTTP/1.1" 200
+2026-06-23 13:59:48 INFO webapp.access GET /api/health → 200 (0ms)
+2026-06-23 13:59:48 INFO uvicorn.access 127.0.0.1:50321 - "GET /api/health HTTP/1.1" 200
+2026-06-23 13:59:49 INFO webapp.api.runs [get_runs] found 19 runs
+2026-06-23 13:59:49 INFO webapp.access GET /api/runs → 200 (1094ms)
+2026-06-23 13:59:49 INFO uvicorn.access 127.0.0.1:63223 - "GET /api/runs HTTP/1.1" 200
+2026-06-23 13:59:49 INFO webapp.access GET /openapi.json → 200 (94ms)
+2026-06-23 13:59:49 INFO uvicorn.access 127.0.0.1:63223 - "GET /openapi.json HTTP/1.1" 200
+2026-06-23 13:59:50 INFO webapp.api.llm_profiles [list_profiles] count=6
+2026-06-23 13:59:50 INFO webapp.access GET /api/llm-profiles → 200 (0ms)
+2026-06-23 13:59:50 INFO uvicorn.access 127.0.0.1:63223 - "GET /api/llm-profiles HTTP/1.1" 200
diff --git a/logs/siemens_build.log b/logs/siemens_build.log
new file mode 100644
index 0000000..47aaa36
--- /dev/null
+++ b/logs/siemens_build.log
@@ -0,0 +1,35 @@
+ [info] generating questions for: 315_1_Flash????????.pdf
+ [info] 315_1_Flash????????.pdf: 6 questions generated (total so far: 6)
+ [info] generating questions for: 316_2_Flash??????_??.pdf
+ [info] 316_2_Flash??????_??.pdf: 10 questions generated (total so far: 16)
+ [info] generating questions for: 317_3_Flash??????_??.pdf
+ [info] 317_3_Flash??????_??.pdf: 9 questions generated (total so far: 25)
+ [info] generating questions for: 318_4_Flash??????_???.pdf
+ [info] 318_4_Flash??????_???.pdf: 9 questions generated (total so far: 34)
+ [info] generating questions for: 319_5_Flash??????_?????.pdf
+ [info] 319_5_Flash??????_?????.pdf: 10 questions generated (total so far: 44)
+ [info] generating questions for: 320_6_Flash??????_??.pdf
+ [info] 320_6_Flash??????_??.pdf: 8 questions generated (total so far: 52)
+ [info] generating questions for: 321_??CT???????????--??.pdf
+ [info] 321_??CT???????????--??.pdf: 5 questions generated (total so far: 57)
+ [info] generating questions for: 322_??CT???????????--??????????.pdf
+ [info] 322_??CT???????????--??????????.pdf: 8 questions generated (total so far: 65)
+ [info] generating questions for: 323_??CT???????????--?????????.pdf
+ [info] 323_??CT???????????--?????????.pdf: 5 questions generated (total so far: 70)
+ [info] generating questions for: 324_??CT???????????--????????.pdf
+ [info] 324_??CT???????????--????????.pdf: 8 questions generated (total so far: 78)
+ [info] generating questions for: 325_??CT???????????--???????.pdf
+ [info] 325_??CT???????????--???????.pdf: 8 questions generated (total so far: 86)
+ [info] generating questions for: 326_??CT???????????--4D????.pdf
+ [info] 326_??CT???????????--4D????.pdf: 7 questions generated (total so far: 93)
+ [info] generating questions for: 327_??CT???????????--??????.pdf
+ [info] 327_??CT???????????--??????.pdf: 8 questions generated (total so far: 101)
+ [info] generating questions for: 749_????01_???????????.pdf
+ [info] 749_????01_???????????.pdf: 8 questions generated (total so far: 109)
+ [info] generating questions for: 804_????02-????????CT?????X-Map??.pdf
+ [info] 804_????02-????????CT?????X-Map??.pdf: 8 questions generated (total so far: 117)
+ [info] generating questions for: 805_????03_????????????????.pdf
+ [info] 805_????03_????????????????.pdf: 6 questions generated (total so far: 123)
+ [info] generating questions for: 807_???CT???????_SJ-L10.2??1-5.pdf
+ [info] 807_???CT???????_SJ-L10.2??1-5.pdf: 9 questions generated (total so far: 132)
+Completed dataset build: C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas\outputs\dataset-builds\siemens-pdf-question-bank\2026-06-15T09-28-35.302231+00-00
diff --git a/rag_eval/execution/evaluator.py b/rag_eval/execution/evaluator.py
index b2a84cc..5044730 100644
--- a/rag_eval/execution/evaluator.py
+++ b/rag_eval/execution/evaluator.py
@@ -180,12 +180,12 @@ class Evaluator:
record["judge_model"] = self.scenario.judge_model
record["embedding_model"] = self.scenario.embedding_model
record["run_id"] = self.scenario.scenario_name
- # Weighted score columns — enable post-hoc weighted aggregation in reporting.
- record["weighted_score"] = compute_weighted_score(
- score.metrics, self.scenario.metric_weights
- )
- doc_name = str(sample.metadata.get("doc_name", "") or "")
- record["sample_weight"] = resolve_weight(
- self.scenario.doc_weights, doc_name, default=1.0
- )
+ # 综合加权得分列(已暂时禁用)
+ # record["weighted_score"] = compute_weighted_score(
+ # score.metrics, self.scenario.metric_weights
+ # )
+ # doc_name = str(sample.metadata.get("doc_name", "") or "")
+ # record["sample_weight"] = resolve_weight(
+ # self.scenario.doc_weights, doc_name, default=1.0
+ # )
return record
diff --git a/rag_eval/reporting/summary.py b/rag_eval/reporting/summary.py
index 13a76af..7953e7f 100644
--- a/rag_eval/reporting/summary.py
+++ b/rag_eval/reporting/summary.py
@@ -75,15 +75,16 @@ def build_summary_markdown(result: EvaluationResult) -> str:
else:
lines.append(f"- {metric}: `n/a`{weight_note}")
- if has_weights:
- overall_ws = compute_overall_weighted_score_mean(
- score_rows_list, result.scenario.metric_weights, result.scenario.doc_weights
- )
- weight_suffix = " (加权)"
- if overall_ws is not None and not math.isnan(overall_ws):
- lines.append(f"- **weighted_score{weight_suffix}: `{overall_ws:.4f}`**")
- else:
- lines.append(f"- **weighted_score{weight_suffix}: `n/a`**")
+ # 综合加权得分(已暂时禁用)
+ # if has_weights:
+ # overall_ws = compute_overall_weighted_score_mean(
+ # score_rows_list, result.scenario.metric_weights, result.scenario.doc_weights
+ # )
+ # weight_suffix = " (加权)"
+ # if overall_ws is not None and not math.isnan(overall_ws):
+ # lines.append(f"- **weighted_score{weight_suffix}: `{overall_ws:.4f}`**")
+ # else:
+ # lines.append(f"- **weighted_score{weight_suffix}: `n/a`**")
detail_columns = ["sample_id", *result.scenario.metrics, "weighted_score", "error"]
existing_columns = [c for c in detail_columns if c in scores.columns]
diff --git a/rag_eval/shared/profile_store.py b/rag_eval/shared/profile_store.py
new file mode 100644
index 0000000..ccf93fc
--- /dev/null
+++ b/rag_eval/shared/profile_store.py
@@ -0,0 +1,53 @@
+"""Lightweight read-only accessor for configs/llm_profiles.json.
+
+Kept in ``rag_eval`` (not ``webapp``) so the runner can look up per-model
+credentials without depending on the webapp layer.
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+from pathlib import Path
+from typing import Any
+
+logger = logging.getLogger(__name__)
+
+_PROFILES_PATH = Path(__file__).resolve().parents[2] / "configs" / "llm_profiles.json"
+
+
+def find_by_model(model_name: str) -> dict[str, Any] | None:
+ """Return the first profile whose ``model`` field matches *model_name*, or None.
+
+ Returns None (without raising) when the profiles file does not exist or
+ cannot be parsed — callers fall back to environment-variable defaults.
+ """
+ if not _PROFILES_PATH.exists():
+ return None
+ try:
+ data = json.loads(_PROFILES_PATH.read_text(encoding="utf-8"))
+ for profile in data.get("profiles", []):
+ if profile.get("model") == model_name:
+ return profile
+ except Exception as exc: # noqa: BLE001
+ logger.warning("[profile_store] failed to read %s: %s", _PROFILES_PATH, exc)
+ return None
+
+
+def profile_to_client_kwargs(
+ profile: dict[str, Any],
+ fallback_api_key: str | None,
+ fallback_timeout: float,
+) -> dict[str, Any]:
+ """Convert a profile dict into keyword arguments for ``openai.AsyncOpenAI``.
+
+ Fields present in the profile override the supplied fallback values.
+ """
+ kwargs: dict[str, Any] = {
+ "api_key": profile.get("api_key") or fallback_api_key or "",
+ "timeout": float(profile.get("timeout_seconds") or fallback_timeout),
+ }
+ base_url = (profile.get("base_url") or "").strip()
+ if base_url:
+ kwargs["base_url"] = base_url
+ return kwargs
diff --git a/project-overview.html b/siemens-ragas-project-overview.html
similarity index 100%
rename from project-overview.html
rename to siemens-ragas-project-overview.html
diff --git a/tests/test_offline_eval.py b/tests/test_offline_eval.py
index 3f6ce90..aee354b 100644
--- a/tests/test_offline_eval.py
+++ b/tests/test_offline_eval.py
@@ -184,7 +184,7 @@ class ScenarioAndDatasetTests(unittest.TestCase):
class EvaluatorAndReportingTests(unittest.TestCase):
def test_merge_score_includes_weighted_score_and_sample_weight(self):
- """_merge_score adds weighted_score and sample_weight columns."""
+ """_merge_score no longer adds weighted_score/sample_weight (feature disabled)."""
from unittest.mock import MagicMock
from rag_eval.execution.evaluator import Evaluator
from rag_eval.shared.models import (
@@ -212,9 +212,11 @@ class EvaluatorAndReportingTests(unittest.TestCase):
)
score = MetricScore(metrics={"faithfulness": 1.0, "context_recall": 0.0})
row = evaluator._merge_score(sample, score)
- # (3*1.0 + 1*0.0) / (3+1) = 0.75
- assert abs(row["weighted_score"] - 0.75) < 1e-4
- assert row["sample_weight"] == 2.0
+ # 综合加权得分已暂时禁用,weighted_score 和 sample_weight 不再写入
+ assert "weighted_score" not in row
+ assert "sample_weight" not in row
+ assert row["faithfulness"] == 1.0
+ assert row["context_recall"] == 0.0
def test_summary_markdown_shows_weighted_score(self):
"""build_summary_markdown includes weighted_score when metric_weights set."""
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
new file mode 100644
index 0000000..9ed1b4c
--- /dev/null
+++ b/tests/test_pipeline.py
@@ -0,0 +1,280 @@
+"""Tests for the end-to-end pipeline API and pipeline task manager."""
+
+from __future__ import annotations
+
+import json
+import time
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import pytest
+from fastapi.testclient import TestClient
+
+
+# ── fixtures ──────────────────────────────────────────────────────────────────
+
+@pytest.fixture()
+def client(tmp_path, monkeypatch):
+ """TestClient with a fresh PipelineTaskManager backed by tmp_path outputs."""
+ import webapp.services.pipeline_task_manager as mgr_mod
+ from webapp.services.pipeline_task_manager import PipelineTaskManager
+
+ fresh_mgr = PipelineTaskManager(max_workers=2)
+ monkeypatch.setattr(mgr_mod, "pipeline_task_manager", fresh_mgr)
+ monkeypatch.setattr(mgr_mod, "_PIPELINE_OUTPUT_ROOT", tmp_path / "pipeline")
+
+ import webapp.api.pipeline as api_mod
+ monkeypatch.setattr(api_mod, "pipeline_task_manager", fresh_mgr)
+
+ from webapp.server import create_app
+ return TestClient(create_app())
+
+
+def _minimal_pdf_dir(tmp_path: Path) -> Path:
+ """Create a temp directory that looks like a PDF folder (empty, valid dir)."""
+ d = tmp_path / "pdfs"
+ d.mkdir()
+ return d
+
+
+def _mock_build_result(tmp_path: Path, job, run_id="r1"):
+ """Return a fake DatasetBuildResult with a minimal dataset CSV."""
+ from rag_eval.dataset_builder.models import (
+ DatasetBuildArtifactPaths,
+ DatasetBuildResult,
+ DraftQuestionSample,
+ )
+
+ artifact_root = tmp_path / "build" / run_id
+ artifact_root.mkdir(parents=True, exist_ok=True)
+ latest = tmp_path / "build" / "latest"
+ latest.mkdir(parents=True, exist_ok=True)
+
+ chunks_path = artifact_root / "source_chunks.jsonl"
+ chunks_path.write_text(
+ json.dumps({"chunk_id": "c1", "doc_id": "d1", "doc_name": "test.pdf",
+ "text": "CT scan context.", "page_start": 1, "page_end": 1,
+ "section_path": "/", "section_title": "", "source_layout_ids": []}) + "\n",
+ encoding="utf-8",
+ )
+ (latest / "source_chunks.jsonl").write_text(chunks_path.read_text(encoding="utf-8"), encoding="utf-8")
+
+ dataset_csv = tmp_path / "generated_dataset.csv"
+ dataset_csv.write_text(
+ "sample_id,question,ground_truth,scenario,language,doc_id,doc_name,"
+ "section_path,page_start,page_end,source_chunk_ids,question_type,difficulty,"
+ "review_status,review_notes\n"
+ 's1,"What is CT?","CT is imaging.","test","zh","d1","test.pdf","/",'
+ '1,1,"[""c1""]","fact","easy","draft",""\n',
+ encoding="utf-8",
+ )
+
+ sample = DraftQuestionSample(
+ sample_id="s1", question="What is CT?", ground_truth="CT is imaging.",
+ scenario="test", language="zh", doc_id="d1", doc_name="test.pdf",
+ section_path="/", page_start=1, page_end=1, source_chunk_ids=["c1"],
+ question_type="fact", difficulty="easy",
+ )
+
+ artifact_paths = DatasetBuildArtifactPaths(
+ root_dir=artifact_root,
+ documents_jsonl=artifact_root / "documents.jsonl",
+ semantic_blocks_jsonl=artifact_root / "semantic_blocks.jsonl",
+ source_chunks_jsonl=chunks_path,
+ dataset_draft_csv=artifact_root / "dataset_draft.csv",
+ parse_failures_csv=artifact_root / "parse_failures.csv",
+ metadata_json=artifact_root / "metadata.json",
+ )
+ return DatasetBuildResult(
+ job=job,
+ run_id=run_id,
+ artifact_paths=artifact_paths,
+ documents=[],
+ draft_samples=[sample],
+ parse_failures=[],
+ )
+
+
+def _mock_eval_result(tmp_path: Path, scenario):
+ """Return a fake EvaluationResult."""
+ from rag_eval.shared.models import EvaluationResult
+
+ return EvaluationResult(
+ scenario=scenario,
+ run_id="eval-r1",
+ started_at="2026-01-01T00:00:00",
+ finished_at="2026-01-01T00:01:00",
+ valid_samples=[],
+ invalid_samples=[],
+ score_rows=[],
+ )
+
+
+# ── API route tests ────────────────────────────────────────────────────────────
+
+def test_submit_returns_202_and_job_id(client, tmp_path):
+ """POST /api/pipeline/jobs returns 202 with job_id immediately."""
+ pdf_dir = _minimal_pdf_dir(tmp_path)
+
+ with patch("webapp.services.pipeline_task_manager.PipelineTaskManager._execute") as mock_exec:
+ from webapp.models import PipelineResult
+ mock_exec.return_value = PipelineResult(
+ build_artifact_dir="/tmp/b", dataset_csv="/tmp/d.csv",
+ source_chunks_jsonl="/tmp/c.jsonl", total_questions=1,
+ parse_failures=0, eval_run_id="r1", eval_output_dir="/tmp/e",
+ scores_csv="/tmp/scores.csv", summary_md="/tmp/summary.md",
+ )
+ resp = client.post("/api/pipeline/jobs", json={
+ "docs_path": str(pdf_dir),
+ "job_name": "test-job",
+ })
+
+ assert resp.status_code == 202
+ data = resp.json()
+ assert "job_id" in data
+ assert data["job_name"] == "test-job"
+ # status may already be completed by the time the response is read (mock runs instantly)
+ assert data["status"] in ("queued", "completed")
+
+
+def test_get_nonexistent_job_returns_404(client):
+ """GET /api/pipeline/jobs/{id} returns 404 for unknown job."""
+ resp = client.get("/api/pipeline/jobs/doesnotexist")
+ assert resp.status_code == 404
+
+
+def test_list_jobs_returns_empty_initially(client):
+ """GET /api/pipeline/jobs returns empty list when no jobs submitted."""
+ resp = client.get("/api/pipeline/jobs")
+ assert resp.status_code == 200
+ assert resp.json()["jobs"] == []
+
+
+def test_job_status_polling(client, tmp_path):
+ """Submitted job becomes visible via GET /api/pipeline/jobs/{id}."""
+ pdf_dir = _minimal_pdf_dir(tmp_path)
+
+ with patch("webapp.services.pipeline_task_manager.PipelineTaskManager._execute") as mock_exec:
+ from webapp.models import PipelineResult
+ mock_exec.return_value = PipelineResult(
+ build_artifact_dir="/tmp/b", dataset_csv="/tmp/d.csv",
+ source_chunks_jsonl="/tmp/c.jsonl", total_questions=3,
+ parse_failures=0, eval_run_id="r2", eval_output_dir="/tmp/e",
+ scores_csv="/tmp/scores.csv", summary_md="/tmp/summary.md",
+ )
+ post_resp = client.post("/api/pipeline/jobs", json={"docs_path": str(pdf_dir)})
+
+ job_id = post_resp.json()["job_id"]
+
+ # Poll until done or timeout (max 5s for mock)
+ for _ in range(20):
+ status_resp = client.get(f"/api/pipeline/jobs/{job_id}")
+ assert status_resp.status_code == 200
+ status = status_resp.json()
+ if status["status"] in ("completed", "failed"):
+ break
+ time.sleep(0.25)
+
+ assert status["status"] == "completed"
+ assert status["result"]["total_questions"] == 3
+
+
+def test_job_fails_on_invalid_docs_path(client):
+ """Job fails quickly if docs_path does not exist."""
+ resp = client.post("/api/pipeline/jobs", json={
+ "docs_path": "/nonexistent/path/that/does/not/exist",
+ })
+ assert resp.status_code == 202
+ job_id = resp.json()["job_id"]
+
+ for _ in range(20):
+ status_resp = client.get(f"/api/pipeline/jobs/{job_id}")
+ status = status_resp.json()
+ if status["status"] in ("completed", "failed"):
+ break
+ time.sleep(0.25)
+
+ assert status["status"] == "failed"
+ assert "docs_path" in status["error"] or "not" in status["error"].lower()
+
+
+def test_list_jobs_shows_submitted(client, tmp_path):
+ """GET /api/pipeline/jobs includes jobs after submission."""
+ pdf_dir = _minimal_pdf_dir(tmp_path)
+
+ with patch("webapp.services.pipeline_task_manager.PipelineTaskManager._execute") as mock_exec:
+ from webapp.models import PipelineResult
+ mock_exec.return_value = PipelineResult(
+ build_artifact_dir="/tmp/b", dataset_csv="/tmp/d.csv",
+ source_chunks_jsonl="/tmp/c.jsonl", total_questions=1,
+ parse_failures=0, eval_run_id="r3", eval_output_dir="/tmp/e",
+ scores_csv="/tmp/scores.csv", summary_md="/tmp/summary.md",
+ )
+ client.post("/api/pipeline/jobs", json={"docs_path": str(pdf_dir), "job_name": "listed-job"})
+
+ time.sleep(0.5)
+ list_resp = client.get("/api/pipeline/jobs")
+ assert list_resp.status_code == 200
+ jobs = list_resp.json()["jobs"]
+ assert len(jobs) >= 1
+ names = [j["job_name"] for j in jobs]
+ assert "listed-job" in names
+
+
+# ── execute_dataset_build_job refactor test ────────────────────────────────────
+
+def test_execute_dataset_build_job_directly(tmp_path):
+ """execute_dataset_build_job runs the build without a YAML file."""
+ from unittest.mock import patch as _patch
+ from rag_eval.dataset_builder.models import DatasetBuildJob, DatasetBuildRuntime
+ from rag_eval.dataset_builder.runner import execute_dataset_build_job
+ from rag_eval.settings import EvaluationSettings
+
+ pdf_dir = tmp_path / "pdfs"
+ pdf_dir.mkdir()
+ (pdf_dir / "doc.pdf").write_bytes(b"%PDF-fake")
+
+ job = DatasetBuildJob(
+ job_name="direct-test",
+ input_path=pdf_dir,
+ input_glob="*.pdf",
+ parser_provider="aliyun_docmind",
+ failure_mode="skip",
+ generation_model="test-model",
+ output_type="online_question_bank",
+ review_mode="draft_with_manual_review",
+ max_questions_per_document=5,
+ max_source_chunks_per_question=3,
+ dataset_path=tmp_path / "out.csv",
+ artifact_dir=tmp_path / "artifacts",
+ runtime=DatasetBuildRuntime(max_documents=1),
+ )
+
+ mock_doc = MagicMock()
+ mock_doc.doc_id = "d1"
+ mock_doc.doc_name = "doc.pdf"
+ mock_doc.source_chunks = []
+ mock_doc.semantic_blocks = []
+ mock_doc.raw_text = ""
+ mock_doc.structure_nodes = []
+ mock_doc.metadata = {}
+ mock_doc.to_record.return_value = {
+ "doc_id": "d1", "doc_name": "doc.pdf", "raw_text": "",
+ "structure_nodes": [], "metadata": {},
+ "semantic_block_count": 0, "source_chunk_count": 0,
+ }
+
+ mock_parser = MagicMock()
+ mock_parser.parse.return_value = mock_doc
+
+ mock_generator = MagicMock()
+ mock_generator.generate.return_value = []
+
+ result = execute_dataset_build_job(
+ job,
+ settings=EvaluationSettings(_env_file=None),
+ parser=mock_parser,
+ generator=mock_generator,
+ )
+ assert result.job.job_name == "direct-test"
+ assert result.artifact_paths.root_dir.exists()
diff --git a/tests/test_webapp_report_builder.py b/tests/test_webapp_report_builder.py
index 1ed0a35..8d492c1 100644
--- a/tests/test_webapp_report_builder.py
+++ b/tests/test_webapp_report_builder.py
@@ -65,7 +65,8 @@ def test_build_report_uses_weighted_means_and_exposes_snapshot_weights(tmp_path:
"faithfulness": pytest.approx(0.75, rel=1e-4),
"context_recall": pytest.approx(0.5, rel=1e-4),
}
- assert report.weighted_score_mean == pytest.approx(0.6667, rel=1e-4)
+ # 综合加权得分已暂时禁用
+ assert report.weighted_score_mean is None
assert report.metric_weights == {"faithfulness": 2.0, "context_recall": 1.0}
assert report.doc_weights == {"a.pdf": 3.0, "b.pdf": 1.0}
assert report.summary_markdown == "summary"
diff --git a/tests/webapp/test_score_api.py b/tests/webapp/test_score_api.py
index 1eb788c..a9ee7cc 100644
--- a/tests/webapp/test_score_api.py
+++ b/tests/webapp/test_score_api.py
@@ -241,7 +241,8 @@ class TestScoreEndpoint:
})
assert resp.status_code == 200
data = resp.json()
- assert data["weighted_score"] is not None
+ # 综合加权得分已暂时禁用,始终返回 null
+ assert data["weighted_score"] is None
def test_missing_required_fields_returns_422(self, client):
resp = client.post("/api/score", json={"question": "q"})
diff --git a/tests/webapp/test_session_score_jobs_api.py b/tests/webapp/test_session_score_jobs_api.py
new file mode 100644
index 0000000..22811ea
--- /dev/null
+++ b/tests/webapp/test_session_score_jobs_api.py
@@ -0,0 +1,299 @@
+"""Tests for session-grouped async scoring API and SessionScoreJobManager."""
+from __future__ import annotations
+
+import json
+import threading
+import time
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import pandas as pd
+import pytest
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+@pytest.fixture()
+def tmp_manager(tmp_path):
+ """Isolated SessionScoreJobManager backed by tmp dirs (no real LLM calls)."""
+ from webapp.services.session_score_manager import SessionScoreJobManager
+ return SessionScoreJobManager(
+ output_dir=tmp_path / "score-session",
+ index_dir=tmp_path / "score-session-jobs",
+ max_workers=2,
+ )
+
+
+@pytest.fixture()
+def client(tmp_path, monkeypatch):
+ """TestClient with fresh SessionScoreJobManager backed by tmp dirs."""
+ import webapp.services.session_score_manager as mgr_mod
+ from webapp.services.session_score_manager import SessionScoreJobManager
+
+ fresh_mgr = SessionScoreJobManager(
+ output_dir=tmp_path / "score-session",
+ index_dir=tmp_path / "score-session-jobs",
+ max_workers=2,
+ )
+ monkeypatch.setattr(mgr_mod, "session_score_manager", fresh_mgr)
+
+ import webapp.api.session_score_jobs as api_mod
+ monkeypatch.setattr(api_mod, "session_score_manager", fresh_mgr)
+
+ from webapp.server import create_app
+ return pytest.importorskip("fastapi.testclient").TestClient(create_app())
+
+
+# ---------------------------------------------------------------------------
+# Unit tests for SessionScoreJobManager
+# ---------------------------------------------------------------------------
+
+class TestSessionRunId:
+ def test_same_session_always_same_run_id(self, tmp_manager):
+ assert tmp_manager.session_run_id("abc") == tmp_manager.session_run_id("abc")
+
+ def test_different_sessions_different_run_ids(self, tmp_manager):
+ assert tmp_manager.session_run_id("session-A") != tmp_manager.session_run_id("session-B")
+
+ def test_run_id_prefixed_with_session(self, tmp_manager):
+ assert tmp_manager.session_run_id("test123").startswith("session-")
+
+ def test_special_chars_sanitized(self, tmp_manager):
+ run_id = tmp_manager.session_run_id("user@dify:flow/001")
+ assert "/" not in run_id
+ assert "@" not in run_id
+ assert ":" not in run_id
+
+
+class TestSubmit:
+ def test_submit_returns_job_status_and_run_id(self, tmp_manager):
+ with patch.object(tmp_manager._executor, "submit"):
+ status, run_id = tmp_manager.submit("session-1", _mock_request())
+ assert status.job_id
+ assert status.status == "queued"
+ assert run_id == tmp_manager.session_run_id("session-1")
+
+ def test_submit_adds_job_to_session(self, tmp_manager):
+ with patch.object(tmp_manager._executor, "submit"):
+ status, _ = tmp_manager.submit("session-1", _mock_request())
+ session = tmp_manager.get_session("session-1")
+ assert session is not None
+ assert any(j.job_id == status.job_id for j in session.jobs)
+
+ def test_multiple_submits_same_session_accumulate(self, tmp_manager):
+ with patch.object(tmp_manager._executor, "submit"):
+ tmp_manager.submit("session-X", _mock_request())
+ tmp_manager.submit("session-X", _mock_request())
+ tmp_manager.submit("session-X", _mock_request())
+ session = tmp_manager.get_session("session-X")
+ assert session.call_count == 3
+
+ def test_get_unknown_job_returns_none(self, tmp_manager):
+ assert tmp_manager.get_job("does-not-exist") is None
+
+ def test_get_unknown_session_returns_none(self, tmp_manager):
+ assert tmp_manager.get_session("no-such-session") is None
+
+
+class TestSessionIndexPersistence:
+ def test_session_index_survives_restart(self, tmp_path):
+ """Jobs and session mappings loaded from disk on new manager instance."""
+ from webapp.services.session_score_manager import SessionScoreJobManager
+
+ mgr1 = SessionScoreJobManager(
+ output_dir=tmp_path / "score-session",
+ index_dir=tmp_path / "score-session-jobs",
+ )
+ with patch.object(mgr1._executor, "submit"):
+ mgr1.submit("persist-session", _mock_request())
+ mgr1.submit("persist-session", _mock_request())
+
+ # New manager instance loads from disk
+ mgr2 = SessionScoreJobManager(
+ output_dir=tmp_path / "score-session",
+ index_dir=tmp_path / "score-session-jobs",
+ )
+ session = mgr2.get_session("persist-session")
+ assert session is not None
+ assert session.call_count == 2
+
+ def test_job_index_file_created_on_submit(self, tmp_path):
+ from webapp.services.session_score_manager import SessionScoreJobManager
+ mgr = SessionScoreJobManager(
+ output_dir=tmp_path / "score-session",
+ index_dir=tmp_path / "score-session-jobs",
+ )
+ with patch.object(mgr._executor, "submit"):
+ status, _ = mgr.submit("file-test", _mock_request())
+ index_file = tmp_path / "score-session-jobs" / f"{status.job_id}.json"
+ assert index_file.is_file()
+ data = json.loads(index_file.read_text())
+ assert data["job_id"] == status.job_id
+
+
+class TestAppendBehaviour:
+ """Test the CSV append / read-all logic in _append_and_regenerate via _read_score_rows."""
+
+ def test_read_score_rows_returns_empty_for_missing_csv(self, tmp_manager, tmp_path):
+ rows = tmp_manager._read_score_rows(tmp_path / "nonexistent")
+ assert rows == []
+
+ def test_read_score_rows_reads_existing_csv(self, tmp_manager, tmp_path):
+ run_dir = tmp_path / "run1"
+ run_dir.mkdir()
+ df = pd.DataFrame([{"sample_id": "s1", "answer_relevancy": 0.9}])
+ df.to_csv(run_dir / "scores.csv", index=False)
+ rows = tmp_manager._read_score_rows(run_dir)
+ assert len(rows) == 1
+ assert rows[0]["sample_id"] == "s1"
+
+ def test_metric_means_computed_from_csv(self, tmp_manager, tmp_path):
+ run_dir = tmp_path / "run2"
+ run_dir.mkdir()
+ df = pd.DataFrame([
+ {"sample_id": "s1", "answer_relevancy": 0.8},
+ {"sample_id": "s2", "answer_relevancy": 0.6},
+ ])
+ df.to_csv(run_dir / "scores.csv", index=False)
+ means = tmp_manager._read_metric_means(run_dir)
+ assert means["answer_relevancy"] == pytest.approx(0.7, abs=1e-4)
+
+
+# ---------------------------------------------------------------------------
+# API endpoint tests
+# ---------------------------------------------------------------------------
+
+class TestSessionAsyncEndpoints:
+ def test_submit_returns_202_with_session_fields(self, client):
+ with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"):
+ resp = client.post("/api/score/session_async", json={
+ "session_id": "test-session-001",
+ "question": "What is CT?",
+ "answer": "CT is computed tomography.",
+ "metrics": ["answer_relevancy"],
+ })
+ assert resp.status_code == 202
+ data = resp.json()
+ assert data["session_id"] == "test-session-001"
+ assert "job_id" in data
+ assert "run_id" in data
+ assert data["status"] == "queued"
+ assert data["call_count"] >= 1
+
+ def test_run_id_deterministic_for_session(self, client):
+ with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"):
+ r1 = client.post("/api/score/session_async", json={
+ "session_id": "det-session",
+ "question": "Q1",
+ "answer": "A1",
+ "metrics": ["answer_relevancy"],
+ })
+ r2 = client.post("/api/score/session_async", json={
+ "session_id": "det-session",
+ "question": "Q2",
+ "answer": "A2",
+ "metrics": ["answer_relevancy"],
+ })
+ assert r1.json()["run_id"] == r2.json()["run_id"]
+
+ def test_different_sessions_different_run_ids(self, client):
+ with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"):
+ r1 = client.post("/api/score/session_async", json={
+ "session_id": "session-A",
+ "question": "Q",
+ "answer": "A",
+ "metrics": ["answer_relevancy"],
+ })
+ r2 = client.post("/api/score/session_async", json={
+ "session_id": "session-B",
+ "question": "Q",
+ "answer": "A",
+ "metrics": ["answer_relevancy"],
+ })
+ assert r1.json()["run_id"] != r2.json()["run_id"]
+
+ def test_call_count_increments_per_session(self, client):
+ with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"):
+ for _ in range(3):
+ client.post("/api/score/session_async", json={
+ "session_id": "count-session",
+ "question": "Q",
+ "answer": "A",
+ "metrics": ["answer_relevancy"],
+ })
+ time.sleep(0.05)
+ resp = client.get("/api/score/sessions/count-session")
+ assert resp.status_code == 200
+ assert resp.json()["call_count"] == 3
+
+ def test_get_session_returns_jobs_list(self, client):
+ with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"):
+ client.post("/api/score/session_async", json={
+ "session_id": "list-session",
+ "question": "Q",
+ "answer": "A",
+ "metrics": ["answer_relevancy"],
+ })
+ time.sleep(0.05)
+ resp = client.get("/api/score/sessions/list-session")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert len(data["jobs"]) == 1
+
+ def test_get_unknown_session_returns_404(self, client):
+ resp = client.get("/api/score/sessions/no-such-session-xyz")
+ assert resp.status_code == 404
+
+ def test_get_session_job_by_id(self, client):
+ with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"):
+ resp = client.post("/api/score/session_async", json={
+ "session_id": "job-lookup-session",
+ "question": "Q",
+ "answer": "A",
+ "metrics": ["answer_relevancy"],
+ })
+ job_id = resp.json()["job_id"]
+ time.sleep(0.05)
+ get_resp = client.get(f"/api/score/session/jobs/{job_id}")
+ assert get_resp.status_code == 200
+ assert get_resp.json()["job_id"] == job_id
+
+ def test_get_unknown_job_returns_404(self, client):
+ resp = client.get("/api/score/session/jobs/nonexistent-job-id")
+ assert resp.status_code == 404
+
+ def test_missing_session_id_returns_422(self, client):
+ resp = client.post("/api/score/session_async", json={
+ "question": "Q",
+ "answer": "A",
+ "metrics": ["answer_relevancy"],
+ })
+ assert resp.status_code == 422
+
+ def test_list_sessions_endpoint(self, client):
+ with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"):
+ client.post("/api/score/session_async", json={
+ "session_id": "list-all-session",
+ "question": "Q",
+ "answer": "A",
+ "metrics": ["answer_relevancy"],
+ })
+ resp = client.get("/api/score/sessions")
+ assert resp.status_code == 200
+ assert "sessions" in resp.json()
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _mock_request():
+ """Build a minimal ScoreRequest for testing."""
+ from webapp.models import ScoreRequest
+ return ScoreRequest(
+ question="What is dual-source CT?",
+ answer="It uses two X-ray sources.",
+ metrics=["answer_relevancy"],
+ )
diff --git a/webapp/api/score.py b/webapp/api/score.py
index 93cd512..dbddd98 100644
--- a/webapp/api/score.py
+++ b/webapp/api/score.py
@@ -156,10 +156,11 @@ def score_sample(
all_scores: dict[str, float | None] = {metric_name: None for metric_name in request.metrics}
all_scores.update(raw_scores)
- weighted = compute_weighted_score(
- {key: value for key, value in raw_scores.items() if value is not None},
- {},
- )
+ # 综合加权得分计算(已暂时禁用)
+ # weighted = compute_weighted_score(
+ # {key: value for key, value in raw_scores.items() if value is not None},
+ # {},
+ # )
logger.info(
"[score] done latency=%dms skipped=%s scores=%s",
@@ -169,7 +170,7 @@ def score_sample(
)
return ScoreResponse(
scores=all_scores,
- weighted_score=round(weighted, 4) if weighted is not None else None,
+ weighted_score=None, # 综合加权得分已暂时禁用
latency_ms=latency_ms,
skipped_metrics=skipped,
)
diff --git a/webapp/api/session_score_jobs.py b/webapp/api/session_score_jobs.py
new file mode 100644
index 0000000..d93b995
--- /dev/null
+++ b/webapp/api/session_score_jobs.py
@@ -0,0 +1,171 @@
+"""Routes for session-grouped async RAGAS scoring (Dify multi-call integration).
+
+Use case: Dify evaluates multiple Q&A pairs in a session. Each pair gets its own
+`POST /api/score/session_async` call with a shared `session_id`. All results are
+accumulated into one report, visible in 「运行列表」→「报告详情」.
+
+Key behaviour:
+ - Deterministic run_id: derived from session_id — same session always maps to the
+ same report directory (outputs/score-session/session-/).
+ - Append semantics: each call adds a new sample row. Previous rows are preserved.
+ - Advisor regeneration: optimization_advice.md is regenerated after every call
+ using the full set of accumulated rows.
+ - Each call returns its own `job_id` for individual status polling, plus the
+ shared `run_id` and `session_id`.
+
+Endpoints:
+ POST /api/score/session_async Submit one call (returns job_id + run_id)
+ GET /api/score/sessions List all sessions
+ GET /api/score/sessions/{session_id} Session aggregate (call_count, metric_means, jobs)
+ GET /api/score/session/jobs/{job_id} Status of one individual call
+"""
+
+from __future__ import annotations
+
+import logging
+
+from fastapi import APIRouter, HTTPException
+
+from webapp.models import (
+ AsyncScoreJobStatus,
+ ScoreRequest,
+ SessionScoreJobResponse,
+ SessionScoreRequest,
+ SessionStatus,
+)
+from webapp.services.session_score_manager import session_score_manager
+
+router = APIRouter(prefix="/api/score", tags=["score"])
+logger = logging.getLogger("webapp.api.session_score_jobs")
+
+
+@router.post(
+ "/session_async",
+ status_code=202,
+ response_model=SessionScoreJobResponse,
+ summary="提交 Session 异步评分(多样本批量聚合)",
+ responses={
+ 202: {
+ "description": (
+ "调用已排队,立即返回 job_id + run_id(202 Accepted)。\n\n"
+ "相同 `session_id` 的多次调用合并为同一报告,每次调用新增一个样本行。\n"
+ "评分完成后,`summary.md` 和 `optimization_advice.md` 增量更新。\n"
+ "通过 `GET /api/score/sessions/{session_id}` 查看 session 聚合状态,"
+ "通过 `GET /api/score/session/jobs/{job_id}` 查询单次调用状态,"
+ "在「运行列表」中查看完整报告(run_id 即 `session-` 形式)。"
+ ),
+ "content": {
+ "application/json": {
+ "example": {
+ "job_id": "abc123def456",
+ "session_id": "dify-session-001",
+ "run_id": "session-dify-session-001",
+ "status": "queued",
+ "call_count": 1,
+ }
+ }
+ },
+ },
+ },
+)
+def submit_session_async_score(request: SessionScoreRequest) -> SessionScoreJobResponse:
+ """提交 Session 异步 RAGAS 评分,立即返回 job_id。
+
+ 相同 `session_id` 的多次调用合并到同一评估报告中,每次调用:
+ 1. 新增一个样本行到 `scores.csv`
+ 2. 重写 `summary.md`(包含所有累积样本的指标均值)
+ 3. 重新生成 `optimization_advice.md`(基于全量样本的 LLM 优化建议)
+
+ **适合 Dify 工作流**:在循环节点中批量调用,所有轮次共用同一 `session_id`,
+ 最终在 RAGAS 平台「运行列表」中查看完整的批量评估报告。
+ """
+ logger.info(
+ "[session_async] submit session_id=%s metrics=%s has_ctx=%s has_gt=%s",
+ request.session_id,
+ request.metrics,
+ bool(request.contexts),
+ bool(request.ground_truth),
+ )
+
+ # Strip session_id to build a plain ScoreRequest for the manager
+ score_request = ScoreRequest(
+ question=request.question,
+ answer=request.answer,
+ contexts=request.contexts,
+ ground_truth=request.ground_truth,
+ context_separator=request.context_separator,
+ metrics=request.metrics,
+ judge_model=request.judge_model,
+ embedding_model=request.embedding_model,
+ )
+
+ status, run_id = session_score_manager.submit(request.session_id, score_request)
+
+ # Compute call_count from current session state
+ session_status = session_score_manager.get_session(request.session_id)
+ call_count = session_status.call_count if session_status else 1
+
+ logger.info(
+ "[session_async] queued job_id=%s session_id=%s run_id=%s call=%d",
+ status.job_id, request.session_id, run_id, call_count,
+ )
+ return SessionScoreJobResponse(
+ job_id=status.job_id,
+ session_id=request.session_id,
+ run_id=run_id,
+ status=status.status,
+ call_count=call_count,
+ )
+
+
+@router.get(
+ "/sessions",
+ response_model=dict,
+ summary="列出所有 Session 聚合状态",
+)
+def list_sessions() -> dict:
+ """返回所有 session 的聚合状态,按最近完成时间倒序排列。"""
+ sessions = session_score_manager.list_sessions()
+ logger.info("[session_score] list_sessions count=%d", len(sessions))
+ return {"sessions": [s.model_dump() for s in sessions]}
+
+
+@router.get(
+ "/sessions/{session_id}",
+ response_model=SessionStatus,
+ summary="查询 Session 聚合状态(指标均值 + 所有调用记录)",
+ responses={404: {"description": "指定 session_id 不存在。"}},
+)
+def get_session(session_id: str) -> SessionStatus:
+ """查询 session 的聚合评分状态。
+
+ 返回内容:
+ - `run_id`:在「运行列表」中查看完整报告
+ - `call_count`:本 session 累计调用次数
+ - `metric_means`:所有已累积样本的各指标均值(实时读取 scores.csv)
+ - `jobs`:本 session 所有调用记录列表
+ """
+ status = session_score_manager.get_session(session_id)
+ if status is None:
+ raise HTTPException(status_code=404, detail=f"Session not found: {session_id}")
+ return status
+
+
+@router.get(
+ "/session/jobs/{job_id}",
+ response_model=AsyncScoreJobStatus,
+ summary="查询 Session 单次调用状态",
+ responses={404: {"description": "指定 job_id 不存在。"}},
+)
+def get_session_job(job_id: str) -> AsyncScoreJobStatus:
+ """查询 session 评分中某次调用的状态和评分结果。
+
+ `status` 为 `completed` 时,`run_id` 即所属 session 的报告目录,
+ `scores` 包含本次调用的各指标得分。
+ """
+ status = session_score_manager.get_job(job_id)
+ if status is None:
+ raise HTTPException(
+ status_code=404, detail=f"Session score job not found: {job_id}"
+ )
+ return status
diff --git a/webapp/models.py b/webapp/models.py
index 604fc80..e124102 100644
--- a/webapp/models.py
+++ b/webapp/models.py
@@ -531,6 +531,50 @@ class AsyncScoreJobResponse(BaseModel):
)
+# ---------------------------------------------------------------------------
+# Session async 评分模型
+# ---------------------------------------------------------------------------
+
+class SessionScoreRequest(ScoreRequest):
+ """Request body for session-grouped async scoring.
+
+ All calls sharing the same session_id are accumulated into one report.
+ Each call adds a new sample row to the session's scores.csv.
+ """
+
+ session_id: str = Field(
+ description=(
+ "会话唯一标识符。相同 session_id 的多次调用合并为同一报告,"
+ "每次调用新增一个样本行,指标均值和优化建议在每次调用后增量更新。"
+ ),
+ )
+
+
+class SessionScoreJobResponse(BaseModel):
+ """Immediate 202 response after submitting a session scoring call."""
+
+ job_id: str = Field(description="本次调用的任务唯一标识符。")
+ session_id: str = Field(description="会话标识符。")
+ run_id: str = Field(description="本 session 对应的报告 Run ID,可在「运行列表」中查看。")
+ status: str = Field(default="queued", description="初始状态:queued。")
+ call_count: int = Field(default=1, description="本 session 当前累计调用次数(包含本次)。")
+
+
+class SessionStatus(BaseModel):
+ """Aggregate status and metrics for a scoring session."""
+
+ session_id: str = Field(description="会话标识符。")
+ run_id: str = Field(description="对应报告目录的 Run ID。")
+ call_count: int = Field(description="本 session 累计调用次数。")
+ metric_means: dict[str, float | None] = Field(
+ default_factory=dict, description="所有已累积样本的各指标均值。"
+ )
+ latest_finished_at: str = Field(default="", description="最近一次评分完成时间(ISO 8601 UTC)。")
+ jobs: list[AsyncScoreJobStatus] = Field(
+ default_factory=list, description="本 session 所有调用记录,按创建时间排序。"
+ )
+
+
class AsyncScoreJobStatus(BaseModel):
"""State of one async score job (queued → running → completed/failed)."""
diff --git a/webapp/server.py b/webapp/server.py
index 5a9ad75..211545b 100644
--- a/webapp/server.py
+++ b/webapp/server.py
@@ -17,7 +17,7 @@ from fastapi.exceptions import RequestValidationError
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
-from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs
+from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs, session_score_jobs
STATIC_DIR = Path(__file__).resolve().parent / "static"
logger = logging.getLogger("webapp.server")
@@ -73,6 +73,10 @@ OPENAPI_TAGS = [
"**异步评分 API(Dify 推荐)** — `POST /api/score/async`\n\n"
"异步方式立即返回 job_id(202),评分在后台执行,完成后自动生成完整报告(含优化建议),"
"在「运行列表」页查看。\n\n"
+ "**Session 批量评分 API** — `POST /api/score/session_async`\n\n"
+ "适合 Dify 循环节点批量评估:同一 `session_id` 的多次调用合并为一个报告,"
+ "每次调用新增一个样本行,指标均值和优化建议增量更新。\n"
+ "通过 `GET /api/score/sessions/{session_id}` 查看 session 聚合状态。\n\n"
"通过 `GET /api/score/jobs` 列出所有异步评分记录,"
"`GET /api/score/jobs/{job_id}` 查询单个任务状态。\n\n"
"**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
@@ -111,6 +115,7 @@ def create_app() -> FastAPI:
app.include_router(pipeline.router)
app.include_router(score.router)
app.include_router(score_jobs.router)
+ app.include_router(session_score_jobs.router)
@app.middleware("http")
async def access_log_middleware(request: Request, call_next):
diff --git a/webapp/services/pipeline_task_manager.py b/webapp/services/pipeline_task_manager.py
new file mode 100644
index 0000000..ced3ea6
--- /dev/null
+++ b/webapp/services/pipeline_task_manager.py
@@ -0,0 +1,257 @@
+"""Background task manager for end-to-end pipeline jobs (build + eval).
+
+Each job runs three sequential phases inside a worker thread:
+ 1. parsing_documents — AliyunDocmind parses every PDF
+ 2. generating_questions — LLM generates a draft question bank
+ 3. evaluating — RAGAS online evaluation scores each question
+
+The DatasetBuildJob and Scenario objects are constructed entirely from the
+API request parameters, so no YAML config files are needed.
+"""
+
+from __future__ import annotations
+
+import io
+import threading
+import uuid
+from concurrent.futures import ThreadPoolExecutor
+from contextlib import redirect_stderr, redirect_stdout
+from datetime import datetime, timezone
+from pathlib import Path
+
+from webapp.models import (
+ PipelineJobRequest,
+ PipelineJobStatus,
+ PipelineResult,
+)
+
+_REPO_ROOT = Path(__file__).resolve().parents[2]
+_PIPELINE_OUTPUT_ROOT = _REPO_ROOT / "outputs" / "pipeline"
+
+
+def _now_iso() -> str:
+ return datetime.now(timezone.utc).isoformat()
+
+
+class _LineCapture(io.TextIOBase):
+ """Write-only stream that appends complete lines to a task's log buffer."""
+
+ def __init__(self, sink: "PipelineTask") -> None:
+ self._sink = sink
+ self._buffer = ""
+
+ def write(self, text: str) -> int:
+ self._buffer += text
+ while "\n" in self._buffer:
+ line, self._buffer = self._buffer.split("\n", 1)
+ self._sink.append_log(line)
+ return len(text)
+
+ def flush(self) -> None:
+ if self._buffer:
+ self._sink.append_log(self._buffer)
+ self._buffer = ""
+
+
+class PipelineTask:
+ """Mutable state for one pipeline job (build + eval)."""
+
+ def __init__(self, job_id: str, job_name: str) -> None:
+ self.job_id = job_id
+ self.job_name = job_name
+ self.status = "queued"
+ self.phase = "idle"
+ self.logs: list[str] = []
+ self.result: PipelineResult | None = None
+ self.error: str | None = None
+ self.created_at = _now_iso()
+ self.finished_at = ""
+ self._lock = threading.Lock()
+
+ def append_log(self, line: str) -> None:
+ with self._lock:
+ self.logs.append(line)
+
+ def snapshot(self) -> PipelineJobStatus:
+ with self._lock:
+ return PipelineJobStatus(
+ job_id=self.job_id,
+ job_name=self.job_name,
+ status=self.status,
+ phase=self.phase,
+ logs=list(self.logs),
+ result=self.result,
+ error=self.error,
+ created_at=self.created_at,
+ finished_at=self.finished_at,
+ )
+
+
+class PipelineTaskManager:
+ """Owns the thread pool and registry of pipeline jobs."""
+
+ def __init__(self, max_workers: int = 2) -> None:
+ self._executor = ThreadPoolExecutor(max_workers=max_workers)
+ self._tasks: dict[str, PipelineTask] = {}
+ self._lock = threading.Lock()
+
+ def submit(self, request: PipelineJobRequest) -> PipelineTask:
+ """Register and schedule a new pipeline job; return its task object."""
+ job_id = uuid.uuid4().hex[:12]
+ job_name = request.job_name.strip() or f"pipeline-{job_id[:6]}"
+ task = PipelineTask(job_id=job_id, job_name=job_name)
+ with self._lock:
+ self._tasks[job_id] = task
+ self._executor.submit(self._run, task, request)
+ return task
+
+ def get(self, job_id: str) -> PipelineJobStatus | None:
+ with self._lock:
+ task = self._tasks.get(job_id)
+ return task.snapshot() if task is not None else None
+
+ def list_jobs(self) -> list[PipelineJobStatus]:
+ with self._lock:
+ tasks = list(self._tasks.values())
+ snapshots = [t.snapshot() for t in tasks]
+ snapshots.sort(key=lambda s: s.created_at, reverse=True)
+ return snapshots
+
+ # ------------------------------------------------------------------ #
+ # Worker
+ # ------------------------------------------------------------------ #
+
+ def _run(self, task: PipelineTask, request: PipelineJobRequest) -> None:
+ """Execute the full pipeline end to end inside a worker thread."""
+ task.status = "running"
+ task.append_log(f"[{_now_iso()}] 开始 pipeline 任务: {task.job_name}")
+
+ capture = _LineCapture(task)
+ try:
+ with redirect_stdout(capture), redirect_stderr(capture):
+ result = self._execute(task, request)
+ capture.flush()
+ task.result = result
+ task.phase = "done"
+ task.status = "completed"
+ task.append_log(f"[{_now_iso()}] pipeline 任务完成: {task.job_name}")
+ except Exception as exc: # noqa: BLE001
+ capture.flush()
+ task.error = f"{type(exc).__name__}: {exc}"
+ task.append_log(f"[{_now_iso()}] pipeline 任务失败: {task.error}")
+ task.status = "failed"
+ finally:
+ task.finished_at = _now_iso()
+
+ def _execute(self, task: PipelineTask, req: PipelineJobRequest) -> PipelineResult:
+ """Run build then eval, updating task.phase as we go."""
+
+ # ── resolve paths ──────────────────────────────────────────────
+ docs_path = Path(req.docs_path)
+ if not docs_path.is_absolute():
+ docs_path = (_REPO_ROOT / docs_path).resolve()
+ if not docs_path.is_dir():
+ raise ValueError(f"docs_path is not an existing directory: {docs_path}")
+
+ job_output_dir = _PIPELINE_OUTPUT_ROOT / task.job_id
+ build_artifact_dir = job_output_dir / "build"
+ dataset_csv = job_output_dir / "generated_dataset.csv"
+ eval_output_dir = job_output_dir / "eval"
+
+ # ── phase 1 + 2: dataset build (parse & generate) ─────────────
+ task.phase = "parsing_documents"
+ task.append_log(f" [build] 扫描文档目录: {docs_path}")
+ build_result = self._run_build(task, req, docs_path, build_artifact_dir, dataset_csv)
+
+ source_chunks_jsonl = build_artifact_dir / "latest" / "source_chunks.jsonl"
+ total_q = len(build_result.draft_samples)
+ parse_failures = len(build_result.parse_failures)
+ task.append_log(f" [build] 题库生成完毕: {total_q} 道题目, {parse_failures} 份文档解析失败")
+
+ if total_q == 0:
+ raise RuntimeError("题库为空(所有文档均解析或生成失败),中止评估。")
+
+ # ── phase 3: evaluation ────────────────────────────────────────
+ task.phase = "evaluating"
+ task.append_log(f" [eval] 开始 RAGAS 评估,共 {total_q} 道题目")
+ eval_result = self._run_eval(task, req, dataset_csv, source_chunks_jsonl, eval_output_dir)
+
+ from rag_eval.reporting.artifacts import build_artifact_paths as _build_eval_paths
+ eval_artifact_paths = _build_eval_paths(eval_result.scenario.output_dir, eval_result.run_id)
+
+ return PipelineResult(
+ build_artifact_dir=build_artifact_dir.as_posix(),
+ dataset_csv=dataset_csv.as_posix(),
+ source_chunks_jsonl=source_chunks_jsonl.as_posix(),
+ total_questions=total_q,
+ parse_failures=parse_failures,
+ eval_run_id=eval_result.run_id,
+ eval_output_dir=eval_result.scenario.output_dir.as_posix(),
+ scores_csv=eval_artifact_paths.scores_csv.as_posix(),
+ summary_md=eval_artifact_paths.summary_md.as_posix(),
+ )
+
+ def _run_build(self, task: PipelineTask, req: PipelineJobRequest,
+ docs_path: Path, artifact_dir: Path, dataset_csv: Path):
+ """Construct DatasetBuildJob and run the build phase."""
+ from rag_eval.dataset_builder.models import DatasetBuildJob, DatasetBuildRuntime
+ from rag_eval.dataset_builder.runner import execute_dataset_build_job
+ from rag_eval.settings import EvaluationSettings
+
+ settings = EvaluationSettings()
+ job = DatasetBuildJob(
+ job_name=task.job_name,
+ input_path=docs_path,
+ input_glob="*.pdf",
+ parser_provider="aliyun_docmind",
+ failure_mode=req.failure_mode, # type: ignore[arg-type]
+ generation_model=req.generation_model,
+ output_type="online_question_bank",
+ review_mode="draft_with_manual_review",
+ max_questions_per_document=req.max_questions_per_document,
+ max_source_chunks_per_question=req.max_source_chunks_per_question,
+ dataset_path=dataset_csv,
+ artifact_dir=artifact_dir,
+ runtime=DatasetBuildRuntime(max_documents=req.max_documents),
+ )
+ return execute_dataset_build_job(job, settings=settings)
+
+ def _run_eval(self, task: PipelineTask, req: PipelineJobRequest,
+ dataset_csv: Path, source_chunks_jsonl: Path, eval_output_dir: Path):
+ """Construct Scenario and run the evaluation phase."""
+ from rag_eval.execution.runner import run_scenario_from_scenario_obj
+ from rag_eval.settings import EvaluationSettings
+ from rag_eval.shared.models import (
+ AppAdapterConfig, DatasetConfig, RuntimeConfig, Scenario,
+ )
+
+ settings = EvaluationSettings()
+ scenario = Scenario(
+ scenario_name=task.job_name,
+ mode="online",
+ dataset=DatasetConfig(path=dataset_csv),
+ judge_model=req.judge_model,
+ embedding_model=req.embedding_model,
+ metrics=list(req.metrics),
+ output_dir=eval_output_dir,
+ runtime=RuntimeConfig(
+ batch_size=4,
+ app_concurrency=2,
+ metric_concurrency=2,
+ max_samples=req.max_samples,
+ ),
+ app_adapter=AppAdapterConfig(
+ type="python",
+ callable="apps.siemens_pdf_qa.adapter:run",
+ static_kwargs={
+ "source_chunks_path": source_chunks_jsonl,
+ "model": req.answer_model,
+ },
+ ),
+ optimization_advisor=req.optimization_advisor,
+ )
+ return run_scenario_from_scenario_obj(scenario, settings=settings)
+
+
+# Module-level singleton shared by the FastAPI routes.
+pipeline_task_manager = PipelineTaskManager()
diff --git a/webapp/services/report_builder.py b/webapp/services/report_builder.py
index 1082401..1f56f65 100644
--- a/webapp/services/report_builder.py
+++ b/webapp/services/report_builder.py
@@ -177,9 +177,11 @@ def build_report(run_dir: Path, metrics: list[str]) -> ReportData:
w_means = _weighted_metric_means(score_rows_list, metrics, doc_weights)
rounded_means = {metric: _round_or_none(value) for metric, value in w_means.items()}
- overall_ws = compute_overall_weighted_score_mean(
- score_rows_list, metric_weights, doc_weights
- )
+ # 综合加权得分计算(已暂时禁用)
+ # overall_ws = compute_overall_weighted_score_mean(
+ # score_rows_list, metric_weights, doc_weights
+ # )
+ overall_ws = None
distributions = {
metric: _distribution(frame, metric)
diff --git a/webapp/services/score_job_manager.py b/webapp/services/score_job_manager.py
index 8805d5b..a3964a1 100644
--- a/webapp/services/score_job_manager.py
+++ b/webapp/services/score_job_manager.py
@@ -149,10 +149,12 @@ class ScoreJobManager:
# Build full scores dict (skipped = None)
all_scores: dict[str, float | None] = {m: None for m in request.metrics}
all_scores.update(raw_scores)
- weighted_raw = compute_weighted_score(
- {k: v for k, v in raw_scores.items() if v is not None}, {}
- )
- weighted = round(weighted_raw, 4) if weighted_raw is not None else None
+ # 综合加权得分计算(已暂时禁用)
+ # weighted_raw = compute_weighted_score(
+ # {k: v for k, v in raw_scores.items() if v is not None}, {}
+ # )
+ # weighted = round(weighted_raw, 4) if weighted_raw is not None else None
+ weighted = None
# Build a score row compatible with report_builder
score_row: dict[str, Any] = {
diff --git a/webapp/services/session_score_manager.py b/webapp/services/session_score_manager.py
new file mode 100644
index 0000000..d145975
--- /dev/null
+++ b/webapp/services/session_score_manager.py
@@ -0,0 +1,452 @@
+"""Background task manager for session-grouped async RAGAS scoring.
+
+Each session groups multiple scoring calls into one shared run report:
+
+ 1. First call: creates outputs/score-session/session-/ and metadata.json.
+ 2. Every call: appends a new sample row to scores.csv, rewrites summary.md
+ and optimization_advice.md by re-running write_run_artifacts + run_advisor
+ over ALL accumulated rows.
+ 3. The resulting run directory is picked up automatically by run_reader, so the
+ 「运行列表」 and 「报告详情」 pages show the live, growing report.
+
+Concurrency model:
+ - Scoring (LLM network I/O) runs freely in the thread pool — different sessions
+ score concurrently; multiple calls to the same session also start scoring in
+ parallel.
+ - File I/O (CSV append, artifact rewrite, advisor) is serialized per session via
+ a per-session threading.Lock, so no two calls corrupt the same session's CSV.
+"""
+
+from __future__ import annotations
+
+import json
+import re
+import threading
+import time
+import uuid
+from concurrent.futures import ThreadPoolExecutor
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any
+
+import pandas as pd
+
+from webapp.models import AsyncScoreJobStatus, ScoreRequest, SessionStatus
+
+_REPO_ROOT = Path(__file__).resolve().parents[2]
+_DEFAULT_OUTPUT_DIR = _REPO_ROOT / "outputs" / "score-session"
+_DEFAULT_INDEX_DIR = _REPO_ROOT / "outputs" / "score-session-jobs"
+
+# Columns that are sample metadata rather than metric scores (mirrors run_reader.NON_METRIC_COLUMNS)
+_NON_METRIC_COLUMNS = {
+ "sample_id", "question", "contexts", "answer", "ground_truth",
+ "scenario", "language", "retrieval_config", "error",
+ "judge_model", "embedding_model", "run_id", "difficulty",
+ "question_type", "doc_id", "doc_name", "section_path",
+ "page_start", "page_end", "source_chunk_ids", "review_status",
+ "review_notes", "weighted_score", "sample_weight",
+}
+
+
+def _now_iso() -> str:
+ return datetime.now(timezone.utc).isoformat()
+
+
+def _sanitize_session_id(session_id: str) -> str:
+ """Convert an arbitrary session_id string to a safe directory-name fragment."""
+ return re.sub(r"[^a-zA-Z0-9]", "-", session_id)[:64].strip("-") or "default"
+
+
+class SessionScoreJobManager:
+ """Thread-pool manager for session-grouped async scoring jobs.
+
+ All calls sharing a session_id append to one shared run directory, so the
+ report detail page shows all samples and their aggregate metrics together.
+ """
+
+ def __init__(
+ self,
+ output_dir: Path = _DEFAULT_OUTPUT_DIR,
+ index_dir: Path = _DEFAULT_INDEX_DIR,
+ max_workers: int = 4,
+ ) -> None:
+ self._output_dir = Path(output_dir)
+ self._index_dir = Path(index_dir)
+ self._output_dir.mkdir(parents=True, exist_ok=True)
+ self._index_dir.mkdir(parents=True, exist_ok=True)
+ (self._index_dir / "_sessions").mkdir(parents=True, exist_ok=True)
+ self._executor = ThreadPoolExecutor(max_workers=max_workers)
+
+ # job_id -> AsyncScoreJobStatus; guarded by _lock
+ self._job_cache: dict[str, AsyncScoreJobStatus] = {}
+ # session_id -> [job_ids in order]; guarded by _lock
+ self._session_jobs: dict[str, list[str]] = {}
+ # session_id -> per-session threading.Lock; guarded by _lock
+ self._session_locks: dict[str, threading.Lock] = {}
+ self._lock = threading.Lock()
+
+ self._load_existing()
+
+ # ------------------------------------------------------------------ #
+ # Public API
+ # ------------------------------------------------------------------ #
+
+ def session_run_id(self, session_id: str) -> str:
+ """Return the deterministic run_id for a session (also the dir name)."""
+ return f"session-{_sanitize_session_id(session_id)}"
+
+ def submit(self, session_id: str, request: ScoreRequest) -> tuple[AsyncScoreJobStatus, str]:
+ """Queue one scoring call for a session.
+
+ Returns (job_status, run_id). run_id is deterministic from session_id.
+ """
+ run_id = self.session_run_id(session_id)
+ job_id = uuid.uuid4().hex[:12]
+
+ status = AsyncScoreJobStatus(
+ job_id=job_id,
+ status="queued",
+ created_at=_now_iso(),
+ request_summary={
+ "question": (request.question or "")[:80],
+ "answer": (request.answer or "")[:80],
+ "metrics": list(request.metrics),
+ "judge_model": request.judge_model or "",
+ "embedding_model": request.embedding_model or "",
+ "has_contexts": bool(request.contexts),
+ "has_ground_truth": bool(request.ground_truth),
+ "session_id": session_id,
+ },
+ )
+
+ with self._lock:
+ self._job_cache[job_id] = status
+ if session_id not in self._session_jobs:
+ self._session_jobs[session_id] = []
+ self._session_jobs[session_id].append(job_id)
+
+ self._persist_job_index(status)
+ self._persist_session_index(session_id)
+ self._executor.submit(self._run, job_id, session_id, run_id, request)
+ return status, run_id
+
+ def get_job(self, job_id: str) -> AsyncScoreJobStatus | None:
+ """Return current status of one call, or None if unknown."""
+ with self._lock:
+ return self._job_cache.get(job_id)
+
+ def list_jobs(self) -> list[AsyncScoreJobStatus]:
+ """Return all session job records, newest first."""
+ with self._lock:
+ jobs = list(self._job_cache.values())
+ jobs.sort(key=lambda j: j.created_at, reverse=True)
+ return jobs
+
+ def get_session(self, session_id: str) -> SessionStatus | None:
+ """Return aggregate status for a session, or None if unknown."""
+ with self._lock:
+ job_ids = list(self._session_jobs.get(session_id) or [])
+ if not job_ids:
+ return None
+
+ run_id = self.session_run_id(session_id)
+ run_dir = self._output_dir / run_id
+
+ # Compute live metric means from the CSV (may be mid-update — best effort)
+ metric_means = self._read_metric_means(run_dir)
+
+ with self._lock:
+ jobs = [self._job_cache[jid] for jid in job_ids if jid in self._job_cache]
+
+ latest = max((j.finished_at for j in jobs if j.finished_at), default="")
+ return SessionStatus(
+ session_id=session_id,
+ run_id=run_id,
+ call_count=len(job_ids),
+ metric_means=metric_means,
+ latest_finished_at=latest,
+ jobs=sorted(jobs, key=lambda j: j.created_at),
+ )
+
+ def list_sessions(self) -> list[SessionStatus]:
+ """Return aggregate status for all known sessions."""
+ with self._lock:
+ session_ids = list(self._session_jobs.keys())
+ results = []
+ for sid in session_ids:
+ status = self.get_session(sid)
+ if status is not None:
+ results.append(status)
+ results.sort(key=lambda s: s.latest_finished_at, reverse=True)
+ return results
+
+ # ------------------------------------------------------------------ #
+ # Worker
+ # ------------------------------------------------------------------ #
+
+ def _run(self, job_id: str, session_id: str, run_id: str, request: ScoreRequest) -> None:
+ """Score one sample then append it to the session's shared run artifacts."""
+ import logging
+ logger = logging.getLogger("webapp.services.session_score_manager")
+ self._update_job(job_id, status="running")
+
+ # Lazy imports — keep web server bootable if ragas is not installed.
+ from rag_eval.advisor import run_advisor
+ from rag_eval.metrics.factory import build_models
+ from rag_eval.metrics.weights import compute_weighted_score
+ from rag_eval.reporting.writers import write_run_artifacts
+ from rag_eval.settings import EvaluationSettings
+ from rag_eval.shared.models import (
+ DatasetConfig, EvaluationResult, NormalizedSample,
+ RuntimeConfig, Scenario,
+ )
+ from rag_eval.shared.utils import utc_now_iso
+ from webapp.services.inline_scorer import inline_scorer
+
+ settings = EvaluationSettings()
+ judge_model = request.judge_model or settings.ragas_judge_model
+ embedding_model = request.embedding_model or settings.ragas_embedding_model
+ effective = request.effective_metrics()
+ requested = set(request.metrics)
+ skipped = sorted(requested - set(effective))
+
+ t0 = time.monotonic()
+
+ try:
+ # --- Scoring (can run concurrently for the same session) ----------
+ if effective:
+ raw_scores = inline_scorer.score(
+ question=request.question,
+ answer=request.answer,
+ contexts=request.contexts_as_list(),
+ ground_truth=request.ground_truth,
+ metrics=effective,
+ judge_model=judge_model,
+ embedding_model=embedding_model,
+ settings=settings,
+ )
+ else:
+ raw_scores = {}
+
+ latency_ms = int((time.monotonic() - t0) * 1000)
+ finished_at = utc_now_iso()
+
+ # Build complete scores for this sample (skipped metrics → None)
+ all_scores: dict[str, float | None] = {m: None for m in request.metrics}
+ all_scores.update(raw_scores)
+
+ # 综合加权得分计算(已暂时禁用)
+ # weighted_raw = compute_weighted_score(
+ # {k: v for k, v in raw_scores.items() if v is not None}, {}
+ # )
+ # weighted = round(weighted_raw, 4) if weighted_raw is not None else None
+ weighted = None
+
+ # --- File I/O must be serialized per session ----------------------
+ session_lock = self._get_session_lock(session_id)
+ with session_lock:
+ run_dir = self._output_dir / run_id
+ run_dir.mkdir(parents=True, exist_ok=True)
+
+ # Read all existing rows, then append the new one
+ existing_rows = self._read_score_rows(run_dir)
+ call_number = len(existing_rows) + 1
+
+ new_row: dict[str, Any] = {
+ "sample_id": f"session-score-{call_number}",
+ "question": request.question,
+ "answer": request.answer or "",
+ "contexts": request.contexts or "",
+ "ground_truth": request.ground_truth or "",
+ "error": "",
+ }
+ new_row.update(all_scores)
+
+ all_rows = existing_rows + [new_row]
+
+ # Reconstruct NormalizedSample objects for write_run_artifacts metadata
+ valid_samples = [
+ NormalizedSample(
+ sample_id=str(row.get("sample_id", f"session-score-{i + 1}")),
+ question=str(row.get("question", "")),
+ answer=str(row.get("answer", "")),
+ contexts=[
+ part.strip()
+ for part in str(row.get("contexts", "")).split(" |||| ")
+ if part.strip()
+ ],
+ ground_truth=str(row.get("ground_truth", "")),
+ )
+ for i, row in enumerate(all_rows)
+ ]
+
+ # Determine all metric columns (union of all rows' metric keys)
+ all_metric_names = sorted({
+ k for row in all_rows
+ for k in row if k not in _NON_METRIC_COLUMNS
+ })
+
+ scenario = Scenario(
+ scenario_name=f"session-{_sanitize_session_id(session_id)}",
+ mode="offline",
+ dataset=DatasetConfig(path=run_dir / "dataset.csv"),
+ judge_model=judge_model,
+ embedding_model=embedding_model,
+ metrics=all_metric_names,
+ output_dir=self._output_dir,
+ optimization_advisor=True,
+ )
+
+ started_at_val = (
+ existing_rows[0].get("_started_at", finished_at)
+ if existing_rows else finished_at
+ )
+
+ result = EvaluationResult(
+ scenario=scenario,
+ run_id=run_id,
+ started_at=started_at_val if isinstance(started_at_val, str) else finished_at,
+ finished_at=finished_at,
+ valid_samples=valid_samples,
+ invalid_samples=[],
+ score_rows=all_rows,
+ )
+
+ write_run_artifacts(result)
+ logger.info(
+ "[session_job] artifacts written job_id=%s session_id=%s call=%d",
+ job_id, session_id, call_number,
+ )
+
+ # Regenerate optimization advice over all accumulated rows
+ try:
+ llm, _ = build_models(judge_model, embedding_model, settings)
+ run_advisor(result, scenario, llm)
+ logger.info("[session_job] advisor done job_id=%s session=%s", job_id, session_id)
+ except Exception as adv_exc: # noqa: BLE001
+ logger.warning(
+ "[session_job] advisor failed job_id=%s err=%s", job_id, adv_exc
+ )
+
+ self._update_job(
+ job_id,
+ status="completed",
+ finished_at=finished_at,
+ run_id=run_id,
+ scores=all_scores,
+ weighted_score=weighted,
+ latency_ms=latency_ms,
+ skipped_metrics=skipped,
+ )
+ self._persist_session_index(session_id)
+
+ except Exception as exc: # noqa: BLE001
+ latency_ms = int((time.monotonic() - t0) * 1000)
+ import logging as _logging
+ _logging.getLogger("webapp.services.session_score_manager").error(
+ "[session_job] failed job_id=%s err=%s", job_id, exc
+ )
+ self._update_job(
+ job_id,
+ status="failed",
+ finished_at=_now_iso(),
+ latency_ms=latency_ms,
+ error=f"{type(exc).__name__}: {exc}",
+ )
+
+ # ------------------------------------------------------------------ #
+ # Helpers
+ # ------------------------------------------------------------------ #
+
+ def _get_session_lock(self, session_id: str) -> threading.Lock:
+ with self._lock:
+ if session_id not in self._session_locks:
+ self._session_locks[session_id] = threading.Lock()
+ return self._session_locks[session_id]
+
+ def _read_score_rows(self, run_dir: Path) -> list[dict[str, Any]]:
+ """Read existing scores.csv rows, returning empty list if file doesn't exist."""
+ scores_path = run_dir / "scores.csv"
+ if not scores_path.is_file():
+ return []
+ try:
+ frame = pd.read_csv(scores_path)
+ return frame.where(pd.notnull(frame), None).to_dict("records")
+ except (OSError, ValueError):
+ return []
+
+ def _read_metric_means(self, run_dir: Path) -> dict[str, float | None]:
+ """Compute per-metric means from the session's scores.csv."""
+ scores_path = run_dir / "scores.csv"
+ if not scores_path.is_file():
+ return {}
+ try:
+ frame = pd.read_csv(scores_path)
+ except (OSError, ValueError):
+ return {}
+ means: dict[str, float | None] = {}
+ for col in frame.columns:
+ if col in _NON_METRIC_COLUMNS:
+ continue
+ if pd.api.types.is_numeric_dtype(frame[col]):
+ val = frame[col].mean(numeric_only=True)
+ means[col] = None if pd.isna(val) else round(float(val), 4)
+ return means
+
+ def _update_job(self, job_id: str, **kwargs: Any) -> None:
+ with self._lock:
+ existing = self._job_cache.get(job_id)
+ if existing is None:
+ return
+ updated = existing.model_copy(update=kwargs)
+ self._job_cache[job_id] = updated
+ self._persist_job_index(updated)
+
+ def _persist_job_index(self, status: AsyncScoreJobStatus) -> None:
+ """Persist a single job's status to the index directory."""
+ path = self._index_dir / f"{status.job_id}.json"
+ path.write_text(
+ json.dumps(status.model_dump(), ensure_ascii=False, indent=2),
+ encoding="utf-8",
+ )
+
+ def _persist_session_index(self, session_id: str) -> None:
+ """Persist the session→job_ids mapping."""
+ with self._lock:
+ job_ids = list(self._session_jobs.get(session_id) or [])
+ run_id = self.session_run_id(session_id)
+ data = {"session_id": session_id, "run_id": run_id, "job_ids": job_ids}
+ path = self._index_dir / "_sessions" / f"{_sanitize_session_id(session_id)}.json"
+ path.write_text(
+ json.dumps(data, ensure_ascii=False, indent=2),
+ encoding="utf-8",
+ )
+
+ def _load_existing(self) -> None:
+ """Restore job cache and session mappings from persisted index files on startup."""
+ # Load individual job files
+ for path in sorted(self._index_dir.glob("*.json")):
+ try:
+ data = json.loads(path.read_text(encoding="utf-8"))
+ status = AsyncScoreJobStatus.model_validate(data)
+ self._job_cache[status.job_id] = status
+ except Exception: # noqa: BLE001
+ pass
+
+ # Load session→job_ids mappings
+ sessions_dir = self._index_dir / "_sessions"
+ if not sessions_dir.is_dir():
+ return
+ for path in sorted(sessions_dir.glob("*.json")):
+ try:
+ data = json.loads(path.read_text(encoding="utf-8"))
+ sid = data.get("session_id", "")
+ job_ids = data.get("job_ids", [])
+ if sid:
+ self._session_jobs[sid] = job_ids
+ except Exception: # noqa: BLE001
+ pass
+
+
+# Module-level singleton shared by FastAPI routes.
+session_score_manager = SessionScoreJobManager()
diff --git a/webapp/static/js/report.js b/webapp/static/js/report.js
index abb7320..a76a7ab 100644
--- a/webapp/static/js/report.js
+++ b/webapp/static/js/report.js
@@ -128,17 +128,17 @@ const Report = {
wrap.appendChild(card);
});
- // 综合加权得分卡片
- const wsValue = (report && report.weighted_score_mean !== undefined) ? report.weighted_score_mean : null;
- const wsCard = document.createElement("div");
- wsCard.className = "metric-card weighted-score-card";
- const wsCls = App.scoreClass(wsValue);
- const wsText = wsValue === null || wsValue === undefined ? "n/a" : wsValue.toFixed(2);
- wsCard.innerHTML = `
- ${wsText}
- 综合加权得分
- `;
- wrap.appendChild(wsCard);
+ // 综合加权得分卡片(已暂时隐藏)
+ // const wsValue = (report && report.weighted_score_mean !== undefined) ? report.weighted_score_mean : null;
+ // const wsCard = document.createElement("div");
+ // wsCard.className = "metric-card weighted-score-card";
+ // const wsCls = App.scoreClass(wsValue);
+ // const wsText = wsValue === null || wsValue === undefined ? "n/a" : wsValue.toFixed(2);
+ // wsCard.innerHTML = `
+ // ${wsText}
+ // 综合加权得分
+ // `;
+ // wrap.appendChild(wsCard);
},
// ② 分数分布直方图(可切换指标)。
diff --git a/webapp/static/js/score_jobs.js b/webapp/static/js/score_jobs.js
index cfe93e9..18f5d9c 100644
--- a/webapp/static/js/score_jobs.js
+++ b/webapp/static/js/score_jobs.js
@@ -55,10 +55,11 @@ const ScoreJobs = {
return `${App.escape(App.shortMetric(k))} ${text}`;
})
.join(" ");
- if (job.weighted_score !== null && job.weighted_score !== undefined) {
- const cls = App.scoreClass(job.weighted_score);
- scoreHtml += ` 综合 ${Number(job.weighted_score).toFixed(3)}`;
- }
+ // 综合加权得分(已暂时隐藏)
+ // if (job.weighted_score !== null && job.weighted_score !== undefined) {
+ // const cls = App.scoreClass(job.weighted_score);
+ // scoreHtml += ` 综合 ${Number(job.weighted_score).toFixed(3)}`;
+ // }
} else if (job.status === "failed") {
scoreHtml = `${App.escape((job.error || "").slice(0, 80))}`;
} else {
diff --git a/webserver.log b/webserver.log
new file mode 100644
index 0000000..399e996
--- /dev/null
+++ b/webserver.log
@@ -0,0 +1,17 @@
+INFO: Started server process [82284]
+INFO: Waiting for application startup.
+INFO: Application startup complete.
+INFO: Uvicorn running on http://127.0.0.1:8811 (Press CTRL+C to quit)
+INFO: 127.0.0.1:56164 - "GET /api/health HTTP/1.1" 200 OK
+INFO: 127.0.0.1:53350 - "GET / HTTP/1.1" 200 OK
+INFO: 127.0.0.1:53351 - "GET /api/runs HTTP/1.1" 200 OK
+INFO: 127.0.0.1:53352 - "GET /api/scenarios HTTP/1.1" 200 OK
+INFO: 127.0.0.1:64689 - "GET /api/runs/2026-06-15T08-30-00%2B00-00 HTTP/1.1" 200 OK
+INFO: 127.0.0.1:64700 - "POST /api/evaluations HTTP/1.1" 200 OK
+INFO: 127.0.0.1:64703 - "GET /api/evaluations/a3243f2443d7 HTTP/1.1" 200 OK
+INFO: 127.0.0.1:58440 - "GET /api/evaluations/a3243f2443d7 HTTP/1.1" 200 OK
+INFO: 127.0.0.1:64454 - "GET /static/css/app.css HTTP/1.1" 200 OK
+INFO: 127.0.0.1:64455 - "GET /static/js/api.js HTTP/1.1" 200 OK
+INFO: 127.0.0.1:56825 - "GET /static/js/app.js HTTP/1.1" 200 OK
+INFO: 127.0.0.1:56829 - "GET /static/js/report.js HTTP/1.1" 200 OK
+INFO: 127.0.0.1:56830 - "GET /static/js/runner.js HTTP/1.1" 200 OK