first commit

This commit is contained in:
2026-06-12 14:02:15 +08:00
commit 9cbdc1d95d
69 changed files with 9486 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
"""Dataset build workflow for converting PDFs into reviewable online question banks."""
from .runner import run_dataset_build
__all__ = ["run_dataset_build"]

View File

@@ -0,0 +1,5 @@
"""Question generation components for draft online datasets."""
from .question_generator import OpenAIQuestionGenerator, QuestionGenerator
__all__ = ["OpenAIQuestionGenerator", "QuestionGenerator"]

View File

@@ -0,0 +1,173 @@
"""LLM-backed question generator for dataset build jobs."""
from __future__ import annotations
import json
from abc import ABC, abstractmethod
from typing import Any
from openai import OpenAI
from rag_eval.dataset_builder.models import DraftQuestionSample, ParsedDocument, SourceChunk
from rag_eval.settings import EvaluationSettings
class QuestionGenerator(ABC):
"""Abstract interface for generating draft questions from parsed documents."""
@abstractmethod
def generate(
self,
document: ParsedDocument,
*,
max_questions: int,
max_chunks_per_question: int,
job_name: str,
) -> list[DraftQuestionSample]:
"""Generate draft question samples for one parsed document."""
raise NotImplementedError
class OpenAIQuestionGenerator(QuestionGenerator):
"""Generate draft questions with an OpenAI-compatible chat completion API."""
def __init__(self, settings: EvaluationSettings, model: str, client: OpenAI | None = None):
"""Initialize the OpenAI-compatible client and target generation model."""
if not settings.openai_api_key:
raise EnvironmentError("OPENAI_API_KEY must be set before generating draft questions.")
self.client = client or OpenAI(**settings.openai_client_kwargs)
self.model = model
def _build_prompt(
self,
document: ParsedDocument,
*,
max_questions: int,
max_chunks_per_question: int,
) -> str:
"""Build a constrained JSON-generation prompt for one document."""
chunk_lines: list[str] = []
for chunk in document.source_chunks:
chunk_lines.append(
json.dumps(
{
"chunk_id": chunk.chunk_id,
"section_path": chunk.section_path,
"page_start": chunk.page_start,
"page_end": chunk.page_end,
"text": chunk.text,
},
ensure_ascii=False,
)
)
instructions = {
"task": "Generate reviewable online evaluation draft questions from one document only.",
"rules": [
"Return JSON only.",
f"Generate at most {max_questions} samples.",
f"Each sample may cite at most {max_chunks_per_question} chunk ids.",
"Every sample must stay within this document and use existing chunk ids only.",
"Allowed question_type values: fact, summary, procedure, comparison.",
"Allowed difficulty values: easy, medium, hard.",
],
"output_schema": {
"samples": [
{
"question": "string",
"ground_truth": "string",
"source_chunk_ids": ["chunk-id"],
"question_type": "fact|summary|procedure|comparison",
"difficulty": "easy|medium|hard",
}
]
},
"document": {
"doc_id": document.doc_id,
"doc_name": document.doc_name,
"chunks": chunk_lines,
},
}
return json.dumps(instructions, ensure_ascii=False, indent=2)
def _build_sample(
self,
*,
document: ParsedDocument,
payload: dict[str, Any],
index: int,
job_name: str,
) -> DraftQuestionSample:
"""Convert one model output object into the internal draft sample model."""
chunk_lookup: dict[str, SourceChunk] = {item.chunk_id: item for item in document.source_chunks}
source_chunk_ids = [str(item).strip() for item in payload.get("source_chunk_ids") or [] if str(item).strip()]
chunks = [chunk_lookup[item] for item in source_chunk_ids if item in chunk_lookup]
section_path = chunks[0].section_path if chunks else ""
page_start = min((chunk.page_start for chunk in chunks), default=0)
page_end = max((chunk.page_end for chunk in chunks), default=0)
language = "zh" if any("\u4e00" <= char <= "\u9fff" for char in payload.get("question", "")) else "en"
return DraftQuestionSample(
sample_id=f"{document.doc_id}-q{index}",
question=str(payload.get("question", "")).strip(),
ground_truth=str(payload.get("ground_truth", "")).strip(),
scenario=job_name,
language=language,
doc_id=document.doc_id,
doc_name=document.doc_name,
section_path=section_path,
page_start=page_start,
page_end=page_end,
source_chunk_ids=source_chunk_ids,
question_type=str(payload.get("question_type", "fact")).strip() or "fact",
difficulty=str(payload.get("difficulty", "medium")).strip() or "medium",
)
@staticmethod
def _parse_response_payload(content: str) -> list[dict[str, Any]]:
"""Parse the model response into a list of sample payload dictionaries."""
try:
payload = json.loads(content or "{}")
except json.JSONDecodeError as exc:
raise ValueError("Question generator returned invalid JSON.") from exc
if not isinstance(payload, dict):
raise ValueError("Question generator response must be a JSON object.")
samples = payload.get("samples") or []
if not isinstance(samples, list):
raise ValueError("Question generator response field 'samples' must be a list.")
normalized_samples: list[dict[str, Any]] = []
for item in samples:
if isinstance(item, dict):
normalized_samples.append(item)
return normalized_samples
def generate(
self,
document: ParsedDocument,
*,
max_questions: int,
max_chunks_per_question: int,
job_name: str,
) -> list[DraftQuestionSample]:
"""Generate draft questions for one parsed document."""
prompt = self._build_prompt(
document,
max_questions=max_questions,
max_chunks_per_question=max_chunks_per_question,
)
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You generate structured draft question banks from source documents."},
{"role": "user", "content": prompt},
],
response_format={"type": "json_object"},
)
content = response.choices[0].message.content or "{}"
payload = self._parse_response_payload(content)
return [
self._build_sample(document=document, payload=item, index=index, job_name=job_name)
for index, item in enumerate(payload[:max_questions], start=1)
]

View File

@@ -0,0 +1,87 @@
"""Validation and deduplication helpers for generated draft question samples."""
from __future__ import annotations
import re
from difflib import SequenceMatcher
from rag_eval.dataset_builder.models import DraftQuestionSample, ParsedDocument
ALLOWED_QUESTION_TYPES = {"fact", "summary", "procedure", "comparison"}
ALLOWED_DIFFICULTIES = {"easy", "medium", "hard"}
def validate_draft_sample(
sample: DraftQuestionSample,
*,
document: ParsedDocument,
max_source_chunks_per_question: int | None = None,
) -> list[str]:
"""Validate one generated sample against the document and enum constraints."""
errors: list[str] = []
if not sample.question.strip():
errors.append("question is empty")
if not sample.ground_truth.strip():
errors.append("ground_truth is empty")
if not sample.source_chunk_ids:
errors.append("source_chunk_ids is empty")
if (
max_source_chunks_per_question is not None
and len(sample.source_chunk_ids) > max_source_chunks_per_question
):
errors.append(
f"source_chunk_ids exceeds limit: {len(sample.source_chunk_ids)} > {max_source_chunks_per_question}"
)
existing_chunk_ids = {chunk.chunk_id for chunk in document.source_chunks}
for chunk_id in sample.source_chunk_ids:
if chunk_id not in existing_chunk_ids:
errors.append(f"unknown source chunk: {chunk_id}")
if sample.doc_id != document.doc_id:
errors.append("sample doc_id does not match source document")
if sample.question_type not in ALLOWED_QUESTION_TYPES:
errors.append(f"unsupported question_type: {sample.question_type}")
if sample.difficulty not in ALLOWED_DIFFICULTIES:
errors.append(f"unsupported difficulty: {sample.difficulty}")
return errors
def normalize_question_text(text: str) -> str:
"""Normalize question text for exact-match deduplication."""
return re.sub(r"\s+", " ", text).strip().lower()
def dedupe_samples(samples: list[DraftQuestionSample]) -> list[DraftQuestionSample]:
"""Drop duplicate questions and enforce one output per chunk group per document."""
deduped: list[DraftQuestionSample] = []
seen_questions: set[tuple[str, str]] = set()
seen_chunk_groups: set[tuple[str, tuple[str, ...]]] = set()
seen_chunk_answers: list[tuple[str, tuple[str, ...], str]] = []
for sample in samples:
question_key = (sample.doc_id, normalize_question_text(sample.question))
if question_key in seen_questions:
continue
chunk_key = tuple(sample.source_chunk_ids)
chunk_group_key = (sample.doc_id, chunk_key)
if chunk_group_key in seen_chunk_groups:
continue
answer_key = normalize_question_text(sample.ground_truth)
duplicate = False
for existing_doc_id, existing_chunk_key, existing_answer in seen_chunk_answers:
if existing_doc_id != sample.doc_id or existing_chunk_key != chunk_key:
continue
if SequenceMatcher(None, existing_answer, answer_key).ratio() >= 0.9:
duplicate = True
break
if duplicate:
continue
seen_questions.add(question_key)
seen_chunk_groups.add(chunk_group_key)
seen_chunk_answers.append((sample.doc_id, chunk_key, answer_key))
deduped.append(sample)
return deduped

View File

@@ -0,0 +1,203 @@
"""Internal data models for the PDF-to-dataset build workflow."""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Literal
ReviewStatus = Literal["draft", "approved", "rejected", "needs_edit"]
QuestionType = Literal["fact", "summary", "procedure", "comparison"]
Difficulty = Literal["easy", "medium", "hard"]
FailureMode = Literal["fail", "skip"]
@dataclass(slots=True)
class DatasetBuildRuntime:
"""Runtime controls for one dataset build job."""
max_documents: int | None = None
@dataclass(slots=True)
class DatasetBuildJob:
"""Resolved dataset build configuration consumed by the build runner."""
job_name: str
input_path: Path
input_glob: str
parser_provider: str
failure_mode: FailureMode
generation_model: str
output_type: str
review_mode: str
max_questions_per_document: int
max_source_chunks_per_question: int
dataset_path: Path
artifact_dir: Path
runtime: DatasetBuildRuntime = field(default_factory=DatasetBuildRuntime)
source_path: Path | None = None
def snapshot(self) -> dict[str, Any]:
"""Serialize the job into JSON-friendly metadata."""
payload = asdict(self)
payload["input_path"] = self.input_path.as_posix()
payload["dataset_path"] = self.dataset_path.as_posix()
payload["artifact_dir"] = self.artifact_dir.as_posix()
if self.source_path is not None:
payload["source_path"] = self.source_path.as_posix()
return payload
@dataclass(slots=True)
class StructureNode:
"""One normalized structure heading extracted from layout results."""
node_id: str
level: int
title: str
page_start: int
page_end: int
section_path: str
@dataclass(slots=True)
class SemanticBlock:
"""One merged semantic block used as an intermediate artifact before chunking."""
block_id: str
doc_id: str
doc_name: str
text: str
page_start: int
page_end: int
section_path: str
section_title: str
source_layout_ids: list[str]
def to_record(self) -> dict[str, Any]:
"""Convert the block into a flat artifact record."""
return asdict(self)
@dataclass(slots=True)
class SourceChunk:
"""Evidence chunk used for question generation and human review."""
chunk_id: str
doc_id: str
doc_name: str
text: str
page_start: int
page_end: int
section_path: str
section_title: str
source_layout_ids: list[str]
def to_record(self) -> dict[str, Any]:
"""Convert the chunk into a flat artifact record."""
return asdict(self)
@dataclass(slots=True)
class ParsedDocument:
"""Normalized parsed document ready for question generation."""
doc_id: str
doc_name: str
raw_text: str
structure_nodes: list[StructureNode]
semantic_blocks: list[SemanticBlock]
source_chunks: list[SourceChunk]
metadata: dict[str, Any] = field(default_factory=dict)
def to_record(self) -> dict[str, Any]:
"""Convert the parsed document into a summary artifact record."""
return {
"doc_id": self.doc_id,
"doc_name": self.doc_name,
"raw_text": self.raw_text,
"structure_nodes": [asdict(item) for item in self.structure_nodes],
"metadata": self.metadata,
"semantic_block_count": len(self.semantic_blocks),
"source_chunk_count": len(self.source_chunks),
}
@dataclass(slots=True)
class DraftQuestionSample:
"""One draft online evaluation sample pending manual review."""
sample_id: str
question: str
ground_truth: str
scenario: str
language: str
doc_id: str
doc_name: str
section_path: str
page_start: int
page_end: int
source_chunk_ids: list[str]
question_type: QuestionType
difficulty: Difficulty
review_status: ReviewStatus = "draft"
review_notes: str = ""
def to_record(self) -> dict[str, Any]:
"""Convert the draft sample into a flat CSV row."""
return {
"sample_id": self.sample_id,
"question": self.question,
"ground_truth": self.ground_truth,
"scenario": self.scenario,
"language": self.language,
"doc_id": self.doc_id,
"doc_name": self.doc_name,
"section_path": self.section_path,
"page_start": self.page_start,
"page_end": self.page_end,
"source_chunk_ids": self.source_chunk_ids,
"question_type": self.question_type,
"difficulty": self.difficulty,
"review_status": self.review_status,
"review_notes": self.review_notes,
}
@dataclass(slots=True)
class ParseFailure:
"""One document parse failure recorded for reporting and skip-mode execution."""
file_path: str
error: str
def to_record(self) -> dict[str, str]:
"""Convert the failure into a flat CSV row."""
return asdict(self)
@dataclass(slots=True)
class DatasetBuildArtifactPaths:
"""Canonical file paths produced by one dataset build run."""
root_dir: Path
documents_jsonl: Path
semantic_blocks_jsonl: Path
source_chunks_jsonl: Path
dataset_draft_csv: Path
parse_failures_csv: Path
metadata_json: Path
@dataclass(slots=True)
class DatasetBuildResult:
"""Aggregate result object returned after a dataset build completes."""
job: DatasetBuildJob
run_id: str
artifact_paths: DatasetBuildArtifactPaths
documents: list[ParsedDocument]
draft_samples: list[DraftQuestionSample]
parse_failures: list[ParseFailure]

View File

@@ -0,0 +1,78 @@
"""Utilities for converting draft online datasets into offline smoke-test datasets."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pandas as pd
from rag_eval.shared.utils import ensure_directory
def _load_jsonl(path: Path) -> list[dict[str, Any]]:
"""Load a JSONL file into a list of dictionaries."""
rows: list[dict[str, Any]] = []
with path.open("r", encoding="utf-8") as handle:
for line in handle:
text = line.strip()
if not text:
continue
rows.append(json.loads(text))
return rows
def build_offline_smoke_dataset(
*,
draft_dataset_path: Path,
source_chunks_path: Path,
output_path: Path,
) -> Path:
"""Derive an offline-evaluable dataset by reusing ground truth as answer and chunk text as contexts."""
draft_frame = pd.read_csv(draft_dataset_path)
chunk_rows = _load_jsonl(source_chunks_path)
chunk_lookup = {str(row["chunk_id"]): row for row in chunk_rows}
output_rows: list[dict[str, Any]] = []
for _, row in draft_frame.iterrows():
chunk_ids = row.get("source_chunk_ids")
if isinstance(chunk_ids, str):
parsed_chunk_ids = json.loads(chunk_ids)
elif isinstance(chunk_ids, list):
parsed_chunk_ids = chunk_ids
else:
parsed_chunk_ids = []
contexts = [
str(chunk_lookup[chunk_id]["text"]).strip()
for chunk_id in parsed_chunk_ids
if chunk_id in chunk_lookup and str(chunk_lookup[chunk_id]["text"]).strip()
]
ground_truth = str(row.get("ground_truth", "")).strip()
output_rows.append(
{
"sample_id": row.get("sample_id", ""),
"question": row.get("question", ""),
"contexts": json.dumps(contexts, ensure_ascii=False),
"answer": ground_truth,
"ground_truth": ground_truth,
"scenario": row.get("scenario", ""),
"language": row.get("language", ""),
"retrieval_config": "offline-smoke-from-pdf-build",
"doc_id": row.get("doc_id", ""),
"doc_name": row.get("doc_name", ""),
"section_path": row.get("section_path", ""),
"page_start": row.get("page_start", ""),
"page_end": row.get("page_end", ""),
"source_chunk_ids": row.get("source_chunk_ids", ""),
"question_type": row.get("question_type", ""),
"difficulty": row.get("difficulty", ""),
"review_status": row.get("review_status", ""),
"review_notes": row.get("review_notes", ""),
}
)
ensure_directory(output_path.parent)
pd.DataFrame(output_rows).to_csv(output_path, index=False)
return output_path

View File

@@ -0,0 +1,7 @@
"""Parser integrations and layout normalization helpers for dataset build jobs."""
from .aliyun_document_parser import AliyunDocumentParser
from .aliyun_docmind_gateway import AliyunDocmindGateway
from .aliyun_layout_normalizer import normalize_layouts
__all__ = ["AliyunDocumentParser", "AliyunDocmindGateway", "normalize_layouts"]

View File

@@ -0,0 +1,202 @@
"""Gateway abstraction for Alibaba Cloud document parsing workflows."""
from __future__ import annotations
import time
from pathlib import Path
from typing import Any
try:
from alibabacloud_docmind_api20220711 import models as docmind_models
from alibabacloud_docmind_api20220711.client import Client as DocmindClient
from alibabacloud_tea_openapi import models as openapi_models
from alibabacloud_tea_util import models as runtime_models
except ImportError:
# Keep Alibaba SDK optional so offline flows and tests can import this module.
DocmindClient = None
docmind_models = None
openapi_models = None
runtime_models = None
try:
from alibabacloud_credentials.client import Client as CredentialClient
except ImportError:
CredentialClient = None
from rag_eval.settings import EvaluationSettings
class AliyunDocmindGateway:
"""Thin gateway interface around the external Alibaba document parser service."""
def __init__(self, settings: EvaluationSettings):
"""Store parser-related settings needed by the gateway implementation."""
self.settings = settings
self._client = None
self._models = None
self._runtime_models = None
def _load_sdk(self) -> tuple[Any, Any, Any, Any]:
"""Load Alibaba SDK modules lazily so tests and offline flows do not require them."""
if (
DocmindClient is None
or openapi_models is None
or docmind_models is None
or runtime_models is None
):
raise ImportError(
"Alibaba Cloud Docmind SDK is not installed. "
"Install alibabacloud-docmind-api20220711, "
"alibabacloud-tea-openapi, alibabacloud-tea-util, and "
"alibabacloud-credentials."
)
return DocmindClient, openapi_models, docmind_models, runtime_models
def _resolve_credentials(self) -> tuple[str, str]:
"""Resolve AccessKey credentials from settings or the Alibaba credentials client."""
if self.settings.alibaba_access_key_id and self.settings.alibaba_access_key_secret:
return self.settings.alibaba_access_key_id, self.settings.alibaba_access_key_secret
if CredentialClient is None:
raise ImportError(
"Alibaba Cloud credentials SDK is not installed and no explicit "
"ALIBABA_ACCESS_KEY_ID / ALIBABA_ACCESS_KEY_SECRET were provided."
)
credential_client = CredentialClient()
credential = credential_client.get_credential()
return credential.get_access_key_id(), credential.get_access_key_secret()
def _init_client(self) -> Any:
"""Create and cache the underlying Alibaba SDK client."""
if self._client is not None:
return self._client
client_class, openapi_models, docmind_models, runtime_models = self._load_sdk()
access_key_id, access_key_secret = self._resolve_credentials()
endpoint = (self.settings.alibaba_endpoint or "docmind-api.cn-hangzhou.aliyuncs.com").strip()
config = openapi_models.Config(
access_key_id=access_key_id,
access_key_secret=access_key_secret,
)
config.endpoint = endpoint
config.region_id = "cn-hangzhou"
config.type = "access_key"
self._client = client_class(config)
self._models = docmind_models
self._runtime_models = runtime_models
return self._client
@staticmethod
def _to_plain_dict(value: Any) -> dict[str, Any]:
"""Convert SDK response objects into ordinary dictionaries."""
if value is None:
return {}
if isinstance(value, dict):
return value
if hasattr(value, "to_map"):
return value.to_map()
if hasattr(value, "__dict__"):
return {
key: getattr(value, key)
for key in vars(value)
if not key.startswith("_")
}
return {}
@staticmethod
def _extract_layouts(payload: Any) -> list[dict[str, Any]]:
"""Convert layout collections from SDK payloads into plain dictionaries."""
if payload is None:
return []
if isinstance(payload, dict):
layouts = payload.get("layouts") or payload.get("Layouts") or []
else:
layouts = getattr(payload, "layouts", None) or getattr(payload, "Layouts", None) or []
normalized: list[dict[str, Any]] = []
for item in layouts:
normalized.append(AliyunDocmindGateway._to_plain_dict(item))
return normalized
def submit_parse_task(self, pdf_path: Path) -> str:
"""Submit one PDF parse task and return the remote task identifier."""
client = self._init_client()
runtime = self._runtime_models.RuntimeOptions()
file_name = pdf_path.name
with pdf_path.open("rb") as handle:
request = self._models.SubmitDocParserJobAdvanceRequest(
file_url_object=handle,
file_name=file_name,
file_name_extension=pdf_path.suffix.lstrip(".").lower() or "pdf",
llm_enhancement=self.settings.aliyun_llm_enhancement,
enhancement_mode=self.settings.aliyun_enhancement_mode,
)
response = client.submit_doc_parser_job_advance(request, runtime)
payload = self._to_plain_dict(getattr(getattr(response, "body", None), "data", None))
task_id = payload.get("id") or payload.get("Id")
if not task_id:
raise RuntimeError(f"Aliyun submit_doc_parser_job_advance returned no task id for {pdf_path.name}")
return str(task_id)
def get_task_status(self, task_id: str) -> dict[str, Any]:
"""Fetch the current parse task status from the remote service."""
client = self._init_client()
request = self._models.QueryDocParserStatusRequest(id=task_id)
response = client.query_doc_parser_status(request)
payload = self._to_plain_dict(getattr(getattr(response, "body", None), "data", None))
status = payload.get("status") or payload.get("Status")
if status is not None and "status" not in payload:
payload["status"] = status
return payload
def fetch_layouts(self, task_id: str) -> list[dict[str, Any]]:
"""Fetch normalized layout pages for a completed parse task."""
client = self._init_client()
layout_num = 0
layout_step_size = min(max(1, self.settings.aliyun_parse_layout_step_size), 3000)
collected: list[dict[str, Any]] = []
while True:
request = self._models.GetDocParserResultRequest(
id=task_id,
layout_step_size=layout_step_size,
layout_num=layout_num,
)
response = client.get_doc_parser_result(request)
payload = getattr(getattr(response, "body", None), "data", None)
layouts = self._extract_layouts(payload)
if not layouts:
break
collected.extend(layouts)
layout_num += len(layouts)
if len(layouts) < layout_step_size:
break
return collected
def parse_document(self, pdf_path: Path) -> dict[str, Any]:
"""Run the submit/poll/fetch cycle and return a raw parse payload."""
task_id = self.submit_parse_task(pdf_path)
started_at = time.monotonic()
poll_interval = max(1, self.settings.aliyun_parse_poll_interval_seconds)
timeout_seconds = max(1, self.settings.aliyun_parse_timeout_seconds)
while True:
status = self.get_task_status(task_id)
state = str(status.get("status", "")).lower()
if state in {"succeeded", "success", "finished"}:
layouts = self.fetch_layouts(task_id)
return {
"task_id": task_id,
"status": state,
"doc_id": status.get("doc_id") or pdf_path.stem,
"doc_name": status.get("doc_name") or pdf_path.name,
"layouts": layouts,
"metadata": status,
}
if state in {"failed", "error"}:
raise RuntimeError(f"Aliyun parse task failed for {pdf_path.name}: {status}")
if time.monotonic() - started_at > timeout_seconds:
raise TimeoutError(f"Aliyun parse task timed out for {pdf_path.name}")
time.sleep(poll_interval)

View File

@@ -0,0 +1,38 @@
"""Document parser that normalizes Alibaba layout results into internal models."""
from __future__ import annotations
from pathlib import Path
from rag_eval.dataset_builder.models import ParsedDocument
from .aliyun_docmind_gateway import AliyunDocmindGateway
from .aliyun_layout_normalizer import normalize_layouts
class AliyunDocumentParser:
"""Parse PDFs through the Alibaba gateway and normalize the returned layouts."""
def __init__(self, gateway: AliyunDocmindGateway):
"""Store the gateway dependency used for remote parsing."""
self.gateway = gateway
def parse(self, pdf_path: Path) -> ParsedDocument:
"""Parse one PDF file into a normalized parsed-document model."""
payload = self.gateway.parse_document(pdf_path)
layouts = payload.get("layouts") or []
if not layouts:
raise ValueError(f"No layouts returned for document: {pdf_path.name}")
document = normalize_layouts(
doc_id=str(payload.get("doc_id") or pdf_path.stem),
doc_name=str(payload.get("doc_name") or pdf_path.name),
layouts=list(layouts),
)
document.metadata.update(
{
"task_id": payload.get("task_id"),
"provider": "aliyun_docmind",
}
)
return document

View File

@@ -0,0 +1,181 @@
"""Normalization helpers that convert raw layout results into source chunks."""
from __future__ import annotations
import re
from typing import Any
from rag_eval.dataset_builder.models import ParsedDocument, SemanticBlock, SourceChunk, StructureNode
def _clean_text(value: Any) -> str:
"""Normalize free-form layout text into a compact string."""
if value is None:
return ""
return re.sub(r"\s+", " ", str(value)).strip()
def _is_catalog_entry(item_type: str, text: str) -> bool:
"""Detect table-of-contents style entries that should be skipped."""
lowered = text.lower()
return item_type == "toc" or "目录" in text or lowered.startswith("table of contents")
def _flatten_table(item: dict[str, Any]) -> str:
"""Convert a table layout node into a searchable plain-text representation."""
rows = item.get("rows") or []
flattened_rows: list[str] = []
for row in rows:
cells = [str(cell).strip() for cell in row if str(cell).strip()]
if cells:
flattened_rows.append(" | ".join(cells))
return "\n".join(flattened_rows)
def _split_text(text: str, max_chars: int = 1200, overlap: int = 150) -> list[str]:
"""Split long text into overlapping windows so each chunk stays reviewable."""
if len(text) <= max_chars:
return [text]
windows: list[str] = []
start = 0
while start < len(text):
end = min(len(text), start + max_chars)
windows.append(text[start:end].strip())
if end >= len(text):
break
start = max(end - overlap, start + 1)
return [window for window in windows if window]
def normalize_layouts(
*,
doc_id: str,
doc_name: str,
layouts: list[dict[str, Any]],
max_chunk_chars: int = 1200,
overlap_chars: int = 150,
) -> ParsedDocument:
"""Convert raw layouts into structure nodes, semantic blocks, and source chunks."""
structure_nodes: list[StructureNode] = []
semantic_blocks: list[SemanticBlock] = []
source_chunks: list[SourceChunk] = []
section_stack: list[tuple[int, str]] = []
current_block_text: list[str] = []
current_block_layout_ids: list[str] = []
current_page_start: int | None = None
current_page_end: int | None = None
current_section_path = ""
current_section_title = ""
def flush_block() -> None:
"""Finalize the in-progress semantic block and emit source chunks."""
nonlocal current_block_text, current_block_layout_ids, current_page_start, current_page_end
nonlocal current_section_path, current_section_title
text = _clean_text(" ".join(current_block_text))
if not text or current_page_start is None or current_page_end is None:
current_block_text = []
current_block_layout_ids = []
current_page_start = None
current_page_end = None
return
block_id = f"{doc_id}-block-{len(semantic_blocks) + 1}"
block = SemanticBlock(
block_id=block_id,
doc_id=doc_id,
doc_name=doc_name,
text=text,
page_start=current_page_start,
page_end=current_page_end,
section_path=current_section_path,
section_title=current_section_title,
source_layout_ids=list(current_block_layout_ids),
)
semantic_blocks.append(block)
chunk_parts = _split_text(text, max_chars=max_chunk_chars, overlap=overlap_chars)
for index, part in enumerate(chunk_parts, start=1):
heading_prefix = current_section_title.strip()
chunk_text = f"{heading_prefix}\n{part}".strip() if heading_prefix and not part.startswith(heading_prefix) else part
source_chunks.append(
SourceChunk(
chunk_id=f"{block_id}-chunk-{index}",
doc_id=doc_id,
doc_name=doc_name,
text=chunk_text,
page_start=current_page_start,
page_end=current_page_end,
section_path=current_section_path,
section_title=current_section_title,
source_layout_ids=list(current_block_layout_ids),
)
)
current_block_text = []
current_block_layout_ids = []
current_page_start = None
current_page_end = None
for index, item in enumerate(layouts, start=1):
item_type = str(item.get("type", "paragraph")).lower()
page = int(item.get("page", 1))
layout_id = str(item.get("layout_id") or f"layout-{index}")
level = int(item.get("level", 1))
if item_type == "table":
text = _flatten_table(item)
else:
text = _clean_text(item.get("text"))
if not text or _is_catalog_entry(item_type, text):
continue
if item_type == "heading":
flush_block()
while section_stack and section_stack[-1][0] >= level:
section_stack.pop()
section_stack.append((level, text))
section_titles = [title for _, title in section_stack]
current_section_title = text
current_section_path = " > ".join(section_titles)
structure_nodes.append(
StructureNode(
node_id=f"{doc_id}-node-{len(structure_nodes) + 1}",
level=level,
title=text,
page_start=page,
page_end=page,
section_path=current_section_path,
)
)
continue
if item_type == "caption":
text = f"图注: {text}"
if current_page_start is None:
current_page_start = page
current_page_end = page
current_block_text.append(text)
current_block_layout_ids.append(layout_id)
flush_block()
raw_text = "\n".join(chunk.text for chunk in source_chunks)
metadata = {
"layout_count": len(layouts),
"structure_node_count": len(structure_nodes),
"semantic_block_count": len(semantic_blocks),
"source_chunk_count": len(source_chunks),
}
return ParsedDocument(
doc_id=doc_id,
doc_name=doc_name,
raw_text=raw_text,
structure_nodes=structure_nodes,
semantic_blocks=semantic_blocks,
source_chunks=source_chunks,
metadata=metadata,
)

View File

@@ -0,0 +1,142 @@
"""Orchestration layer for PDF-to-dataset build jobs."""
from __future__ import annotations
from pathlib import Path
import yaml
from rag_eval.settings import EvaluationSettings
from rag_eval.shared.utils import ensure_directory, utc_now_iso
from .generator.question_generator import OpenAIQuestionGenerator, QuestionGenerator
from .generator.validators import dedupe_samples, validate_draft_sample
from .models import DatasetBuildJob, DatasetBuildResult, DatasetBuildRuntime, ParseFailure
from .parser.aliyun_document_parser import AliyunDocumentParser
from .parser.aliyun_docmind_gateway import AliyunDocmindGateway
from .schema import DatasetBuildConfigModel
from .sources import discover_pdf_files
from .writers import build_artifact_paths, write_dataset_build_artifacts
def load_dataset_build_job(path: str | Path, settings: EvaluationSettings | None = None) -> DatasetBuildJob:
"""Load and validate a dataset build YAML file."""
settings = settings or EvaluationSettings()
config_path = Path(path).resolve()
payload = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
model = DatasetBuildConfigModel.model_validate(payload)
base_dir = config_path.parent
generation_model = (
model.generation.model
or settings.dataset_generator_model
or "qwen3.6-plus"
)
parser_payload = payload.get("parser") or {}
failure_mode = parser_payload.get("failure_mode") or settings.parser_failure_mode or "fail"
return DatasetBuildJob(
job_name=model.job_name,
input_path=model.resolve_path(base_dir, model.input.path),
input_glob=model.input.glob,
parser_provider=model.parser.provider,
failure_mode=failure_mode,
generation_model=generation_model,
output_type=model.generation.output_type,
review_mode=model.generation.review_mode,
max_questions_per_document=model.generation.max_questions_per_document,
max_source_chunks_per_question=model.generation.max_source_chunks_per_question,
dataset_path=model.resolve_path(base_dir, model.output.dataset_path),
artifact_dir=model.resolve_path(base_dir, model.output.artifact_dir),
runtime=DatasetBuildRuntime(max_documents=model.runtime.max_documents),
source_path=config_path,
)
def _create_parser(job: DatasetBuildJob, settings: EvaluationSettings) -> AliyunDocumentParser:
"""Create the configured document parser implementation."""
if job.parser_provider != "aliyun_docmind":
raise ValueError(f"Unsupported parser provider: {job.parser_provider}")
gateway = AliyunDocmindGateway(settings)
return AliyunDocumentParser(gateway)
def _create_generator(job: DatasetBuildJob, settings: EvaluationSettings) -> QuestionGenerator:
"""Create the configured draft question generator implementation."""
return OpenAIQuestionGenerator(settings=settings, model=job.generation_model)
def run_dataset_build(
config_path: str | Path,
*,
settings: EvaluationSettings | None = None,
parser: AliyunDocumentParser | None = None,
generator: QuestionGenerator | None = None,
) -> DatasetBuildResult:
"""Run one dataset build job end to end and persist all required artifacts."""
settings = settings or EvaluationSettings()
job = load_dataset_build_job(config_path, settings=settings)
pdf_files = discover_pdf_files(job.input_path, job.input_glob)
if job.runtime.max_documents is not None:
pdf_files = pdf_files[: job.runtime.max_documents]
parser = parser or _create_parser(job, settings)
generator = generator or _create_generator(job, settings)
run_id = utc_now_iso().replace(":", "-")
artifact_root = job.artifact_dir / run_id
ensure_directory(artifact_root)
artifact_paths = build_artifact_paths(artifact_root)
documents = []
failures: list[ParseFailure] = []
draft_samples = []
for pdf_path in pdf_files:
try:
document = parser.parse(pdf_path)
except Exception as exc:
failure = ParseFailure(file_path=pdf_path.as_posix(), error=str(exc))
failures.append(failure)
if job.failure_mode == "fail":
result = DatasetBuildResult(
job=job,
run_id=run_id,
artifact_paths=artifact_paths,
documents=documents,
draft_samples=draft_samples,
parse_failures=failures,
)
write_dataset_build_artifacts(result)
raise
continue
documents.append(document)
generated = generator.generate(
document,
max_questions=job.max_questions_per_document,
max_chunks_per_question=job.max_source_chunks_per_question,
job_name=job.job_name,
)
valid_generated = []
for sample in generated:
errors = validate_draft_sample(
sample,
document=document,
max_source_chunks_per_question=job.max_source_chunks_per_question,
)
if not errors:
valid_generated.append(sample)
draft_samples.extend(
dedupe_samples(valid_generated)[: job.max_questions_per_document]
)
result = DatasetBuildResult(
job=job,
run_id=run_id,
artifact_paths=artifact_paths,
documents=documents,
draft_samples=draft_samples,
parse_failures=failures,
)
write_dataset_build_artifacts(result)
return result

View File

@@ -0,0 +1,82 @@
"""Pydantic schemas for dataset build YAML configuration files."""
from __future__ import annotations
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, model_validator
class DatasetBuildInputModel(BaseModel):
"""Schema for input PDF discovery settings."""
model_config = ConfigDict(extra="ignore")
path: str
glob: str = "*.pdf"
class DatasetBuildParserModel(BaseModel):
"""Schema for parser selection and failure handling."""
model_config = ConfigDict(extra="ignore")
provider: Literal["aliyun_docmind"]
failure_mode: Literal["fail", "skip"] | None = None
class DatasetBuildGenerationModel(BaseModel):
"""Schema for question generation controls."""
model_config = ConfigDict(extra="ignore")
model: str | None = None
output_type: Literal["online_question_bank"]
review_mode: Literal["draft_with_manual_review"]
max_questions_per_document: int = Field(default=10, gt=0)
max_source_chunks_per_question: int = Field(default=3, gt=0)
class DatasetBuildOutputModel(BaseModel):
"""Schema for dataset build output locations."""
model_config = ConfigDict(extra="ignore")
dataset_path: str
artifact_dir: str
class DatasetBuildRuntimeModel(BaseModel):
"""Schema for runtime throttling and document limits."""
model_config = ConfigDict(extra="ignore")
max_documents: int | None = Field(default=None, gt=0)
class DatasetBuildConfigModel(BaseModel):
"""Top-level schema for a dataset build job."""
model_config = ConfigDict(extra="ignore")
job_name: str
input: DatasetBuildInputModel
parser: DatasetBuildParserModel
generation: DatasetBuildGenerationModel
output: DatasetBuildOutputModel
runtime: DatasetBuildRuntimeModel = Field(default_factory=DatasetBuildRuntimeModel)
@model_validator(mode="after")
def validate_job_name(self) -> "DatasetBuildConfigModel":
"""Reject blank job names that would break artifact paths."""
if not self.job_name.strip():
raise ValueError("job_name must not be empty.")
return self
def resolve_path(self, base_dir: Path, raw_path: str) -> Path:
"""Resolve relative paths against the config file directory."""
candidate = Path(raw_path)
if candidate.is_absolute():
return candidate
return (base_dir / candidate).resolve()

View File

@@ -0,0 +1,21 @@
"""Input source discovery helpers for dataset build jobs."""
from __future__ import annotations
from pathlib import Path
def discover_pdf_files(input_path: Path, pattern: str = "*.pdf") -> list[Path]:
"""Return all PDF files from a single file path or a directory scan."""
if not input_path.exists():
raise FileNotFoundError(f"Input path does not exist: {input_path}")
if input_path.is_file():
if input_path.suffix.lower() != ".pdf":
raise ValueError(f"Input file is not a PDF: {input_path}")
return [input_path]
files = sorted(path for path in input_path.glob(pattern) if path.is_file() and path.suffix.lower() == ".pdf")
if not files:
raise ValueError(f"No PDF files found under {input_path} with pattern {pattern}")
return files

View File

@@ -0,0 +1,147 @@
"""Artifact writers for dataset build runs."""
from __future__ import annotations
import csv
import json
import shutil
from pathlib import Path
from typing import Any
from rag_eval.shared.utils import ensure_directory
from .models import DatasetBuildArtifactPaths, DatasetBuildResult
def build_artifact_paths(root_dir: Path) -> DatasetBuildArtifactPaths:
"""Construct canonical output paths for one dataset build run."""
return DatasetBuildArtifactPaths(
root_dir=root_dir,
documents_jsonl=root_dir / "documents.jsonl",
semantic_blocks_jsonl=root_dir / "semantic_blocks.jsonl",
source_chunks_jsonl=root_dir / "source_chunks.jsonl",
dataset_draft_csv=root_dir / "dataset_draft.csv",
parse_failures_csv=root_dir / "parse_failures.csv",
metadata_json=root_dir / "metadata.json",
)
def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
"""Write a list of dictionaries as JSON Lines."""
with path.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row, ensure_ascii=False) + "\n")
def _write_csv(path: Path, rows: list[dict[str, Any]], fieldnames: list[str] | None = None) -> None:
"""Write flat records into a CSV file, including list values as JSON strings."""
normalized_rows: list[dict[str, Any]] = []
resolved_fieldnames = list(fieldnames or [])
for row in rows:
normalized_row: dict[str, Any] = {}
for key, value in row.items():
if key not in resolved_fieldnames:
resolved_fieldnames.append(key)
if isinstance(value, list):
normalized_row[key] = json.dumps(value, ensure_ascii=False)
else:
normalized_row[key] = value
normalized_rows.append(normalized_row)
with path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=resolved_fieldnames or ["placeholder"])
writer.writeheader()
if normalized_rows:
writer.writerows(normalized_rows)
def _write_latest_alias_assets(result: DatasetBuildResult) -> None:
"""Publish stable alias files so sample scenarios can target the latest build output."""
latest_dir = result.job.artifact_dir / "latest"
ensure_directory(latest_dir)
# Keep the canonical run directory and also expose a stable entrypoint for tutorials.
shutil.copyfile(result.artifact_paths.source_chunks_jsonl, latest_dir / "source_chunks.jsonl")
shutil.copyfile(result.artifact_paths.dataset_draft_csv, latest_dir / "dataset_draft.csv")
shutil.copyfile(result.artifact_paths.metadata_json, latest_dir / "metadata.json")
def write_dataset_build_artifacts(result: DatasetBuildResult) -> None:
"""Persist dataset build outputs and metadata to disk."""
artifact_paths = result.artifact_paths
ensure_directory(artifact_paths.root_dir)
ensure_directory(result.job.dataset_path.parent)
_write_jsonl(artifact_paths.documents_jsonl, [item.to_record() for item in result.documents])
_write_jsonl(
artifact_paths.semantic_blocks_jsonl,
[block.to_record() for item in result.documents for block in item.semantic_blocks],
)
_write_jsonl(
artifact_paths.source_chunks_jsonl,
[chunk.to_record() for item in result.documents for chunk in item.source_chunks],
)
draft_rows = [sample.to_record() for sample in result.draft_samples]
_write_csv(
artifact_paths.dataset_draft_csv,
draft_rows,
fieldnames=[
"sample_id",
"question",
"ground_truth",
"scenario",
"language",
"doc_id",
"doc_name",
"section_path",
"page_start",
"page_end",
"source_chunk_ids",
"question_type",
"difficulty",
"review_status",
"review_notes",
],
)
_write_csv(
result.job.dataset_path,
draft_rows,
fieldnames=[
"sample_id",
"question",
"ground_truth",
"scenario",
"language",
"doc_id",
"doc_name",
"section_path",
"page_start",
"page_end",
"source_chunk_ids",
"question_type",
"difficulty",
"review_status",
"review_notes",
],
)
_write_csv(
artifact_paths.parse_failures_csv,
[item.to_record() for item in result.parse_failures],
fieldnames=["file_path", "error"],
)
metadata = {
"run_id": result.run_id,
"job": result.job.snapshot(),
"stats": {
"documents_processed": len(result.documents),
"draft_samples": len(result.draft_samples),
"parse_failures": len(result.parse_failures),
},
}
artifact_paths.metadata_json.write_text(
json.dumps(metadata, ensure_ascii=False, indent=2),
encoding="utf-8",
)
_write_latest_alias_assets(result)