This commit is contained in:
ZhuJW
2026-03-13 20:53:44 +08:00
parent da6abea48b
commit 8584821f36
6 changed files with 223 additions and 112 deletions

View File

@@ -22,7 +22,6 @@ def create_dev_agent() -> Agent:
1. 快速理解需求并设计合理的系统架构
2. 编写清晰、可维护、高性能的代码
3. 遵循 TDD测试驱动开发实践
4. 遵循博世研发规范中的编码标准
你的代码实现必须通过所有测试用例,并满足性能和安全要求。""",
verbose=True,

View File

@@ -22,7 +22,6 @@ def create_pm_agent() -> Agent:
1. 快速理解用户业务场景和核心痛点
2. 识别功能性需求和非功能性需求
3. 定义清晰的验收标准 (Acceptance Criteria)
4. 遵循博世研发规范,确保需求的可追溯性和可验证性
你的输出将作为测试和开发团队的输入,务必保证准确性和完整性。""",
verbose=True,
@@ -75,7 +74,6 @@ def create_pm_task(requirement: str) -> Task:
## 5. 约束条件
- 5.1 技术约束
- 5.2 业务约束
- 5.3 合规约束(博世研发规范)
【注意事项】
- 使用清晰、无歧义的语言

View File

@@ -22,7 +22,6 @@ def create_qa_agent() -> Agent:
1. 基于需求文档设计全面的测试策略
2. 编写高质量的 Pytest 自动化测试脚本
3. 覆盖单元测试、集成测试、端到端测试
4. 遵循博世研发规范中的测试要求
你的测试用例将作为开发实现的验证标准,务必保证覆盖率和可执行性。""",
verbose=True,

90
main.py
View File

@@ -6,6 +6,8 @@ SDLC Agent Demo - FastAPI 主服务(异步版本)
import json
import uuid
import asyncio
import threading
import queue
from typing import Dict, Optional, AsyncGenerator
from datetime import datetime
from fastapi import FastAPI, HTTPException
@@ -50,7 +52,7 @@ class TaskManager:
def __init__(self):
self.tasks: Dict[str, Dict] = {}
self.task_queues: Dict[str, asyncio.Queue] = {}
self.task_events: Dict[str, list] = {} # 存储任务的所有事件
def create_task(self, requirement: str) -> str:
"""创建新任务"""
@@ -62,8 +64,8 @@ class TaskManager:
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
# 创建异步队列用于 SSE 推送
self.task_queues[task_id] = asyncio.Queue()
# 创建事件列表用于存储所有事件
self.task_events[task_id] = []
return task_id
def update_task_status(self, task_id: str, status: str):
@@ -72,22 +74,27 @@ class TaskManager:
self.tasks[task_id]["status"] = status
self.tasks[task_id]["updated_at"] = datetime.now().isoformat()
async def send_event(self, task_id: str, event: dict):
"""发送事件到队列"""
if task_id in self.task_queues:
await self.task_queues[task_id].put(event)
def add_event(self, task_id: str, event: dict):
"""添加事件到任务"""
if task_id in self.task_events:
self.task_events[task_id].append(event)
async def get_event(self, task_id: str, timeout: float = 60.0) -> Optional[dict]:
"""从队列获取事件"""
if task_id in self.task_queues:
try:
return await asyncio.wait_for(
self.task_queues[task_id].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return None
return None
def get_events(self, task_id: str, last_index: int = 0) -> Dict:
"""获取任务的新事件"""
if task_id not in self.task_events:
return {"events": [], "has_more": False}
events = self.task_events[task_id]
new_events = events[last_index:]
task = self.tasks.get(task_id, {})
has_more = task.get("status") not in ["completed", "failed", "pending"]
return {
"events": new_events,
"has_more": has_more,
"status": task.get("status", "unknown")
}
def get_task(self, task_id: str) -> Optional[Dict]:
"""获取任务信息"""
@@ -102,7 +109,7 @@ task_manager = TaskManager()
@app.post("/api/v1/sdlc/start", response_model=Dict[str, str])
async def start_sdlc_process(request: StartRequest):
"""
启动 SDLC 流程(异步执行)
启动 SDLC 流程(使用线程池异步执行)
"""
# 验证配置
try:
@@ -113,8 +120,9 @@ async def start_sdlc_process(request: StartRequest):
# 创建任务
task_id = task_manager.create_task(request.requirement)
# 异步执行 SDLC 流程
asyncio.create_task(execute_sdlc_flow(task_id, request.requirement))
# 在线程池中异步执行 SDLC 流程
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, execute_sdlc_flow, task_id, request.requirement)
return {
"task_id": task_id,
@@ -122,9 +130,16 @@ async def start_sdlc_process(request: StartRequest):
}
async def execute_sdlc_flow(task_id: str, requirement: str):
import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 线程池
executor = ThreadPoolExecutor(max_workers=5)
def execute_sdlc_flow(task_id: str, requirement: str):
"""
异步执行 SDLC 流程(使用 asyncio.to_thread 运行同步生成器)
在线程池中执行 SDLC 流程,将事件保存到任务列表
Args:
task_id: 任务 ID
@@ -133,8 +148,8 @@ async def execute_sdlc_flow(task_id: str, requirement: str):
task_manager.update_task_status(task_id, "processing")
try:
# 先发送一个任务启动事件
await task_manager.send_event(task_id, {
# 添加启动事件
task_manager.add_event(task_id, {
"event": "task_started",
"data": {
"status": "starting",
@@ -143,13 +158,13 @@ async def execute_sdlc_flow(task_id: str, requirement: str):
}
})
# 在线程池中执行同步生成器
# 直接执行 CrewAI (同步阻塞)
crew = SDLCCrew()
for event in crew.execute(requirement):
# 发送事件到队列
await task_manager.send_event(task_id, event)
# 添加事件到任务
task_manager.add_event(task_id, event)
# 如果是最终结果或错误,更新状态
# 更新状态
event_type = event.get("event", "")
if event_type == "final_result":
task_manager.update_task_status(task_id, "completed")
@@ -158,7 +173,7 @@ async def execute_sdlc_flow(task_id: str, requirement: str):
except Exception as e:
task_manager.update_task_status(task_id, "failed")
await task_manager.send_event(task_id, {
task_manager.add_event(task_id, {
"event": "error",
"data": {
"error": str(e),
@@ -167,6 +182,19 @@ async def execute_sdlc_flow(task_id: str, requirement: str):
})
@app.get("/api/v1/sdlc/poll/{task_id}")
async def poll_task_events(task_id: str, last_index: int = 0):
"""
轮询任务事件(替代 SSE
Args:
task_id: 任务 ID
last_index: 最后已知的 event 索引
"""
result = task_manager.get_events(task_id, last_index)
return result
@app.get("/api/v1/sdlc/stream/{task_id}")
async def stream_task_progress(task_id: str):
"""
@@ -316,7 +344,7 @@ if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
port=8080,
reload=False,
log_level="info"
)

View File

@@ -399,8 +399,8 @@
// 初始化阶段
this.initStages();
// 连接 SSE
this.connectSSE(data.task_id);
// 开始轮询任务事件
this.connectPolling(data.task_id);
} catch (error) {
console.error('启动失败:', error);
@@ -423,88 +423,118 @@
},
/**
* 连接 SSE
* 轮询任务事件(替代 SSE
*/
connectSSE(taskId) {
const url = `/api/v1/sdlc/stream/${taskId}`;
connectPolling(taskId) {
this.connectionStatus = 'connecting';
this.addLog('system', 'SSE', `连接到${url}`);
this.addLog('system', 'POLL', `开始轮询任务${taskId}`);
this.eventSource = new EventSource(url);
let lastIndex = 0;
let pollCount = 0;
const maxPolls = 600; // 最多轮询 600 次 (10 分钟)
// 连接成功
this.eventSource.onopen = () => {
this.connectionStatus = 'connected';
this.addLog('system', 'SSE', '连接成功');
const poll = () => {
if (pollCount >= maxPolls) {
this.addLog('system', 'POLL', '轮询超时');
this.isProcessing = false;
this.connectionStatus = 'disconnected';
return;
}
fetch(`/api/v1/sdlc/poll/${taskId}?last_index=${lastIndex}`)
.then(res => res.json())
.then(data => {
const { events, has_more, status } = data;
// 处理新事件
events.forEach(event => {
lastIndex++;
this.handleEvent(event);
});
// 检查是否继续轮询
if (status === 'completed' || status === 'failed') {
this.isProcessing = false;
this.connectionStatus = 'disconnected';
this.addLog('system', 'POLL', `任务完成,状态:${status}`);
return;
}
if (has_more || events.length > 0) {
pollCount++;
setTimeout(poll, 500); // 每 500ms 轮询一次
} else if (status === 'processing') {
pollCount++;
setTimeout(poll, 1000); // 无新事件时 1 秒后再试
}
})
.catch(err => {
console.error('轮询失败:', err);
this.addLog('error', 'POLL', err.message);
pollCount++;
setTimeout(poll, 2000);
});
};
// PM 阶段
this.eventSource.addEventListener('pm_start', (event) => {
const data = JSON.parse(event.data);
this.updateStageStatus('pm', 'processing');
this.addLog('pm_start', 'PM Agent', '开始需求分析...');
});
// 开始轮询
setTimeout(poll, 500);
},
this.eventSource.addEventListener('pm_complete', (event) => {
const data = JSON.parse(event.data);
this.updateStageStatus('pm', 'completed');
this.addLog('pm_complete', 'PM Agent', '需求分析完成');
this.addResult('📋 软件需求规格说明书 (SRS)', data.content, data.timestamp);
});
/**
* 处理单个事件
*/
handleEvent(event) {
const eventType = event.event;
const data = event.data;
// QA 阶段
this.eventSource.addEventListener('qa_start', (event) => {
const data = JSON.parse(event.data);
this.updateStageStatus('qa', 'processing');
this.addLog('qa_start', 'QA Agent', '开始测试用例设计...');
});
switch(eventType) {
case 'task_started':
this.addLog('task_started', 'System', data.message || '任务已启动');
break;
this.eventSource.addEventListener('qa_complete', (event) => {
const data = JSON.parse(event.data);
this.updateStageStatus('qa', 'completed');
this.addLog('qa_complete', 'QA Agent', '测试用例设计完成');
this.addResult('🧪 测试方案与用例', data.content, data.timestamp);
});
case 'pm_start':
this.updateStageStatus('pm', 'processing');
this.addLog('pm_start', 'PM Agent', '开始需求分析...');
break;
// Dev 阶段
this.eventSource.addEventListener('dev_start', (event) => {
const data = JSON.parse(event.data);
this.updateStageStatus('dev', 'processing');
this.addLog('dev_start', 'Dev Agent', '开始代码实现...');
});
case 'pm_complete':
this.updateStageStatus('pm', 'completed');
this.addLog('pm_complete', 'PM Agent', '需求分析完成');
this.addResult('📋 软件需求规格说明书 (SRS)', data.content, data.timestamp);
break;
this.eventSource.addEventListener('dev_complete', (event) => {
const data = JSON.parse(event.data);
this.updateStageStatus('dev', 'completed');
this.addLog('dev_complete', 'Dev Agent', '代码实现完成');
this.addResult('💻 代码实现', data.content, data.timestamp);
});
case 'qa_start':
this.updateStageStatus('qa', 'processing');
this.addLog('qa_start', 'QA Agent', '开始测试用例设计...');
break;
// 最终结果
this.eventSource.addEventListener('final_result', (event) => {
const data = JSON.parse(event.data);
this.updateStageStatus('final', 'completed');
this.addLog('final_result', 'System', 'SDLC 流程完成');
this.isProcessing = false;
this.connectionStatus = 'disconnected';
});
case 'qa_complete':
this.updateStageStatus('qa', 'completed');
this.addLog('qa_complete', 'QA Agent', '测试用例设计完成');
this.addResult('🧪 测试方案与用例', data.content, data.timestamp);
break;
// 错误处理
this.eventSource.addEventListener('error', (event) => {
const data = JSON.parse(event.data);
this.addLog('error', 'Error', data.error || '未知错误');
this.isProcessing = false;
this.connectionStatus = 'disconnected';
alert(`执行错误:${data.error}`);
});
case 'dev_start':
this.updateStageStatus('dev', 'processing');
this.addLog('dev_start', 'Dev Agent', '开始代码实现...');
break;
// 连接错误
this.eventSource.onerror = () => {
this.addLog('system', 'SSE', '连接断开');
this.connectionStatus = 'disconnected';
this.eventSource.close();
};
case 'dev_complete':
this.updateStageStatus('dev', 'completed');
this.addLog('dev_complete', 'Dev Agent', '代码实现完成');
this.addResult('💻 代码实现', data.content, data.timestamp);
break;
case 'final_result':
this.updateStageStatus('final', 'completed');
this.addLog('final_result', 'System', 'SDLC 流程完成');
break;
case 'error':
this.addLog('error', 'Error', data.error || '未知错误');
alert(`执行错误:${data.error}`);
break;
}
},
/**

57
test_sse.py Normal file
View File

@@ -0,0 +1,57 @@
"""
SSE 测试脚本 - 验证事件流是否正确发送
"""
import requests
import json
import time
import sys
# 设置控制台输出编码
sys.stdout.reconfigure(encoding='utf-8')
# 启动任务
response = requests.post(
'http://localhost:8080/api/v1/sdlc/start',
json={'requirement': '测试需求:做一个简单的计算器'},
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
print(f"Start failed: {response.status_code} - {response.text}")
exit(1)
task_data = response.json()
task_id = task_data['task_id']
print(f"Task started: {task_id}")
print("=" * 60)
# 连接 SSE
sse_url = f'http://localhost:8080/api/v1/sdlc/stream/{task_id}'
print(f"Connecting SSE: {sse_url}")
print("=" * 60)
with requests.get(sse_url, stream=True, timeout=300) as sse_response:
for line in sse_response.iter_lines():
if line:
line_str = line.decode('utf-8')
print(line_str)
# 解析事件
if line_str.startswith('event:'):
event_type = line_str[6:].strip()
print(f" -> Event type: {event_type}")
if line_str.startswith('data:'):
try:
data = json.loads(line_str[5:].strip())
print(f" -> Data: {json.dumps(data, indent=2, ensure_ascii=False)}")
except:
pass
# 如果是结束事件,退出
if 'event: final_result' in line_str or 'event: error' in line_str:
print("=" * 60)
print("Task completed!")
break
print("Test completed")