Files
siemens_ragas/webapp/server.py

174 lines
7.4 KiB
Python
Raw Permalink Normal View History

"""FastAPI application factory for the RAGAS evaluation console.
The app mounts three JSON API routers and serves the single-page static
frontend. It imports rag_eval only lazily (inside the task manager worker), so
the server starts even when the evaluation dependencies are not yet installed.
"""
from __future__ import annotations
import logging
import time
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from webapp.api import evaluations, llm_profiles, pipeline, runs, scenarios, score, score_jobs, session_score_jobs
STATIC_DIR = Path(__file__).resolve().parent / "static"
logger = logging.getLogger("webapp.server")
access_logger = logging.getLogger("webapp.access")
# OpenAPI tag metadata — controls the grouping and descriptions in /docs.
OPENAPI_TAGS = [
{
"name": "pipeline",
"description": (
"**全链路评估 Pipeline API**\n\n"
"一次调用完成「解析文档 → 生成题库 → RAGAS 评估 → 输出报告」全流程。\n\n"
"**使用流程**\n"
"1. `POST /api/pipeline/jobs` 提交任务,立即拿到 `job_id`。\n"
"2. `GET /api/pipeline/jobs/{job_id}` 轮询 `status` / `phase` / `logs`。\n"
"3. 当 `status=completed` 时,`result` 字段包含所有产物路径。\n\n"
"**Pipeline 阶段**\n"
"| phase | 说明 |\n"
"|-------|------|\n"
"| `parsing_documents` | 调用阿里云 DocMind 解析每份 PDF |\n"
"| `generating_questions` | LLM 从文档片段生成草稿题库 |\n"
"| `evaluating` | RAGAS 在线评测打分 |\n"
"| `done` | 所有产物写入磁盘,任务完成 |"
),
},
{
"name": "evaluations",
"description": (
"**单场景评估 API**\n\n"
"基于已有 YAML 场景文件触发评估任务,并查询任务状态与日志。"
),
},
{
"name": "llm-profiles",
"description": (
"**LLM 配置管理 API**\n\n"
"增删改查已保存的 LLM 连接配置模型名称、Base URL、API Key"
"支持连通性测试;可将配置一键写入场景 YAML 文件。"
),
},
{
"name": "runs",
"description": "**评估运行列表 API**\n\n查询历史评估运行记录及详细报告数据。",
},
{
"name": "scenarios",
"description": "**场景文件 API**\n\n扫描并列出 `scenarios/` 目录下所有可用的 YAML 场景文件。",
},
{
"name": "score",
"description": (
"**实时评分 API同步** — `POST /api/score`\n\n"
"**异步评分 APIDify 推荐)** — `POST /api/score/async`\n\n"
"异步方式立即返回 job_id202评分在后台执行完成后自动生成完整报告含优化建议"
"在「运行列表」页查看。\n\n"
"**Session 批量评分 API** — `POST /api/score/session_async`\n\n"
"适合 Dify 循环节点批量评估:同一 `session_id` 的多次调用合并为一个报告,"
2026-06-27 14:31:45 +08:00
"每次调用新增一个样本行,指标均值和优化建议增量更新。\n\n"
"**Session 模式调用流程**\n"
"1. `POST /api/score/session_async` 提交一条问答评分请求。\n"
"2. 用 `GET /api/score/session/jobs/{job_id}` 轮询单次调用状态。\n"
"3. 用 `GET /api/score/sessions/{session_id}` 查看 session 聚合状态。\n"
"4. 用 `GET /api/runs/{run_id}` 或在「运行列表」中查看完整报告。\n\n"
"通过 `GET /api/score/jobs` 列出所有异步评分记录,"
"`GET /api/score/jobs/{job_id}` 查询单个任务状态。\n\n"
"**鉴权**:若 `.env` 中配置了 `SCORE_API_TOKEN`,需携带 "
"`Authorization: Bearer <token>` 请求头。"
),
},
{
"name": "meta",
"description": "**系统 API**\n\n健康检查等基础接口。",
},
]
def create_app() -> FastAPI:
"""Build and configure the FastAPI application instance."""
app = FastAPI(
title="Siemens RAGAS 评估平台",
description=(
"西门子医疗影像 RAG 评估平台 API 文档。\n\n"
"提供以下能力:\n"
"- **Pipeline API** — 一键完成「解析文档 → 生成题库 → RAGAS 评估」全链路\n"
"- **实时评分 API** — 供 Dify 外部 Tool 调用的单题 RAGAS 评分接口\n"
"- **评估 API** — 基于 YAML 场景文件触发单次评估\n"
"- **LLM 配置 API** — 管理多个 LLM 连接配置,支持连通性测试\n"
"- **报告 API** — 查询历史运行记录与评估报告\n\n"
"> **快速开始**:调用 `POST /api/pipeline/jobs` 传入 PDF 文件夹路径即可启动完整评估流程。"
),
version="0.3.0",
openapi_tags=OPENAPI_TAGS,
)
app.include_router(runs.router)
app.include_router(scenarios.router)
app.include_router(evaluations.router)
app.include_router(llm_profiles.router)
app.include_router(pipeline.router)
app.include_router(score.router)
app.include_router(score_jobs.router)
app.include_router(session_score_jobs.router)
@app.middleware("http")
async def access_log_middleware(request: Request, call_next):
"""Log every API request with method, path, status code and latency.
Static file requests are logged at DEBUG level to keep the console clean.
"""
t0 = time.monotonic()
response = await call_next(request)
latency_ms = int((time.monotonic() - t0) * 1000)
path = request.url.path
is_static = path.startswith("/static/") or path in ("/", "/favicon.ico")
msg = "%s %s%d (%dms)", request.method, path, response.status_code, latency_ms
if is_static:
access_logger.debug(*msg)
else:
access_logger.info(*msg)
return response
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
"""Log full validation error detail to help diagnose 422 responses."""
errors = jsonable_encoder(exc.errors())
logger.warning(
"[422] validation error url=%s content_type=%s errors=%s",
request.url.path,
request.headers.get("content-type", ""),
errors,
)
return JSONResponse(
status_code=422,
content={"detail": errors},
)
@app.get("/api/health", tags=["meta"])
def health() -> dict[str, str]:
"""Report basic liveness so the UI can confirm the server is reachable."""
return {"status": "ok"}
@app.get("/", include_in_schema=False)
def index() -> FileResponse:
"""Serve the single-page console entry document."""
return FileResponse(STATIC_DIR / "index.html")
# Serve CSS/JS assets under /static while keeping API routes at /api.
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
return app
app = create_app()