"""Routes for the end-to-end pipeline API (document parse → build → eval).""" from __future__ import annotations import logging from fastapi import APIRouter, HTTPException from webapp.models import ( PipelineJobRequest, PipelineJobResponse, PipelineJobStatus, ) from webapp.services.pipeline_task_manager import pipeline_task_manager router = APIRouter(prefix="/api/pipeline", tags=["pipeline"]) logger = logging.getLogger("webapp.api.pipeline") @router.post( "/jobs", status_code=202, response_model=PipelineJobResponse, summary="提交全链路评估任务", responses={ 202: { "description": "任务已成功排队,立即返回 job_id。", "content": { "application/json": { "example": { "job_id": "a1b2c3d4e5f6", "job_name": "siemens-ct-eval-2026", "status": "queued", } } }, }, 422: {"description": "请求参数校验失败(docs_path 等必填字段缺失或格式错误)。"}, }, ) def submit_pipeline_job(request: PipelineJobRequest) -> PipelineJobResponse: """提交一个「解析文档 → 生成题库 → RAGAS 评估 → 输出报告」全链路任务。 任务在后台线程中异步执行,立即返回 `job_id`。 通过 `GET /api/pipeline/jobs/{job_id}` 轮询 `status` / `phase` / `logs`。 **Pipeline 执行阶段**: 1. `parsing_documents` — 调用阿里云 DocMind 解析每份 PDF 2. `generating_questions` — LLM 从文档片段生成草稿题库 3. `evaluating` — RAGAS 在线评测打分(answer_model 答题 + judge_model 评分) 4. `done` — 所有产物写入磁盘,`status` 变为 `completed` """ logger.info( "[submit_pipeline] docs_path=%s job_name=%r gen_model=%s judge=%s max_docs=%s", request.docs_path, request.job_name, request.generation_model, request.judge_model, request.max_documents, ) task = pipeline_task_manager.submit(request) logger.info("[submit_pipeline] queued job_id=%s job_name=%s", task.job_id, task.job_name) return PipelineJobResponse( job_id=task.job_id, job_name=task.job_name, status=task.status, ) @router.get( "/jobs/{job_id}", response_model=PipelineJobStatus, summary="查询任务状态", responses={ 200: {"description": "返回任务当前状态、执行阶段、日志及完成后的产物路径。"}, 404: {"description": "指定 job_id 的任务不存在。"}, }, ) def get_pipeline_job(job_id: str) -> PipelineJobStatus: """查询一个 Pipeline 任务的当前状态、执行阶段、实时日志和结果。 **轮询建议**:每 3–5 秒查询一次,直到 `status` 为 `completed` 或 `failed`。 `result` 字段在任务完成后填充,包含: - `scores_csv` — 每道题目逐项评分 - `summary_md` — 评估摘要 Markdown - `dataset_csv` — 生成的题库 CSV - `source_chunks_jsonl` — 文档片段索引 """ status = pipeline_task_manager.get(job_id) if status is None: logger.warning("[get_pipeline_job] not found job_id=%s", job_id) raise HTTPException(status_code=404, detail=f"Pipeline job not found: {job_id}") logger.debug("[get_pipeline_job] job_id=%s status=%s phase=%s", job_id, status.status, status.phase) return status @router.get( "/jobs", response_model=dict, summary="列出所有任务", responses={ 200: { "description": "按创建时间倒序返回本次服务器会话中所有的 Pipeline 任务。", "content": { "application/json": { "example": { "jobs": [ { "job_id": "a1b2c3d4e5f6", "job_name": "siemens-ct-eval", "status": "completed", "phase": "done", "logs": ["[build] 17 documents parsed", "..."], "result": { "total_questions": 19, "eval_run_id": "2026-06-18T...", "scores_csv": "outputs/pipeline/.../scores.csv", "summary_md": "outputs/pipeline/.../summary.md", }, "error": None, } ] } } }, } }, ) def list_pipeline_jobs() -> dict: """返回本次服务器会话中所有已提交的 Pipeline 任务,按创建时间倒序排列。""" jobs = pipeline_task_manager.list_jobs() logger.info("[list_pipeline_jobs] count=%d", len(jobs)) return {"jobs": [s.model_dump() for s in jobs]}