diff --git a/agents/dev_agent.py b/agents/dev_agent.py index 1eea06b..083c421 100644 --- a/agents/dev_agent.py +++ b/agents/dev_agent.py @@ -22,7 +22,6 @@ def create_dev_agent() -> Agent: 1. 快速理解需求并设计合理的系统架构 2. 编写清晰、可维护、高性能的代码 3. 遵循 TDD(测试驱动开发)实践 - 4. 遵循博世研发规范中的编码标准 你的代码实现必须通过所有测试用例,并满足性能和安全要求。""", verbose=True, diff --git a/agents/pm_agent.py b/agents/pm_agent.py index 38e8e27..be4bed5 100644 --- a/agents/pm_agent.py +++ b/agents/pm_agent.py @@ -22,7 +22,6 @@ def create_pm_agent() -> Agent: 1. 快速理解用户业务场景和核心痛点 2. 识别功能性需求和非功能性需求 3. 定义清晰的验收标准 (Acceptance Criteria) - 4. 遵循博世研发规范,确保需求的可追溯性和可验证性 你的输出将作为测试和开发团队的输入,务必保证准确性和完整性。""", verbose=True, @@ -75,7 +74,6 @@ def create_pm_task(requirement: str) -> Task: ## 5. 约束条件 - 5.1 技术约束 - 5.2 业务约束 - - 5.3 合规约束(博世研发规范) 【注意事项】 - 使用清晰、无歧义的语言 diff --git a/agents/qa_agent.py b/agents/qa_agent.py index ed51e51..8f9a144 100644 --- a/agents/qa_agent.py +++ b/agents/qa_agent.py @@ -22,7 +22,6 @@ def create_qa_agent() -> Agent: 1. 基于需求文档设计全面的测试策略 2. 编写高质量的 Pytest 自动化测试脚本 3. 覆盖单元测试、集成测试、端到端测试 - 4. 遵循博世研发规范中的测试要求 你的测试用例将作为开发实现的验证标准,务必保证覆盖率和可执行性。""", verbose=True, diff --git a/main.py b/main.py index 499638e..d6604c8 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,8 @@ SDLC Agent Demo - FastAPI 主服务(异步版本) import json import uuid import asyncio +import threading +import queue from typing import Dict, Optional, AsyncGenerator from datetime import datetime from fastapi import FastAPI, HTTPException @@ -50,7 +52,7 @@ class TaskManager: def __init__(self): self.tasks: Dict[str, Dict] = {} - self.task_queues: Dict[str, asyncio.Queue] = {} + self.task_events: Dict[str, list] = {} # 存储任务的所有事件 def create_task(self, requirement: str) -> str: """创建新任务""" @@ -62,8 +64,8 @@ class TaskManager: "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat() } - # 创建异步队列用于 SSE 推送 - self.task_queues[task_id] = asyncio.Queue() + # 创建事件列表用于存储所有事件 + self.task_events[task_id] = [] return task_id def update_task_status(self, task_id: str, status: str): @@ -72,22 +74,27 @@ class TaskManager: self.tasks[task_id]["status"] = status self.tasks[task_id]["updated_at"] = datetime.now().isoformat() - async def send_event(self, task_id: str, event: dict): - """发送事件到队列""" - if task_id in self.task_queues: - await self.task_queues[task_id].put(event) + def add_event(self, task_id: str, event: dict): + """添加事件到任务""" + if task_id in self.task_events: + self.task_events[task_id].append(event) - async def get_event(self, task_id: str, timeout: float = 60.0) -> Optional[dict]: - """从队列获取事件""" - if task_id in self.task_queues: - try: - return await asyncio.wait_for( - self.task_queues[task_id].get(), - timeout=timeout - ) - except asyncio.TimeoutError: - return None - return None + def get_events(self, task_id: str, last_index: int = 0) -> Dict: + """获取任务的新事件""" + if task_id not in self.task_events: + return {"events": [], "has_more": False} + + events = self.task_events[task_id] + new_events = events[last_index:] + + task = self.tasks.get(task_id, {}) + has_more = task.get("status") not in ["completed", "failed", "pending"] + + return { + "events": new_events, + "has_more": has_more, + "status": task.get("status", "unknown") + } def get_task(self, task_id: str) -> Optional[Dict]: """获取任务信息""" @@ -102,7 +109,7 @@ task_manager = TaskManager() @app.post("/api/v1/sdlc/start", response_model=Dict[str, str]) async def start_sdlc_process(request: StartRequest): """ - 启动 SDLC 流程(异步执行) + 启动 SDLC 流程(使用线程池异步执行) """ # 验证配置 try: @@ -113,8 +120,9 @@ async def start_sdlc_process(request: StartRequest): # 创建任务 task_id = task_manager.create_task(request.requirement) - # 异步执行 SDLC 流程 - asyncio.create_task(execute_sdlc_flow(task_id, request.requirement)) + # 在线程池中异步执行 SDLC 流程 + loop = asyncio.get_event_loop() + loop.run_in_executor(executor, execute_sdlc_flow, task_id, request.requirement) return { "task_id": task_id, @@ -122,9 +130,16 @@ async def start_sdlc_process(request: StartRequest): } -async def execute_sdlc_flow(task_id: str, requirement: str): +import threading +import asyncio +from concurrent.futures import ThreadPoolExecutor + +# 线程池 +executor = ThreadPoolExecutor(max_workers=5) + +def execute_sdlc_flow(task_id: str, requirement: str): """ - 异步执行 SDLC 流程(使用 asyncio.to_thread 运行同步生成器) + 在线程池中执行 SDLC 流程,将事件保存到任务列表 Args: task_id: 任务 ID @@ -133,8 +148,8 @@ async def execute_sdlc_flow(task_id: str, requirement: str): task_manager.update_task_status(task_id, "processing") try: - # 先发送一个任务启动事件 - await task_manager.send_event(task_id, { + # 添加启动事件 + task_manager.add_event(task_id, { "event": "task_started", "data": { "status": "starting", @@ -143,13 +158,13 @@ async def execute_sdlc_flow(task_id: str, requirement: str): } }) - # 在线程池中执行同步生成器 + # 直接执行 CrewAI (同步阻塞) crew = SDLCCrew() for event in crew.execute(requirement): - # 发送事件到队列 - await task_manager.send_event(task_id, event) + # 添加事件到任务 + task_manager.add_event(task_id, event) - # 如果是最终结果或错误,更新状态 + # 更新状态 event_type = event.get("event", "") if event_type == "final_result": task_manager.update_task_status(task_id, "completed") @@ -158,7 +173,7 @@ async def execute_sdlc_flow(task_id: str, requirement: str): except Exception as e: task_manager.update_task_status(task_id, "failed") - await task_manager.send_event(task_id, { + task_manager.add_event(task_id, { "event": "error", "data": { "error": str(e), @@ -167,6 +182,19 @@ async def execute_sdlc_flow(task_id: str, requirement: str): }) +@app.get("/api/v1/sdlc/poll/{task_id}") +async def poll_task_events(task_id: str, last_index: int = 0): + """ + 轮询任务事件(替代 SSE) + + Args: + task_id: 任务 ID + last_index: 最后已知的 event 索引 + """ + result = task_manager.get_events(task_id, last_index) + return result + + @app.get("/api/v1/sdlc/stream/{task_id}") async def stream_task_progress(task_id: str): """ @@ -316,7 +344,7 @@ if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", - port=8000, + port=8080, reload=False, log_level="info" ) diff --git a/static/index.html b/static/index.html index 410c84a..21a5318 100644 --- a/static/index.html +++ b/static/index.html @@ -399,8 +399,8 @@ // 初始化阶段 this.initStages(); - // 连接 SSE - this.connectSSE(data.task_id); + // 开始轮询任务事件 + this.connectPolling(data.task_id); } catch (error) { console.error('启动失败:', error); @@ -423,88 +423,118 @@ }, /** - * 连接 SSE + * 轮询任务事件(替代 SSE) */ - connectSSE(taskId) { - const url = `/api/v1/sdlc/stream/${taskId}`; - + connectPolling(taskId) { this.connectionStatus = 'connecting'; - this.addLog('system', 'SSE', `连接到:${url}`); + this.addLog('system', 'POLL', `开始轮询任务:${taskId}`); - this.eventSource = new EventSource(url); + let lastIndex = 0; + let pollCount = 0; + const maxPolls = 600; // 最多轮询 600 次 (10 分钟) - // 连接成功 - this.eventSource.onopen = () => { - this.connectionStatus = 'connected'; - this.addLog('system', 'SSE', '连接成功'); + const poll = () => { + if (pollCount >= maxPolls) { + this.addLog('system', 'POLL', '轮询超时'); + this.isProcessing = false; + this.connectionStatus = 'disconnected'; + return; + } + + fetch(`/api/v1/sdlc/poll/${taskId}?last_index=${lastIndex}`) + .then(res => res.json()) + .then(data => { + const { events, has_more, status } = data; + + // 处理新事件 + events.forEach(event => { + lastIndex++; + this.handleEvent(event); + }); + + // 检查是否继续轮询 + if (status === 'completed' || status === 'failed') { + this.isProcessing = false; + this.connectionStatus = 'disconnected'; + this.addLog('system', 'POLL', `任务完成,状态:${status}`); + return; + } + + if (has_more || events.length > 0) { + pollCount++; + setTimeout(poll, 500); // 每 500ms 轮询一次 + } else if (status === 'processing') { + pollCount++; + setTimeout(poll, 1000); // 无新事件时 1 秒后再试 + } + }) + .catch(err => { + console.error('轮询失败:', err); + this.addLog('error', 'POLL', err.message); + pollCount++; + setTimeout(poll, 2000); + }); }; - // PM 阶段 - this.eventSource.addEventListener('pm_start', (event) => { - const data = JSON.parse(event.data); - this.updateStageStatus('pm', 'processing'); - this.addLog('pm_start', 'PM Agent', '开始需求分析...'); - }); + // 开始轮询 + setTimeout(poll, 500); + }, + + /** + * 处理单个事件 + */ + handleEvent(event) { + const eventType = event.event; + const data = event.data; - this.eventSource.addEventListener('pm_complete', (event) => { - const data = JSON.parse(event.data); - this.updateStageStatus('pm', 'completed'); - this.addLog('pm_complete', 'PM Agent', '需求分析完成'); - this.addResult('📋 软件需求规格说明书 (SRS)', data.content, data.timestamp); - }); - - // QA 阶段 - this.eventSource.addEventListener('qa_start', (event) => { - const data = JSON.parse(event.data); - this.updateStageStatus('qa', 'processing'); - this.addLog('qa_start', 'QA Agent', '开始测试用例设计...'); - }); - - this.eventSource.addEventListener('qa_complete', (event) => { - const data = JSON.parse(event.data); - this.updateStageStatus('qa', 'completed'); - this.addLog('qa_complete', 'QA Agent', '测试用例设计完成'); - this.addResult('🧪 测试方案与用例', data.content, data.timestamp); - }); - - // Dev 阶段 - this.eventSource.addEventListener('dev_start', (event) => { - const data = JSON.parse(event.data); - this.updateStageStatus('dev', 'processing'); - this.addLog('dev_start', 'Dev Agent', '开始代码实现...'); - }); - - this.eventSource.addEventListener('dev_complete', (event) => { - const data = JSON.parse(event.data); - this.updateStageStatus('dev', 'completed'); - this.addLog('dev_complete', 'Dev Agent', '代码实现完成'); - this.addResult('💻 代码实现', data.content, data.timestamp); - }); - - // 最终结果 - this.eventSource.addEventListener('final_result', (event) => { - const data = JSON.parse(event.data); - this.updateStageStatus('final', 'completed'); - this.addLog('final_result', 'System', 'SDLC 流程完成'); - this.isProcessing = false; - this.connectionStatus = 'disconnected'; - }); - - // 错误处理 - this.eventSource.addEventListener('error', (event) => { - const data = JSON.parse(event.data); - this.addLog('error', 'Error', data.error || '未知错误'); - this.isProcessing = false; - this.connectionStatus = 'disconnected'; - alert(`执行错误:${data.error}`); - }); - - // 连接错误 - this.eventSource.onerror = () => { - this.addLog('system', 'SSE', '连接断开'); - this.connectionStatus = 'disconnected'; - this.eventSource.close(); - }; + switch(eventType) { + case 'task_started': + this.addLog('task_started', 'System', data.message || '任务已启动'); + break; + + case 'pm_start': + this.updateStageStatus('pm', 'processing'); + this.addLog('pm_start', 'PM Agent', '开始需求分析...'); + break; + + case 'pm_complete': + this.updateStageStatus('pm', 'completed'); + this.addLog('pm_complete', 'PM Agent', '需求分析完成'); + this.addResult('📋 软件需求规格说明书 (SRS)', data.content, data.timestamp); + break; + + case 'qa_start': + this.updateStageStatus('qa', 'processing'); + this.addLog('qa_start', 'QA Agent', '开始测试用例设计...'); + break; + + case 'qa_complete': + this.updateStageStatus('qa', 'completed'); + this.addLog('qa_complete', 'QA Agent', '测试用例设计完成'); + this.addResult('🧪 测试方案与用例', data.content, data.timestamp); + break; + + case 'dev_start': + this.updateStageStatus('dev', 'processing'); + this.addLog('dev_start', 'Dev Agent', '开始代码实现...'); + break; + + case 'dev_complete': + this.updateStageStatus('dev', 'completed'); + this.addLog('dev_complete', 'Dev Agent', '代码实现完成'); + this.addResult('💻 代码实现', data.content, data.timestamp); + break; + + case 'final_result': + this.updateStageStatus('final', 'completed'); + this.addLog('final_result', 'System', 'SDLC 流程完成'); + break; + + case 'error': + this.addLog('error', 'Error', data.error || '未知错误'); + alert(`执行错误:${data.error}`); + break; + } }, /** diff --git a/test_sse.py b/test_sse.py new file mode 100644 index 0000000..e0becca --- /dev/null +++ b/test_sse.py @@ -0,0 +1,57 @@ +""" +SSE 测试脚本 - 验证事件流是否正确发送 +""" +import requests +import json +import time +import sys + +# 设置控制台输出编码 +sys.stdout.reconfigure(encoding='utf-8') + +# 启动任务 +response = requests.post( + 'http://localhost:8080/api/v1/sdlc/start', + json={'requirement': '测试需求:做一个简单的计算器'}, + headers={'Content-Type': 'application/json'} +) + +if response.status_code != 200: + print(f"Start failed: {response.status_code} - {response.text}") + exit(1) + +task_data = response.json() +task_id = task_data['task_id'] +print(f"Task started: {task_id}") +print("=" * 60) + +# 连接 SSE +sse_url = f'http://localhost:8080/api/v1/sdlc/stream/{task_id}' +print(f"Connecting SSE: {sse_url}") +print("=" * 60) + +with requests.get(sse_url, stream=True, timeout=300) as sse_response: + for line in sse_response.iter_lines(): + if line: + line_str = line.decode('utf-8') + print(line_str) + + # 解析事件 + if line_str.startswith('event:'): + event_type = line_str[6:].strip() + print(f" -> Event type: {event_type}") + + if line_str.startswith('data:'): + try: + data = json.loads(line_str[5:].strip()) + print(f" -> Data: {json.dumps(data, indent=2, ensure_ascii=False)}") + except: + pass + + # 如果是结束事件,退出 + if 'event: final_result' in line_str or 'event: error' in line_str: + print("=" * 60) + print("Task completed!") + break + +print("Test completed")