Files
wangwei 75ae7927ad Add Siemens CT document evaluation scenario (three-step pipeline)
- scenarios/siemens_build/siemens-pdf-build.yaml: dataset build for all 17
  Siemens medical-imaging PDFs (aliyun_docmind parser, 10 questions/doc,
  failure_mode=skip, ~170 question total)
- scenarios/offline/siemens-pdf-offline-smoke.yaml: offline evaluation using
  source chunks as contexts and ground_truth as answer (up to 30 samples)
- scenarios/online/siemens-pdf-question-bank-online.yaml: online evaluation
  calling siemens_pdf_qa adapter, batch_size=4, up to 50 samples
- apps/siemens_pdf_qa/adapter.py: Siemens-specific adapter with bilingual
  (zh/en) system prompt and strict evidence-grounding for CT domain
- scripts/build_siemens_offline_smoke.py: helper to derive offline smoke CSV
  from completed dataset build artifacts (run after dataset build step)
- docs/superpowers/specs/2026-06-15-siemens-scenario-design.md: design spec

All three scenarios are automatically discovered by the web console.

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
2026-06-15 17:00:52 +08:00

171 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Online evaluation adapter for the Siemens medical-imaging PDF question bank.
Functionally identical to apps/pdf_question_bank/adapter.py but uses a
Siemens-specific system prompt that:
- Instructs the model to answer in the same language as the question
(important for Chinese CT documentation).
- Emphasises citation of source chunks and refusal when evidence is absent.
- Adds domain context (medical imaging / CT terminology).
The adapter contract is the same as all other adapters:
run(question, **kwargs) -> {"answer": str, "contexts": [str], "raw_response": {}}
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from openai import OpenAI
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.utils import parse_contexts
# ── chunk cache (module-level, lives for the process lifetime) ────────────────
_CHUNK_CACHE: dict[Path, dict[str, dict[str, Any]]] = {}
def _resolve_source_chunks_path(source_chunks_path: str) -> Path:
"""Resolve the chunk artifact path; fall back to the latest timestamped run."""
resolved = Path(source_chunks_path).resolve()
if resolved.exists():
return resolved
if resolved.parent.name != "latest":
raise FileNotFoundError(resolved)
artifact_root = resolved.parent.parent
if not artifact_root.exists():
raise FileNotFoundError(resolved)
candidates = sorted(
[d for d in artifact_root.iterdir() if d.is_dir() and d.name != "latest"],
key=lambda p: p.name,
reverse=True,
)
for run_dir in candidates:
candidate = run_dir / resolved.name
if candidate.exists():
return candidate
raise FileNotFoundError(resolved)
def _load_source_chunks(source_chunks_path: str) -> dict[str, dict[str, Any]]:
"""Load and cache source chunks by chunk_id."""
resolved = _resolve_source_chunks_path(source_chunks_path)
cached = _CHUNK_CACHE.get(resolved)
if cached is not None:
return cached
lookup: dict[str, dict[str, Any]] = {}
with resolved.open(encoding="utf-8") as fh:
for lineno, line in enumerate(fh, 1):
text = line.strip()
if not text:
continue
payload = json.loads(text)
chunk_id = str(payload.get("chunk_id", "")).strip()
if not chunk_id:
raise ValueError(f"source_chunks.jsonl row {lineno} missing chunk_id: {resolved}")
lookup[chunk_id] = payload
_CHUNK_CACHE[resolved] = lookup
return lookup
def _resolve_chunk_ids(raw: Any) -> list[str]:
"""Parse the source_chunk_ids column into a list of non-empty id strings."""
ids = parse_contexts(raw)
normalized = [i for i in ids if i]
if not normalized:
raise ValueError("source_chunk_ids is required for siemens_pdf_qa adapter.")
return normalized
def _build_messages(
question: str,
contexts: list[str],
metadata: dict[str, Any],
) -> list[dict[str, str]]:
"""Build a Siemens-domain grounded prompt for the answer model."""
evidence_lines = [f"[chunk {i}] {ctx}" for i, ctx in enumerate(contexts, 1)]
meta_lines = [
f"doc_name: {metadata.get('doc_name', '')}",
f"section_path: {metadata.get('section_path', '')}",
f"page_range: {metadata.get('page_start', '')}{metadata.get('page_end', '')}",
]
# Siemens-specific system prompt: bilingual awareness, medical domain, strict grounding
system_prompt = (
"你是西门子医疗影像知识库的问答助手Siemens Healthineers CT Knowledge Base QA"
"请严格根据下方【证据片段】回答问题,不得使用片段之外的任何知识。"
"若证据不足以回答,请明确说明「根据现有资料无法回答」。"
"请用与问题相同的语言(中文或英文)作答,简洁准确,必要时引用片段编号。"
)
user_prompt = "\n".join([
"【问题】",
question,
"",
"【文档元信息】",
*meta_lines,
"",
"【证据片段】",
*evidence_lines,
"",
"请基于以上证据片段作答。",
])
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
def run(
question: str,
*,
source_chunks_path: str,
model: str | None = None,
client: OpenAI | None = None,
**kwargs: Any,
) -> dict[str, Any]:
"""Answer one question by resolving cited chunks and calling an OpenAI-compatible model.
This is the adapter contract entry point used by the online evaluation runner.
"""
chunk_ids = _resolve_chunk_ids(kwargs.get("source_chunk_ids"))
chunk_lookup = _load_source_chunks(source_chunks_path)
missing = [cid for cid in chunk_ids if cid not in chunk_lookup]
if missing:
raise ValueError("source_chunk_ids not found in artifact: " + ", ".join(missing))
resolved_chunks = [chunk_lookup[cid] for cid in chunk_ids]
contexts = [
str(chunk.get("text", "")).strip()
for chunk in resolved_chunks
if str(chunk.get("text", "")).strip()
]
if not contexts:
raise ValueError("resolved source chunks contain no usable text.")
settings = EvaluationSettings()
target_model = (model or settings.ragas_judge_model).strip()
if not target_model:
raise ValueError("A model name is required for siemens_pdf_qa adapter.")
llm_client = client or OpenAI(**settings.openai_client_kwargs)
completion = llm_client.chat.completions.create(
model=target_model,
messages=_build_messages(question, contexts, kwargs),
temperature=0,
)
answer = str(completion.choices[0].message.content or "").strip()
return {
"answer": answer,
"contexts": contexts,
"raw_response": {
"resolved_chunk_ids": chunk_ids,
"doc_id": kwargs.get("doc_id", ""),
"doc_name": kwargs.get("doc_name", ""),
"model": target_model,
"response_text": answer,
},
}