""" AutoGen SDLC 多智能体协同系统 - 主程序入口 实现端到端软件交付的自动化流程 """ import os import sys from pathlib import Path from typing import Dict, Any, Optional, List import json import re from datetime import datetime # 添加项目根目录到路径 sys.path.insert(0, str(Path(__file__).parent)) from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager from config.llm_config import ( get_llm_config, PM_PROMPT, QA_PROMPT, DEV_PROMPT, ORCH_PROMPT ) from utils.logger import get_logger from utils.callback_handler import get_callback_handler def extract_code(content): """从 Markdown 代码块中提取纯代码""" if "```python" in content: parts = content.split("```python") if len(parts) > 1: code = parts[1].split("```")[0].strip() return code elif "```" in content: parts = content.split("```") if len(parts) > 1: code = parts[1].strip() return code return content def save_file_from_content(content: str, agent_name: str, workspace_dir: Path) -> List[str]: """ 根据 Agent 的内容实时保存文件 Args: content: Agent 的输出内容 agent_name: Agent 名称 workspace_dir: 工作目录 Returns: 保存的文件路径列表 """ saved_files = [] # PM Agent 生成 SRS if agent_name == "PM_Agent" and ("需求" in content or "SRS" in content or "软件需求规格说明书" in content): file = workspace_dir / "SRS.md" with open(file, "w", encoding="utf-8") as f: f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(content) saved_files.append(str(file)) print(f"💾 [实时保存] {file}") # QA Agent 生成测试 elif agent_name == "QA_Agent" and ("test" in content.lower() or "测试" in content or "def test_" in content): file = workspace_dir / "test_sample.py" code = extract_code(content) if code: # 确保有代码内容才保存 with open(file, "w", encoding="utf-8") as f: f.write(f"# 测试用例\n\n{code}") saved_files.append(str(file)) print(f"💾 [实时保存] {file}") # Dev Agent 生成代码 - 支持多个文件 elif agent_name == "Dev_Agent" and ("def " in content or "class " in content or "import " in content): # 尝试提取带文件名的代码块 code_blocks = re.findall(r'```python\s*\n#(?:\s*File:|\s*filename:)?\s*([^\n]+)\n(.*?)```', content, re.DOTALL) if code_blocks: # 保存多个文件 for filename, code_content in code_blocks: filename = filename.strip() file_path = workspace_dir / filename with open(file_path, "w", encoding="utf-8") as f: f.write(f"# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{code_content}") saved_files.append(str(file_path)) print(f"💾 [实时保存] {file_path}") else: # 没有文件名标记,保存为 src_sample.py file = workspace_dir / "src_sample.py" code = extract_code(content) if code: # 确保有代码内容才保存 with open(file, "w", encoding="utf-8") as f: f.write(f"# 源代码\n\n{code}") saved_files.append(str(file)) print(f"💾 [实时保存] {file}") return saved_files class AutoGenSDLCSystem: """AutoGen SDLC 多智能体协同系统""" def __init__( self, api_key: Optional[str] = None, base_url: Optional[str] = None, model: str = "qwen3.5-flash", workspace_dir: str = "workspace" ): """ 初始化 SDLC 系统 Args: api_key: API Key,默认从环境变量读取 base_url: API Base URL model: 模型名称 workspace_dir: 工作目录 """ # 配置 LLM self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY", "") self.base_url = base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1" self.model = model if not self.api_key: raise ValueError("请设置 DASHSCOPE_API_KEY 环境变量或传入 api_key 参数") self.llm_config = get_llm_config( model=model, api_key=self.api_key, base_url=self.base_url ) # 初始化日志和回调 self.logger = get_logger() self.callback_handler = get_callback_handler() # 创建工作目录 self.workspace_dir = Path(workspace_dir) self.workspace_dir.mkdir(parents=True, exist_ok=True) # 创建 Agent self._create_agents() # 创建 GroupChat self.groupchat = None self.manager = None # 记录已保存的文件 self.saved_files = [] def _on_new_message(self, sender, message, **kwargs): """ 新消息回调函数 - 实时保存文件 Args: sender: 发送者 message: 消息内容 **kwargs: 其他参数 """ try: agent_name = getattr(sender, 'name', 'Unknown') content = message.get('content', '') if isinstance(message, dict) else str(message) # 实时保存文件 saved_files = save_file_from_content(content, agent_name, self.workspace_dir) # 如果有文件保存,记录并通知回调 if saved_files: self.saved_files.extend(saved_files) for file_path in saved_files: # 确定文件类型 if file_path.endswith('.md'): file_type = 'markdown' elif file_path.endswith('.py'): file_type = 'python' else: file_type = 'other' # 触发回调 self.callback_handler.on_file_created(agent_name, file_path, file_type) except Exception as e: print(f"⚠️ 实时保存文件时出错:{e}") def _create_agents(self): """创建所有 Agent""" # PM Agent self.pm_agent = AssistantAgent( name="PM_Agent", system_message=PM_PROMPT, llm_config=self.llm_config, description="资深软件产品经理,负责需求分析和 SRS 生成", human_input_mode="NEVER" ) # QA Agent self.qa_agent = AssistantAgent( name="QA_Agent", system_message=QA_PROMPT, llm_config=self.llm_config, description="资深测试工程师,负责测试用例设计", human_input_mode="NEVER" ) # Dev Agent self.dev_agent = AssistantAgent( name="Dev_Agent", system_message=DEV_PROMPT, llm_config=self.llm_config, description="资深软件工程师,负责代码实现", human_input_mode="NEVER" ) # Orchestrator Agent self.orchestrator = AssistantAgent( name="Orchestrator", system_message=ORCH_PROMPT, llm_config=self.llm_config, description="多智能体协调器,负责流程控制和验证", human_input_mode="NEVER" ) # User Proxy(用于执行代码) self.user_proxy = UserProxyAgent( name="User_Proxy", human_input_mode="NEVER", # 修复:Web 环境不支持 TERMINAL max_consecutive_auto_reply=10, # 允许连续自动回复,避免提前终止 code_execution_config={ "work_dir": str(self.workspace_dir), "use_docker": False, }, is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE") ) self.logger.log_event("agents_created", "所有 Agent 已创建", { "agents": ["PM_Agent", "QA_Agent", "Dev_Agent", "Orchestrator", "User_Proxy"] }) def create_groupchat(self, max_round: int = 20): """ 创建 GroupChat Args: max_round: 最大对话轮数 """ self.groupchat = GroupChat( agents=[self.pm_agent, self.qa_agent, self.dev_agent, self.orchestrator, self.user_proxy], messages=[], max_round=max_round, speaker_selection_method="round_robin" # 轮流发言确保流程可控 ) self.manager = GroupChatManager( groupchat=self.groupchat, llm_config=self.llm_config ) self.logger.log_event("groupchat_created", "GroupChat 已创建", { "max_round": max_round }) def run_workflow(self, user_requirement: str, max_round: int = 20) -> Dict[str, Any]: """ 运行完整的 SDLC 工作流 Args: user_requirement: 用户需求描述 max_round: 最大对话轮数 Returns: 工作流结果 """ self.logger.log_event("workflow_started", "SDLC 工作流启动", { "requirement": user_requirement }) # 创建 GroupChat self.create_groupchat(max_round) # 重置已保存文件列表 self.saved_files = [] # 构建初始消息 initial_message = f""" 请启动完整的 SDLC 流程,开发以下功能: 【用户需求】 {user_requirement} 【工作流程】 1. PM_Agent: 分析需求,生成 SRS 文档 2. QA_Agent: 根据 SRS 设计测试用例 3. Dev_Agent: 根据 SRS 和测试用例编写代码 4. User_Proxy: 执行测试验证 5. Orchestrator: 汇总结果并生成最终报告 请各 Agent 按顺序协作完成。每个步骤完成后,Orchestrator 进行验证。 如果测试失败,Dev_Agent 需要修复代码直到测试通过。 开始工作! """ try: # 启动对话 - 使用回调函数实时处理消息 print("\n🚀 开始对话...\n") # 为每个 Agent 注册实时文件保存回调 for agent in [self.pm_agent, self.qa_agent, self.dev_agent, self.orchestrator]: agent.register_reply( self._on_new_message, reply_func_total_count=1, position=0 # 在最前面执行,确保实时保存 ) chat_result = self.user_proxy.initiate_chat( self.manager, message=initial_message, max_turns=max_round, summary_method="reflection_with_llm" ) # 记录结果 self.logger.log_event( "workflow_completed", "SDLC 工作流完成", {"chat_summary": chat_result.summary if hasattr(chat_result, 'summary') else "完成"} ) # 导出对话历史 for msg in self.groupchat.messages: self.logger.log_message( agent_name=msg.get("name", "Unknown"), message=msg.get("content", ""), role=msg.get("role", "assistant") ) return { "success": True, "summary": chat_result.summary if hasattr(chat_result, 'summary') else "工作流完成", "messages": self.groupchat.messages, "workspace": str(self.workspace_dir), "saved_files": self.saved_files } except Exception as e: self.logger.log_event("workflow_error", f"工作流执行出错:{str(e)}") return { "success": False, "error": str(e), "messages": self.groupchat.messages if self.groupchat else [] }