Files
siemens_ragas/webapp/api/pipeline.py

132 lines
4.9 KiB
Python
Raw Normal View History

"""Routes for the end-to-end pipeline API (document parse → build → eval)."""
from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException
from webapp.models import (
PipelineJobRequest,
PipelineJobResponse,
PipelineJobStatus,
)
from webapp.services.pipeline_task_manager import pipeline_task_manager
router = APIRouter(prefix="/api/pipeline", tags=["pipeline"])
logger = logging.getLogger("webapp.api.pipeline")
@router.post(
"/jobs",
status_code=202,
response_model=PipelineJobResponse,
summary="提交全链路评估任务",
responses={
202: {
"description": "任务已成功排队,立即返回 job_id。",
"content": {
"application/json": {
"example": {
"job_id": "a1b2c3d4e5f6",
"job_name": "siemens-ct-eval-2026",
"status": "queued",
}
}
},
},
422: {"description": "请求参数校验失败docs_path 等必填字段缺失或格式错误)。"},
},
)
def submit_pipeline_job(request: PipelineJobRequest) -> PipelineJobResponse:
"""提交一个「解析文档 → 生成题库 → RAGAS 评估 → 输出报告」全链路任务。
任务在后台线程中异步执行立即返回 `job_id`
通过 `GET /api/pipeline/jobs/{job_id}` 轮询 `status` / `phase` / `logs`
**Pipeline 执行阶段**
1. `parsing_documents` 调用阿里云 DocMind 解析每份 PDF
2. `generating_questions` LLM 从文档片段生成草稿题库
3. `evaluating` RAGAS 在线评测打分answer_model 答题 + judge_model 评分
4. `done` 所有产物写入磁盘`status` 变为 `completed`
"""
logger.info(
"[submit_pipeline] docs_path=%s job_name=%r gen_model=%s judge=%s max_docs=%s",
request.docs_path, request.job_name, request.generation_model,
request.judge_model, request.max_documents,
)
task = pipeline_task_manager.submit(request)
logger.info("[submit_pipeline] queued job_id=%s job_name=%s", task.job_id, task.job_name)
return PipelineJobResponse(
job_id=task.job_id,
job_name=task.job_name,
status=task.status,
)
@router.get(
"/jobs/{job_id}",
response_model=PipelineJobStatus,
summary="查询任务状态",
responses={
200: {"description": "返回任务当前状态、执行阶段、日志及完成后的产物路径。"},
404: {"description": "指定 job_id 的任务不存在。"},
},
)
def get_pipeline_job(job_id: str) -> PipelineJobStatus:
"""查询一个 Pipeline 任务的当前状态、执行阶段、实时日志和结果。
**轮询建议** 35 秒查询一次直到 `status` `completed` `failed`
`result` 字段在任务完成后填充包含
- `scores_csv` 每道题目逐项评分
- `summary_md` 评估摘要 Markdown
- `dataset_csv` 生成的题库 CSV
- `source_chunks_jsonl` 文档片段索引
"""
status = pipeline_task_manager.get(job_id)
if status is None:
logger.warning("[get_pipeline_job] not found job_id=%s", job_id)
raise HTTPException(status_code=404, detail=f"Pipeline job not found: {job_id}")
logger.debug("[get_pipeline_job] job_id=%s status=%s phase=%s", job_id, status.status, status.phase)
return status
@router.get(
"/jobs",
response_model=dict,
summary="列出所有任务",
responses={
200: {
"description": "按创建时间倒序返回本次服务器会话中所有的 Pipeline 任务。",
"content": {
"application/json": {
"example": {
"jobs": [
{
"job_id": "a1b2c3d4e5f6",
"job_name": "siemens-ct-eval",
"status": "completed",
"phase": "done",
"logs": ["[build] 17 documents parsed", "..."],
"result": {
"total_questions": 19,
"eval_run_id": "2026-06-18T...",
"scores_csv": "outputs/pipeline/.../scores.csv",
"summary_md": "outputs/pipeline/.../summary.md",
},
"error": None,
}
]
}
}
},
}
},
)
def list_pipeline_jobs() -> dict:
"""返回本次服务器会话中所有已提交的 Pipeline 任务,按创建时间倒序排列。"""
jobs = pipeline_task_manager.list_jobs()
logger.info("[list_pipeline_jobs] count=%d", len(jobs))
return {"jobs": [s.model_dump() for s in jobs]}