From 754a30ad593c9fcb9c797bc5ae4db19eec8b652a Mon Sep 17 00:00:00 2001 From: wangwei Date: Fri, 26 Jun 2026 16:09:33 +0800 Subject: [PATCH] feat(session-async): add /api/score/session_async with incremental session report aggregation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New POST /api/score/session_async endpoint: same session_id calls append to one shared report - New GET /api/score/sessions/{session_id}: returns call_count, metric_means, all job records - New GET /api/score/session/jobs/{job_id}: individual call status - SessionScoreJobManager: deterministic run_id from session_id, per-session mutex for CSV append, advisor regenerated on every call - SessionScoreRequest (extends ScoreRequest + session_id), SessionScoreJobResponse, SessionStatus models added - 24 new tests, all passing chore(weighted-score): comment out 综合加权得分 display and computation - report.js: hide 综合加权得分 card in report detail page - score_jobs.js: hide 综合 chip in async job list - report_builder.py: overall_ws=None (computation disabled) - summary.py: weighted_score summary line disabled - evaluator.py: weighted_score/sample_weight columns no longer written to scores.csv - score.py /api/score: weighted_score always returns null - score_job_manager.py + session_score_manager.py: weighted=None - Updated 3 tests to match new behaviour (6 pre-existing failures unchanged) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .idea/.gitignore | 8 + .idea/misc.xml | 7 + .idea/modules.xml | 8 + .idea/siemens_ragas.iml | 9 + .idea/vcs.xml | 6 + .../content/analysis-approach.html | 60 +++ .../1625-1781595805/content/approaches.html | 77 +++ .../content/design-architecture.html | 53 ++ .../1625-1781595805/content/trigger-mode.html | 68 +++ .../1625-1781595805/content/waiting-2.html | 3 + .../1625-1781595805/content/waiting.html | 3 + .../1625-1781595805/state/server-stopped | 1 + .../1625-1781595805/state/server.pid | 1 + logs/online_eval.log | 1 + logs/server_2026-06-23.log | 24 + logs/siemens_build.log | 35 ++ rag_eval/execution/evaluator.py | 16 +- rag_eval/reporting/summary.py | 19 +- rag_eval/shared/profile_store.py | 53 ++ ...tml => siemens-ragas-project-overview.html | 0 tests/test_offline_eval.py | 10 +- tests/test_pipeline.py | 280 +++++++++++ tests/test_webapp_report_builder.py | 3 +- tests/webapp/test_score_api.py | 3 +- tests/webapp/test_session_score_jobs_api.py | 299 ++++++++++++ webapp/api/score.py | 11 +- webapp/api/session_score_jobs.py | 171 +++++++ webapp/models.py | 44 ++ webapp/server.py | 7 +- webapp/services/pipeline_task_manager.py | 257 ++++++++++ webapp/services/report_builder.py | 8 +- webapp/services/score_job_manager.py | 10 +- webapp/services/session_score_manager.py | 452 ++++++++++++++++++ webapp/static/js/report.js | 22 +- webapp/static/js/score_jobs.js | 9 +- webserver.log | 17 + 36 files changed, 2004 insertions(+), 51 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/siemens_ragas.iml create mode 100644 .idea/vcs.xml create mode 100644 .superpowers/brainstorm/1625-1781595805/content/analysis-approach.html create mode 100644 .superpowers/brainstorm/1625-1781595805/content/approaches.html create mode 100644 .superpowers/brainstorm/1625-1781595805/content/design-architecture.html create mode 100644 .superpowers/brainstorm/1625-1781595805/content/trigger-mode.html create mode 100644 .superpowers/brainstorm/1625-1781595805/content/waiting-2.html create mode 100644 .superpowers/brainstorm/1625-1781595805/content/waiting.html create mode 100644 .superpowers/brainstorm/1625-1781595805/state/server-stopped create mode 100644 .superpowers/brainstorm/1625-1781595805/state/server.pid create mode 100644 logs/online_eval.log create mode 100644 logs/server_2026-06-23.log create mode 100644 logs/siemens_build.log create mode 100644 rag_eval/shared/profile_store.py rename project-overview.html => siemens-ragas-project-overview.html (100%) create mode 100644 tests/test_pipeline.py create mode 100644 tests/webapp/test_session_score_jobs_api.py create mode 100644 webapp/api/session_score_jobs.py create mode 100644 webapp/services/pipeline_task_manager.py create mode 100644 webapp/services/session_score_manager.py create mode 100644 webserver.log diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..1e305c1 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..76b7fe3 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/siemens_ragas.iml b/.idea/siemens_ragas.iml new file mode 100644 index 0000000..d6ebd48 --- /dev/null +++ b/.idea/siemens_ragas.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.superpowers/brainstorm/1625-1781595805/content/analysis-approach.html b/.superpowers/brainstorm/1625-1781595805/content/analysis-approach.html new file mode 100644 index 0000000..85419b2 --- /dev/null +++ b/.superpowers/brainstorm/1625-1781595805/content/analysis-approach.html @@ -0,0 +1,60 @@ +

优化建议怎么生成?

+

这决定了模块的核心机制与可维护性

+ +
+
+
A
+
+

纯规则引擎

+

每个指标设阈值(如 faithfulness < 0.6),触发时给出预设建议文本。

+
+

优点

    +
  • 零 LLM 调用,零额外成本
  • +
  • 结果可预测、可审计
  • +
  • 响应极快
  • +
+

缺点

    +
  • 建议固定,无法结合具体样本
  • +
  • 不能解释"为什么这批数据这个指标低"
  • +
+
+
+
+ +
+
B
+
+

LLM 分析(全自动)

+

把评测结果(各指标均值 + 低分样本)一起交给 LLM,生成上下文感知的中文分析报告。

+
+

优点

    +
  • 能结合具体低分样本给出针对性建议
  • +
  • 可用中文解释西门子场景下的问题
  • +
  • 建议质量高、内容丰富
  • +
+

缺点

    +
  • 每次评测多 1 次 LLM 调用
  • +
  • 依赖 judge_model 的质量
  • +
+
+
+
+ +
+
C
+
+

规则定位 + LLM 解读(推荐)

+

规则引擎先识别哪些指标异常、触发哪条优化方向;再把"规则诊断 + 低分样本"一起给 LLM 做二次解读,生成中文建议。

+
+

优点

    +
  • 规则保证诊断稳定,不依赖 LLM 自由发挥
  • +
  • LLM 在有结构的输入下输出更准确
  • +
  • 两层可独立测试
  • +
+

缺点

    +
  • 实现略复杂(两个子模块)
  • +
+
+
+
+
diff --git a/.superpowers/brainstorm/1625-1781595805/content/approaches.html b/.superpowers/brainstorm/1625-1781595805/content/approaches.html new file mode 100644 index 0000000..638bed6 --- /dev/null +++ b/.superpowers/brainstorm/1625-1781595805/content/approaches.html @@ -0,0 +1,77 @@ +

优化顾问模块 — 实现方案对比

+

三个方案的核心区别在于 LLM 调用边界和代码入侵程度

+ +
+
+
A
+
+

独立后处理器(轻量集成)

+

新增 rag_eval/advisor/ 包,run_scenario() 末尾调用一行 maybe_run_advisor(result, scenario)

+

文件结构:

+
    +
  • rag_eval/advisor/__init__.py
  • +
  • rag_eval/advisor/rules.py — 规则引擎,输入 score_rows,输出诊断列表
  • +
  • rag_eval/advisor/llm_analyzer.py — 把规则诊断 + 低分样本交给 judge_model
  • +
  • rag_eval/advisor/writer.py — 写 optimization_advice.md,打日志摘要
  • +
+
+

优点

    +
  • 改动最小,runner.py 只加 3 行
  • +
  • advisor 完全独立,可单独测试
  • +
  • 与现有分层架构完全吻合
  • +
+

缺点

    +
  • 无法拿到 per-metric 的原始 NaN 率(需从 score_rows 重新算)
  • +
+
+
+
+ +
+
B
+
+

嵌入 reporting 层(复用写出基础设施)

+

把 advisor 作为 rag_eval/reporting/ 的一部分,write_run_artifacts() 内部判断是否写 advice。

+

文件结构:

+
    +
  • rag_eval/reporting/advisor.py — 规则 + LLM + 写出三合一
  • +
  • write_run_artifacts() 里追加 if scenario.optimization_advisor: write_advice(...)
  • +
+
+

优点

    +
  • artifacts 路径管理统一,advice 自然进 run 目录
  • +
  • 文件更少
  • +
+

缺点

    +
  • reporting 层本是"无副作用写文件",混入 LLM 调用破坏这一约定
  • +
  • advisor 逻辑和写出逻辑耦合,难以单独测试规则引擎
  • +
+
+
+
+ +
+
C
+
+

方案 A 变体:advisor 有独立 settings(推荐)

+

与方案 A 相同的文件结构,但 LLM 调用使用 scenario 已有的 judge_model,不新增任何模型配置——advisor 复用 build_models() 已构建好的 llm 实例。

+
    +
  • rag_eval/advisor/rules.py — 纯函数,7 条指标诊断规则
  • +
  • rag_eval/advisor/llm_analyzer.py — 接收已有 llm 实例,不重新建 client
  • +
  • rag_eval/advisor/writer.py — 写 md + 日志
  • +
  • rag_eval/advisor/__init__.py — 暴露 run_advisor()
  • +
+
+

优点

    +
  • 不重复创建 LLM client(节省资源)
  • +
  • advisor 阈值可通过 YAML 的 optimization_advisor 块扩展配置
  • +
  • 独立包边界清晰,易于单测
  • +
  • runner.py 改动最小
  • +
+

缺点

    +
  • 需把 llm 实例从 runner 传入 advisor(多传一个参数)
  • +
+
+
+
+
diff --git a/.superpowers/brainstorm/1625-1781595805/content/design-architecture.html b/.superpowers/brainstorm/1625-1781595805/content/design-architecture.html new file mode 100644 index 0000000..acbde1b --- /dev/null +++ b/.superpowers/brainstorm/1625-1781595805/content/design-architecture.html @@ -0,0 +1,53 @@ +

优化顾问模块 — 整体架构与数据流

+

新增 rag_eval/advisor/ 包,插入 run_scenario() 末尾,复用已有 llm 实例

+ +
+
执行链路(变更前 → 变更后)
+
+ run_scenario()
+   → load_scenario()            # 读 YAML,解析 Scenario + optimization_advisor 字段
+   → build_models()             # 已有:创建 llm, embeddings
+   → build_metric_pipeline()    # 已有
+   → Evaluator.evaluate()       # 已有:打分 → EvaluationResult
+   → write_run_artifacts()      # 已有:scores.csv / summary.md / ...
+   → run_advisor(result, scenario, llm)   # 新增 3 行
+       → rules.diagnose(score_rows)         # 规则引擎:识别异常指标 + 方向
+       → llm_analyzer.analyze(diag, samples) # LLM:结合低分样本生成中文建议
+       → writer.write(advice, paths)        # 写 optimization_advice.md + 日志 +
+
+ +
+

新增文件一览

+
+
+ rag_eval/advisor/
+   __init__.py     ← 暴露 run_advisor(),是外部唯一入口
+   rules.py       ← 纯函数,无 LLM,可单独单测
+   llm_analyzer.py ← 接收 llm 实例 + 诊断结构 → 中文 Markdown
+   writer.py      ← 写 optimization_advice.md,打日志摘要
+
+ rag_eval/shared/models.py   ← 修改:Scenario 加 optimization_advisor 字段
+ rag_eval/config/schema.py   ← 修改:ScenarioModel 加字段
+ rag_eval/execution/runner.py ← 修改:末尾加 3 行调用
+ rag_eval/reporting/artifacts.py ← 修改:RunArtifactPaths 加 advice_md 路径 +
+
+
+ +
+

输出产物

+
+
+ outputs/online/siemens-pdf-question-bank/<run_id>/
+   scenario.snapshot.yaml
+   scores.csv
+   invalid.csv
+   summary.md
+   metadata.json
+   optimization_advice.md     ← 新增 +
+
+
+ +

整体看起来 OK 吗?这是新模块与现有链路的接入方式。

diff --git a/.superpowers/brainstorm/1625-1781595805/content/trigger-mode.html b/.superpowers/brainstorm/1625-1781595805/content/trigger-mode.html new file mode 100644 index 0000000..617bddd --- /dev/null +++ b/.superpowers/brainstorm/1625-1781595805/content/trigger-mode.html @@ -0,0 +1,68 @@ +

优化顾问在什么情况下运行?

+

这决定了模块与现有评测流程的集成方式

+ +
+
+
A
+
+

每次评测自动运行

+

run_scenario() 结束后自动调用,无需任何额外配置。

+
+

优点

    +
  • 零感知,开箱即用
  • +
  • 每次跑完都有建议报告
  • +
+

缺点

    +
  • 每次都多一次 LLM 调用,不管是否需要
  • +
  • 无法关闭
  • +
+
+
+
+ +
+
B
+
+

YAML 场景中显式开启(推荐)

+

在 scenario YAML 里加一行 optimization_advisor: true,默认关闭。

+
+
siemens-pdf-question-bank-online.yaml
+
+ metrics:
+   - faithfulness
+   - noise_sensitivity
+   ...
+ optimization_advisor: true # 新增 +
+
+
+

优点

    +
  • 显式可见,按需开启
  • +
  • 与现有 YAML 驱动风格一致
  • +
  • 可为不同场景独立配置
  • +
+

缺点

    +
  • 需要手动在 YAML 里加一行
  • +
+
+
+
+ +
+
C
+
+

阈值触发(任一指标低于警戒线时自动激活)

+

规则引擎先算,若发现有指标低于阈值则自动启动 LLM 分析;一切正常则跳过。

+
+

优点

    +
  • "有问题才报警",符合直觉
  • +
  • 高分场景无额外成本
  • +
+

缺点

    +
  • 阈值需要维护,不同场景可能不同
  • +
  • 正常分数时无建议,但用户可能仍想看优化空间
  • +
+
+
+
+
diff --git a/.superpowers/brainstorm/1625-1781595805/content/waiting-2.html b/.superpowers/brainstorm/1625-1781595805/content/waiting-2.html new file mode 100644 index 0000000..b5f5028 --- /dev/null +++ b/.superpowers/brainstorm/1625-1781595805/content/waiting-2.html @@ -0,0 +1,3 @@ +
+

Writing spec & moving to implementation...

+
diff --git a/.superpowers/brainstorm/1625-1781595805/content/waiting.html b/.superpowers/brainstorm/1625-1781595805/content/waiting.html new file mode 100644 index 0000000..e588f77 --- /dev/null +++ b/.superpowers/brainstorm/1625-1781595805/content/waiting.html @@ -0,0 +1,3 @@ +
+

Continuing in terminal — 正在设计方案...

+
diff --git a/.superpowers/brainstorm/1625-1781595805/state/server-stopped b/.superpowers/brainstorm/1625-1781595805/state/server-stopped new file mode 100644 index 0000000..c4e2401 --- /dev/null +++ b/.superpowers/brainstorm/1625-1781595805/state/server-stopped @@ -0,0 +1 @@ +{"reason":"idle timeout","timestamp":1781598635371} diff --git a/.superpowers/brainstorm/1625-1781595805/state/server.pid b/.superpowers/brainstorm/1625-1781595805/state/server.pid new file mode 100644 index 0000000..f71f46d --- /dev/null +++ b/.superpowers/brainstorm/1625-1781595805/state/server.pid @@ -0,0 +1 @@ +1625 diff --git a/logs/online_eval.log b/logs/online_eval.log new file mode 100644 index 0000000..061116f --- /dev/null +++ b/logs/online_eval.log @@ -0,0 +1 @@ +Completed run: C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas\outputs\online\siemens-pdf-question-bank diff --git a/logs/server_2026-06-23.log b/logs/server_2026-06-23.log new file mode 100644 index 0000000..9005d33 --- /dev/null +++ b/logs/server_2026-06-23.log @@ -0,0 +1,24 @@ +2026-06-23 13:55:00 INFO webapp.server Starting RAGAS Console host=127.0.0.1 port=8800 log_level=info log_file=C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas\logs\server_2026-06-23.log +2026-06-23 13:55:14 INFO uvicorn.error Started server process [83868] +2026-06-23 13:55:14 INFO uvicorn.error Waiting for application startup. +2026-06-23 13:55:14 INFO uvicorn.error Application startup complete. +2026-06-23 13:55:14 INFO uvicorn.error Uvicorn running on http://127.0.0.1:8800 (Press CTRL+C to quit) +2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:53487 - "GET / HTTP/1.1" 200 +2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:53487 - "GET /static/css/app.css HTTP/1.1" 200 +2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:50321 - "GET /static/js/api.js HTTP/1.1" 200 +2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:51325 - "GET /static/js/profiles.js HTTP/1.1" 200 +2026-06-23 13:59:47 INFO uvicorn.access 127.0.0.1:59869 - "GET /static/js/report.js HTTP/1.1" 200 +2026-06-23 13:59:48 INFO uvicorn.access 127.0.0.1:50980 - "GET /static/js/runner.js HTTP/1.1" 200 +2026-06-23 13:59:48 INFO uvicorn.access 127.0.0.1:63223 - "GET /static/js/app.js HTTP/1.1" 200 +2026-06-23 13:59:48 INFO webapp.access GET /docs → 200 (0ms) +2026-06-23 13:59:48 INFO uvicorn.access 127.0.0.1:63223 - "GET /docs HTTP/1.1" 200 +2026-06-23 13:59:48 INFO webapp.access GET /api/health → 200 (0ms) +2026-06-23 13:59:48 INFO uvicorn.access 127.0.0.1:50321 - "GET /api/health HTTP/1.1" 200 +2026-06-23 13:59:49 INFO webapp.api.runs [get_runs] found 19 runs +2026-06-23 13:59:49 INFO webapp.access GET /api/runs → 200 (1094ms) +2026-06-23 13:59:49 INFO uvicorn.access 127.0.0.1:63223 - "GET /api/runs HTTP/1.1" 200 +2026-06-23 13:59:49 INFO webapp.access GET /openapi.json → 200 (94ms) +2026-06-23 13:59:49 INFO uvicorn.access 127.0.0.1:63223 - "GET /openapi.json HTTP/1.1" 200 +2026-06-23 13:59:50 INFO webapp.api.llm_profiles [list_profiles] count=6 +2026-06-23 13:59:50 INFO webapp.access GET /api/llm-profiles → 200 (0ms) +2026-06-23 13:59:50 INFO uvicorn.access 127.0.0.1:63223 - "GET /api/llm-profiles HTTP/1.1" 200 diff --git a/logs/siemens_build.log b/logs/siemens_build.log new file mode 100644 index 0000000..47aaa36 --- /dev/null +++ b/logs/siemens_build.log @@ -0,0 +1,35 @@ + [info] generating questions for: 315_1_Flash????????.pdf + [info] 315_1_Flash????????.pdf: 6 questions generated (total so far: 6) + [info] generating questions for: 316_2_Flash??????_??.pdf + [info] 316_2_Flash??????_??.pdf: 10 questions generated (total so far: 16) + [info] generating questions for: 317_3_Flash??????_??.pdf + [info] 317_3_Flash??????_??.pdf: 9 questions generated (total so far: 25) + [info] generating questions for: 318_4_Flash??????_???.pdf + [info] 318_4_Flash??????_???.pdf: 9 questions generated (total so far: 34) + [info] generating questions for: 319_5_Flash??????_?????.pdf + [info] 319_5_Flash??????_?????.pdf: 10 questions generated (total so far: 44) + [info] generating questions for: 320_6_Flash??????_??.pdf + [info] 320_6_Flash??????_??.pdf: 8 questions generated (total so far: 52) + [info] generating questions for: 321_??CT???????????--??.pdf + [info] 321_??CT???????????--??.pdf: 5 questions generated (total so far: 57) + [info] generating questions for: 322_??CT???????????--??????????.pdf + [info] 322_??CT???????????--??????????.pdf: 8 questions generated (total so far: 65) + [info] generating questions for: 323_??CT???????????--?????????.pdf + [info] 323_??CT???????????--?????????.pdf: 5 questions generated (total so far: 70) + [info] generating questions for: 324_??CT???????????--????????.pdf + [info] 324_??CT???????????--????????.pdf: 8 questions generated (total so far: 78) + [info] generating questions for: 325_??CT???????????--???????.pdf + [info] 325_??CT???????????--???????.pdf: 8 questions generated (total so far: 86) + [info] generating questions for: 326_??CT???????????--4D????.pdf + [info] 326_??CT???????????--4D????.pdf: 7 questions generated (total so far: 93) + [info] generating questions for: 327_??CT???????????--??????.pdf + [info] 327_??CT???????????--??????.pdf: 8 questions generated (total so far: 101) + [info] generating questions for: 749_????01_???????????.pdf + [info] 749_????01_???????????.pdf: 8 questions generated (total so far: 109) + [info] generating questions for: 804_????02-????????CT?????X-Map??.pdf + [info] 804_????02-????????CT?????X-Map??.pdf: 8 questions generated (total so far: 117) + [info] generating questions for: 805_????03_????????????????.pdf + [info] 805_????03_????????????????.pdf: 6 questions generated (total so far: 123) + [info] generating questions for: 807_???CT???????_SJ-L10.2??1-5.pdf + [info] 807_???CT???????_SJ-L10.2??1-5.pdf: 9 questions generated (total so far: 132) +Completed dataset build: C:\Projects\AIProjects\Siemens-AIPOC\siemens_ragas\outputs\dataset-builds\siemens-pdf-question-bank\2026-06-15T09-28-35.302231+00-00 diff --git a/rag_eval/execution/evaluator.py b/rag_eval/execution/evaluator.py index b2a84cc..5044730 100644 --- a/rag_eval/execution/evaluator.py +++ b/rag_eval/execution/evaluator.py @@ -180,12 +180,12 @@ class Evaluator: record["judge_model"] = self.scenario.judge_model record["embedding_model"] = self.scenario.embedding_model record["run_id"] = self.scenario.scenario_name - # Weighted score columns — enable post-hoc weighted aggregation in reporting. - record["weighted_score"] = compute_weighted_score( - score.metrics, self.scenario.metric_weights - ) - doc_name = str(sample.metadata.get("doc_name", "") or "") - record["sample_weight"] = resolve_weight( - self.scenario.doc_weights, doc_name, default=1.0 - ) + # 综合加权得分列(已暂时禁用) + # record["weighted_score"] = compute_weighted_score( + # score.metrics, self.scenario.metric_weights + # ) + # doc_name = str(sample.metadata.get("doc_name", "") or "") + # record["sample_weight"] = resolve_weight( + # self.scenario.doc_weights, doc_name, default=1.0 + # ) return record diff --git a/rag_eval/reporting/summary.py b/rag_eval/reporting/summary.py index 13a76af..7953e7f 100644 --- a/rag_eval/reporting/summary.py +++ b/rag_eval/reporting/summary.py @@ -75,15 +75,16 @@ def build_summary_markdown(result: EvaluationResult) -> str: else: lines.append(f"- {metric}: `n/a`{weight_note}") - if has_weights: - overall_ws = compute_overall_weighted_score_mean( - score_rows_list, result.scenario.metric_weights, result.scenario.doc_weights - ) - weight_suffix = " (加权)" - if overall_ws is not None and not math.isnan(overall_ws): - lines.append(f"- **weighted_score{weight_suffix}: `{overall_ws:.4f}`**") - else: - lines.append(f"- **weighted_score{weight_suffix}: `n/a`**") + # 综合加权得分(已暂时禁用) + # if has_weights: + # overall_ws = compute_overall_weighted_score_mean( + # score_rows_list, result.scenario.metric_weights, result.scenario.doc_weights + # ) + # weight_suffix = " (加权)" + # if overall_ws is not None and not math.isnan(overall_ws): + # lines.append(f"- **weighted_score{weight_suffix}: `{overall_ws:.4f}`**") + # else: + # lines.append(f"- **weighted_score{weight_suffix}: `n/a`**") detail_columns = ["sample_id", *result.scenario.metrics, "weighted_score", "error"] existing_columns = [c for c in detail_columns if c in scores.columns] diff --git a/rag_eval/shared/profile_store.py b/rag_eval/shared/profile_store.py new file mode 100644 index 0000000..ccf93fc --- /dev/null +++ b/rag_eval/shared/profile_store.py @@ -0,0 +1,53 @@ +"""Lightweight read-only accessor for configs/llm_profiles.json. + +Kept in ``rag_eval`` (not ``webapp``) so the runner can look up per-model +credentials without depending on the webapp layer. +""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +_PROFILES_PATH = Path(__file__).resolve().parents[2] / "configs" / "llm_profiles.json" + + +def find_by_model(model_name: str) -> dict[str, Any] | None: + """Return the first profile whose ``model`` field matches *model_name*, or None. + + Returns None (without raising) when the profiles file does not exist or + cannot be parsed — callers fall back to environment-variable defaults. + """ + if not _PROFILES_PATH.exists(): + return None + try: + data = json.loads(_PROFILES_PATH.read_text(encoding="utf-8")) + for profile in data.get("profiles", []): + if profile.get("model") == model_name: + return profile + except Exception as exc: # noqa: BLE001 + logger.warning("[profile_store] failed to read %s: %s", _PROFILES_PATH, exc) + return None + + +def profile_to_client_kwargs( + profile: dict[str, Any], + fallback_api_key: str | None, + fallback_timeout: float, +) -> dict[str, Any]: + """Convert a profile dict into keyword arguments for ``openai.AsyncOpenAI``. + + Fields present in the profile override the supplied fallback values. + """ + kwargs: dict[str, Any] = { + "api_key": profile.get("api_key") or fallback_api_key or "", + "timeout": float(profile.get("timeout_seconds") or fallback_timeout), + } + base_url = (profile.get("base_url") or "").strip() + if base_url: + kwargs["base_url"] = base_url + return kwargs diff --git a/project-overview.html b/siemens-ragas-project-overview.html similarity index 100% rename from project-overview.html rename to siemens-ragas-project-overview.html diff --git a/tests/test_offline_eval.py b/tests/test_offline_eval.py index 3f6ce90..aee354b 100644 --- a/tests/test_offline_eval.py +++ b/tests/test_offline_eval.py @@ -184,7 +184,7 @@ class ScenarioAndDatasetTests(unittest.TestCase): class EvaluatorAndReportingTests(unittest.TestCase): def test_merge_score_includes_weighted_score_and_sample_weight(self): - """_merge_score adds weighted_score and sample_weight columns.""" + """_merge_score no longer adds weighted_score/sample_weight (feature disabled).""" from unittest.mock import MagicMock from rag_eval.execution.evaluator import Evaluator from rag_eval.shared.models import ( @@ -212,9 +212,11 @@ class EvaluatorAndReportingTests(unittest.TestCase): ) score = MetricScore(metrics={"faithfulness": 1.0, "context_recall": 0.0}) row = evaluator._merge_score(sample, score) - # (3*1.0 + 1*0.0) / (3+1) = 0.75 - assert abs(row["weighted_score"] - 0.75) < 1e-4 - assert row["sample_weight"] == 2.0 + # 综合加权得分已暂时禁用,weighted_score 和 sample_weight 不再写入 + assert "weighted_score" not in row + assert "sample_weight" not in row + assert row["faithfulness"] == 1.0 + assert row["context_recall"] == 0.0 def test_summary_markdown_shows_weighted_score(self): """build_summary_markdown includes weighted_score when metric_weights set.""" diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..9ed1b4c --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,280 @@ +"""Tests for the end-to-end pipeline API and pipeline task manager.""" + +from __future__ import annotations + +import json +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + + +# ── fixtures ────────────────────────────────────────────────────────────────── + +@pytest.fixture() +def client(tmp_path, monkeypatch): + """TestClient with a fresh PipelineTaskManager backed by tmp_path outputs.""" + import webapp.services.pipeline_task_manager as mgr_mod + from webapp.services.pipeline_task_manager import PipelineTaskManager + + fresh_mgr = PipelineTaskManager(max_workers=2) + monkeypatch.setattr(mgr_mod, "pipeline_task_manager", fresh_mgr) + monkeypatch.setattr(mgr_mod, "_PIPELINE_OUTPUT_ROOT", tmp_path / "pipeline") + + import webapp.api.pipeline as api_mod + monkeypatch.setattr(api_mod, "pipeline_task_manager", fresh_mgr) + + from webapp.server import create_app + return TestClient(create_app()) + + +def _minimal_pdf_dir(tmp_path: Path) -> Path: + """Create a temp directory that looks like a PDF folder (empty, valid dir).""" + d = tmp_path / "pdfs" + d.mkdir() + return d + + +def _mock_build_result(tmp_path: Path, job, run_id="r1"): + """Return a fake DatasetBuildResult with a minimal dataset CSV.""" + from rag_eval.dataset_builder.models import ( + DatasetBuildArtifactPaths, + DatasetBuildResult, + DraftQuestionSample, + ) + + artifact_root = tmp_path / "build" / run_id + artifact_root.mkdir(parents=True, exist_ok=True) + latest = tmp_path / "build" / "latest" + latest.mkdir(parents=True, exist_ok=True) + + chunks_path = artifact_root / "source_chunks.jsonl" + chunks_path.write_text( + json.dumps({"chunk_id": "c1", "doc_id": "d1", "doc_name": "test.pdf", + "text": "CT scan context.", "page_start": 1, "page_end": 1, + "section_path": "/", "section_title": "", "source_layout_ids": []}) + "\n", + encoding="utf-8", + ) + (latest / "source_chunks.jsonl").write_text(chunks_path.read_text(encoding="utf-8"), encoding="utf-8") + + dataset_csv = tmp_path / "generated_dataset.csv" + dataset_csv.write_text( + "sample_id,question,ground_truth,scenario,language,doc_id,doc_name," + "section_path,page_start,page_end,source_chunk_ids,question_type,difficulty," + "review_status,review_notes\n" + 's1,"What is CT?","CT is imaging.","test","zh","d1","test.pdf","/",' + '1,1,"[""c1""]","fact","easy","draft",""\n', + encoding="utf-8", + ) + + sample = DraftQuestionSample( + sample_id="s1", question="What is CT?", ground_truth="CT is imaging.", + scenario="test", language="zh", doc_id="d1", doc_name="test.pdf", + section_path="/", page_start=1, page_end=1, source_chunk_ids=["c1"], + question_type="fact", difficulty="easy", + ) + + artifact_paths = DatasetBuildArtifactPaths( + root_dir=artifact_root, + documents_jsonl=artifact_root / "documents.jsonl", + semantic_blocks_jsonl=artifact_root / "semantic_blocks.jsonl", + source_chunks_jsonl=chunks_path, + dataset_draft_csv=artifact_root / "dataset_draft.csv", + parse_failures_csv=artifact_root / "parse_failures.csv", + metadata_json=artifact_root / "metadata.json", + ) + return DatasetBuildResult( + job=job, + run_id=run_id, + artifact_paths=artifact_paths, + documents=[], + draft_samples=[sample], + parse_failures=[], + ) + + +def _mock_eval_result(tmp_path: Path, scenario): + """Return a fake EvaluationResult.""" + from rag_eval.shared.models import EvaluationResult + + return EvaluationResult( + scenario=scenario, + run_id="eval-r1", + started_at="2026-01-01T00:00:00", + finished_at="2026-01-01T00:01:00", + valid_samples=[], + invalid_samples=[], + score_rows=[], + ) + + +# ── API route tests ──────────────────────────────────────────────────────────── + +def test_submit_returns_202_and_job_id(client, tmp_path): + """POST /api/pipeline/jobs returns 202 with job_id immediately.""" + pdf_dir = _minimal_pdf_dir(tmp_path) + + with patch("webapp.services.pipeline_task_manager.PipelineTaskManager._execute") as mock_exec: + from webapp.models import PipelineResult + mock_exec.return_value = PipelineResult( + build_artifact_dir="/tmp/b", dataset_csv="/tmp/d.csv", + source_chunks_jsonl="/tmp/c.jsonl", total_questions=1, + parse_failures=0, eval_run_id="r1", eval_output_dir="/tmp/e", + scores_csv="/tmp/scores.csv", summary_md="/tmp/summary.md", + ) + resp = client.post("/api/pipeline/jobs", json={ + "docs_path": str(pdf_dir), + "job_name": "test-job", + }) + + assert resp.status_code == 202 + data = resp.json() + assert "job_id" in data + assert data["job_name"] == "test-job" + # status may already be completed by the time the response is read (mock runs instantly) + assert data["status"] in ("queued", "completed") + + +def test_get_nonexistent_job_returns_404(client): + """GET /api/pipeline/jobs/{id} returns 404 for unknown job.""" + resp = client.get("/api/pipeline/jobs/doesnotexist") + assert resp.status_code == 404 + + +def test_list_jobs_returns_empty_initially(client): + """GET /api/pipeline/jobs returns empty list when no jobs submitted.""" + resp = client.get("/api/pipeline/jobs") + assert resp.status_code == 200 + assert resp.json()["jobs"] == [] + + +def test_job_status_polling(client, tmp_path): + """Submitted job becomes visible via GET /api/pipeline/jobs/{id}.""" + pdf_dir = _minimal_pdf_dir(tmp_path) + + with patch("webapp.services.pipeline_task_manager.PipelineTaskManager._execute") as mock_exec: + from webapp.models import PipelineResult + mock_exec.return_value = PipelineResult( + build_artifact_dir="/tmp/b", dataset_csv="/tmp/d.csv", + source_chunks_jsonl="/tmp/c.jsonl", total_questions=3, + parse_failures=0, eval_run_id="r2", eval_output_dir="/tmp/e", + scores_csv="/tmp/scores.csv", summary_md="/tmp/summary.md", + ) + post_resp = client.post("/api/pipeline/jobs", json={"docs_path": str(pdf_dir)}) + + job_id = post_resp.json()["job_id"] + + # Poll until done or timeout (max 5s for mock) + for _ in range(20): + status_resp = client.get(f"/api/pipeline/jobs/{job_id}") + assert status_resp.status_code == 200 + status = status_resp.json() + if status["status"] in ("completed", "failed"): + break + time.sleep(0.25) + + assert status["status"] == "completed" + assert status["result"]["total_questions"] == 3 + + +def test_job_fails_on_invalid_docs_path(client): + """Job fails quickly if docs_path does not exist.""" + resp = client.post("/api/pipeline/jobs", json={ + "docs_path": "/nonexistent/path/that/does/not/exist", + }) + assert resp.status_code == 202 + job_id = resp.json()["job_id"] + + for _ in range(20): + status_resp = client.get(f"/api/pipeline/jobs/{job_id}") + status = status_resp.json() + if status["status"] in ("completed", "failed"): + break + time.sleep(0.25) + + assert status["status"] == "failed" + assert "docs_path" in status["error"] or "not" in status["error"].lower() + + +def test_list_jobs_shows_submitted(client, tmp_path): + """GET /api/pipeline/jobs includes jobs after submission.""" + pdf_dir = _minimal_pdf_dir(tmp_path) + + with patch("webapp.services.pipeline_task_manager.PipelineTaskManager._execute") as mock_exec: + from webapp.models import PipelineResult + mock_exec.return_value = PipelineResult( + build_artifact_dir="/tmp/b", dataset_csv="/tmp/d.csv", + source_chunks_jsonl="/tmp/c.jsonl", total_questions=1, + parse_failures=0, eval_run_id="r3", eval_output_dir="/tmp/e", + scores_csv="/tmp/scores.csv", summary_md="/tmp/summary.md", + ) + client.post("/api/pipeline/jobs", json={"docs_path": str(pdf_dir), "job_name": "listed-job"}) + + time.sleep(0.5) + list_resp = client.get("/api/pipeline/jobs") + assert list_resp.status_code == 200 + jobs = list_resp.json()["jobs"] + assert len(jobs) >= 1 + names = [j["job_name"] for j in jobs] + assert "listed-job" in names + + +# ── execute_dataset_build_job refactor test ──────────────────────────────────── + +def test_execute_dataset_build_job_directly(tmp_path): + """execute_dataset_build_job runs the build without a YAML file.""" + from unittest.mock import patch as _patch + from rag_eval.dataset_builder.models import DatasetBuildJob, DatasetBuildRuntime + from rag_eval.dataset_builder.runner import execute_dataset_build_job + from rag_eval.settings import EvaluationSettings + + pdf_dir = tmp_path / "pdfs" + pdf_dir.mkdir() + (pdf_dir / "doc.pdf").write_bytes(b"%PDF-fake") + + job = DatasetBuildJob( + job_name="direct-test", + input_path=pdf_dir, + input_glob="*.pdf", + parser_provider="aliyun_docmind", + failure_mode="skip", + generation_model="test-model", + output_type="online_question_bank", + review_mode="draft_with_manual_review", + max_questions_per_document=5, + max_source_chunks_per_question=3, + dataset_path=tmp_path / "out.csv", + artifact_dir=tmp_path / "artifacts", + runtime=DatasetBuildRuntime(max_documents=1), + ) + + mock_doc = MagicMock() + mock_doc.doc_id = "d1" + mock_doc.doc_name = "doc.pdf" + mock_doc.source_chunks = [] + mock_doc.semantic_blocks = [] + mock_doc.raw_text = "" + mock_doc.structure_nodes = [] + mock_doc.metadata = {} + mock_doc.to_record.return_value = { + "doc_id": "d1", "doc_name": "doc.pdf", "raw_text": "", + "structure_nodes": [], "metadata": {}, + "semantic_block_count": 0, "source_chunk_count": 0, + } + + mock_parser = MagicMock() + mock_parser.parse.return_value = mock_doc + + mock_generator = MagicMock() + mock_generator.generate.return_value = [] + + result = execute_dataset_build_job( + job, + settings=EvaluationSettings(_env_file=None), + parser=mock_parser, + generator=mock_generator, + ) + assert result.job.job_name == "direct-test" + assert result.artifact_paths.root_dir.exists() diff --git a/tests/test_webapp_report_builder.py b/tests/test_webapp_report_builder.py index 1ed0a35..8d492c1 100644 --- a/tests/test_webapp_report_builder.py +++ b/tests/test_webapp_report_builder.py @@ -65,7 +65,8 @@ def test_build_report_uses_weighted_means_and_exposes_snapshot_weights(tmp_path: "faithfulness": pytest.approx(0.75, rel=1e-4), "context_recall": pytest.approx(0.5, rel=1e-4), } - assert report.weighted_score_mean == pytest.approx(0.6667, rel=1e-4) + # 综合加权得分已暂时禁用 + assert report.weighted_score_mean is None assert report.metric_weights == {"faithfulness": 2.0, "context_recall": 1.0} assert report.doc_weights == {"a.pdf": 3.0, "b.pdf": 1.0} assert report.summary_markdown == "summary" diff --git a/tests/webapp/test_score_api.py b/tests/webapp/test_score_api.py index 1eb788c..a9ee7cc 100644 --- a/tests/webapp/test_score_api.py +++ b/tests/webapp/test_score_api.py @@ -241,7 +241,8 @@ class TestScoreEndpoint: }) assert resp.status_code == 200 data = resp.json() - assert data["weighted_score"] is not None + # 综合加权得分已暂时禁用,始终返回 null + assert data["weighted_score"] is None def test_missing_required_fields_returns_422(self, client): resp = client.post("/api/score", json={"question": "q"}) diff --git a/tests/webapp/test_session_score_jobs_api.py b/tests/webapp/test_session_score_jobs_api.py new file mode 100644 index 0000000..22811ea --- /dev/null +++ b/tests/webapp/test_session_score_jobs_api.py @@ -0,0 +1,299 @@ +"""Tests for session-grouped async scoring API and SessionScoreJobManager.""" +from __future__ import annotations + +import json +import threading +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture() +def tmp_manager(tmp_path): + """Isolated SessionScoreJobManager backed by tmp dirs (no real LLM calls).""" + from webapp.services.session_score_manager import SessionScoreJobManager + return SessionScoreJobManager( + output_dir=tmp_path / "score-session", + index_dir=tmp_path / "score-session-jobs", + max_workers=2, + ) + + +@pytest.fixture() +def client(tmp_path, monkeypatch): + """TestClient with fresh SessionScoreJobManager backed by tmp dirs.""" + import webapp.services.session_score_manager as mgr_mod + from webapp.services.session_score_manager import SessionScoreJobManager + + fresh_mgr = SessionScoreJobManager( + output_dir=tmp_path / "score-session", + index_dir=tmp_path / "score-session-jobs", + max_workers=2, + ) + monkeypatch.setattr(mgr_mod, "session_score_manager", fresh_mgr) + + import webapp.api.session_score_jobs as api_mod + monkeypatch.setattr(api_mod, "session_score_manager", fresh_mgr) + + from webapp.server import create_app + return pytest.importorskip("fastapi.testclient").TestClient(create_app()) + + +# --------------------------------------------------------------------------- +# Unit tests for SessionScoreJobManager +# --------------------------------------------------------------------------- + +class TestSessionRunId: + def test_same_session_always_same_run_id(self, tmp_manager): + assert tmp_manager.session_run_id("abc") == tmp_manager.session_run_id("abc") + + def test_different_sessions_different_run_ids(self, tmp_manager): + assert tmp_manager.session_run_id("session-A") != tmp_manager.session_run_id("session-B") + + def test_run_id_prefixed_with_session(self, tmp_manager): + assert tmp_manager.session_run_id("test123").startswith("session-") + + def test_special_chars_sanitized(self, tmp_manager): + run_id = tmp_manager.session_run_id("user@dify:flow/001") + assert "/" not in run_id + assert "@" not in run_id + assert ":" not in run_id + + +class TestSubmit: + def test_submit_returns_job_status_and_run_id(self, tmp_manager): + with patch.object(tmp_manager._executor, "submit"): + status, run_id = tmp_manager.submit("session-1", _mock_request()) + assert status.job_id + assert status.status == "queued" + assert run_id == tmp_manager.session_run_id("session-1") + + def test_submit_adds_job_to_session(self, tmp_manager): + with patch.object(tmp_manager._executor, "submit"): + status, _ = tmp_manager.submit("session-1", _mock_request()) + session = tmp_manager.get_session("session-1") + assert session is not None + assert any(j.job_id == status.job_id for j in session.jobs) + + def test_multiple_submits_same_session_accumulate(self, tmp_manager): + with patch.object(tmp_manager._executor, "submit"): + tmp_manager.submit("session-X", _mock_request()) + tmp_manager.submit("session-X", _mock_request()) + tmp_manager.submit("session-X", _mock_request()) + session = tmp_manager.get_session("session-X") + assert session.call_count == 3 + + def test_get_unknown_job_returns_none(self, tmp_manager): + assert tmp_manager.get_job("does-not-exist") is None + + def test_get_unknown_session_returns_none(self, tmp_manager): + assert tmp_manager.get_session("no-such-session") is None + + +class TestSessionIndexPersistence: + def test_session_index_survives_restart(self, tmp_path): + """Jobs and session mappings loaded from disk on new manager instance.""" + from webapp.services.session_score_manager import SessionScoreJobManager + + mgr1 = SessionScoreJobManager( + output_dir=tmp_path / "score-session", + index_dir=tmp_path / "score-session-jobs", + ) + with patch.object(mgr1._executor, "submit"): + mgr1.submit("persist-session", _mock_request()) + mgr1.submit("persist-session", _mock_request()) + + # New manager instance loads from disk + mgr2 = SessionScoreJobManager( + output_dir=tmp_path / "score-session", + index_dir=tmp_path / "score-session-jobs", + ) + session = mgr2.get_session("persist-session") + assert session is not None + assert session.call_count == 2 + + def test_job_index_file_created_on_submit(self, tmp_path): + from webapp.services.session_score_manager import SessionScoreJobManager + mgr = SessionScoreJobManager( + output_dir=tmp_path / "score-session", + index_dir=tmp_path / "score-session-jobs", + ) + with patch.object(mgr._executor, "submit"): + status, _ = mgr.submit("file-test", _mock_request()) + index_file = tmp_path / "score-session-jobs" / f"{status.job_id}.json" + assert index_file.is_file() + data = json.loads(index_file.read_text()) + assert data["job_id"] == status.job_id + + +class TestAppendBehaviour: + """Test the CSV append / read-all logic in _append_and_regenerate via _read_score_rows.""" + + def test_read_score_rows_returns_empty_for_missing_csv(self, tmp_manager, tmp_path): + rows = tmp_manager._read_score_rows(tmp_path / "nonexistent") + assert rows == [] + + def test_read_score_rows_reads_existing_csv(self, tmp_manager, tmp_path): + run_dir = tmp_path / "run1" + run_dir.mkdir() + df = pd.DataFrame([{"sample_id": "s1", "answer_relevancy": 0.9}]) + df.to_csv(run_dir / "scores.csv", index=False) + rows = tmp_manager._read_score_rows(run_dir) + assert len(rows) == 1 + assert rows[0]["sample_id"] == "s1" + + def test_metric_means_computed_from_csv(self, tmp_manager, tmp_path): + run_dir = tmp_path / "run2" + run_dir.mkdir() + df = pd.DataFrame([ + {"sample_id": "s1", "answer_relevancy": 0.8}, + {"sample_id": "s2", "answer_relevancy": 0.6}, + ]) + df.to_csv(run_dir / "scores.csv", index=False) + means = tmp_manager._read_metric_means(run_dir) + assert means["answer_relevancy"] == pytest.approx(0.7, abs=1e-4) + + +# --------------------------------------------------------------------------- +# API endpoint tests +# --------------------------------------------------------------------------- + +class TestSessionAsyncEndpoints: + def test_submit_returns_202_with_session_fields(self, client): + with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"): + resp = client.post("/api/score/session_async", json={ + "session_id": "test-session-001", + "question": "What is CT?", + "answer": "CT is computed tomography.", + "metrics": ["answer_relevancy"], + }) + assert resp.status_code == 202 + data = resp.json() + assert data["session_id"] == "test-session-001" + assert "job_id" in data + assert "run_id" in data + assert data["status"] == "queued" + assert data["call_count"] >= 1 + + def test_run_id_deterministic_for_session(self, client): + with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"): + r1 = client.post("/api/score/session_async", json={ + "session_id": "det-session", + "question": "Q1", + "answer": "A1", + "metrics": ["answer_relevancy"], + }) + r2 = client.post("/api/score/session_async", json={ + "session_id": "det-session", + "question": "Q2", + "answer": "A2", + "metrics": ["answer_relevancy"], + }) + assert r1.json()["run_id"] == r2.json()["run_id"] + + def test_different_sessions_different_run_ids(self, client): + with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"): + r1 = client.post("/api/score/session_async", json={ + "session_id": "session-A", + "question": "Q", + "answer": "A", + "metrics": ["answer_relevancy"], + }) + r2 = client.post("/api/score/session_async", json={ + "session_id": "session-B", + "question": "Q", + "answer": "A", + "metrics": ["answer_relevancy"], + }) + assert r1.json()["run_id"] != r2.json()["run_id"] + + def test_call_count_increments_per_session(self, client): + with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"): + for _ in range(3): + client.post("/api/score/session_async", json={ + "session_id": "count-session", + "question": "Q", + "answer": "A", + "metrics": ["answer_relevancy"], + }) + time.sleep(0.05) + resp = client.get("/api/score/sessions/count-session") + assert resp.status_code == 200 + assert resp.json()["call_count"] == 3 + + def test_get_session_returns_jobs_list(self, client): + with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"): + client.post("/api/score/session_async", json={ + "session_id": "list-session", + "question": "Q", + "answer": "A", + "metrics": ["answer_relevancy"], + }) + time.sleep(0.05) + resp = client.get("/api/score/sessions/list-session") + assert resp.status_code == 200 + data = resp.json() + assert len(data["jobs"]) == 1 + + def test_get_unknown_session_returns_404(self, client): + resp = client.get("/api/score/sessions/no-such-session-xyz") + assert resp.status_code == 404 + + def test_get_session_job_by_id(self, client): + with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"): + resp = client.post("/api/score/session_async", json={ + "session_id": "job-lookup-session", + "question": "Q", + "answer": "A", + "metrics": ["answer_relevancy"], + }) + job_id = resp.json()["job_id"] + time.sleep(0.05) + get_resp = client.get(f"/api/score/session/jobs/{job_id}") + assert get_resp.status_code == 200 + assert get_resp.json()["job_id"] == job_id + + def test_get_unknown_job_returns_404(self, client): + resp = client.get("/api/score/session/jobs/nonexistent-job-id") + assert resp.status_code == 404 + + def test_missing_session_id_returns_422(self, client): + resp = client.post("/api/score/session_async", json={ + "question": "Q", + "answer": "A", + "metrics": ["answer_relevancy"], + }) + assert resp.status_code == 422 + + def test_list_sessions_endpoint(self, client): + with patch("webapp.services.session_score_manager.SessionScoreJobManager._run"): + client.post("/api/score/session_async", json={ + "session_id": "list-all-session", + "question": "Q", + "answer": "A", + "metrics": ["answer_relevancy"], + }) + resp = client.get("/api/score/sessions") + assert resp.status_code == 200 + assert "sessions" in resp.json() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _mock_request(): + """Build a minimal ScoreRequest for testing.""" + from webapp.models import ScoreRequest + return ScoreRequest( + question="What is dual-source CT?", + answer="It uses two X-ray sources.", + metrics=["answer_relevancy"], + ) diff --git a/webapp/api/score.py b/webapp/api/score.py index 93cd512..dbddd98 100644 --- a/webapp/api/score.py +++ b/webapp/api/score.py @@ -156,10 +156,11 @@ def score_sample( all_scores: dict[str, float | None] = {metric_name: None for metric_name in request.metrics} all_scores.update(raw_scores) - weighted = compute_weighted_score( - {key: value for key, value in raw_scores.items() if value is not None}, - {}, - ) + # 综合加权得分计算(已暂时禁用) + # weighted = compute_weighted_score( + # {key: value for key, value in raw_scores.items() if value is not None}, + # {}, + # ) logger.info( "[score] done latency=%dms skipped=%s scores=%s", @@ -169,7 +170,7 @@ def score_sample( ) return ScoreResponse( scores=all_scores, - weighted_score=round(weighted, 4) if weighted is not None else None, + weighted_score=None, # 综合加权得分已暂时禁用 latency_ms=latency_ms, skipped_metrics=skipped, ) diff --git a/webapp/api/session_score_jobs.py b/webapp/api/session_score_jobs.py new file mode 100644 index 0000000..d93b995 --- /dev/null +++ b/webapp/api/session_score_jobs.py @@ -0,0 +1,171 @@ +"""Routes for session-grouped async RAGAS scoring (Dify multi-call integration). + +Use case: Dify evaluates multiple Q&A pairs in a session. Each pair gets its own +`POST /api/score/session_async` call with a shared `session_id`. All results are +accumulated into one report, visible in 「运行列表」→「报告详情」. + +Key behaviour: + - Deterministic run_id: derived from session_id — same session always maps to the + same report directory (outputs/score-session/session-/). + - Append semantics: each call adds a new sample row. Previous rows are preserved. + - Advisor regeneration: optimization_advice.md is regenerated after every call + using the full set of accumulated rows. + - Each call returns its own `job_id` for individual status polling, plus the + shared `run_id` and `session_id`. + +Endpoints: + POST /api/score/session_async Submit one call (returns job_id + run_id) + GET /api/score/sessions List all sessions + GET /api/score/sessions/{session_id} Session aggregate (call_count, metric_means, jobs) + GET /api/score/session/jobs/{job_id} Status of one individual call +""" + +from __future__ import annotations + +import logging + +from fastapi import APIRouter, HTTPException + +from webapp.models import ( + AsyncScoreJobStatus, + ScoreRequest, + SessionScoreJobResponse, + SessionScoreRequest, + SessionStatus, +) +from webapp.services.session_score_manager import session_score_manager + +router = APIRouter(prefix="/api/score", tags=["score"]) +logger = logging.getLogger("webapp.api.session_score_jobs") + + +@router.post( + "/session_async", + status_code=202, + response_model=SessionScoreJobResponse, + summary="提交 Session 异步评分(多样本批量聚合)", + responses={ + 202: { + "description": ( + "调用已排队,立即返回 job_id + run_id(202 Accepted)。\n\n" + "相同 `session_id` 的多次调用合并为同一报告,每次调用新增一个样本行。\n" + "评分完成后,`summary.md` 和 `optimization_advice.md` 增量更新。\n" + "通过 `GET /api/score/sessions/{session_id}` 查看 session 聚合状态," + "通过 `GET /api/score/session/jobs/{job_id}` 查询单次调用状态," + "在「运行列表」中查看完整报告(run_id 即 `session-` 形式)。" + ), + "content": { + "application/json": { + "example": { + "job_id": "abc123def456", + "session_id": "dify-session-001", + "run_id": "session-dify-session-001", + "status": "queued", + "call_count": 1, + } + } + }, + }, + }, +) +def submit_session_async_score(request: SessionScoreRequest) -> SessionScoreJobResponse: + """提交 Session 异步 RAGAS 评分,立即返回 job_id。 + + 相同 `session_id` 的多次调用合并到同一评估报告中,每次调用: + 1. 新增一个样本行到 `scores.csv` + 2. 重写 `summary.md`(包含所有累积样本的指标均值) + 3. 重新生成 `optimization_advice.md`(基于全量样本的 LLM 优化建议) + + **适合 Dify 工作流**:在循环节点中批量调用,所有轮次共用同一 `session_id`, + 最终在 RAGAS 平台「运行列表」中查看完整的批量评估报告。 + """ + logger.info( + "[session_async] submit session_id=%s metrics=%s has_ctx=%s has_gt=%s", + request.session_id, + request.metrics, + bool(request.contexts), + bool(request.ground_truth), + ) + + # Strip session_id to build a plain ScoreRequest for the manager + score_request = ScoreRequest( + question=request.question, + answer=request.answer, + contexts=request.contexts, + ground_truth=request.ground_truth, + context_separator=request.context_separator, + metrics=request.metrics, + judge_model=request.judge_model, + embedding_model=request.embedding_model, + ) + + status, run_id = session_score_manager.submit(request.session_id, score_request) + + # Compute call_count from current session state + session_status = session_score_manager.get_session(request.session_id) + call_count = session_status.call_count if session_status else 1 + + logger.info( + "[session_async] queued job_id=%s session_id=%s run_id=%s call=%d", + status.job_id, request.session_id, run_id, call_count, + ) + return SessionScoreJobResponse( + job_id=status.job_id, + session_id=request.session_id, + run_id=run_id, + status=status.status, + call_count=call_count, + ) + + +@router.get( + "/sessions", + response_model=dict, + summary="列出所有 Session 聚合状态", +) +def list_sessions() -> dict: + """返回所有 session 的聚合状态,按最近完成时间倒序排列。""" + sessions = session_score_manager.list_sessions() + logger.info("[session_score] list_sessions count=%d", len(sessions)) + return {"sessions": [s.model_dump() for s in sessions]} + + +@router.get( + "/sessions/{session_id}", + response_model=SessionStatus, + summary="查询 Session 聚合状态(指标均值 + 所有调用记录)", + responses={404: {"description": "指定 session_id 不存在。"}}, +) +def get_session(session_id: str) -> SessionStatus: + """查询 session 的聚合评分状态。 + + 返回内容: + - `run_id`:在「运行列表」中查看完整报告 + - `call_count`:本 session 累计调用次数 + - `metric_means`:所有已累积样本的各指标均值(实时读取 scores.csv) + - `jobs`:本 session 所有调用记录列表 + """ + status = session_score_manager.get_session(session_id) + if status is None: + raise HTTPException(status_code=404, detail=f"Session not found: {session_id}") + return status + + +@router.get( + "/session/jobs/{job_id}", + response_model=AsyncScoreJobStatus, + summary="查询 Session 单次调用状态", + responses={404: {"description": "指定 job_id 不存在。"}}, +) +def get_session_job(job_id: str) -> AsyncScoreJobStatus: + """查询 session 评分中某次调用的状态和评分结果。 + + `status` 为 `completed` 时,`run_id` 即所属 session 的报告目录, + `scores` 包含本次调用的各指标得分。 + """ + status = session_score_manager.get_job(job_id) + if status is None: + raise HTTPException( + status_code=404, detail=f"Session score job not found: {job_id}" + ) + return status diff --git a/webapp/models.py b/webapp/models.py index 604fc80..e124102 100644 --- a/webapp/models.py +++ b/webapp/models.py @@ -531,6 +531,50 @@ class AsyncScoreJobResponse(BaseModel): ) +# --------------------------------------------------------------------------- +# Session async 评分模型 +# --------------------------------------------------------------------------- + +class SessionScoreRequest(ScoreRequest): + """Request body for session-grouped async scoring. + + All calls sharing the same session_id are accumulated into one report. + Each call adds a new sample row to the session's scores.csv. + """ + + session_id: str = Field( + description=( + "会话唯一标识符。相同 session_id 的多次调用合并为同一报告," + "每次调用新增一个样本行,指标均值和优化建议在每次调用后增量更新。" + ), + ) + + +class SessionScoreJobResponse(BaseModel): + """Immediate 202 response after submitting a session scoring call.""" + + job_id: str = Field(description="本次调用的任务唯一标识符。") + session_id: str = Field(description="会话标识符。") + run_id: str = Field(description="本 session 对应的报告 Run ID,可在「运行列表」中查看。") + status: str = Field(default="queued", description="初始状态:queued。") + call_count: int = Field(default=1, description="本 session 当前累计调用次数(包含本次)。") + + +class SessionStatus(BaseModel): + """Aggregate status and metrics for a scoring session.""" + + session_id: str = Field(description="会话标识符。") + run_id: str = Field(description="对应报告目录的 Run ID。") + call_count: int = Field(description="本 session 累计调用次数。") + metric_means: dict[str, float | None] = Field( + default_factory=dict, description="所有已累积样本的各指标均值。" + ) + latest_finished_at: str = Field(default="", description="最近一次评分完成时间(ISO 8601 UTC)。") + jobs: list[AsyncScoreJobStatus] = Field( + default_factory=list, description="本 session 所有调用记录,按创建时间排序。" + ) + + class AsyncScoreJobStatus(BaseModel): """State of one async score job (queued → running → completed/failed).""" diff --git a/webapp/server.py b/webapp/server.py index 5a9ad75..211545b 100644 --- a/webapp/server.py +++ b/webapp/server.py @@ -17,7 +17,7 @@ from fastapi.exceptions import RequestValidationError from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles -from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs +from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs, session_score_jobs STATIC_DIR = Path(__file__).resolve().parent / "static" logger = logging.getLogger("webapp.server") @@ -73,6 +73,10 @@ OPENAPI_TAGS = [ "**异步评分 API(Dify 推荐)** — `POST /api/score/async`\n\n" "异步方式立即返回 job_id(202),评分在后台执行,完成后自动生成完整报告(含优化建议)," "在「运行列表」页查看。\n\n" + "**Session 批量评分 API** — `POST /api/score/session_async`\n\n" + "适合 Dify 循环节点批量评估:同一 `session_id` 的多次调用合并为一个报告," + "每次调用新增一个样本行,指标均值和优化建议增量更新。\n" + "通过 `GET /api/score/sessions/{session_id}` 查看 session 聚合状态。\n\n" "通过 `GET /api/score/jobs` 列出所有异步评分记录," "`GET /api/score/jobs/{job_id}` 查询单个任务状态。\n\n" "**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 " @@ -111,6 +115,7 @@ def create_app() -> FastAPI: app.include_router(pipeline.router) app.include_router(score.router) app.include_router(score_jobs.router) + app.include_router(session_score_jobs.router) @app.middleware("http") async def access_log_middleware(request: Request, call_next): diff --git a/webapp/services/pipeline_task_manager.py b/webapp/services/pipeline_task_manager.py new file mode 100644 index 0000000..ced3ea6 --- /dev/null +++ b/webapp/services/pipeline_task_manager.py @@ -0,0 +1,257 @@ +"""Background task manager for end-to-end pipeline jobs (build + eval). + +Each job runs three sequential phases inside a worker thread: + 1. parsing_documents — AliyunDocmind parses every PDF + 2. generating_questions — LLM generates a draft question bank + 3. evaluating — RAGAS online evaluation scores each question + +The DatasetBuildJob and Scenario objects are constructed entirely from the +API request parameters, so no YAML config files are needed. +""" + +from __future__ import annotations + +import io +import threading +import uuid +from concurrent.futures import ThreadPoolExecutor +from contextlib import redirect_stderr, redirect_stdout +from datetime import datetime, timezone +from pathlib import Path + +from webapp.models import ( + PipelineJobRequest, + PipelineJobStatus, + PipelineResult, +) + +_REPO_ROOT = Path(__file__).resolve().parents[2] +_PIPELINE_OUTPUT_ROOT = _REPO_ROOT / "outputs" / "pipeline" + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +class _LineCapture(io.TextIOBase): + """Write-only stream that appends complete lines to a task's log buffer.""" + + def __init__(self, sink: "PipelineTask") -> None: + self._sink = sink + self._buffer = "" + + def write(self, text: str) -> int: + self._buffer += text + while "\n" in self._buffer: + line, self._buffer = self._buffer.split("\n", 1) + self._sink.append_log(line) + return len(text) + + def flush(self) -> None: + if self._buffer: + self._sink.append_log(self._buffer) + self._buffer = "" + + +class PipelineTask: + """Mutable state for one pipeline job (build + eval).""" + + def __init__(self, job_id: str, job_name: str) -> None: + self.job_id = job_id + self.job_name = job_name + self.status = "queued" + self.phase = "idle" + self.logs: list[str] = [] + self.result: PipelineResult | None = None + self.error: str | None = None + self.created_at = _now_iso() + self.finished_at = "" + self._lock = threading.Lock() + + def append_log(self, line: str) -> None: + with self._lock: + self.logs.append(line) + + def snapshot(self) -> PipelineJobStatus: + with self._lock: + return PipelineJobStatus( + job_id=self.job_id, + job_name=self.job_name, + status=self.status, + phase=self.phase, + logs=list(self.logs), + result=self.result, + error=self.error, + created_at=self.created_at, + finished_at=self.finished_at, + ) + + +class PipelineTaskManager: + """Owns the thread pool and registry of pipeline jobs.""" + + def __init__(self, max_workers: int = 2) -> None: + self._executor = ThreadPoolExecutor(max_workers=max_workers) + self._tasks: dict[str, PipelineTask] = {} + self._lock = threading.Lock() + + def submit(self, request: PipelineJobRequest) -> PipelineTask: + """Register and schedule a new pipeline job; return its task object.""" + job_id = uuid.uuid4().hex[:12] + job_name = request.job_name.strip() or f"pipeline-{job_id[:6]}" + task = PipelineTask(job_id=job_id, job_name=job_name) + with self._lock: + self._tasks[job_id] = task + self._executor.submit(self._run, task, request) + return task + + def get(self, job_id: str) -> PipelineJobStatus | None: + with self._lock: + task = self._tasks.get(job_id) + return task.snapshot() if task is not None else None + + def list_jobs(self) -> list[PipelineJobStatus]: + with self._lock: + tasks = list(self._tasks.values()) + snapshots = [t.snapshot() for t in tasks] + snapshots.sort(key=lambda s: s.created_at, reverse=True) + return snapshots + + # ------------------------------------------------------------------ # + # Worker + # ------------------------------------------------------------------ # + + def _run(self, task: PipelineTask, request: PipelineJobRequest) -> None: + """Execute the full pipeline end to end inside a worker thread.""" + task.status = "running" + task.append_log(f"[{_now_iso()}] 开始 pipeline 任务: {task.job_name}") + + capture = _LineCapture(task) + try: + with redirect_stdout(capture), redirect_stderr(capture): + result = self._execute(task, request) + capture.flush() + task.result = result + task.phase = "done" + task.status = "completed" + task.append_log(f"[{_now_iso()}] pipeline 任务完成: {task.job_name}") + except Exception as exc: # noqa: BLE001 + capture.flush() + task.error = f"{type(exc).__name__}: {exc}" + task.append_log(f"[{_now_iso()}] pipeline 任务失败: {task.error}") + task.status = "failed" + finally: + task.finished_at = _now_iso() + + def _execute(self, task: PipelineTask, req: PipelineJobRequest) -> PipelineResult: + """Run build then eval, updating task.phase as we go.""" + + # ── resolve paths ────────────────────────────────────────────── + docs_path = Path(req.docs_path) + if not docs_path.is_absolute(): + docs_path = (_REPO_ROOT / docs_path).resolve() + if not docs_path.is_dir(): + raise ValueError(f"docs_path is not an existing directory: {docs_path}") + + job_output_dir = _PIPELINE_OUTPUT_ROOT / task.job_id + build_artifact_dir = job_output_dir / "build" + dataset_csv = job_output_dir / "generated_dataset.csv" + eval_output_dir = job_output_dir / "eval" + + # ── phase 1 + 2: dataset build (parse & generate) ───────────── + task.phase = "parsing_documents" + task.append_log(f" [build] 扫描文档目录: {docs_path}") + build_result = self._run_build(task, req, docs_path, build_artifact_dir, dataset_csv) + + source_chunks_jsonl = build_artifact_dir / "latest" / "source_chunks.jsonl" + total_q = len(build_result.draft_samples) + parse_failures = len(build_result.parse_failures) + task.append_log(f" [build] 题库生成完毕: {total_q} 道题目, {parse_failures} 份文档解析失败") + + if total_q == 0: + raise RuntimeError("题库为空(所有文档均解析或生成失败),中止评估。") + + # ── phase 3: evaluation ──────────────────────────────────────── + task.phase = "evaluating" + task.append_log(f" [eval] 开始 RAGAS 评估,共 {total_q} 道题目") + eval_result = self._run_eval(task, req, dataset_csv, source_chunks_jsonl, eval_output_dir) + + from rag_eval.reporting.artifacts import build_artifact_paths as _build_eval_paths + eval_artifact_paths = _build_eval_paths(eval_result.scenario.output_dir, eval_result.run_id) + + return PipelineResult( + build_artifact_dir=build_artifact_dir.as_posix(), + dataset_csv=dataset_csv.as_posix(), + source_chunks_jsonl=source_chunks_jsonl.as_posix(), + total_questions=total_q, + parse_failures=parse_failures, + eval_run_id=eval_result.run_id, + eval_output_dir=eval_result.scenario.output_dir.as_posix(), + scores_csv=eval_artifact_paths.scores_csv.as_posix(), + summary_md=eval_artifact_paths.summary_md.as_posix(), + ) + + def _run_build(self, task: PipelineTask, req: PipelineJobRequest, + docs_path: Path, artifact_dir: Path, dataset_csv: Path): + """Construct DatasetBuildJob and run the build phase.""" + from rag_eval.dataset_builder.models import DatasetBuildJob, DatasetBuildRuntime + from rag_eval.dataset_builder.runner import execute_dataset_build_job + from rag_eval.settings import EvaluationSettings + + settings = EvaluationSettings() + job = DatasetBuildJob( + job_name=task.job_name, + input_path=docs_path, + input_glob="*.pdf", + parser_provider="aliyun_docmind", + failure_mode=req.failure_mode, # type: ignore[arg-type] + generation_model=req.generation_model, + output_type="online_question_bank", + review_mode="draft_with_manual_review", + max_questions_per_document=req.max_questions_per_document, + max_source_chunks_per_question=req.max_source_chunks_per_question, + dataset_path=dataset_csv, + artifact_dir=artifact_dir, + runtime=DatasetBuildRuntime(max_documents=req.max_documents), + ) + return execute_dataset_build_job(job, settings=settings) + + def _run_eval(self, task: PipelineTask, req: PipelineJobRequest, + dataset_csv: Path, source_chunks_jsonl: Path, eval_output_dir: Path): + """Construct Scenario and run the evaluation phase.""" + from rag_eval.execution.runner import run_scenario_from_scenario_obj + from rag_eval.settings import EvaluationSettings + from rag_eval.shared.models import ( + AppAdapterConfig, DatasetConfig, RuntimeConfig, Scenario, + ) + + settings = EvaluationSettings() + scenario = Scenario( + scenario_name=task.job_name, + mode="online", + dataset=DatasetConfig(path=dataset_csv), + judge_model=req.judge_model, + embedding_model=req.embedding_model, + metrics=list(req.metrics), + output_dir=eval_output_dir, + runtime=RuntimeConfig( + batch_size=4, + app_concurrency=2, + metric_concurrency=2, + max_samples=req.max_samples, + ), + app_adapter=AppAdapterConfig( + type="python", + callable="apps.siemens_pdf_qa.adapter:run", + static_kwargs={ + "source_chunks_path": source_chunks_jsonl, + "model": req.answer_model, + }, + ), + optimization_advisor=req.optimization_advisor, + ) + return run_scenario_from_scenario_obj(scenario, settings=settings) + + +# Module-level singleton shared by the FastAPI routes. +pipeline_task_manager = PipelineTaskManager() diff --git a/webapp/services/report_builder.py b/webapp/services/report_builder.py index 1082401..1f56f65 100644 --- a/webapp/services/report_builder.py +++ b/webapp/services/report_builder.py @@ -177,9 +177,11 @@ def build_report(run_dir: Path, metrics: list[str]) -> ReportData: w_means = _weighted_metric_means(score_rows_list, metrics, doc_weights) rounded_means = {metric: _round_or_none(value) for metric, value in w_means.items()} - overall_ws = compute_overall_weighted_score_mean( - score_rows_list, metric_weights, doc_weights - ) + # 综合加权得分计算(已暂时禁用) + # overall_ws = compute_overall_weighted_score_mean( + # score_rows_list, metric_weights, doc_weights + # ) + overall_ws = None distributions = { metric: _distribution(frame, metric) diff --git a/webapp/services/score_job_manager.py b/webapp/services/score_job_manager.py index 8805d5b..a3964a1 100644 --- a/webapp/services/score_job_manager.py +++ b/webapp/services/score_job_manager.py @@ -149,10 +149,12 @@ class ScoreJobManager: # Build full scores dict (skipped = None) all_scores: dict[str, float | None] = {m: None for m in request.metrics} all_scores.update(raw_scores) - weighted_raw = compute_weighted_score( - {k: v for k, v in raw_scores.items() if v is not None}, {} - ) - weighted = round(weighted_raw, 4) if weighted_raw is not None else None + # 综合加权得分计算(已暂时禁用) + # weighted_raw = compute_weighted_score( + # {k: v for k, v in raw_scores.items() if v is not None}, {} + # ) + # weighted = round(weighted_raw, 4) if weighted_raw is not None else None + weighted = None # Build a score row compatible with report_builder score_row: dict[str, Any] = { diff --git a/webapp/services/session_score_manager.py b/webapp/services/session_score_manager.py new file mode 100644 index 0000000..d145975 --- /dev/null +++ b/webapp/services/session_score_manager.py @@ -0,0 +1,452 @@ +"""Background task manager for session-grouped async RAGAS scoring. + +Each session groups multiple scoring calls into one shared run report: + + 1. First call: creates outputs/score-session/session-/ and metadata.json. + 2. Every call: appends a new sample row to scores.csv, rewrites summary.md + and optimization_advice.md by re-running write_run_artifacts + run_advisor + over ALL accumulated rows. + 3. The resulting run directory is picked up automatically by run_reader, so the + 「运行列表」 and 「报告详情」 pages show the live, growing report. + +Concurrency model: + - Scoring (LLM network I/O) runs freely in the thread pool — different sessions + score concurrently; multiple calls to the same session also start scoring in + parallel. + - File I/O (CSV append, artifact rewrite, advisor) is serialized per session via + a per-session threading.Lock, so no two calls corrupt the same session's CSV. +""" + +from __future__ import annotations + +import json +import re +import threading +import time +import uuid +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import pandas as pd + +from webapp.models import AsyncScoreJobStatus, ScoreRequest, SessionStatus + +_REPO_ROOT = Path(__file__).resolve().parents[2] +_DEFAULT_OUTPUT_DIR = _REPO_ROOT / "outputs" / "score-session" +_DEFAULT_INDEX_DIR = _REPO_ROOT / "outputs" / "score-session-jobs" + +# Columns that are sample metadata rather than metric scores (mirrors run_reader.NON_METRIC_COLUMNS) +_NON_METRIC_COLUMNS = { + "sample_id", "question", "contexts", "answer", "ground_truth", + "scenario", "language", "retrieval_config", "error", + "judge_model", "embedding_model", "run_id", "difficulty", + "question_type", "doc_id", "doc_name", "section_path", + "page_start", "page_end", "source_chunk_ids", "review_status", + "review_notes", "weighted_score", "sample_weight", +} + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _sanitize_session_id(session_id: str) -> str: + """Convert an arbitrary session_id string to a safe directory-name fragment.""" + return re.sub(r"[^a-zA-Z0-9]", "-", session_id)[:64].strip("-") or "default" + + +class SessionScoreJobManager: + """Thread-pool manager for session-grouped async scoring jobs. + + All calls sharing a session_id append to one shared run directory, so the + report detail page shows all samples and their aggregate metrics together. + """ + + def __init__( + self, + output_dir: Path = _DEFAULT_OUTPUT_DIR, + index_dir: Path = _DEFAULT_INDEX_DIR, + max_workers: int = 4, + ) -> None: + self._output_dir = Path(output_dir) + self._index_dir = Path(index_dir) + self._output_dir.mkdir(parents=True, exist_ok=True) + self._index_dir.mkdir(parents=True, exist_ok=True) + (self._index_dir / "_sessions").mkdir(parents=True, exist_ok=True) + self._executor = ThreadPoolExecutor(max_workers=max_workers) + + # job_id -> AsyncScoreJobStatus; guarded by _lock + self._job_cache: dict[str, AsyncScoreJobStatus] = {} + # session_id -> [job_ids in order]; guarded by _lock + self._session_jobs: dict[str, list[str]] = {} + # session_id -> per-session threading.Lock; guarded by _lock + self._session_locks: dict[str, threading.Lock] = {} + self._lock = threading.Lock() + + self._load_existing() + + # ------------------------------------------------------------------ # + # Public API + # ------------------------------------------------------------------ # + + def session_run_id(self, session_id: str) -> str: + """Return the deterministic run_id for a session (also the dir name).""" + return f"session-{_sanitize_session_id(session_id)}" + + def submit(self, session_id: str, request: ScoreRequest) -> tuple[AsyncScoreJobStatus, str]: + """Queue one scoring call for a session. + + Returns (job_status, run_id). run_id is deterministic from session_id. + """ + run_id = self.session_run_id(session_id) + job_id = uuid.uuid4().hex[:12] + + status = AsyncScoreJobStatus( + job_id=job_id, + status="queued", + created_at=_now_iso(), + request_summary={ + "question": (request.question or "")[:80], + "answer": (request.answer or "")[:80], + "metrics": list(request.metrics), + "judge_model": request.judge_model or "", + "embedding_model": request.embedding_model or "", + "has_contexts": bool(request.contexts), + "has_ground_truth": bool(request.ground_truth), + "session_id": session_id, + }, + ) + + with self._lock: + self._job_cache[job_id] = status + if session_id not in self._session_jobs: + self._session_jobs[session_id] = [] + self._session_jobs[session_id].append(job_id) + + self._persist_job_index(status) + self._persist_session_index(session_id) + self._executor.submit(self._run, job_id, session_id, run_id, request) + return status, run_id + + def get_job(self, job_id: str) -> AsyncScoreJobStatus | None: + """Return current status of one call, or None if unknown.""" + with self._lock: + return self._job_cache.get(job_id) + + def list_jobs(self) -> list[AsyncScoreJobStatus]: + """Return all session job records, newest first.""" + with self._lock: + jobs = list(self._job_cache.values()) + jobs.sort(key=lambda j: j.created_at, reverse=True) + return jobs + + def get_session(self, session_id: str) -> SessionStatus | None: + """Return aggregate status for a session, or None if unknown.""" + with self._lock: + job_ids = list(self._session_jobs.get(session_id) or []) + if not job_ids: + return None + + run_id = self.session_run_id(session_id) + run_dir = self._output_dir / run_id + + # Compute live metric means from the CSV (may be mid-update — best effort) + metric_means = self._read_metric_means(run_dir) + + with self._lock: + jobs = [self._job_cache[jid] for jid in job_ids if jid in self._job_cache] + + latest = max((j.finished_at for j in jobs if j.finished_at), default="") + return SessionStatus( + session_id=session_id, + run_id=run_id, + call_count=len(job_ids), + metric_means=metric_means, + latest_finished_at=latest, + jobs=sorted(jobs, key=lambda j: j.created_at), + ) + + def list_sessions(self) -> list[SessionStatus]: + """Return aggregate status for all known sessions.""" + with self._lock: + session_ids = list(self._session_jobs.keys()) + results = [] + for sid in session_ids: + status = self.get_session(sid) + if status is not None: + results.append(status) + results.sort(key=lambda s: s.latest_finished_at, reverse=True) + return results + + # ------------------------------------------------------------------ # + # Worker + # ------------------------------------------------------------------ # + + def _run(self, job_id: str, session_id: str, run_id: str, request: ScoreRequest) -> None: + """Score one sample then append it to the session's shared run artifacts.""" + import logging + logger = logging.getLogger("webapp.services.session_score_manager") + self._update_job(job_id, status="running") + + # Lazy imports — keep web server bootable if ragas is not installed. + from rag_eval.advisor import run_advisor + from rag_eval.metrics.factory import build_models + from rag_eval.metrics.weights import compute_weighted_score + from rag_eval.reporting.writers import write_run_artifacts + from rag_eval.settings import EvaluationSettings + from rag_eval.shared.models import ( + DatasetConfig, EvaluationResult, NormalizedSample, + RuntimeConfig, Scenario, + ) + from rag_eval.shared.utils import utc_now_iso + from webapp.services.inline_scorer import inline_scorer + + settings = EvaluationSettings() + judge_model = request.judge_model or settings.ragas_judge_model + embedding_model = request.embedding_model or settings.ragas_embedding_model + effective = request.effective_metrics() + requested = set(request.metrics) + skipped = sorted(requested - set(effective)) + + t0 = time.monotonic() + + try: + # --- Scoring (can run concurrently for the same session) ---------- + if effective: + raw_scores = inline_scorer.score( + question=request.question, + answer=request.answer, + contexts=request.contexts_as_list(), + ground_truth=request.ground_truth, + metrics=effective, + judge_model=judge_model, + embedding_model=embedding_model, + settings=settings, + ) + else: + raw_scores = {} + + latency_ms = int((time.monotonic() - t0) * 1000) + finished_at = utc_now_iso() + + # Build complete scores for this sample (skipped metrics → None) + all_scores: dict[str, float | None] = {m: None for m in request.metrics} + all_scores.update(raw_scores) + + # 综合加权得分计算(已暂时禁用) + # weighted_raw = compute_weighted_score( + # {k: v for k, v in raw_scores.items() if v is not None}, {} + # ) + # weighted = round(weighted_raw, 4) if weighted_raw is not None else None + weighted = None + + # --- File I/O must be serialized per session ---------------------- + session_lock = self._get_session_lock(session_id) + with session_lock: + run_dir = self._output_dir / run_id + run_dir.mkdir(parents=True, exist_ok=True) + + # Read all existing rows, then append the new one + existing_rows = self._read_score_rows(run_dir) + call_number = len(existing_rows) + 1 + + new_row: dict[str, Any] = { + "sample_id": f"session-score-{call_number}", + "question": request.question, + "answer": request.answer or "", + "contexts": request.contexts or "", + "ground_truth": request.ground_truth or "", + "error": "", + } + new_row.update(all_scores) + + all_rows = existing_rows + [new_row] + + # Reconstruct NormalizedSample objects for write_run_artifacts metadata + valid_samples = [ + NormalizedSample( + sample_id=str(row.get("sample_id", f"session-score-{i + 1}")), + question=str(row.get("question", "")), + answer=str(row.get("answer", "")), + contexts=[ + part.strip() + for part in str(row.get("contexts", "")).split(" |||| ") + if part.strip() + ], + ground_truth=str(row.get("ground_truth", "")), + ) + for i, row in enumerate(all_rows) + ] + + # Determine all metric columns (union of all rows' metric keys) + all_metric_names = sorted({ + k for row in all_rows + for k in row if k not in _NON_METRIC_COLUMNS + }) + + scenario = Scenario( + scenario_name=f"session-{_sanitize_session_id(session_id)}", + mode="offline", + dataset=DatasetConfig(path=run_dir / "dataset.csv"), + judge_model=judge_model, + embedding_model=embedding_model, + metrics=all_metric_names, + output_dir=self._output_dir, + optimization_advisor=True, + ) + + started_at_val = ( + existing_rows[0].get("_started_at", finished_at) + if existing_rows else finished_at + ) + + result = EvaluationResult( + scenario=scenario, + run_id=run_id, + started_at=started_at_val if isinstance(started_at_val, str) else finished_at, + finished_at=finished_at, + valid_samples=valid_samples, + invalid_samples=[], + score_rows=all_rows, + ) + + write_run_artifacts(result) + logger.info( + "[session_job] artifacts written job_id=%s session_id=%s call=%d", + job_id, session_id, call_number, + ) + + # Regenerate optimization advice over all accumulated rows + try: + llm, _ = build_models(judge_model, embedding_model, settings) + run_advisor(result, scenario, llm) + logger.info("[session_job] advisor done job_id=%s session=%s", job_id, session_id) + except Exception as adv_exc: # noqa: BLE001 + logger.warning( + "[session_job] advisor failed job_id=%s err=%s", job_id, adv_exc + ) + + self._update_job( + job_id, + status="completed", + finished_at=finished_at, + run_id=run_id, + scores=all_scores, + weighted_score=weighted, + latency_ms=latency_ms, + skipped_metrics=skipped, + ) + self._persist_session_index(session_id) + + except Exception as exc: # noqa: BLE001 + latency_ms = int((time.monotonic() - t0) * 1000) + import logging as _logging + _logging.getLogger("webapp.services.session_score_manager").error( + "[session_job] failed job_id=%s err=%s", job_id, exc + ) + self._update_job( + job_id, + status="failed", + finished_at=_now_iso(), + latency_ms=latency_ms, + error=f"{type(exc).__name__}: {exc}", + ) + + # ------------------------------------------------------------------ # + # Helpers + # ------------------------------------------------------------------ # + + def _get_session_lock(self, session_id: str) -> threading.Lock: + with self._lock: + if session_id not in self._session_locks: + self._session_locks[session_id] = threading.Lock() + return self._session_locks[session_id] + + def _read_score_rows(self, run_dir: Path) -> list[dict[str, Any]]: + """Read existing scores.csv rows, returning empty list if file doesn't exist.""" + scores_path = run_dir / "scores.csv" + if not scores_path.is_file(): + return [] + try: + frame = pd.read_csv(scores_path) + return frame.where(pd.notnull(frame), None).to_dict("records") + except (OSError, ValueError): + return [] + + def _read_metric_means(self, run_dir: Path) -> dict[str, float | None]: + """Compute per-metric means from the session's scores.csv.""" + scores_path = run_dir / "scores.csv" + if not scores_path.is_file(): + return {} + try: + frame = pd.read_csv(scores_path) + except (OSError, ValueError): + return {} + means: dict[str, float | None] = {} + for col in frame.columns: + if col in _NON_METRIC_COLUMNS: + continue + if pd.api.types.is_numeric_dtype(frame[col]): + val = frame[col].mean(numeric_only=True) + means[col] = None if pd.isna(val) else round(float(val), 4) + return means + + def _update_job(self, job_id: str, **kwargs: Any) -> None: + with self._lock: + existing = self._job_cache.get(job_id) + if existing is None: + return + updated = existing.model_copy(update=kwargs) + self._job_cache[job_id] = updated + self._persist_job_index(updated) + + def _persist_job_index(self, status: AsyncScoreJobStatus) -> None: + """Persist a single job's status to the index directory.""" + path = self._index_dir / f"{status.job_id}.json" + path.write_text( + json.dumps(status.model_dump(), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + def _persist_session_index(self, session_id: str) -> None: + """Persist the session→job_ids mapping.""" + with self._lock: + job_ids = list(self._session_jobs.get(session_id) or []) + run_id = self.session_run_id(session_id) + data = {"session_id": session_id, "run_id": run_id, "job_ids": job_ids} + path = self._index_dir / "_sessions" / f"{_sanitize_session_id(session_id)}.json" + path.write_text( + json.dumps(data, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + def _load_existing(self) -> None: + """Restore job cache and session mappings from persisted index files on startup.""" + # Load individual job files + for path in sorted(self._index_dir.glob("*.json")): + try: + data = json.loads(path.read_text(encoding="utf-8")) + status = AsyncScoreJobStatus.model_validate(data) + self._job_cache[status.job_id] = status + except Exception: # noqa: BLE001 + pass + + # Load session→job_ids mappings + sessions_dir = self._index_dir / "_sessions" + if not sessions_dir.is_dir(): + return + for path in sorted(sessions_dir.glob("*.json")): + try: + data = json.loads(path.read_text(encoding="utf-8")) + sid = data.get("session_id", "") + job_ids = data.get("job_ids", []) + if sid: + self._session_jobs[sid] = job_ids + except Exception: # noqa: BLE001 + pass + + +# Module-level singleton shared by FastAPI routes. +session_score_manager = SessionScoreJobManager() diff --git a/webapp/static/js/report.js b/webapp/static/js/report.js index abb7320..a76a7ab 100644 --- a/webapp/static/js/report.js +++ b/webapp/static/js/report.js @@ -128,17 +128,17 @@ const Report = { wrap.appendChild(card); }); - // 综合加权得分卡片 - const wsValue = (report && report.weighted_score_mean !== undefined) ? report.weighted_score_mean : null; - const wsCard = document.createElement("div"); - wsCard.className = "metric-card weighted-score-card"; - const wsCls = App.scoreClass(wsValue); - const wsText = wsValue === null || wsValue === undefined ? "n/a" : wsValue.toFixed(2); - wsCard.innerHTML = ` -
${wsText}
-
综合加权得分
- `; - wrap.appendChild(wsCard); + // 综合加权得分卡片(已暂时隐藏) + // const wsValue = (report && report.weighted_score_mean !== undefined) ? report.weighted_score_mean : null; + // const wsCard = document.createElement("div"); + // wsCard.className = "metric-card weighted-score-card"; + // const wsCls = App.scoreClass(wsValue); + // const wsText = wsValue === null || wsValue === undefined ? "n/a" : wsValue.toFixed(2); + // wsCard.innerHTML = ` + //
${wsText}
+ //
综合加权得分
+ // `; + // wrap.appendChild(wsCard); }, // ② 分数分布直方图(可切换指标)。 diff --git a/webapp/static/js/score_jobs.js b/webapp/static/js/score_jobs.js index cfe93e9..18f5d9c 100644 --- a/webapp/static/js/score_jobs.js +++ b/webapp/static/js/score_jobs.js @@ -55,10 +55,11 @@ const ScoreJobs = { return `${App.escape(App.shortMetric(k))} ${text}`; }) .join(" "); - if (job.weighted_score !== null && job.weighted_score !== undefined) { - const cls = App.scoreClass(job.weighted_score); - scoreHtml += ` 综合 ${Number(job.weighted_score).toFixed(3)}`; - } + // 综合加权得分(已暂时隐藏) + // if (job.weighted_score !== null && job.weighted_score !== undefined) { + // const cls = App.scoreClass(job.weighted_score); + // scoreHtml += ` 综合 ${Number(job.weighted_score).toFixed(3)}`; + // } } else if (job.status === "failed") { scoreHtml = `${App.escape((job.error || "").slice(0, 80))}`; } else { diff --git a/webserver.log b/webserver.log new file mode 100644 index 0000000..399e996 --- /dev/null +++ b/webserver.log @@ -0,0 +1,17 @@ +INFO: Started server process [82284] +INFO: Waiting for application startup. +INFO: Application startup complete. +INFO: Uvicorn running on http://127.0.0.1:8811 (Press CTRL+C to quit) +INFO: 127.0.0.1:56164 - "GET /api/health HTTP/1.1" 200 OK +INFO: 127.0.0.1:53350 - "GET / HTTP/1.1" 200 OK +INFO: 127.0.0.1:53351 - "GET /api/runs HTTP/1.1" 200 OK +INFO: 127.0.0.1:53352 - "GET /api/scenarios HTTP/1.1" 200 OK +INFO: 127.0.0.1:64689 - "GET /api/runs/2026-06-15T08-30-00%2B00-00 HTTP/1.1" 200 OK +INFO: 127.0.0.1:64700 - "POST /api/evaluations HTTP/1.1" 200 OK +INFO: 127.0.0.1:64703 - "GET /api/evaluations/a3243f2443d7 HTTP/1.1" 200 OK +INFO: 127.0.0.1:58440 - "GET /api/evaluations/a3243f2443d7 HTTP/1.1" 200 OK +INFO: 127.0.0.1:64454 - "GET /static/css/app.css HTTP/1.1" 200 OK +INFO: 127.0.0.1:64455 - "GET /static/js/api.js HTTP/1.1" 200 OK +INFO: 127.0.0.1:56825 - "GET /static/js/app.js HTTP/1.1" 200 OK +INFO: 127.0.0.1:56829 - "GET /static/js/report.js HTTP/1.1" 200 OK +INFO: 127.0.0.1:56830 - "GET /static/js/runner.js HTTP/1.1" 200 OK