Files
siemens_ragas/webapp/api/score_jobs.py

90 lines
3.2 KiB
Python
Raw Permalink Normal View History

"""Routes for async RAGAS scoring jobs (Dify fire-and-forget integration).
Dify calls POST /api/score/async gets job_id immediately (202).
Scoring runs in background, result written as a standard run artifact.
View full report at GET /api/runs/{run_id} or in the 运行列表 page.
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException
from webapp.models import AsyncScoreJobResponse, AsyncScoreJobStatus, ScoreRequest
from webapp.services.score_job_manager import score_job_manager
router = APIRouter(prefix="/api/score", tags=["score"])
logger = logging.getLogger("webapp.api.score_jobs")
@router.post(
"/async",
status_code=202,
response_model=AsyncScoreJobResponse,
summary="提交异步评分任务Dify 推荐方式)",
responses={
202: {
"description": (
"任务已排队,立即返回 job_id202 Accepted\n\n"
"评分在后台执行,完成后自动生成完整报告(含优化建议)。\n"
"通过 `GET /api/score/jobs/{job_id}` 查询状态,"
"完成后在「运行列表」页查看完整报告。"
),
"content": {
"application/json": {
"example": {"job_id": "abc123def456", "status": "queued", "run_id": None}
}
},
},
},
)
def submit_async_score(request: ScoreRequest) -> AsyncScoreJobResponse:
"""提交异步 RAGAS 评分任务,立即返回 job_id。
**适合 Dify 工作流**HTTP 节点无需等待评分完成无超时风险
工作流立即继续评分结果在 RAGAS 平台运行列表中查看
评分完成后自动生成
- 各指标得分`scores.csv`
- 摘要报告`summary.md`
- LLM 优化建议`optimization_advice.md`
"""
logger.info(
"[score_async] submit metrics=%s has_ctx=%s has_gt=%s",
request.metrics, bool(request.contexts), bool(request.ground_truth),
)
status = score_job_manager.submit(request)
logger.info("[score_async] queued job_id=%s", status.job_id)
return AsyncScoreJobResponse(job_id=status.job_id, status=status.status)
@router.get(
"/jobs",
response_model=dict,
summary="列出所有异步评分记录",
)
def list_score_jobs() -> dict:
"""返回所有异步评分记录,按创建时间倒序排列。"""
jobs = score_job_manager.list_jobs()
logger.info("[score_jobs] list count=%d", len(jobs))
return {"jobs": [j.model_dump() for j in jobs]}
@router.get(
"/jobs/{job_id}",
response_model=AsyncScoreJobStatus,
summary="查询单个异步评分任务状态",
responses={404: {"description": "指定 job_id 的评分任务不存在。"}},
)
def get_score_job(job_id: str) -> AsyncScoreJobStatus:
"""查询单个异步评分任务的状态和结果。
`status` `completed` `run_id` 字段包含对应的运行 ID
可通过 `GET /api/runs/{run_id}` 获取完整评分报告
"""
status = score_job_manager.get(job_id)
if status is None:
raise HTTPException(status_code=404, detail=f"Score job not found: {job_id}")
return status