Files
autogen/autogen_sdls_system.py
2026-03-12 17:58:15 +08:00

354 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
AutoGen SDLC 多智能体协同系统 - 主程序入口
实现端到端软件交付的自动化流程
"""
import os
import sys
from pathlib import Path
from typing import Dict, Any, Optional, List
import json
import re
from datetime import datetime
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent))
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
from config.llm_config import (
get_llm_config,
PM_PROMPT,
QA_PROMPT,
DEV_PROMPT,
ORCH_PROMPT
)
from utils.logger import get_logger
from utils.callback_handler import get_callback_handler
def extract_code(content):
"""从 Markdown 代码块中提取纯代码"""
if "```python" in content:
parts = content.split("```python")
if len(parts) > 1:
code = parts[1].split("```")[0].strip()
return code
elif "```" in content:
parts = content.split("```")
if len(parts) > 1:
code = parts[1].strip()
return code
return content
def save_file_from_content(content: str, agent_name: str, workspace_dir: Path) -> List[str]:
"""
根据 Agent 的内容实时保存文件
Args:
content: Agent 的输出内容
agent_name: Agent 名称
workspace_dir: 工作目录
Returns:
保存的文件路径列表
"""
saved_files = []
# PM Agent 生成 SRS
if agent_name == "PM_Agent" and ("需求" in content or "SRS" in content or "软件需求规格说明书" in content):
file = workspace_dir / "SRS.md"
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(content)
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
# QA Agent 生成测试
elif agent_name == "QA_Agent" and ("test" in content.lower() or "测试" in content or "def test_" in content):
file = workspace_dir / "test_sample.py"
code = extract_code(content)
if code: # 确保有代码内容才保存
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 测试用例\n\n{code}")
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
# Dev Agent 生成代码 - 支持多个文件
elif agent_name == "Dev_Agent" and ("def " in content or "class " in content or "import " in content):
# 尝试提取带文件名的代码块
code_blocks = re.findall(r'```python\s*\n#(?:\s*File:|\s*filename:)?\s*([^\n]+)\n(.*?)```', content, re.DOTALL)
if code_blocks:
# 保存多个文件
for filename, code_content in code_blocks:
filename = filename.strip()
file_path = workspace_dir / filename
with open(file_path, "w", encoding="utf-8") as f:
f.write(f"# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{code_content}")
saved_files.append(str(file_path))
print(f"💾 [实时保存] {file_path}")
else:
# 没有文件名标记,保存为 src_sample.py
file = workspace_dir / "src_sample.py"
code = extract_code(content)
if code: # 确保有代码内容才保存
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 源代码\n\n{code}")
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
return saved_files
class AutoGenSDLCSystem:
"""AutoGen SDLC 多智能体协同系统"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: str = "qwen3.5-flash",
workspace_dir: str = "workspace"
):
"""
初始化 SDLC 系统
Args:
api_key: API Key默认从环境变量读取
base_url: API Base URL
model: 模型名称
workspace_dir: 工作目录
"""
# 配置 LLM
self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY", "")
self.base_url = base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
self.model = model
if not self.api_key:
raise ValueError("请设置 DASHSCOPE_API_KEY 环境变量或传入 api_key 参数")
self.llm_config = get_llm_config(
model=model,
api_key=self.api_key,
base_url=self.base_url
)
# 初始化日志和回调
self.logger = get_logger()
self.callback_handler = get_callback_handler()
# 创建工作目录
self.workspace_dir = Path(workspace_dir)
self.workspace_dir.mkdir(parents=True, exist_ok=True)
# 创建 Agent
self._create_agents()
# 创建 GroupChat
self.groupchat = None
self.manager = None
# 记录已保存的文件
self.saved_files = []
def _on_new_message(self, sender, message, **kwargs):
"""
新消息回调函数 - 实时保存文件
Args:
sender: 发送者
message: 消息内容
**kwargs: 其他参数
"""
try:
agent_name = getattr(sender, 'name', 'Unknown')
content = message.get('content', '') if isinstance(message, dict) else str(message)
# 实时保存文件
saved_files = save_file_from_content(content, agent_name, self.workspace_dir)
# 如果有文件保存,记录并通知回调
if saved_files:
self.saved_files.extend(saved_files)
for file_path in saved_files:
# 确定文件类型
if file_path.endswith('.md'):
file_type = 'markdown'
elif file_path.endswith('.py'):
file_type = 'python'
else:
file_type = 'other'
# 触发回调
self.callback_handler.on_file_created(agent_name, file_path, file_type)
except Exception as e:
print(f"⚠️ 实时保存文件时出错:{e}")
def _create_agents(self):
"""创建所有 Agent"""
# PM Agent
self.pm_agent = AssistantAgent(
name="PM_Agent",
system_message=PM_PROMPT,
llm_config=self.llm_config,
description="资深软件产品经理,负责需求分析和 SRS 生成",
human_input_mode="NEVER"
)
# QA Agent
self.qa_agent = AssistantAgent(
name="QA_Agent",
system_message=QA_PROMPT,
llm_config=self.llm_config,
description="资深测试工程师,负责测试用例设计",
human_input_mode="NEVER"
)
# Dev Agent
self.dev_agent = AssistantAgent(
name="Dev_Agent",
system_message=DEV_PROMPT,
llm_config=self.llm_config,
description="资深软件工程师,负责代码实现",
human_input_mode="NEVER"
)
# Orchestrator Agent
self.orchestrator = AssistantAgent(
name="Orchestrator",
system_message=ORCH_PROMPT,
llm_config=self.llm_config,
description="多智能体协调器,负责流程控制和验证",
human_input_mode="NEVER"
)
# User Proxy用于执行代码
self.user_proxy = UserProxyAgent(
name="User_Proxy",
human_input_mode="NEVER", # 修复Web 环境不支持 TERMINAL
max_consecutive_auto_reply=10, # 允许连续自动回复,避免提前终止
code_execution_config={
"work_dir": str(self.workspace_dir),
"use_docker": False,
},
is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE")
)
self.logger.log_event("agents_created", "所有 Agent 已创建", {
"agents": ["PM_Agent", "QA_Agent", "Dev_Agent", "Orchestrator", "User_Proxy"]
})
def create_groupchat(self, max_round: int = 20):
"""
创建 GroupChat
Args:
max_round: 最大对话轮数
"""
self.groupchat = GroupChat(
agents=[self.pm_agent, self.qa_agent, self.dev_agent,
self.orchestrator, self.user_proxy],
messages=[],
max_round=max_round,
speaker_selection_method="round_robin" # 轮流发言确保流程可控
)
self.manager = GroupChatManager(
groupchat=self.groupchat,
llm_config=self.llm_config
)
self.logger.log_event("groupchat_created", "GroupChat 已创建", {
"max_round": max_round
})
def run_workflow(self, user_requirement: str, max_round: int = 20) -> Dict[str, Any]:
"""
运行完整的 SDLC 工作流
Args:
user_requirement: 用户需求描述
max_round: 最大对话轮数
Returns:
工作流结果
"""
self.logger.log_event("workflow_started", "SDLC 工作流启动", {
"requirement": user_requirement
})
# 创建 GroupChat
self.create_groupchat(max_round)
# 重置已保存文件列表
self.saved_files = []
# 构建初始消息
initial_message = f"""
请启动完整的 SDLC 流程,开发以下功能:
【用户需求】
{user_requirement}
【工作流程】
1. PM_Agent: 分析需求,生成 SRS 文档
2. QA_Agent: 根据 SRS 设计测试用例
3. Dev_Agent: 根据 SRS 和测试用例编写代码
4. User_Proxy: 执行测试验证
5. Orchestrator: 汇总结果并生成最终报告
请各 Agent 按顺序协作完成。每个步骤完成后Orchestrator 进行验证。
如果测试失败Dev_Agent 需要修复代码直到测试通过。
开始工作!
"""
try:
# 启动对话 - 使用回调函数实时处理消息
print("\n🚀 开始对话...\n")
# 为每个 Agent 注册实时文件保存回调
for agent in [self.pm_agent, self.qa_agent, self.dev_agent, self.orchestrator]:
agent.register_reply(
self._on_new_message,
reply_func_total_count=1,
position=0 # 在最前面执行,确保实时保存
)
chat_result = self.user_proxy.initiate_chat(
self.manager,
message=initial_message,
max_turns=max_round,
summary_method="reflection_with_llm"
)
# 记录结果
self.logger.log_event(
"workflow_completed",
"SDLC 工作流完成",
{"chat_summary": chat_result.summary if hasattr(chat_result, 'summary') else "完成"}
)
# 导出对话历史
for msg in self.groupchat.messages:
self.logger.log_message(
agent_name=msg.get("name", "Unknown"),
message=msg.get("content", ""),
role=msg.get("role", "assistant")
)
return {
"success": True,
"summary": chat_result.summary if hasattr(chat_result, 'summary') else "工作流完成",
"messages": self.groupchat.messages,
"workspace": str(self.workspace_dir),
"saved_files": self.saved_files
}
except Exception as e:
self.logger.log_event("workflow_error", f"工作流执行出错:{str(e)}")
return {
"success": False,
"error": str(e),
"messages": self.groupchat.messages if self.groupchat else []
}