diff --git a/config/__init__.py b/config/__init__.py
new file mode 100644
index 0000000..bbee05c
--- /dev/null
+++ b/config/__init__.py
@@ -0,0 +1,10 @@
+from .llm_config import get_llm_config, PM_PROMPT, QA_PROMPT, DEV_PROMPT, ORCH_PROMPT
+
+
+__all__ = [
+ "get_llm_config",
+ "PM_PROMPT",
+ "QA_PROMPT",
+ "DEV_PROMPT",
+ "ORCH_PROMPT",
+]
diff --git a/frontend/app.py b/frontend/app.py
new file mode 100644
index 0000000..9cf81c1
--- /dev/null
+++ b/frontend/app.py
@@ -0,0 +1,280 @@
+# -*- coding: utf-8 -*-
+"""
+Streamlit 实时 Agent 协作平台
+功能:
+1. 实时展示每个 Agent 的状态和动作
+2. 自动保存生成的文件到 workspace/
+3. 简单稳定的代码结构
+"""
+import streamlit as st
+import os
+from pathlib import Path
+from datetime import datetime
+import time
+
+try:
+ from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
+ AUTOGEN_AVAILABLE = True
+except ImportError:
+ AUTOGEN_AVAILABLE = False
+
+# 添加项目根目录到路径
+import sys
+sys.path.insert(0, str(Path(__file__).parent.parent))
+
+from config.llm_config import get_llm_config, PM_PROMPT, QA_PROMPT, DEV_PROMPT, ORCH_PROMPT
+
+# 页面配置
+st.set_page_config(page_title="多 Agent 协作平台", page_icon="🤖", layout="wide")
+
+# Agent 配置
+AGENTS = {
+ "PM_Agent": {"name": "产品经理", "avatar": "📋", "color": "blue"},
+ "QA_Agent": {"name": "测试工程师", "avatar": "✅", "color": "green"},
+ "Dev_Agent": {"name": "开发工程师", "avatar": "💻", "color": "orange"},
+ "Orchestrator": {"name": "协调器", "avatar": "🎯", "color": "purple"},
+ "User_Proxy": {"name": "用户代理", "avatar": "👤", "color": "gray"}
+}
+
+def init_state():
+ """初始化 session state"""
+ if "messages" not in st.session_state:
+ st.session_state.messages = []
+ if "running" not in st.session_state:
+ st.session_state.running = False
+ if "current_agent" not in st.session_state:
+ st.session_state.current_agent = None
+ if "agent_counts" not in st.session_state:
+ st.session_state.agent_counts = {k: 0 for k in AGENTS}
+
+def add_message(agent, content, task=""):
+ """添加消息"""
+ msg = {
+ "agent": agent,
+ "content": content,
+ "task": task,
+ "time": datetime.now().strftime("%H:%M:%S")
+ }
+ st.session_state.messages.append(msg)
+ st.session_state.agent_counts[agent] = st.session_state.agent_counts.get(agent, 0) + 1
+ st.session_state.current_agent = agent
+
+def show_agent_status():
+ """显示 Agent 状态"""
+ cols = st.columns(len(AGENTS))
+ for i, (agent_key, info) in enumerate(AGENTS.items()):
+ with cols[i]:
+ is_active = st.session_state.current_agent == agent_key
+ count = st.session_state.agent_counts.get(agent_key, 0)
+
+ border_color = info["color"]
+ bg_color = "#e8f5e9" if is_active else "white"
+
+ st.markdown(f"""
+
+
{info["avatar"]}
+
{info["name"]}
+
+ {"🟢 发言中" if is_active else "⚪ 等待中"}
+
+
+ 💬 {count} 条消息
+
+
+ """, unsafe_allow_html=True)
+
+def show_chat():
+ """显示对话流"""
+ st.subheader("💬 Agent 对话流")
+
+ if not st.session_state.messages:
+ st.info("👈 暂无对话,请在下方输入需求并启动")
+ return
+
+ for msg in st.session_state.messages:
+ agent = msg["agent"]
+ info = AGENTS.get(agent, {"name": "未知", "avatar": "🤖", "color": "gray"})
+
+ with st.chat_message(agent.lower(), avatar=info["avatar"]):
+ st.markdown(f"**{info['name']}** *{msg['time']}* - {msg['task']}")
+ st.markdown(msg["content"][:800] + ("..." if len(msg["content"]) > 800 else ""))
+
+def save_files():
+ """保存生成的文件到 workspace/"""
+ workspace = Path("workspace")
+ workspace.mkdir(exist_ok=True)
+
+ files = []
+
+ # 遍历消息,提取并保存文件
+ for msg in st.session_state.messages:
+ agent = msg["agent"]
+ content = msg["content"]
+
+ # PM Agent 生成 SRS
+ if agent == "PM_Agent" and ("需求" in content or "SRS" in content):
+ file = workspace / "SRS.md"
+ with open(file, "w", encoding="utf-8") as f:
+ f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{content}")
+ files.append(str(file))
+
+ # QA Agent 生成测试
+ if agent == "QA_Agent" and ("test" in content.lower() or "测试" in content):
+ file = workspace / "test_sample.py"
+ with open(file, "w", encoding="utf-8") as f:
+ f.write(f"# 测试用例\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{content}")
+ files.append(str(file))
+
+ # Dev Agent 生成代码
+ if agent == "Dev_Agent" and ("def " in content or "class " in content):
+ file = workspace / "src_sample.py"
+ with open(file, "w", encoding="utf-8") as f:
+ f.write(f"# 源代码\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{content}")
+ files.append(str(file))
+
+ return files
+
+def main():
+ st.title("🤖 多 Agent 协作平台")
+ st.markdown("**实时展示 Agent 状态 · 自动生成文件**")
+
+ init_state()
+
+ # 侧边栏
+ with st.sidebar:
+ st.title("⚙️ 配置")
+
+ api_key = st.text_input("API Key", type="password", value=os.getenv("DASHSCOPE_API_KEY", ""))
+ model = st.selectbox("模型", ["qwen3.5-flash", "qwen-max", "qwen-plus"], index=0)
+ max_round = st.slider("最大轮数", 5, 30, 15)
+
+ st.divider()
+
+ if st.button("▶️ 启动工作流", type="primary", use_container_width=True):
+ if not api_key:
+ st.error("请先设置 API Key")
+ elif not AUTOGEN_AVAILABLE:
+ st.error("请先安装 AutoGen")
+ else:
+ run_workflow(api_key, model, max_round)
+
+ st.divider()
+
+ if st.button("🗑️ 清空对话", use_container_width=True):
+ st.session_state.messages = []
+ st.session_state.current_agent = None
+ st.session_state.agent_counts = {k: 0 for k in AGENTS}
+ st.rerun()
+
+ # 显示生成的文件
+ st.divider()
+ st.subheader("📁 生成的文件")
+ workspace = Path("workspace")
+ if workspace.exists():
+ for file in workspace.glob("*"):
+ if file.is_file():
+ st.caption(f"📄 {file.name}")
+
+ # 主界面
+ show_agent_status()
+ st.divider()
+ show_chat()
+
+ # 输入框
+ st.divider()
+ if user_input := st.chat_input("输入需求..."):
+ add_message("User_Proxy", user_input, "提出需求")
+ st.rerun()
+
+def run_workflow(api_key, model, max_round):
+ """运行工作流"""
+ st.session_state.running = True
+ st.session_state.messages = []
+ st.session_state.agent_counts = {k: 0 for k in AGENTS}
+
+ progress = st.empty()
+ progress.info("🚀 启动工作流...")
+
+ try:
+ # 获取需求
+ user_msgs = [m for m in st.session_state.messages if m["agent"] == "User_Proxy"]
+ requirement = user_msgs[-1]["content"] if user_msgs else "开发一个电池健康预测 API"
+
+ # 创建 Agent
+ llm_config = get_llm_config(model=model, api_key=api_key)
+
+ pm = AssistantAgent("PM_Agent", system_message=PM_PROMPT, llm_config=llm_config, human_input_mode="NEVER")
+ qa = AssistantAgent("QA_Agent", system_message=QA_PROMPT, llm_config=llm_config, human_input_mode="NEVER")
+ dev = AssistantAgent("Dev_Agent", system_message=DEV_PROMPT, llm_config=llm_config, human_input_mode="NEVER")
+ orch = AssistantAgent("Orchestrator", system_message=ORCH_PROMPT, llm_config=llm_config, human_input_mode="NEVER")
+ user = UserProxyAgent("User_Proxy", human_input_mode="NEVER", max_consecutive_auto_reply=0,
+ code_execution_config={"work_dir": "workspace", "use_docker": False})
+
+ # 创建 GroupChat
+ groupchat = GroupChat(
+ agents=[pm, qa, dev, orch, user],
+ messages=[],
+ max_round=max_round,
+ speaker_selection_method="round_robin"
+ )
+ manager = GroupChatManager(groupchat=groupchat, llm_config=llm_config)
+
+ # 初始消息
+ initial_msg = f"""请启动完整的 SDLC 流程:
+
+【用户需求】{requirement}
+
+【流程】
+1. PM_Agent → SRS 文档
+2. QA_Agent → 测试用例
+3. Dev_Agent → 编写代码
+4. User_Proxy → 执行测试
+5. Orchestrator → 汇总
+
+开始协作!"""
+
+ # 执行对话
+ with st.spinner("💬 Agent 们正在协作中..."):
+ chat_result = user.initiate_chat(manager, message=initial_msg, max_turns=max_round)
+
+ # 记录所有对话
+ task_map = {
+ "PM_Agent": "需求分析",
+ "QA_Agent": "测试设计",
+ "Dev_Agent": "代码实现",
+ "Orchestrator": "流程协调",
+ "User_Proxy": "测试执行"
+ }
+
+ for msg in groupchat.messages:
+ agent = msg.get("name", "Unknown")
+ content = msg.get("content", "")
+ task = task_map.get(agent, "工作中")
+ add_message(agent, content, task)
+
+ # 保存文件
+ progress.info("💾 正在保存文件...")
+ files = save_files()
+
+ if files:
+ progress.success(f"✅ 完成!已保存 {len(files)} 个文件到 workspace/")
+ else:
+ progress.success("✅ 工作流完成!")
+
+ st.session_state.running = False
+ st.rerun()
+
+ except Exception as e:
+ st.session_state.running = False
+ progress.error(f"❌ 错误:{e}")
+ import traceback
+ st.error(traceback.format_exc())
+
+if __name__ == "__main__":
+ main()
diff --git a/frontend/streamlit_app.py b/frontend/streamlit_app.py
deleted file mode 100644
index 5246027..0000000
--- a/frontend/streamlit_app.py
+++ /dev/null
@@ -1,442 +0,0 @@
-"""
-Streamlit 前端 - 多智能体实时聊天界面
-提供可视化的 Agent 协同工作展示和人机交互
-"""
-import streamlit as st
-import os
-import sys
-from pathlib import Path
-from datetime import datetime
-import time
-from typing import Dict, Any, List
-
-# 添加项目根目录到路径
-sys.path.insert(0, str(Path(__file__).parent.parent))
-
-from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
-from config.llm_config import (
- get_llm_config,
- PM_PROMPT,
- QA_PROMPT,
- DEV_PROMPT,
- ORCH_PROMPT
-)
-
-
-# 页面配置
-st.set_page_config(
- page_title="AutoGen SDLC 多智能体系统",
- page_icon="🤖",
- layout="wide",
- initial_sidebar_state="expanded"
-)
-
-# 自定义 CSS 样式
-st.markdown("""
-
-""", unsafe_allow_html=True)
-
-
-def init_session_state():
- """初始化 session state"""
- if "messages" not in st.session_state:
- st.session_state.messages = []
- if "is_running" not in st.session_state:
- st.session_state.is_running = False
- if "workflow_result" not in st.session_state:
- st.session_state.workflow_result = None
- if "conversation_history" not in st.session_state:
- st.session_state.conversation_history = []
- if "current_step" not in st.session_state:
- st.session_state.current_step = 0
-
-
-def create_agents(api_key: str, base_url: str, model: str):
- """创建所有 Agent"""
- llm_config = get_llm_config(model=model, api_key=api_key, base_url=base_url)
-
- pm_agent = AssistantAgent(
- name="PM_Agent",
- system_message=PM_PROMPT,
- llm_config=llm_config,
- description="资深软件产品经理",
- human_input_mode="NEVER"
- )
-
- qa_agent = AssistantAgent(
- name="QA_Agent",
- system_message=QA_PROMPT,
- llm_config=llm_config,
- description="资深测试工程师",
- human_input_mode="NEVER"
- )
-
- dev_agent = AssistantAgent(
- name="Dev_Agent",
- system_message=DEV_PROMPT,
- llm_config=llm_config,
- description="资深软件工程师",
- human_input_mode="NEVER"
- )
-
- orchestrator = AssistantAgent(
- name="Orchestrator",
- system_message=ORCH_PROMPT,
- llm_config=llm_config,
- description="多智能体协调器",
- human_input_mode="NEVER"
- )
-
- user_proxy = UserProxyAgent(
- name="User_Proxy",
- human_input_mode="NEVER", # 修复:Web 环境不支持 TERMINAL
- max_consecutive_auto_reply=0,
- code_execution_config={
- "work_dir": "workspace",
- "use_docker": False,
- },
- is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE")
- )
-
- return pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config
-
-
-def get_agent_color(agent_name: str) -> str:
- """根据 Agent 名称返回颜色类"""
- color_map = {
- "PM_Agent": "pm-agent",
- "QA_Agent": "qa-agent",
- "Dev_Agent": "dev-agent",
- "Orchestrator": "orchestrator",
- "User_Proxy": "user",
- "system": "system"
- }
- return color_map.get(agent_name, "system")
-
-
-def display_message(agent_name: str, message: str, timestamp: str):
- """显示单条消息"""
- color_class = get_agent_color(agent_name)
-
- # 头像映射
- avatar_map = {
- "PM_Agent": "📋",
- "QA_Agent": "✅",
- "Dev_Agent": "💻",
- "Orchestrator": "🎯",
- "User_Proxy": "👤",
- "system": "⚙️"
- }
-
- avatar = avatar_map.get(agent_name, "🤖")
-
- with st.chat_message(name=agent_name.lower().replace("_", ""), avatar=avatar):
- st.markdown(f"**{avatar} {agent_name}** *({timestamp})*")
- st.markdown(f"{message}
",
- unsafe_allow_html=True)
-
-
-def export_conversation(messages: List[Dict]) -> str:
- """导出对话为 Markdown"""
- md_content = "# AutoGen SDLC 对话历史\n\n"
- md_content += f"**导出时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
- md_content += "---\n\n"
-
- for msg in messages:
- timestamp = msg.get("timestamp", "")[:19].replace('T', ' ')
- agent = msg.get("agent_name", "Unknown")
- content = msg.get("message", "")
-
- md_content += f"### [{timestamp}] {agent}\n\n"
- md_content += f"{content}\n\n"
- md_content += "---\n\n"
-
- return md_content
-
-
-def main():
- """主应用"""
- init_session_state()
-
- # ===== 左侧边栏 - 配置区域 =====
- with st.sidebar:
- st.title("⚙️ 配置")
-
- # API 配置
- st.subheader("模型配置")
- api_key = st.text_input(
- "API Key",
- type="password",
- value=os.getenv("DASHSCOPE_API_KEY", ""),
- help="阿里云 DashScope API Key"
- )
-
- base_url = st.text_input(
- "Base URL",
- value="https://dashscope.aliyuncs.com/compatible-mode/v1",
- help="API Base URL"
- )
-
- model = st.selectbox(
- "模型选择",
- options=["qwen3.5-flash", "qwen-max", "qwen-plus", "qwen-turbo"],
- index=0,
- help="选择使用的 Qwen 模型"
- )
-
- # Agent 参数
- st.subheader("Agent 参数")
- max_round = st.slider(
- "最大对话轮数",
- min_value=5,
- max_value=50,
- value=15,
- step=1
- )
-
- temperature = st.slider(
- "温度参数",
- min_value=0.0,
- max_value=1.0,
- value=0.7,
- step=0.1
- )
-
- st.divider()
-
- # 控制按钮
- st.subheader("控制")
- col1, col2 = st.columns(2)
-
- with col1:
- start_btn = st.button("▶️ 启动", use_container_width=True)
- with col2:
- stop_btn = st.button("⏸️ 暂停", use_container_width=True)
-
- st.divider()
-
- # 导出选项
- st.subheader("导出")
- if st.button("📄 导出 JSON", use_container_width=True):
- if st.session_state.messages:
- import json
- json_str = json.dumps(st.session_state.messages, ensure_ascii=False, indent=2)
- st.download_button(
- label="下载 JSON",
- data=json_str,
- file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
- mime="application/json"
- )
-
- if st.button("📝 导出 Markdown", use_container_width=True):
- if st.session_state.messages:
- md_content = export_conversation(st.session_state.messages)
- st.download_button(
- label="下载 Markdown",
- data=md_content,
- file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md",
- mime="text/markdown"
- )
-
- # ===== 主区域 - 聊天展示 =====
- st.title("🤖 AutoGen SDLC 多智能体协同系统")
- st.markdown("**基于 AutoGen + Qwen3.5-flash 的端到端软件交付系统**")
-
- # 状态指示器
- status_col1, status_col2, status_col3 = st.columns(3)
- with status_col1:
- st.metric("对话轮数", len(st.session_state.messages))
- with status_col2:
- status = "🟢 运行中" if st.session_state.is_running else "🔴 已停止"
- st.metric("系统状态", status)
- with status_col3:
- result_status = "✅ 成功" if st.session_state.workflow_result and st.session_state.workflow_result.get("success") else "⏳ 未开始"
- st.metric("工作流状态", result_status)
-
- st.divider()
-
- # 聊天历史展示区
- chat_container = st.container()
- with chat_container:
- if not st.session_state.messages:
- st.info("👈 请在左侧配置后,输入需求并点击启动按钮开始工作流")
- else:
- for msg in st.session_state.messages:
- display_message(
- agent_name=msg.get("agent_name", "Unknown"),
- message=msg.get("message", ""),
- timestamp=msg.get("timestamp", "")
- )
-
- # ===== 右侧边栏 - 进度和日志 =====
- with st.sidebar:
- st.divider()
-
- # 工作流进度
- st.subheader("📊 工作流进度")
-
- workflow_steps = [
- "需求分析",
- "测试设计",
- "代码实现",
- "测试执行",
- "最终验证"
- ]
-
- current_step = st.session_state.get("current_step", 0)
- for i, step in enumerate(workflow_steps):
- if i < current_step:
- st.success(f"✅ {step}")
- elif i == current_step:
- st.info(f"🔄 {step}")
- else:
- st.write(f"⚪ {step}")
-
- # ===== 用户输入区域 =====
- st.divider()
-
- # 用户需求输入
- user_input = st.chat_input("请输入您的软件需求...")
-
- if user_input:
- # 添加用户消息到历史
- user_msg = {
- "timestamp": datetime.now().isoformat(),
- "agent_name": "User",
- "role": "user",
- "message": user_input
- }
- st.session_state.messages.append(user_msg)
- st.session_state.conversation_history.append(user_msg)
-
- # 如果正在运行,添加到队列等待处理
- if st.session_state.is_running:
- # 这里会触发工作流执行
- pass
-
- # ===== 工作流执行逻辑 =====
- if start_btn:
- if not api_key:
- st.error("请先设置 API Key!")
- st.stop()
-
- # 获取最后一条用户消息
- user_messages = [m for m in st.session_state.messages if m.get("role") == "user"]
- if not user_messages:
- st.warning("请先输入需求!")
- st.stop()
-
- latest_requirement = user_messages[-1]["message"]
-
- # 设置运行状态
- st.session_state.is_running = True
- st.session_state.current_step = 0
-
- # 创建占位符用于实时更新
- status_placeholder = st.empty()
- message_placeholder = st.empty()
-
- with status_placeholder:
- st.info("🚀 正在启动 SDLC 工作流...")
-
- try:
- # 创建 Agent
- pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config = create_agents(
- api_key=api_key,
- base_url=base_url,
- model=model
- )
-
- # 创建 GroupChat
- groupchat = GroupChat(
- agents=[pm_agent, qa_agent, dev_agent, orchestrator, user_proxy],
- messages=[],
- max_round=max_round,
- speaker_selection_method="round_robin"
- )
-
- manager = GroupChatManager(groupchat=groupchat, llm_config=llm_config)
-
- # 构建初始消息
- initial_message = f"""
-请启动完整的 SDLC 流程:
-
-【用户需求】
-{latest_requirement}
-
-【工作流程】
-1. PM_Agent → 生成 SRS
-2. QA_Agent → 生成测试用例
-3. Dev_Agent → 编写代码
-4. User_Proxy → 执行测试
-5. Orchestrator → 汇总报告
-
-开始协作!
-"""
-
- # 记录第一条消息
- first_msg = {
- "timestamp": datetime.now().isoformat(),
- "agent_name": "Orchestrator",
- "role": "assistant",
- "message": f"🎯 启动 SDLC 工作流,需求:{latest_requirement[:100]}..."
- }
- st.session_state.messages.append(first_msg)
- st.session_state.current_step = 0
-
- # 启动对话(简化版本,实际应该用异步)
- with message_placeholder:
- st.info("💬 Agent 们正在协作中,请稍候...")
-
- # 这里简化处理,实际应该使用异步回调
- chat_result = user_proxy.initiate_chat(
- manager,
- message=initial_message,
- max_turns=max_round
- )
-
- # 记录结果
- for msg in groupchat.messages:
- chat_msg = {
- "timestamp": datetime.now().isoformat(),
- "agent_name": msg.get("name", "Unknown"),
- "role": msg.get("role", "assistant"),
- "message": msg.get("content", "")
- }
- st.session_state.messages.append(chat_msg)
-
- # 更新状态
- st.session_state.workflow_result = {
- "success": True,
- "summary": chat_result.summary if hasattr(chat_result, 'summary') else "完成"
- }
- st.session_state.current_step = 5 # 完成所有步骤
- st.session_state.is_running = False
-
- # 刷新显示
- st.rerun()
-
- except Exception as e:
- st.session_state.is_running = False
- st.error(f"❌ 错误:{str(e)}")
- st.session_state.workflow_result = {"success": False, "error": str(e)}
-
- if stop_btn:
- st.session_state.is_running = False
- st.info("⏸️ 工作流已暂停")
-
-
-if __name__ == "__main__":
- main()
diff --git a/frontend/streamlit_app_v2.py b/frontend/streamlit_app_v2.py
deleted file mode 100644
index da8c9d1..0000000
--- a/frontend/streamlit_app_v2.py
+++ /dev/null
@@ -1,725 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Streamlit 前端 v2 - 增强版多智能体实时协作可视化界面
-功能:
-1. Agent 协作流程图(桑基图)
-2. 实时状态监控面板
-3. 分屏对话展示
-4. 生成的文件预览
-5. 时间线视图
-"""
-import streamlit as st
-import os
-import sys
-from pathlib import Path
-from datetime import datetime
-import time
-from typing import Dict, Any, List
-import json
-
-# 添加项目根目录到路径
-sys.path.insert(0, str(Path(__file__).parent.parent))
-
-try:
- from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
- AUTOGEN_AVAILABLE = True
-except ImportError:
- AUTOGEN_AVAILABLE = False
-
-from config.llm_config import (
- get_llm_config,
- PM_PROMPT,
- QA_PROMPT,
- DEV_PROMPT,
- ORCH_PROMPT
-)
-
-
-# 页面配置
-st.set_page_config(
- page_title="AutoGen SDLC 多智能体协作平台",
- page_icon="🤖",
- layout="wide",
- initial_sidebar_state="expanded"
-)
-
-# 自定义 CSS 样式
-st.markdown("""
-
-""", unsafe_allow_html=True)
-
-
-# Agent 配置信息
-AGENT_CONFIG = {
- "PM_Agent": {
- "name": "产品经理",
- "avatar": "📋",
- "color": "#2196f3",
- "description": "需求分析与 SRS 生成",
- "icon": "📊"
- },
- "QA_Agent": {
- "name": "测试工程师",
- "avatar": "✅",
- "color": "#4caf50",
- "description": "测试用例设计与 TDD",
- "icon": "🧪"
- },
- "Dev_Agent": {
- "name": "开发工程师",
- "avatar": "💻",
- "color": "#ff9800",
- "description": "代码实现与修复",
- "icon": "⚙️"
- },
- "Orchestrator": {
- "name": "协调器",
- "avatar": "🎯",
- "color": "#9c27b0",
- "description": "流程调度与验证",
- "icon": "🎭"
- },
- "User_Proxy": {
- "name": "用户代理",
- "avatar": "👤",
- "color": "#795548",
- "description": "人机交互接口",
- "icon": "🔌"
- }
-}
-
-
-def init_session_state():
- """初始化 session state"""
- if "messages" not in st.session_state:
- st.session_state.messages = []
- if "is_running" not in st.session_state:
- st.session_state.is_running = False
- if "workflow_result" not in st.session_state:
- st.session_state.workflow_result = None
- if "conversation_history" not in st.session_state:
- st.session_state.conversation_history = []
- if "current_step" not in st.session_state:
- st.session_state.current_step = 0
- if "agent_stats" not in st.session_state:
- st.session_state.agent_stats = {
- "PM_Agent": {"messages": 0, "last_active": None},
- "QA_Agent": {"messages": 0, "last_active": None},
- "Dev_Agent": {"messages": 0, "last_active": None},
- "Orchestrator": {"messages": 0, "last_active": None},
- "User_Proxy": {"messages": 0, "last_active": None}
- }
- if "generated_files" not in st.session_state:
- st.session_state.generated_files = []
- if "active_agent" not in st.session_state:
- st.session_state.active_agent = None
-
-
-def create_agents(api_key: str, base_url: str, model: str):
- """创建所有 Agent"""
- llm_config = get_llm_config(model=model, api_key=api_key, base_url=base_url)
-
- pm_agent = AssistantAgent(
- name="PM_Agent",
- system_message=PM_PROMPT,
- llm_config=llm_config,
- description="资深软件产品经理",
- human_input_mode="NEVER"
- )
-
- qa_agent = AssistantAgent(
- name="QA_Agent",
- system_message=QA_PROMPT,
- llm_config=llm_config,
- description="资深测试工程师",
- human_input_mode="NEVER"
- )
-
- dev_agent = AssistantAgent(
- name="Dev_Agent",
- system_message=DEV_PROMPT,
- llm_config=llm_config,
- description="资深软件工程师",
- human_input_mode="NEVER"
- )
-
- orchestrator = AssistantAgent(
- name="Orchestrator",
- system_message=ORCH_PROMPT,
- llm_config=llm_config,
- description="多智能体协调器",
- human_input_mode="NEVER"
- )
-
- user_proxy = UserProxyAgent(
- name="User_Proxy",
- human_input_mode="NEVER", # 修复:Web 环境不支持 TERMINAL
- max_consecutive_auto_reply=0,
- code_execution_config={
- "work_dir": "workspace",
- "use_docker": False,
- },
- is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE")
- )
-
- return pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config
-
-
-def display_agent_status_panel():
- """显示 Agent 状态面板"""
- st.subheader("🎭 Agent 实时状态")
-
- cols = st.columns(5)
- agent_names = list(AGENT_CONFIG.keys())
-
- for i, (agent_name, config) in enumerate(AGENT_CONFIG.items()):
- with cols[i]:
- stats = st.session_state.agent_stats.get(agent_name, {})
- msg_count = stats.get("messages", 0)
- last_active = stats.get("last_active")
-
- is_active = st.session_state.active_agent == agent_name
-
- status_color = config["color"]
- if is_active:
- status_html = f''
- else:
- status_html = f''
-
- st.markdown(f"""
-
-
{config['avatar']}
-
{config['name']}
-
消息:{msg_count}
-
{status_html}{'工作中' if is_active else '等待中'}
-
- """, unsafe_allow_html=True)
-
-
-def display_workflow_diagram():
- """显示工作流程图"""
- st.subheader("🔄 工作流程图")
-
- # 使用 Mermaid 绘制流程图
- mermaid_code = """
- ```mermaid
- graph LR
- A[👤 用户需求] --> B[📋 PM Agent]
- B --> C[📄 SRS 文档]
- C --> D[✅ QA Agent]
- D --> E[🧪 测试用例]
- E --> F[💻 Dev Agent]
- F --> G[💾 源代码]
- G --> H[🔧 自动测试]
- H -->|失败 | F
- H -->|通过 | I[🎯 Orchestrator]
- I --> J[✅ 最终报告]
-
- style B fill:#e3f2fd,stroke:#2196f3,stroke-width:3px
- style D fill:#e8f5e9,stroke:#4caf50,stroke-width:3px
- style F fill:#fff3e0,stroke:#ff9800,stroke-width:3px
- style I fill:#f3e5f5,stroke:#9c27b0,stroke-width:3px
- ```
- """
- st.markdown(mermaid_code)
-
-
-def display_message(agent_name: str, message: str, timestamp: str, index: int):
- """显示单条消息(增强版)"""
- config = AGENT_CONFIG.get(agent_name, {"name": agent_name, "avatar": "🤖", "color": "#999"})
-
- # 折叠长消息
- if len(message) > 500:
- with st.expander(f"{config['avatar']} {config['name']} - {timestamp[:16]}"):
- st.markdown(message)
- else:
- with st.chat_message(name=agent_name.lower().replace("_", ""), avatar=config['avatar']):
- st.markdown(f"**{config['avatar']} {config['name']}** *({timestamp[:16]})*")
- st.markdown(message)
-
-
-def display_timeline():
- """显示时间线视图"""
- st.subheader("⏱️ 执行时间线")
-
- if not st.session_state.messages:
- st.info("暂无执行记录")
- return
-
- # 按时间分组显示关键事件
- timeline_data = []
- for msg in st.session_state.messages:
- agent = msg.get("agent_name", "Unknown")
- timestamp = msg.get("timestamp", "")[:19].replace('T', ' ')
- message = msg.get("message", "")[:100]
-
- if agent in AGENT_CONFIG:
- timeline_data.append({
- "time": timestamp,
- "agent": agent,
- "message": message,
- "icon": AGENT_CONFIG[agent]["avatar"]
- })
-
- # 倒序显示(最新的在前)
- for item in reversed(timeline_data[-10:]): # 只显示最近 10 条
- with st.container():
- st.markdown(f"""
-
- {item['icon']} {item['time']}
- {AGENT_CONFIG.get(item['agent'], {}).get('name', item['agent'])}
- {item['message']}...
-
- """, unsafe_allow_html=True)
-
-
-def display_generated_files():
- """显示生成的文件"""
- st.subheader("📁 生成的文件")
-
- workspace_dir = Path("workspace")
- if not workspace_dir.exists():
- st.info("工作目录为空")
- return
-
- files = list(workspace_dir.glob("*"))
-
- if not files:
- st.info("暂无生成的文件")
- return
-
- for file_path in files:
- if file_path.is_file():
- file_size = file_path.stat().st_size
- file_type = file_path.suffix
-
- icon_map = {
- ".py": "🐍",
- ".md": "📝",
- ".txt": "📄",
- ".json": "📊"
- }
-
- icon = icon_map.get(file_type, "📎")
-
- with st.container():
- st.markdown(f"""
-
-
-
- {icon}
- {file_path.name}
-
- {file_size:,} bytes
-
-
-
-
- """, unsafe_allow_html=True)
-
- # 文件内容预览
- if st.button(f"👁️ 预览 {file_path.name}", key=f"preview_{file_path.name}"):
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- content = f.read()
- st.code(content[:2000] + ("..." if len(content) > 2000 else ""))
- except Exception as e:
- st.error(f"读取失败:{e}")
-
-
-def display_chat_interface():
- """显示聊天界面"""
- st.subheader("💬 Agent 对话实录")
-
- if not st.session_state.messages:
- st.info("👈 暂无对话,请在下方输入需求并启动工作流")
- else:
- # 使用容器显示消息
- chat_container = st.container()
- with chat_container:
- for idx, msg in enumerate(st.session_state.messages):
- display_message(
- agent_name=msg.get("agent_name", "Unknown"),
- message=msg.get("message", ""),
- timestamp=msg.get("timestamp", ""),
- index=idx
- )
-
-
-def export_conversation(messages: List[Dict]) -> str:
- """导出对话为 Markdown"""
- md_content = "# AutoGen SDLC 对话历史\n\n"
- md_content += f"**导出时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
- md_content += "---\n\n"
-
- for msg in messages:
- timestamp = msg.get("timestamp", "")[:19].replace('T', ' ')
- agent = msg.get("agent_name", "Unknown")
- content = msg.get("message", "")
-
- config = AGENT_CONFIG.get(agent, {"avatar": "🤖", "name": agent})
-
- md_content += f"### {config['avatar']} [{timestamp}] {config['name']}\n\n"
- md_content += f"{content}\n\n"
- md_content += "---\n\n"
-
- return md_content
-
-
-def main():
- """主应用"""
- init_session_state()
-
- # ===== 顶部标题区 =====
- st.title("🤖 AutoGen SDLC 多智能体协作平台")
- st.markdown("**基于 AutoGen + Qwen3.5-flash 的端到端软件交付系统 · 实时可视化 Agent 协作**")
-
- # ===== 侧边栏 - 配置区域 =====
- with st.sidebar:
- st.title("⚙️ 控制中心")
-
- # API 配置
- st.subheader("🔑 模型配置")
- api_key = st.text_input(
- "API Key",
- type="password",
- value=os.getenv("DASHSCOPE_API_KEY", ""),
- help="阿里云 DashScope API Key"
- )
-
- base_url = st.text_input(
- "Base URL",
- value="https://dashscope.aliyuncs.com/compatible-mode/v1",
- help="API Base URL"
- )
-
- model = st.selectbox(
- "模型选择",
- options=["qwen3.5-flash", "qwen-max", "qwen-plus", "qwen-turbo"],
- index=0,
- help="选择使用的 Qwen 模型"
- )
-
- # Agent 参数
- st.subheader("🎛️ Agent 参数")
- max_round = st.slider(
- "最大对话轮数",
- min_value=5,
- max_value=50,
- value=20,
- step=1
- )
-
- temperature = st.slider(
- "温度参数",
- min_value=0.0,
- max_value=1.0,
- value=0.7,
- step=0.1
- )
-
- st.divider()
-
- # 控制按钮
- st.subheader("🎮 流程控制")
- col1, col2 = st.columns(2)
-
- with col1:
- start_btn = st.button("▶️ 启动", use_container_width=True, type="primary")
- with col2:
- stop_btn = st.button("⏸️ 暂停", use_container_width=True)
-
- st.divider()
-
- # 导出选项
- st.subheader("💾 数据导出")
- if st.button("📄 导出 JSON", use_container_width=True):
- if st.session_state.messages:
- json_str = json.dumps(st.session_state.messages, ensure_ascii=False, indent=2)
- st.download_button(
- label="下载 JSON",
- data=json_str,
- file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
- mime="application/json"
- )
-
- if st.button("📝 导出 Markdown", use_container_width=True):
- if st.session_state.messages:
- md_content = export_conversation(st.session_state.messages)
- st.download_button(
- label="下载 Markdown",
- data=md_content,
- file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md",
- mime="text/markdown"
- )
-
- # ===== 主区域 - 三栏布局 =====
- # 第一行:状态指标
- status_col1, status_col2, status_col3, status_col4 = st.columns(4)
-
- with status_col1:
- total_msgs = len(st.session_state.messages)
- st.metric("💬 总消息数", total_msgs)
-
- with status_col2:
- status = "🟢 运行中" if st.session_state.is_running else "🔴 已停止"
- status_class = "running" if st.session_state.is_running else "stopped"
- st.markdown(f"""
-
- {status}
-
- """, unsafe_allow_html=True)
-
- with status_col3:
- result_status = "✅ 成功" if st.session_state.workflow_result and st.session_state.workflow_result.get("success") else "⏳ 未开始"
- st.metric("📊 工作流状态", result_status)
-
- with status_col4:
- files_count = len(list(Path("workspace").glob("*"))) if Path("workspace").exists() else 0
- st.metric("📁 生成文件数", files_count)
-
- st.divider()
-
- # 第二行:左侧 - Agent 状态 + 流程图,右侧 - 时间线
- left_col, right_col = st.columns([2, 1])
-
- with left_col:
- display_agent_status_panel()
- st.divider()
- display_workflow_diagram()
-
- with right_col:
- display_timeline()
-
- st.divider()
-
- # 第三行:聊天界面
- display_chat_interface()
-
- # ===== 底部 - 用户需求输入 =====
- st.divider()
-
- # 用户输入
- user_input = st.chat_input("💡 请输入您的软件需求,例如:我需要一个电池健康状态预测 API...")
-
- if user_input:
- # 添加用户消息
- user_msg = {
- "timestamp": datetime.now().isoformat(),
- "agent_name": "User_Proxy",
- "role": "user",
- "message": user_input
- }
- st.session_state.messages.append(user_msg)
- st.session_state.conversation_history.append(user_msg)
-
- # 更新统计
- st.session_state.agent_stats["User_Proxy"]["messages"] += 1
- st.session_state.agent_stats["User_Proxy"]["last_active"] = datetime.now().isoformat()
-
- # 刷新显示
- st.rerun()
-
- # ===== 工作流执行逻辑 =====
- if start_btn:
- if not api_key:
- st.error("请先设置 API Key!")
- st.stop()
-
- if not AUTOGEN_AVAILABLE:
- st.error("请先安装 AutoGen: pip install pyautogen")
- st.stop()
-
- # 获取最后一条用户消息
- user_messages = [m for m in st.session_state.messages if m.get("role") == "user"]
- if not user_messages:
- st.warning("请先输入需求!")
- st.stop()
-
- latest_requirement = user_messages[-1]["message"]
-
- # 设置运行状态
- st.session_state.is_running = True
- st.session_state.current_step = 0
-
- # 创建进度占位符
- progress_placeholder = st.empty()
-
- with progress_placeholder:
- st.info("🚀 正在启动 SDLC 工作流,请稍候...")
-
- try:
- # 创建 Agent
- pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config = create_agents(
- api_key=api_key,
- base_url=base_url,
- model=model
- )
-
- # 创建 GroupChat
- groupchat = GroupChat(
- agents=[pm_agent, qa_agent, dev_agent, orchestrator, user_proxy],
- messages=[],
- max_round=max_round,
- speaker_selection_method="round_robin"
- )
-
- manager = GroupChatManager(groupchat=groupchat, llm_config=llm_config)
-
- # 构建初始消息
- initial_message = f"""
-请启动完整的 SDLC 流程:
-
-【用户需求】
-{latest_requirement}
-
-【工作流程】
-1. PM_Agent → 生成 SRS
-2. QA_Agent → 生成测试用例
-3. Dev_Agent → 编写代码
-4. User_Proxy → 执行测试
-5. Orchestrator → 汇总报告
-
-开始协作!
-"""
-
- # 记录第一条消息
- first_msg = {
- "timestamp": datetime.now().isoformat(),
- "agent_name": "Orchestrator",
- "role": "assistant",
- "message": f"🎯 启动 SDLC 工作流,需求:{latest_requirement[:100]}..."
- }
- st.session_state.messages.append(first_msg)
-
- # 更新统计
- st.session_state.agent_stats["Orchestrator"]["messages"] += 1
- st.session_state.active_agent = "Orchestrator"
-
- # 执行对话
- chat_result = user_proxy.initiate_chat(
- manager,
- message=initial_message,
- max_turns=max_round
- )
-
- # 记录所有对话
- for msg in groupchat.messages:
- chat_msg = {
- "timestamp": datetime.now().isoformat(),
- "agent_name": msg.get("name", "Unknown"),
- "role": msg.get("role", "assistant"),
- "message": msg.get("content", "")
- }
-
- # 避免重复添加
- if chat_msg not in st.session_state.messages:
- st.session_state.messages.append(chat_msg)
-
- # 更新统计
- agent_name = msg.get("name", "Unknown")
- if agent_name in st.session_state.agent_stats:
- st.session_state.agent_stats[agent_name]["messages"] += 1
- st.session_state.agent_stats[agent_name]["last_active"] = datetime.now().isoformat()
-
- # 扫描生成的文件
- if Path("workspace").exists():
- files = list(Path("workspace").glob("*"))
- st.session_state.generated_files = [str(f) for f in files if f.is_file()]
-
- # 更新状态
- st.session_state.workflow_result = {
- "success": True,
- "summary": chat_result.summary if hasattr(chat_result, 'summary') else "完成"
- }
- st.session_state.current_step = 5
- st.session_state.is_running = False
- st.session_state.active_agent = None
-
- progress_placeholder.success("✅ SDLC 工作流完成!")
-
- # 刷新显示
- st.rerun()
-
- except Exception as e:
- st.session_state.is_running = False
- st.session_state.active_agent = None
- st.error(f"❌ 错误:{str(e)}")
- st.session_state.workflow_result = {"success": False, "error": str(e)}
-
- if stop_btn:
- st.session_state.is_running = False
- st.session_state.active_agent = None
- st.info("⏸️ 工作流已暂停")
-
-
-if __name__ == "__main__":
- main()
diff --git a/frontend/streamlit_app_v3.py b/frontend/streamlit_app_v3.py
deleted file mode 100644
index a90c22a..0000000
--- a/frontend/streamlit_app_v3.py
+++ /dev/null
@@ -1,756 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Streamlit 前端 v3 - Agent 对话流可视化版本
-核心特性:
-1. 清晰展示每个 Agent 当前在做什么(任务标签)
-2. Agent 之间的对话流(类似聊天室)
-3. 实时活动指示器(高亮当前发言的 Agent)
-4. 对话历史完整记录
-"""
-import streamlit as st
-import os
-import sys
-from pathlib import Path
-from datetime import datetime
-import time
-from typing import Dict, Any, List
-import json
-
-# 添加项目根目录到路径
-sys.path.insert(0, str(Path(__file__).parent.parent))
-
-try:
- from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
- AUTOGEN_AVAILABLE = True
-except ImportError:
- AUTOGEN_AVAILABLE = False
-
-from config.llm_config import (
- get_llm_config,
- PM_PROMPT,
- QA_PROMPT,
- DEV_PROMPT,
- ORCH_PROMPT
-)
-
-
-# 页面配置
-st.set_page_config(
- page_title="AutoGen SDLC - Agent 对话流",
- page_icon="💬",
- layout="wide",
- initial_sidebar_state="expanded"
-)
-
-# 自定义 CSS 样式 - 突出对话流
-st.markdown("""
-
-""", unsafe_allow_html=True)
-
-
-# Agent 配置
-AGENT_CONFIG = {
- "PM_Agent": {
- "name": "产品经理",
- "avatar": "📋",
- "color": "#2196f3",
- "bubble_class": "pm-bubble",
- "task_class": "pm-task",
- "default_task": "待命"
- },
- "QA_Agent": {
- "name": "测试工程师",
- "avatar": "✅",
- "color": "#4caf50",
- "bubble_class": "qa-bubble",
- "task_class": "qa-task",
- "default_task": "待命"
- },
- "Dev_Agent": {
- "name": "开发工程师",
- "avatar": "💻",
- "color": "#ff9800",
- "bubble_class": "dev-bubble",
- "task_class": "dev-task",
- "default_task": "待命"
- },
- "Orchestrator": {
- "name": "协调器",
- "avatar": "🎯",
- "color": "#9c27b0",
- "bubble_class": "orchestrator-bubble",
- "task_class": "orchestrator-task",
- "default_task": "待命"
- },
- "User_Proxy": {
- "name": "用户代理",
- "avatar": "👤",
- "color": "#795548",
- "bubble_class": "user-bubble",
- "task_class": "user-task",
- "default_task": "待命"
- }
-}
-
-
-def init_session_state():
- """初始化 session state"""
- if "messages" not in st.session_state:
- st.session_state.messages = []
- if "is_running" not in st.session_state:
- st.session_state.is_running = False
- if "current_agent" not in st.session_state:
- st.session_state.current_agent = None
- if "agent_tasks" not in st.session_state:
- st.session_state.agent_tasks = {
- "PM_Agent": "待命",
- "QA_Agent": "待命",
- "Dev_Agent": "待命",
- "Orchestrator": "待命",
- "User_Proxy": "待命"
- }
- if "agent_stats" not in st.session_state:
- st.session_state.agent_stats = {agent: 0 for agent in AGENT_CONFIG}
-
-
-def display_agent_status_row():
- """显示 Agent 状态行 - 突出当前活动的 Agent"""
- st.subheader("🎭 Agent 实时状态")
-
- cols = st.columns(5)
-
- for i, (agent_key, config) in enumerate(AGENT_CONFIG.items()):
- with cols[i]:
- is_active = st.session_state.current_agent == agent_key
- current_task = st.session_state.agent_tasks.get(agent_key, "待命")
- msg_count = st.session_state.agent_stats.get(agent_key, 0)
-
- status_class = "active" if is_active else ""
- indicator_class = "status-active" if is_active else "status-inactive"
-
- st.markdown(f"""
-
-
{config['avatar']}
-
-
- {config['name']}
-
-
- 📝 {current_task}
-
-
- 💬 {msg_count} 条消息
-
-
- """, unsafe_allow_html=True)
-
-
-def display_active_agent_banner():
- """显示当前活动 Agent 的横幅"""
- if st.session_state.current_agent:
- config = AGENT_CONFIG.get(st.session_state.current_agent, {})
- st.markdown(f"""
-
- {config.get('avatar', '🤖')} 当前发言:{config.get('name', 'Unknown')} - {st.session_state.agent_tasks.get(st.session_state.current_agent, '工作中...')}
-
- """, unsafe_allow_html=True)
-
-
-def display_chat_flow():
- """显示 Agent 对话流 - 核心功能"""
- st.subheader("💬 Agent 对话流")
-
- if not st.session_state.messages:
- st.info("👈 暂无对话,请启动工作流")
- return
-
- # 使用容器显示对话流
- chat_container = st.container()
-
- for idx, msg in enumerate(st.session_state.messages):
- agent_key = msg.get("agent_key", "Unknown")
- config = AGENT_CONFIG.get(agent_key, {"name": "未知", "avatar": "🤖", "bubble_class": ""})
- task_class = AGENT_CONFIG.get(agent_key, {}).get("task_class", "")
-
- timestamp = msg.get("timestamp", "")
- if timestamp:
- time_str = datetime.fromisoformat(timestamp).strftime("%H:%M:%S")
- else:
- time_str = datetime.now().strftime("%H:%M:%S")
-
- content = msg.get("content", "")
- task = msg.get("task", "工作中")
-
- # 显示对话气泡
- st.markdown(f"""
-
- """, unsafe_allow_html=True)
-
- # 显示对话流指示器(下一条消息的箭头)
- if idx < len(st.session_state.messages) - 1:
- next_msg = st.session_state.messages[idx + 1]
- next_agent = next_msg.get("agent_key", "")
- next_config = AGENT_CONFIG.get(next_agent, {})
-
- if next_agent != agent_key:
- st.markdown(f"""
-
-
-
⬇️
-
- 传递给 {next_config.get('name', 'Unknown')}
-
-
- """, unsafe_allow_html=True)
-
-
-def save_generated_files():
- """从对话中提取并保存生成的文件到 workspace 目录"""
- workspace_dir = Path("workspace")
- workspace_dir.mkdir(parents=True, exist_ok=True)
-
- files_saved = []
-
- # 遍历所有消息,提取文件内容
- for msg in st.session_state.messages:
- content = msg.get("content", "")
- agent_key = msg.get("agent_key", "")
-
- # PM Agent 生成 SRS.md
- if agent_key == "PM_Agent" and ("SRS" in content or "软件需求规格" in content):
- srs_file = workspace_dir / "SRS.md"
- # 提取 SRS 内容(查找包含 FR- 或 NFR- 的段落)
- srs_content = content
- if "功能性需求" in content:
- srs_content = content[content.find("功能性需求"):]
- with open(srs_file, 'w', encoding='utf-8') as f:
- f.write(f"# 软件需求规格说明书 (SRS)\n\n")
- f.write(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
- f.write(srs_content)
- files_saved.append(str(srs_file))
-
- # QA Agent 生成测试文件
- if agent_key == "QA_Agent" and ("test_" in content or "def test_" in content):
- test_file = workspace_dir / "test_battery_health.py"
- # 提取 Python 代码
- if "```python" in content:
- code = content.split("```python")[1].split("```")[0]
- else:
- code = content
- with open(test_file, 'w', encoding='utf-8') as f:
- f.write(f'"""\n电池健康状态测试用例\n生成时间:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n"""\n\n')
- f.write(code)
- files_saved.append(str(test_file))
-
- # Dev Agent 生成源代码
- if agent_key == "Dev_Agent" and ("def " in content or "class " in content):
- src_file = workspace_dir / "src_battery_health.py"
- # 提取 Python 代码
- if "```python" in content:
- code = content.split("```python")[1].split("```")[0]
- else:
- code = content
- with open(src_file, 'w', encoding='utf-8') as f:
- f.write(f'"""\n电池健康状态计算模块\n生成时间:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n"""\n\n')
- f.write(code)
- files_saved.append(str(src_file))
-
- # Orchestrator 生成最终报告
- if agent_key == "Orchestrator" and ("完成" in content and "SDLC" in content):
- report_file = workspace_dir / "FINAL_REPORT.md"
- with open(report_file, 'w', encoding='utf-8') as f:
- f.write(f"# SDLC 项目最终报告\n\n")
- f.write(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
- f.write("## 项目概述\n\n")
- f.write("基于 AutoGen 多智能体系统完成的软件交付项目。\n\n")
- f.write("## 生成文件列表\n\n")
- for i, saved_file in enumerate(files_saved, 1):
- f.write(f"{i}. {saved_file}\n")
- f.write(f"\n## 项目总结\n\n{content}\n")
- files_saved.append(str(report_file))
-
- return files_saved
-
-
-def create_agents(api_key: str, base_url: str, model: str):
- """创建所有 Agent"""
- llm_config = get_llm_config(model=model, api_key=api_key, base_url=base_url)
-
- pm_agent = AssistantAgent(
- name="PM_Agent",
- system_message=PM_PROMPT,
- llm_config=llm_config,
- description="资深软件产品经理",
- human_input_mode="NEVER"
- )
-
- qa_agent = AssistantAgent(
- name="QA_Agent",
- system_message=QA_PROMPT,
- llm_config=llm_config,
- description="资深测试工程师",
- human_input_mode="NEVER"
- )
-
- dev_agent = AssistantAgent(
- name="Dev_Agent",
- system_message=DEV_PROMPT,
- llm_config=llm_config,
- description="资深软件工程师",
- human_input_mode="NEVER"
- )
-
- orchestrator = AssistantAgent(
- name="Orchestrator",
- system_message=ORCH_PROMPT,
- llm_config=llm_config,
- description="多智能体协调器",
- human_input_mode="NEVER"
- )
-
- user_proxy = UserProxyAgent(
- name="User_Proxy",
- human_input_mode="NEVER",
- max_consecutive_auto_reply=0,
- code_execution_config={
- "work_dir": "workspace",
- "use_docker": False,
- },
- is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE")
- )
-
- return pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config
-
-
-def add_message(agent_key: str, content: str, task: str = "工作中"):
- """添加消息到对话流"""
- msg = {
- "agent_key": agent_key,
- "content": content,
- "timestamp": datetime.now().isoformat(),
- "task": task
- }
- st.session_state.messages.append(msg)
-
- # 更新统计
- st.session_state.agent_stats[agent_key] = st.session_state.agent_stats.get(agent_key, 0) + 1
-
- # 更新当前 Agent
- st.session_state.current_agent = agent_key
-
- # 更新任务
- st.session_state.agent_tasks[agent_key] = task
-
-
-def main():
- """主应用"""
- init_session_state()
-
- # 标题
- st.title("💬 AutoGen SDLC - Agent 对话流可视化")
- st.markdown("**清晰展示每个 Agent 在做什么 · 实时追踪 Agent 之间的对话交互**")
-
- # 侧边栏配置
- with st.sidebar:
- st.title("⚙️ 控制中心")
-
- api_key = st.text_input("API Key", type="password", value=os.getenv("DASHSCOPE_API_KEY", ""))
- base_url = st.text_input("Base URL", value="https://dashscope.aliyuncs.com/compatible-mode/v1")
- model = st.selectbox("模型选择", ["qwen3.5-flash", "qwen-max", "qwen-plus", "qwen-turbo"], index=0)
-
- max_round = st.slider("最大对话轮数", 5, 50, 20)
-
- st.divider()
-
- col1, col2 = st.columns(2)
- with col1:
- start_btn = st.button("▶️ 启动对话流", type="primary", use_container_width=True)
- with col2:
- stop_btn = st.button("⏸️ 暂停", use_container_width=True)
-
- st.divider()
-
- if st.button("🗑️ 清空对话", use_container_width=True):
- st.session_state.messages = []
- st.session_state.current_agent = None
- st.session_state.agent_tasks = {k: "待命" for k in AGENT_CONFIG}
- st.session_state.agent_stats = {k: 0 for k in AGENT_CONFIG}
- st.rerun()
-
- if st.button("📥 导出对话", use_container_width=True):
- if st.session_state.messages:
- json_str = json.dumps(st.session_state.messages, ensure_ascii=False, indent=2)
- st.download_button(
- label="下载 JSON",
- data=json_str,
- file_name=f"agent_chat_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
- mime="application/json"
- )
-
- st.divider()
-
- # 显示生成的文件
- st.subheader("📁 生成的文件")
- workspace_dir = Path("workspace")
- if workspace_dir.exists():
- files = list(workspace_dir.glob("*"))
- if files:
- for file in files:
- if file.is_file():
- with st.expander(f"📄 {file.name}"):
- try:
- content = file.read_text(encoding='utf-8')
- st.code(content[:500] + ("..." if len(content) > 500 else ""))
- st.download_button(
- label="⬇️ 下载",
- data=content,
- file_name=file.name,
- mime="text/plain",
- key=f"download_{file.name}"
- )
- except Exception as e:
- st.error(f"读取失败:{e}")
- else:
- st.info("工作目录为空,请先运行工作流")
- else:
- st.info("工作目录不存在")
-
- # 主界面
- display_agent_status_row()
- st.divider()
-
- display_active_agent_banner()
-
- display_chat_flow()
-
- # 用户输入
- st.divider()
- user_input = st.chat_input("💡 输入需求,或点击启动按钮开始...")
-
- if user_input:
- add_message("User_Proxy", user_input, "提出需求")
- st.rerun()
-
- # 启动工作流
- if start_btn:
- if not api_key:
- st.error("请先设置 API Key")
- st.stop()
-
- if not AUTOGEN_AVAILABLE:
- st.error("请先安装 AutoGen: pip install pyautogen")
- st.stop()
-
- # 获取需求
- user_msgs = [m for m in st.session_state.messages if m.get("agent_key") == "User_Proxy"]
- if not user_msgs:
- add_message("User_Proxy", "请开发一个电池健康状态预测 API", "提出需求")
-
- latest_requirement = user_msgs[-1]["content"] if user_msgs else "请开发一个电池健康状态预测 API"
-
- st.session_state.is_running = True
-
- # 进度提示
- progress_placeholder = st.empty()
- progress_placeholder.info("🚀 启动 SDLC 工作流,Agent 开始协作...")
-
- try:
- # 创建 Agent
- pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config = create_agents(
- api_key=api_key,
- base_url=base_url,
- model=model
- )
-
- # 添加 Orchestrator 启动消息
- add_message("Orchestrator", f"🚀 启动 SDLC 工作流!用户需求:{latest_requirement[:100]}...", "启动流程")
-
- # 创建 GroupChat
- groupchat = GroupChat(
- agents=[pm_agent, qa_agent, dev_agent, orchestrator, user_proxy],
- messages=[],
- max_round=max_round,
- speaker_selection_method="round_robin"
- )
-
- manager = GroupChatManager(groupchat=groupchat, llm_config=llm_config)
-
- # 初始消息
- initial_message = f"""
-请启动完整的 SDLC 流程:
-
-【用户需求】
-{latest_requirement}
-
-【工作流程】
-1. PM_Agent → 生成 SRS 文档
-2. QA_Agent → 生成测试用例
-3. Dev_Agent → 编写代码
-4. User_Proxy → 执行测试
-5. Orchestrator → 汇总报告
-
-开始协作!每个步骤完成后请明确说明。
-"""
-
- # 执行对话
- with st.spinner("💬 Agent 们正在协作中,请稍候..."):
- chat_result = user_proxy.initiate_chat(
- manager,
- message=initial_message,
- max_turns=max_round
- )
-
- # 记录所有对话
- for msg in groupchat.messages:
- agent_name = msg.get("name", "Unknown")
- content = msg.get("content", "")
-
- # 推断任务
- task_map = {
- "PM_Agent": "需求分析",
- "QA_Agent": "测试设计",
- "Dev_Agent": "代码实现",
- "Orchestrator": "流程协调",
- "User_Proxy": "测试执行"
- }
- task = task_map.get(agent_name, "工作中")
-
- add_message(agent_name, content, task)
-
- # 完成
- add_message("Orchestrator", "✅ SDLC 流程完成!所有任务已完成。", "总结完成")
-
- # 保存生成的文件
- progress_placeholder.info("💾 正在保存生成的文件...")
- saved_files = save_generated_files()
-
- if saved_files:
- progress_placeholder.success(f"✅ SDLC 工作流完成!已保存 {len(saved_files)} 个文件到 workspace/ 目录")
- else:
- progress_placeholder.success("✅ SDLC 工作流完成!查看上方的对话流了解详情。")
-
- st.session_state.is_running = False
-
- st.rerun()
-
- except Exception as e:
- st.session_state.is_running = False
- st.session_state.current_agent = None
- progress_placeholder.error(f"❌ 错误:{str(e)}")
- st.error("请检查 API Key 和网络连接")
-
- if stop_btn:
- st.session_state.is_running = False
- st.session_state.current_agent = None
- st.info("⏸️ 工作流已暂停")
-
-
-if __name__ == "__main__":
- main()