Files
autogen/autogen_sdls_system.py

354 lines
12 KiB
Python
Raw Permalink Normal View History

2026-03-12 13:27:03 +08:00
"""
AutoGen SDLC 多智能体协同系统 - 主程序入口
实现端到端软件交付的自动化流程
"""
import os
import sys
from pathlib import Path
from typing import Dict, Any, Optional, List
import json
2026-03-12 17:58:15 +08:00
import re
from datetime import datetime
2026-03-12 13:27:03 +08:00
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent))
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
from config.llm_config import (
get_llm_config,
PM_PROMPT,
QA_PROMPT,
DEV_PROMPT,
ORCH_PROMPT
)
from utils.logger import get_logger
from utils.callback_handler import get_callback_handler
2026-03-12 17:58:15 +08:00
def extract_code(content):
"""从 Markdown 代码块中提取纯代码"""
if "```python" in content:
parts = content.split("```python")
if len(parts) > 1:
code = parts[1].split("```")[0].strip()
return code
elif "```" in content:
parts = content.split("```")
if len(parts) > 1:
code = parts[1].strip()
return code
return content
def save_file_from_content(content: str, agent_name: str, workspace_dir: Path) -> List[str]:
"""
根据 Agent 的内容实时保存文件
Args:
content: Agent 的输出内容
agent_name: Agent 名称
workspace_dir: 工作目录
Returns:
保存的文件路径列表
"""
saved_files = []
# PM Agent 生成 SRS
if agent_name == "PM_Agent" and ("需求" in content or "SRS" in content or "软件需求规格说明书" in content):
file = workspace_dir / "SRS.md"
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(content)
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
# QA Agent 生成测试
elif agent_name == "QA_Agent" and ("test" in content.lower() or "测试" in content or "def test_" in content):
file = workspace_dir / "test_sample.py"
code = extract_code(content)
if code: # 确保有代码内容才保存
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 测试用例\n\n{code}")
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
# Dev Agent 生成代码 - 支持多个文件
elif agent_name == "Dev_Agent" and ("def " in content or "class " in content or "import " in content):
# 尝试提取带文件名的代码块
code_blocks = re.findall(r'```python\s*\n#(?:\s*File:|\s*filename:)?\s*([^\n]+)\n(.*?)```', content, re.DOTALL)
if code_blocks:
# 保存多个文件
for filename, code_content in code_blocks:
filename = filename.strip()
file_path = workspace_dir / filename
with open(file_path, "w", encoding="utf-8") as f:
f.write(f"# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{code_content}")
saved_files.append(str(file_path))
print(f"💾 [实时保存] {file_path}")
else:
# 没有文件名标记,保存为 src_sample.py
file = workspace_dir / "src_sample.py"
code = extract_code(content)
if code: # 确保有代码内容才保存
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 源代码\n\n{code}")
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
return saved_files
2026-03-12 13:27:03 +08:00
class AutoGenSDLCSystem:
"""AutoGen SDLC 多智能体协同系统"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: str = "qwen3.5-flash",
workspace_dir: str = "workspace"
):
"""
初始化 SDLC 系统
Args:
api_key: API Key默认从环境变量读取
base_url: API Base URL
model: 模型名称
workspace_dir: 工作目录
"""
# 配置 LLM
self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY", "")
self.base_url = base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
self.model = model
if not self.api_key:
raise ValueError("请设置 DASHSCOPE_API_KEY 环境变量或传入 api_key 参数")
self.llm_config = get_llm_config(
model=model,
api_key=self.api_key,
base_url=self.base_url
)
# 初始化日志和回调
self.logger = get_logger()
self.callback_handler = get_callback_handler()
# 创建工作目录
self.workspace_dir = Path(workspace_dir)
self.workspace_dir.mkdir(parents=True, exist_ok=True)
# 创建 Agent
self._create_agents()
# 创建 GroupChat
self.groupchat = None
self.manager = None
2026-03-12 17:58:15 +08:00
# 记录已保存的文件
self.saved_files = []
2026-03-12 13:27:03 +08:00
2026-03-12 17:58:15 +08:00
def _on_new_message(self, sender, message, **kwargs):
"""
新消息回调函数 - 实时保存文件
Args:
sender: 发送者
message: 消息内容
**kwargs: 其他参数
"""
try:
agent_name = getattr(sender, 'name', 'Unknown')
content = message.get('content', '') if isinstance(message, dict) else str(message)
# 实时保存文件
saved_files = save_file_from_content(content, agent_name, self.workspace_dir)
# 如果有文件保存,记录并通知回调
if saved_files:
self.saved_files.extend(saved_files)
for file_path in saved_files:
# 确定文件类型
if file_path.endswith('.md'):
file_type = 'markdown'
elif file_path.endswith('.py'):
file_type = 'python'
else:
file_type = 'other'
# 触发回调
self.callback_handler.on_file_created(agent_name, file_path, file_type)
except Exception as e:
print(f"⚠️ 实时保存文件时出错:{e}")
2026-03-12 13:27:03 +08:00
def _create_agents(self):
"""创建所有 Agent"""
# PM Agent
self.pm_agent = AssistantAgent(
name="PM_Agent",
system_message=PM_PROMPT,
llm_config=self.llm_config,
description="资深软件产品经理,负责需求分析和 SRS 生成",
human_input_mode="NEVER"
)
# QA Agent
self.qa_agent = AssistantAgent(
name="QA_Agent",
system_message=QA_PROMPT,
llm_config=self.llm_config,
description="资深测试工程师,负责测试用例设计",
human_input_mode="NEVER"
)
# Dev Agent
self.dev_agent = AssistantAgent(
name="Dev_Agent",
system_message=DEV_PROMPT,
llm_config=self.llm_config,
description="资深软件工程师,负责代码实现",
human_input_mode="NEVER"
)
# Orchestrator Agent
self.orchestrator = AssistantAgent(
name="Orchestrator",
system_message=ORCH_PROMPT,
llm_config=self.llm_config,
description="多智能体协调器,负责流程控制和验证",
human_input_mode="NEVER"
)
# User Proxy用于执行代码
self.user_proxy = UserProxyAgent(
name="User_Proxy",
human_input_mode="NEVER", # 修复Web 环境不支持 TERMINAL
2026-03-12 17:58:15 +08:00
max_consecutive_auto_reply=10, # 允许连续自动回复,避免提前终止
2026-03-12 13:27:03 +08:00
code_execution_config={
"work_dir": str(self.workspace_dir),
"use_docker": False,
},
is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE")
)
self.logger.log_event("agents_created", "所有 Agent 已创建", {
"agents": ["PM_Agent", "QA_Agent", "Dev_Agent", "Orchestrator", "User_Proxy"]
})
def create_groupchat(self, max_round: int = 20):
"""
创建 GroupChat
Args:
max_round: 最大对话轮数
"""
self.groupchat = GroupChat(
agents=[self.pm_agent, self.qa_agent, self.dev_agent,
self.orchestrator, self.user_proxy],
messages=[],
max_round=max_round,
speaker_selection_method="round_robin" # 轮流发言确保流程可控
)
self.manager = GroupChatManager(
groupchat=self.groupchat,
llm_config=self.llm_config
)
self.logger.log_event("groupchat_created", "GroupChat 已创建", {
"max_round": max_round
})
def run_workflow(self, user_requirement: str, max_round: int = 20) -> Dict[str, Any]:
"""
运行完整的 SDLC 工作流
Args:
user_requirement: 用户需求描述
max_round: 最大对话轮数
Returns:
工作流结果
"""
self.logger.log_event("workflow_started", "SDLC 工作流启动", {
"requirement": user_requirement
})
# 创建 GroupChat
self.create_groupchat(max_round)
2026-03-12 17:58:15 +08:00
# 重置已保存文件列表
self.saved_files = []
2026-03-12 13:27:03 +08:00
# 构建初始消息
initial_message = f"""
请启动完整的 SDLC 流程开发以下功能
用户需求
{user_requirement}
工作流程
1. PM_Agent: 分析需求生成 SRS 文档
2. QA_Agent: 根据 SRS 设计测试用例
3. Dev_Agent: 根据 SRS 和测试用例编写代码
4. User_Proxy: 执行测试验证
5. Orchestrator: 汇总结果并生成最终报告
请各 Agent 按顺序协作完成每个步骤完成后Orchestrator 进行验证
如果测试失败Dev_Agent 需要修复代码直到测试通过
开始工作
"""
try:
2026-03-12 17:58:15 +08:00
# 启动对话 - 使用回调函数实时处理消息
print("\n🚀 开始对话...\n")
# 为每个 Agent 注册实时文件保存回调
for agent in [self.pm_agent, self.qa_agent, self.dev_agent, self.orchestrator]:
agent.register_reply(
self._on_new_message,
reply_func_total_count=1,
position=0 # 在最前面执行,确保实时保存
)
2026-03-12 13:27:03 +08:00
chat_result = self.user_proxy.initiate_chat(
self.manager,
message=initial_message,
max_turns=max_round,
summary_method="reflection_with_llm"
)
# 记录结果
self.logger.log_event(
"workflow_completed",
"SDLC 工作流完成",
{"chat_summary": chat_result.summary if hasattr(chat_result, 'summary') else "完成"}
)
# 导出对话历史
for msg in self.groupchat.messages:
self.logger.log_message(
agent_name=msg.get("name", "Unknown"),
message=msg.get("content", ""),
role=msg.get("role", "assistant")
)
return {
"success": True,
"summary": chat_result.summary if hasattr(chat_result, 'summary') else "工作流完成",
"messages": self.groupchat.messages,
2026-03-12 17:58:15 +08:00
"workspace": str(self.workspace_dir),
"saved_files": self.saved_files
2026-03-12 13:27:03 +08:00
}
except Exception as e:
self.logger.log_event("workflow_error", f"工作流执行出错:{str(e)}")
return {
"success": False,
"error": str(e),
"messages": self.groupchat.messages if self.groupchat else []
2026-03-12 17:58:15 +08:00
}