"""Background task manager for end-to-end pipeline jobs (build + eval). Each job runs three sequential phases inside a worker thread: 1. parsing_documents — AliyunDocmind parses every PDF 2. generating_questions — LLM generates a draft question bank 3. evaluating — RAGAS online evaluation scores each question The DatasetBuildJob and Scenario objects are constructed entirely from the API request parameters, so no YAML config files are needed. """ from __future__ import annotations import io import threading import uuid from concurrent.futures import ThreadPoolExecutor from contextlib import redirect_stderr, redirect_stdout from datetime import datetime, timezone from pathlib import Path from webapp.models import ( PipelineJobRequest, PipelineJobStatus, PipelineResult, ) _REPO_ROOT = Path(__file__).resolve().parents[2] _PIPELINE_OUTPUT_ROOT = _REPO_ROOT / "outputs" / "pipeline" def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() class _LineCapture(io.TextIOBase): """Write-only stream that appends complete lines to a task's log buffer.""" def __init__(self, sink: "PipelineTask") -> None: self._sink = sink self._buffer = "" def write(self, text: str) -> int: self._buffer += text while "\n" in self._buffer: line, self._buffer = self._buffer.split("\n", 1) self._sink.append_log(line) return len(text) def flush(self) -> None: if self._buffer: self._sink.append_log(self._buffer) self._buffer = "" class PipelineTask: """Mutable state for one pipeline job (build + eval).""" def __init__(self, job_id: str, job_name: str) -> None: self.job_id = job_id self.job_name = job_name self.status = "queued" self.phase = "idle" self.logs: list[str] = [] self.result: PipelineResult | None = None self.error: str | None = None self.created_at = _now_iso() self.finished_at = "" self._lock = threading.Lock() def append_log(self, line: str) -> None: with self._lock: self.logs.append(line) def snapshot(self) -> PipelineJobStatus: with self._lock: return PipelineJobStatus( job_id=self.job_id, job_name=self.job_name, status=self.status, phase=self.phase, logs=list(self.logs), result=self.result, error=self.error, created_at=self.created_at, finished_at=self.finished_at, ) class PipelineTaskManager: """Owns the thread pool and registry of pipeline jobs.""" def __init__(self, max_workers: int = 2) -> None: self._executor = ThreadPoolExecutor(max_workers=max_workers) self._tasks: dict[str, PipelineTask] = {} self._lock = threading.Lock() def submit(self, request: PipelineJobRequest) -> PipelineTask: """Register and schedule a new pipeline job; return its task object.""" job_id = uuid.uuid4().hex[:12] job_name = request.job_name.strip() or f"pipeline-{job_id[:6]}" task = PipelineTask(job_id=job_id, job_name=job_name) with self._lock: self._tasks[job_id] = task self._executor.submit(self._run, task, request) return task def get(self, job_id: str) -> PipelineJobStatus | None: with self._lock: task = self._tasks.get(job_id) return task.snapshot() if task is not None else None def list_jobs(self) -> list[PipelineJobStatus]: with self._lock: tasks = list(self._tasks.values()) snapshots = [t.snapshot() for t in tasks] snapshots.sort(key=lambda s: s.created_at, reverse=True) return snapshots # ------------------------------------------------------------------ # # Worker # ------------------------------------------------------------------ # def _run(self, task: PipelineTask, request: PipelineJobRequest) -> None: """Execute the full pipeline end to end inside a worker thread.""" task.status = "running" task.append_log(f"[{_now_iso()}] 开始 pipeline 任务: {task.job_name}") capture = _LineCapture(task) try: with redirect_stdout(capture), redirect_stderr(capture): result = self._execute(task, request) capture.flush() task.result = result task.phase = "done" task.status = "completed" task.append_log(f"[{_now_iso()}] pipeline 任务完成: {task.job_name}") except Exception as exc: # noqa: BLE001 capture.flush() task.error = f"{type(exc).__name__}: {exc}" task.append_log(f"[{_now_iso()}] pipeline 任务失败: {task.error}") task.status = "failed" finally: task.finished_at = _now_iso() def _execute(self, task: PipelineTask, req: PipelineJobRequest) -> PipelineResult: """Run build then eval, updating task.phase as we go.""" # ── resolve paths ────────────────────────────────────────────── docs_path = Path(req.docs_path) if not docs_path.is_absolute(): docs_path = (_REPO_ROOT / docs_path).resolve() if not docs_path.is_dir(): raise ValueError(f"docs_path is not an existing directory: {docs_path}") job_output_dir = _PIPELINE_OUTPUT_ROOT / task.job_id build_artifact_dir = job_output_dir / "build" dataset_csv = job_output_dir / "generated_dataset.csv" eval_output_dir = job_output_dir / "eval" # ── phase 1 + 2: dataset build (parse & generate) ───────────── task.phase = "parsing_documents" task.append_log(f" [build] 扫描文档目录: {docs_path}") build_result = self._run_build(task, req, docs_path, build_artifact_dir, dataset_csv) source_chunks_jsonl = build_artifact_dir / "latest" / "source_chunks.jsonl" total_q = len(build_result.draft_samples) parse_failures = len(build_result.parse_failures) task.append_log(f" [build] 题库生成完毕: {total_q} 道题目, {parse_failures} 份文档解析失败") if total_q == 0: raise RuntimeError("题库为空(所有文档均解析或生成失败),中止评估。") # ── phase 3: evaluation ──────────────────────────────────────── task.phase = "evaluating" task.append_log(f" [eval] 开始 RAGAS 评估,共 {total_q} 道题目") eval_result = self._run_eval(task, req, dataset_csv, source_chunks_jsonl, eval_output_dir) from rag_eval.reporting.artifacts import build_artifact_paths as _build_eval_paths eval_artifact_paths = _build_eval_paths(eval_result.scenario.output_dir, eval_result.run_id) return PipelineResult( build_artifact_dir=build_artifact_dir.as_posix(), dataset_csv=dataset_csv.as_posix(), source_chunks_jsonl=source_chunks_jsonl.as_posix(), total_questions=total_q, parse_failures=parse_failures, eval_run_id=eval_result.run_id, eval_output_dir=eval_result.scenario.output_dir.as_posix(), scores_csv=eval_artifact_paths.scores_csv.as_posix(), summary_md=eval_artifact_paths.summary_md.as_posix(), ) def _run_build(self, task: PipelineTask, req: PipelineJobRequest, docs_path: Path, artifact_dir: Path, dataset_csv: Path): """Construct DatasetBuildJob and run the build phase.""" from rag_eval.dataset_builder.models import DatasetBuildJob, DatasetBuildRuntime from rag_eval.dataset_builder.runner import execute_dataset_build_job from rag_eval.settings import EvaluationSettings settings = EvaluationSettings() job = DatasetBuildJob( job_name=task.job_name, input_path=docs_path, input_glob="*.pdf", parser_provider="aliyun_docmind", failure_mode=req.failure_mode, # type: ignore[arg-type] generation_model=req.generation_model, output_type="online_question_bank", review_mode="draft_with_manual_review", max_questions_per_document=req.max_questions_per_document, max_source_chunks_per_question=req.max_source_chunks_per_question, dataset_path=dataset_csv, artifact_dir=artifact_dir, runtime=DatasetBuildRuntime(max_documents=req.max_documents), ) return execute_dataset_build_job(job, settings=settings) def _run_eval(self, task: PipelineTask, req: PipelineJobRequest, dataset_csv: Path, source_chunks_jsonl: Path, eval_output_dir: Path): """Construct Scenario and run the evaluation phase.""" from rag_eval.execution.runner import run_scenario_from_scenario_obj from rag_eval.settings import EvaluationSettings from rag_eval.shared.models import ( AppAdapterConfig, DatasetConfig, RuntimeConfig, Scenario, ) settings = EvaluationSettings() scenario = Scenario( scenario_name=task.job_name, mode="online", dataset=DatasetConfig(path=dataset_csv), judge_model=req.judge_model, embedding_model=req.embedding_model, metrics=list(req.metrics), output_dir=eval_output_dir, runtime=RuntimeConfig( batch_size=4, app_concurrency=2, metric_concurrency=2, max_samples=req.max_samples, ), app_adapter=AppAdapterConfig( type="python", callable="apps.siemens_pdf_qa.adapter:run", static_kwargs={ "source_chunks_path": source_chunks_jsonl, "model": req.answer_model, }, ), optimization_advisor=req.optimization_advisor, ) return run_scenario_from_scenario_obj(scenario, settings=settings) # Module-level singleton shared by the FastAPI routes. pipeline_task_manager = PipelineTaskManager()