Files
siemens_ragas/webapp/api/pipeline.py
wangwei ac410e7a5d feat: add detailed logging to all API routes and global access log middleware
Each API module now logs:
- evaluations: trigger (scenario path, task_id), status polls, list
- runs: list (count), detail (run_id, metrics, sample counts)
- scenarios: list (total, valid, error counts)
- pipeline: submit (docs_path, models, max_docs), status polls, list
- llm_profiles: CRUD ops (name, model, id), probe/test (model, ok, latency), apply (patched fields)
- score: already had per-request logging

Global middleware (webapp.access logger):
- Every API request: METHOD path -> status (latency_ms) at INFO
- Static file requests demoted to DEBUG to reduce noise

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-23 10:35:00 +08:00

132 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Routes for the end-to-end pipeline API (document parse → build → eval)."""
from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException
from webapp.models import (
PipelineJobRequest,
PipelineJobResponse,
PipelineJobStatus,
)
from webapp.services.pipeline_task_manager import pipeline_task_manager
router = APIRouter(prefix="/api/pipeline", tags=["pipeline"])
logger = logging.getLogger("webapp.api.pipeline")
@router.post(
"/jobs",
status_code=202,
response_model=PipelineJobResponse,
summary="提交全链路评估任务",
responses={
202: {
"description": "任务已成功排队,立即返回 job_id。",
"content": {
"application/json": {
"example": {
"job_id": "a1b2c3d4e5f6",
"job_name": "siemens-ct-eval-2026",
"status": "queued",
}
}
},
},
422: {"description": "请求参数校验失败docs_path 等必填字段缺失或格式错误)。"},
},
)
def submit_pipeline_job(request: PipelineJobRequest) -> PipelineJobResponse:
"""提交一个「解析文档 → 生成题库 → RAGAS 评估 → 输出报告」全链路任务。
任务在后台线程中异步执行,立即返回 `job_id`。
通过 `GET /api/pipeline/jobs/{job_id}` 轮询 `status` / `phase` / `logs`。
**Pipeline 执行阶段**
1. `parsing_documents` — 调用阿里云 DocMind 解析每份 PDF
2. `generating_questions` — LLM 从文档片段生成草稿题库
3. `evaluating` — RAGAS 在线评测打分answer_model 答题 + judge_model 评分)
4. `done` — 所有产物写入磁盘,`status` 变为 `completed`
"""
logger.info(
"[submit_pipeline] docs_path=%s job_name=%r gen_model=%s judge=%s max_docs=%s",
request.docs_path, request.job_name, request.generation_model,
request.judge_model, request.max_documents,
)
task = pipeline_task_manager.submit(request)
logger.info("[submit_pipeline] queued job_id=%s job_name=%s", task.job_id, task.job_name)
return PipelineJobResponse(
job_id=task.job_id,
job_name=task.job_name,
status=task.status,
)
@router.get(
"/jobs/{job_id}",
response_model=PipelineJobStatus,
summary="查询任务状态",
responses={
200: {"description": "返回任务当前状态、执行阶段、日志及完成后的产物路径。"},
404: {"description": "指定 job_id 的任务不存在。"},
},
)
def get_pipeline_job(job_id: str) -> PipelineJobStatus:
"""查询一个 Pipeline 任务的当前状态、执行阶段、实时日志和结果。
**轮询建议**:每 35 秒查询一次,直到 `status` 为 `completed` 或 `failed`。
`result` 字段在任务完成后填充,包含:
- `scores_csv` — 每道题目逐项评分
- `summary_md` — 评估摘要 Markdown
- `dataset_csv` — 生成的题库 CSV
- `source_chunks_jsonl` — 文档片段索引
"""
status = pipeline_task_manager.get(job_id)
if status is None:
logger.warning("[get_pipeline_job] not found job_id=%s", job_id)
raise HTTPException(status_code=404, detail=f"Pipeline job not found: {job_id}")
logger.debug("[get_pipeline_job] job_id=%s status=%s phase=%s", job_id, status.status, status.phase)
return status
@router.get(
"/jobs",
response_model=dict,
summary="列出所有任务",
responses={
200: {
"description": "按创建时间倒序返回本次服务器会话中所有的 Pipeline 任务。",
"content": {
"application/json": {
"example": {
"jobs": [
{
"job_id": "a1b2c3d4e5f6",
"job_name": "siemens-ct-eval",
"status": "completed",
"phase": "done",
"logs": ["[build] 17 documents parsed", "..."],
"result": {
"total_questions": 19,
"eval_run_id": "2026-06-18T...",
"scores_csv": "outputs/pipeline/.../scores.csv",
"summary_md": "outputs/pipeline/.../summary.md",
},
"error": None,
}
]
}
}
},
}
},
)
def list_pipeline_jobs() -> dict:
"""返回本次服务器会话中所有已提交的 Pipeline 任务,按创建时间倒序排列。"""
jobs = pipeline_task_manager.list_jobs()
logger.info("[list_pipeline_jobs] count=%d", len(jobs))
return {"jobs": [s.model_dump() for s in jobs]}