Files
crewai/main.py
2026-03-13 21:09:44 +08:00

232 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SDLC Agent Demo - FastAPI 主服务(轮询版本)
多智能体端到端软件交付协同系统
"""
import json
import uuid
import asyncio
import threading
from typing import Dict, Optional, AsyncGenerator
from datetime import datetime
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, RedirectResponse, JSONResponse
from pydantic import BaseModel, Field
import uvicorn
from concurrent.futures import ThreadPoolExecutor
from crews.sdlc_crew import SDLCCrew
from models.qwen_config import get_qwen_config
# ========== FastAPI 应用初始化 ==========
app = FastAPI(
title="SDLC Agent Demo",
description="多智能体端到端软件交付协同系统 - 基于 CrewAI + Qwen3.5-flash",
version="1.0.0"
)
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory="static"), name="static")
# ========== 数据模型 ==========
class StartRequest(BaseModel):
"""启动请求模型"""
requirement: str = Field(..., description="用户需求描述", min_length=10)
# ========== 任务管理(内存存储) ==========
class TaskManager:
"""任务管理器 - 负责任务状态持久化"""
def __init__(self):
self.tasks: Dict[str, Dict] = {}
self.task_events: Dict[str, list] = {} # 存储任务的所有事件
def create_task(self, requirement: str) -> str:
"""创建新任务"""
task_id = str(uuid.uuid4())
self.tasks[task_id] = {
"task_id": task_id,
"requirement": requirement,
"status": "pending",
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
# 创建事件列表用于存储所有事件
self.task_events[task_id] = []
return task_id
def update_task_status(self, task_id: str, status: str):
"""更新任务状态"""
if task_id in self.tasks:
self.tasks[task_id]["status"] = status
self.tasks[task_id]["updated_at"] = datetime.now().isoformat()
def add_event(self, task_id: str, event: dict):
"""添加事件到任务"""
if task_id in self.task_events:
self.task_events[task_id].append(event)
def get_events(self, task_id: str, last_index: int = 0) -> Dict:
"""获取任务的新事件"""
if task_id not in self.task_events:
return {"events": [], "has_more": False}
events = self.task_events[task_id]
new_events = events[last_index:]
task = self.tasks.get(task_id, {})
has_more = task.get("status") not in ["completed", "failed", "pending"]
return {
"events": new_events,
"has_more": has_more,
"status": task.get("status", "unknown")
}
def get_task(self, task_id: str) -> Optional[Dict]:
"""获取任务信息"""
return self.tasks.get(task_id)
# 全局任务管理器
task_manager = TaskManager()
# 线程池
executor = ThreadPoolExecutor(max_workers=5)
# ========== API 端点 ==========
@app.post("/api/v1/sdlc/start", response_model=Dict[str, str])
async def start_sdlc_process(request: StartRequest):
"""
启动 SDLC 流程(使用线程池异步执行)
"""
# 验证配置
try:
get_qwen_config()
except ValueError as e:
raise HTTPException(status_code=500, detail=str(e))
# 创建任务
task_id = task_manager.create_task(request.requirement)
# 在线程池中异步执行 SDLC 流程
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, execute_sdlc_flow, task_id, request.requirement)
return {
"task_id": task_id,
"status": "processing"
}
def execute_sdlc_flow(task_id: str, requirement: str):
"""
在线程池中执行 SDLC 流程,将事件保存到任务列表
Args:
task_id: 任务 ID
requirement: 用户需求
"""
task_manager.update_task_status(task_id, "processing")
try:
# 添加启动事件
task_manager.add_event(task_id, {
"event": "task_started",
"data": {
"status": "starting",
"message": "SDLC 流程已启动",
"timestamp": datetime.now().isoformat()
}
})
# 直接执行 CrewAI (同步阻塞)
crew = SDLCCrew()
for event in crew.execute(requirement):
# 添加事件到任务
task_manager.add_event(task_id, event)
# 更新状态
event_type = event.get("event", "")
if event_type == "final_result":
task_manager.update_task_status(task_id, "completed")
elif event_type == "error":
task_manager.update_task_status(task_id, "failed")
except Exception as e:
task_manager.update_task_status(task_id, "failed")
task_manager.add_event(task_id, {
"event": "error",
"data": {
"error": str(e),
"timestamp": datetime.now().isoformat()
}
})
@app.get("/api/v1/sdlc/poll/{task_id}")
async def poll_task_events(task_id: str, last_index: int = 0):
"""
轮询任务事件(替代 SSE
Args:
task_id: 任务 ID
last_index: 最后已知的 event 索引
"""
result = task_manager.get_events(task_id, last_index)
return result
@app.get("/api/v1/sdlc/status/{task_id}")
def get_task_status(task_id: str):
"""
获取任务状态(非流式)
"""
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return {
"task_id": task["task_id"],
"status": task["status"],
"created_at": task["created_at"],
"updated_at": task["updated_at"]
}
@app.get("/")
def root():
"""根路径重定向到测试页面"""
return RedirectResponse(url="/static/index.html")
@app.get("/health")
def health_check():
"""健康检查端点"""
return {"status": "healthy", "version": "1.0.0"}
# ========== 主程序入口 ==========
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8080,
reload=False,
log_level="info"
)