""" SDLC Crew - CrewAI 编排逻辑 (纯同步版本) 负责协调 PM、QA、Dev 三个智能体按顺序执行 """ from typing import Dict, Any, Optional, Generator from datetime import datetime from crewai import Crew from agents.pm_agent import create_pm_agent, create_pm_task from agents.qa_agent import create_qa_agent, create_qa_task from agents.dev_agent import create_dev_agent, create_dev_task class SDLCCrew: """ SDLC 多智能体协同编排类 实现从需求分析→测试用例→代码实现的完整流程 """ def __init__(self): """初始化 SDLC Crew""" self.pm_agent = create_pm_agent() self.qa_agent = create_qa_agent() self.dev_agent = create_dev_agent() # 存储各阶段结果 self.srs_document: Optional[str] = None self.test_plan: Optional[str] = None self.implementation: Optional[str] = None def execute(self, requirement: str) -> Generator[Dict[str, Any], None, None]: """ 执行完整的 SDLC 流程(纯同步生成器) Args: requirement: 用户需求描述 Yields: Dict[str, Any]: 各阶段的 SSE事件 """ try: # ========== 阶段 1: PM Agent - 需求分析 ========== pm_task = create_pm_task(requirement) pm_crew = Crew( agents=[self.pm_agent], tasks=[pm_task], verbose=True ) # 发送开始事件 yield { "event": "pm_start", "data": { "stage": "需求分析", "status": "started", "timestamp": datetime.now().isoformat() } } # 执行 Crew 任务 result = pm_crew.kickoff() content = result.raw if hasattr(result, 'raw') else str(result) # 发送完成事件 yield { "event": "pm_complete", "data": { "stage": "需求分析", "content": content, "status": "completed", "timestamp": datetime.now().isoformat() } } self.srs_document = content # ========== 阶段 2: QA Agent - 测试用例设计 ========== qa_task = create_qa_task(self.srs_document) qa_crew = Crew( agents=[self.qa_agent], tasks=[qa_task], verbose=True ) # 发送开始事件 yield { "event": "qa_start", "data": { "stage": "测试用例设计", "status": "started", "timestamp": datetime.now().isoformat() } } # 执行 Crew 任务 result = qa_crew.kickoff() content = result.raw if hasattr(result, 'raw') else str(result) # 发送完成事件 yield { "event": "qa_complete", "data": { "stage": "测试用例设计", "content": content, "status": "completed", "timestamp": datetime.now().isoformat() } } self.test_plan = content # ========== 阶段 3: Dev Agent - 代码实现 ========== dev_task = create_dev_task(self.srs_document, self.test_plan) dev_crew = Crew( agents=[self.dev_agent], tasks=[dev_task], verbose=True ) # 发送开始事件 yield { "event": "dev_start", "data": { "stage": "代码实现", "status": "started", "timestamp": datetime.now().isoformat() } } # 执行 Crew 任务 result = dev_crew.kickoff() content = result.raw if hasattr(result, 'raw') else str(result) # 发送完成事件 yield { "event": "dev_complete", "data": { "stage": "代码实现", "content": content, "status": "completed", "timestamp": datetime.now().isoformat() } } self.implementation = content # ========== 最终结果汇总 ========== yield { "event": "final_result", "data": { "stage": "交付完成", "status": "success", "timestamp": datetime.now().isoformat(), "summary": { "srs_length": len(self.srs_document) if self.srs_document else 0, "test_plan_length": len(self.test_plan) if self.test_plan else 0, "implementation_length": len(self.implementation) if self.implementation else 0 }, "deliverables": { "srs_document": self.srs_document, "test_plan": self.test_plan, "implementation": self.implementation } } } except Exception as e: # 错误处理 yield { "event": "error", "data": { "stage": "系统错误", "error": str(e), "timestamp": datetime.now().isoformat() } } # 别名,方便调用 execute_sync = execute