This commit is contained in:
2026-03-12 15:04:03 +08:00
parent cf46f77224
commit c8aef9319b
5 changed files with 290 additions and 1923 deletions

10
config/__init__.py Normal file
View File

@@ -0,0 +1,10 @@
from .llm_config import get_llm_config, PM_PROMPT, QA_PROMPT, DEV_PROMPT, ORCH_PROMPT
__all__ = [
"get_llm_config",
"PM_PROMPT",
"QA_PROMPT",
"DEV_PROMPT",
"ORCH_PROMPT",
]

280
frontend/app.py Normal file
View File

@@ -0,0 +1,280 @@
# -*- coding: utf-8 -*-
"""
Streamlit 实时 Agent 协作平台
功能:
1. 实时展示每个 Agent 的状态和动作
2. 自动保存生成的文件到 workspace/
3. 简单稳定的代码结构
"""
import streamlit as st
import os
from pathlib import Path
from datetime import datetime
import time
try:
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
AUTOGEN_AVAILABLE = True
except ImportError:
AUTOGEN_AVAILABLE = False
# 添加项目根目录到路径
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from config.llm_config import get_llm_config, PM_PROMPT, QA_PROMPT, DEV_PROMPT, ORCH_PROMPT
# 页面配置
st.set_page_config(page_title="多 Agent 协作平台", page_icon="🤖", layout="wide")
# Agent 配置
AGENTS = {
"PM_Agent": {"name": "产品经理", "avatar": "📋", "color": "blue"},
"QA_Agent": {"name": "测试工程师", "avatar": "", "color": "green"},
"Dev_Agent": {"name": "开发工程师", "avatar": "💻", "color": "orange"},
"Orchestrator": {"name": "协调器", "avatar": "🎯", "color": "purple"},
"User_Proxy": {"name": "用户代理", "avatar": "👤", "color": "gray"}
}
def init_state():
"""初始化 session state"""
if "messages" not in st.session_state:
st.session_state.messages = []
if "running" not in st.session_state:
st.session_state.running = False
if "current_agent" not in st.session_state:
st.session_state.current_agent = None
if "agent_counts" not in st.session_state:
st.session_state.agent_counts = {k: 0 for k in AGENTS}
def add_message(agent, content, task=""):
"""添加消息"""
msg = {
"agent": agent,
"content": content,
"task": task,
"time": datetime.now().strftime("%H:%M:%S")
}
st.session_state.messages.append(msg)
st.session_state.agent_counts[agent] = st.session_state.agent_counts.get(agent, 0) + 1
st.session_state.current_agent = agent
def show_agent_status():
"""显示 Agent 状态"""
cols = st.columns(len(AGENTS))
for i, (agent_key, info) in enumerate(AGENTS.items()):
with cols[i]:
is_active = st.session_state.current_agent == agent_key
count = st.session_state.agent_counts.get(agent_key, 0)
border_color = info["color"]
bg_color = "#e8f5e9" if is_active else "white"
st.markdown(f"""
<div style='
padding: 15px;
border-radius: 10px;
border: 3px {"solid" if is_active else "dashed"} {border_color};
background: {bg_color};
text-align: center;
'>
<div style='font-size: 2.5rem;'>{info["avatar"]}</div>
<div style='font-weight: bold; margin: 5px 0;'>{info["name"]}</div>
<div style='font-size: 0.8rem; color: #666;'>
{"🟢 发言中" if is_active else "⚪ 等待中"}
</div>
<div style='font-size: 0.8rem; color: #999; margin-top: 5px;'>
💬 {count} 条消息
</div>
</div>
""", unsafe_allow_html=True)
def show_chat():
"""显示对话流"""
st.subheader("💬 Agent 对话流")
if not st.session_state.messages:
st.info("👈 暂无对话,请在下方输入需求并启动")
return
for msg in st.session_state.messages:
agent = msg["agent"]
info = AGENTS.get(agent, {"name": "未知", "avatar": "🤖", "color": "gray"})
with st.chat_message(agent.lower(), avatar=info["avatar"]):
st.markdown(f"**{info['name']}** *{msg['time']}* - {msg['task']}")
st.markdown(msg["content"][:800] + ("..." if len(msg["content"]) > 800 else ""))
def save_files():
"""保存生成的文件到 workspace/"""
workspace = Path("workspace")
workspace.mkdir(exist_ok=True)
files = []
# 遍历消息,提取并保存文件
for msg in st.session_state.messages:
agent = msg["agent"]
content = msg["content"]
# PM Agent 生成 SRS
if agent == "PM_Agent" and ("需求" in content or "SRS" in content):
file = workspace / "SRS.md"
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{content}")
files.append(str(file))
# QA Agent 生成测试
if agent == "QA_Agent" and ("test" in content.lower() or "测试" in content):
file = workspace / "test_sample.py"
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 测试用例\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{content}")
files.append(str(file))
# Dev Agent 生成代码
if agent == "Dev_Agent" and ("def " in content or "class " in content):
file = workspace / "src_sample.py"
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 源代码\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{content}")
files.append(str(file))
return files
def main():
st.title("🤖 多 Agent 协作平台")
st.markdown("**实时展示 Agent 状态 · 自动生成文件**")
init_state()
# 侧边栏
with st.sidebar:
st.title("⚙️ 配置")
api_key = st.text_input("API Key", type="password", value=os.getenv("DASHSCOPE_API_KEY", ""))
model = st.selectbox("模型", ["qwen3.5-flash", "qwen-max", "qwen-plus"], index=0)
max_round = st.slider("最大轮数", 5, 30, 15)
st.divider()
if st.button("▶️ 启动工作流", type="primary", use_container_width=True):
if not api_key:
st.error("请先设置 API Key")
elif not AUTOGEN_AVAILABLE:
st.error("请先安装 AutoGen")
else:
run_workflow(api_key, model, max_round)
st.divider()
if st.button("🗑️ 清空对话", use_container_width=True):
st.session_state.messages = []
st.session_state.current_agent = None
st.session_state.agent_counts = {k: 0 for k in AGENTS}
st.rerun()
# 显示生成的文件
st.divider()
st.subheader("📁 生成的文件")
workspace = Path("workspace")
if workspace.exists():
for file in workspace.glob("*"):
if file.is_file():
st.caption(f"📄 {file.name}")
# 主界面
show_agent_status()
st.divider()
show_chat()
# 输入框
st.divider()
if user_input := st.chat_input("输入需求..."):
add_message("User_Proxy", user_input, "提出需求")
st.rerun()
def run_workflow(api_key, model, max_round):
"""运行工作流"""
st.session_state.running = True
st.session_state.messages = []
st.session_state.agent_counts = {k: 0 for k in AGENTS}
progress = st.empty()
progress.info("🚀 启动工作流...")
try:
# 获取需求
user_msgs = [m for m in st.session_state.messages if m["agent"] == "User_Proxy"]
requirement = user_msgs[-1]["content"] if user_msgs else "开发一个电池健康预测 API"
# 创建 Agent
llm_config = get_llm_config(model=model, api_key=api_key)
pm = AssistantAgent("PM_Agent", system_message=PM_PROMPT, llm_config=llm_config, human_input_mode="NEVER")
qa = AssistantAgent("QA_Agent", system_message=QA_PROMPT, llm_config=llm_config, human_input_mode="NEVER")
dev = AssistantAgent("Dev_Agent", system_message=DEV_PROMPT, llm_config=llm_config, human_input_mode="NEVER")
orch = AssistantAgent("Orchestrator", system_message=ORCH_PROMPT, llm_config=llm_config, human_input_mode="NEVER")
user = UserProxyAgent("User_Proxy", human_input_mode="NEVER", max_consecutive_auto_reply=0,
code_execution_config={"work_dir": "workspace", "use_docker": False})
# 创建 GroupChat
groupchat = GroupChat(
agents=[pm, qa, dev, orch, user],
messages=[],
max_round=max_round,
speaker_selection_method="round_robin"
)
manager = GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# 初始消息
initial_msg = f"""请启动完整的 SDLC 流程:
【用户需求】{requirement}
【流程】
1. PM_Agent → SRS 文档
2. QA_Agent → 测试用例
3. Dev_Agent → 编写代码
4. User_Proxy → 执行测试
5. Orchestrator → 汇总
开始协作!"""
# 执行对话
with st.spinner("💬 Agent 们正在协作中..."):
chat_result = user.initiate_chat(manager, message=initial_msg, max_turns=max_round)
# 记录所有对话
task_map = {
"PM_Agent": "需求分析",
"QA_Agent": "测试设计",
"Dev_Agent": "代码实现",
"Orchestrator": "流程协调",
"User_Proxy": "测试执行"
}
for msg in groupchat.messages:
agent = msg.get("name", "Unknown")
content = msg.get("content", "")
task = task_map.get(agent, "工作中")
add_message(agent, content, task)
# 保存文件
progress.info("💾 正在保存文件...")
files = save_files()
if files:
progress.success(f"✅ 完成!已保存 {len(files)} 个文件到 workspace/")
else:
progress.success("✅ 工作流完成!")
st.session_state.running = False
st.rerun()
except Exception as e:
st.session_state.running = False
progress.error(f"❌ 错误:{e}")
import traceback
st.error(traceback.format_exc())
if __name__ == "__main__":
main()

View File

@@ -1,442 +0,0 @@
"""
Streamlit 前端 - 多智能体实时聊天界面
提供可视化的 Agent 协同工作展示和人机交互
"""
import streamlit as st
import os
import sys
from pathlib import Path
from datetime import datetime
import time
from typing import Dict, Any, List
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
from config.llm_config import (
get_llm_config,
PM_PROMPT,
QA_PROMPT,
DEV_PROMPT,
ORCH_PROMPT
)
# 页面配置
st.set_page_config(
page_title="AutoGen SDLC 多智能体系统",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded"
)
# 自定义 CSS 样式
st.markdown("""
<style>
.agent-message {
padding: 1rem;
border-radius: 0.5rem;
margin-bottom: 1rem;
}
.pm-agent { background-color: #e3f2fd; border-left: 4px solid #2196f3; }
.qa-agent { background-color: #e8f5e9; border-left: 4px solid #4caf50; }
.dev-agent { background-color: #fff3e0; border-left: 4px solid #ff9800; }
.orchestrator { background-color: #f3e5f5; border-left: 4px solid #9c27b0; }
.system { background-color: #fce4ec; border-left: 4px solid #e91e63; }
.user { background-color: #efebe9; border-left: 4px solid #795548; }
</style>
""", unsafe_allow_html=True)
def init_session_state():
"""初始化 session state"""
if "messages" not in st.session_state:
st.session_state.messages = []
if "is_running" not in st.session_state:
st.session_state.is_running = False
if "workflow_result" not in st.session_state:
st.session_state.workflow_result = None
if "conversation_history" not in st.session_state:
st.session_state.conversation_history = []
if "current_step" not in st.session_state:
st.session_state.current_step = 0
def create_agents(api_key: str, base_url: str, model: str):
"""创建所有 Agent"""
llm_config = get_llm_config(model=model, api_key=api_key, base_url=base_url)
pm_agent = AssistantAgent(
name="PM_Agent",
system_message=PM_PROMPT,
llm_config=llm_config,
description="资深软件产品经理",
human_input_mode="NEVER"
)
qa_agent = AssistantAgent(
name="QA_Agent",
system_message=QA_PROMPT,
llm_config=llm_config,
description="资深测试工程师",
human_input_mode="NEVER"
)
dev_agent = AssistantAgent(
name="Dev_Agent",
system_message=DEV_PROMPT,
llm_config=llm_config,
description="资深软件工程师",
human_input_mode="NEVER"
)
orchestrator = AssistantAgent(
name="Orchestrator",
system_message=ORCH_PROMPT,
llm_config=llm_config,
description="多智能体协调器",
human_input_mode="NEVER"
)
user_proxy = UserProxyAgent(
name="User_Proxy",
human_input_mode="NEVER", # 修复Web 环境不支持 TERMINAL
max_consecutive_auto_reply=0,
code_execution_config={
"work_dir": "workspace",
"use_docker": False,
},
is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE")
)
return pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config
def get_agent_color(agent_name: str) -> str:
"""根据 Agent 名称返回颜色类"""
color_map = {
"PM_Agent": "pm-agent",
"QA_Agent": "qa-agent",
"Dev_Agent": "dev-agent",
"Orchestrator": "orchestrator",
"User_Proxy": "user",
"system": "system"
}
return color_map.get(agent_name, "system")
def display_message(agent_name: str, message: str, timestamp: str):
"""显示单条消息"""
color_class = get_agent_color(agent_name)
# 头像映射
avatar_map = {
"PM_Agent": "📋",
"QA_Agent": "",
"Dev_Agent": "💻",
"Orchestrator": "🎯",
"User_Proxy": "👤",
"system": "⚙️"
}
avatar = avatar_map.get(agent_name, "🤖")
with st.chat_message(name=agent_name.lower().replace("_", ""), avatar=avatar):
st.markdown(f"**{avatar} {agent_name}** *({timestamp})*")
st.markdown(f"<div class='agent-message {color_class}'>{message}</div>",
unsafe_allow_html=True)
def export_conversation(messages: List[Dict]) -> str:
"""导出对话为 Markdown"""
md_content = "# AutoGen SDLC 对话历史\n\n"
md_content += f"**导出时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
md_content += "---\n\n"
for msg in messages:
timestamp = msg.get("timestamp", "")[:19].replace('T', ' ')
agent = msg.get("agent_name", "Unknown")
content = msg.get("message", "")
md_content += f"### [{timestamp}] {agent}\n\n"
md_content += f"{content}\n\n"
md_content += "---\n\n"
return md_content
def main():
"""主应用"""
init_session_state()
# ===== 左侧边栏 - 配置区域 =====
with st.sidebar:
st.title("⚙️ 配置")
# API 配置
st.subheader("模型配置")
api_key = st.text_input(
"API Key",
type="password",
value=os.getenv("DASHSCOPE_API_KEY", ""),
help="阿里云 DashScope API Key"
)
base_url = st.text_input(
"Base URL",
value="https://dashscope.aliyuncs.com/compatible-mode/v1",
help="API Base URL"
)
model = st.selectbox(
"模型选择",
options=["qwen3.5-flash", "qwen-max", "qwen-plus", "qwen-turbo"],
index=0,
help="选择使用的 Qwen 模型"
)
# Agent 参数
st.subheader("Agent 参数")
max_round = st.slider(
"最大对话轮数",
min_value=5,
max_value=50,
value=15,
step=1
)
temperature = st.slider(
"温度参数",
min_value=0.0,
max_value=1.0,
value=0.7,
step=0.1
)
st.divider()
# 控制按钮
st.subheader("控制")
col1, col2 = st.columns(2)
with col1:
start_btn = st.button("▶️ 启动", use_container_width=True)
with col2:
stop_btn = st.button("⏸️ 暂停", use_container_width=True)
st.divider()
# 导出选项
st.subheader("导出")
if st.button("📄 导出 JSON", use_container_width=True):
if st.session_state.messages:
import json
json_str = json.dumps(st.session_state.messages, ensure_ascii=False, indent=2)
st.download_button(
label="下载 JSON",
data=json_str,
file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json"
)
if st.button("📝 导出 Markdown", use_container_width=True):
if st.session_state.messages:
md_content = export_conversation(st.session_state.messages)
st.download_button(
label="下载 Markdown",
data=md_content,
file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md",
mime="text/markdown"
)
# ===== 主区域 - 聊天展示 =====
st.title("🤖 AutoGen SDLC 多智能体协同系统")
st.markdown("**基于 AutoGen + Qwen3.5-flash 的端到端软件交付系统**")
# 状态指示器
status_col1, status_col2, status_col3 = st.columns(3)
with status_col1:
st.metric("对话轮数", len(st.session_state.messages))
with status_col2:
status = "🟢 运行中" if st.session_state.is_running else "🔴 已停止"
st.metric("系统状态", status)
with status_col3:
result_status = "✅ 成功" if st.session_state.workflow_result and st.session_state.workflow_result.get("success") else "⏳ 未开始"
st.metric("工作流状态", result_status)
st.divider()
# 聊天历史展示区
chat_container = st.container()
with chat_container:
if not st.session_state.messages:
st.info("👈 请在左侧配置后,输入需求并点击启动按钮开始工作流")
else:
for msg in st.session_state.messages:
display_message(
agent_name=msg.get("agent_name", "Unknown"),
message=msg.get("message", ""),
timestamp=msg.get("timestamp", "")
)
# ===== 右侧边栏 - 进度和日志 =====
with st.sidebar:
st.divider()
# 工作流进度
st.subheader("📊 工作流进度")
workflow_steps = [
"需求分析",
"测试设计",
"代码实现",
"测试执行",
"最终验证"
]
current_step = st.session_state.get("current_step", 0)
for i, step in enumerate(workflow_steps):
if i < current_step:
st.success(f"{step}")
elif i == current_step:
st.info(f"🔄 {step}")
else:
st.write(f"{step}")
# ===== 用户输入区域 =====
st.divider()
# 用户需求输入
user_input = st.chat_input("请输入您的软件需求...")
if user_input:
# 添加用户消息到历史
user_msg = {
"timestamp": datetime.now().isoformat(),
"agent_name": "User",
"role": "user",
"message": user_input
}
st.session_state.messages.append(user_msg)
st.session_state.conversation_history.append(user_msg)
# 如果正在运行,添加到队列等待处理
if st.session_state.is_running:
# 这里会触发工作流执行
pass
# ===== 工作流执行逻辑 =====
if start_btn:
if not api_key:
st.error("请先设置 API Key")
st.stop()
# 获取最后一条用户消息
user_messages = [m for m in st.session_state.messages if m.get("role") == "user"]
if not user_messages:
st.warning("请先输入需求!")
st.stop()
latest_requirement = user_messages[-1]["message"]
# 设置运行状态
st.session_state.is_running = True
st.session_state.current_step = 0
# 创建占位符用于实时更新
status_placeholder = st.empty()
message_placeholder = st.empty()
with status_placeholder:
st.info("🚀 正在启动 SDLC 工作流...")
try:
# 创建 Agent
pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config = create_agents(
api_key=api_key,
base_url=base_url,
model=model
)
# 创建 GroupChat
groupchat = GroupChat(
agents=[pm_agent, qa_agent, dev_agent, orchestrator, user_proxy],
messages=[],
max_round=max_round,
speaker_selection_method="round_robin"
)
manager = GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# 构建初始消息
initial_message = f"""
请启动完整的 SDLC 流程:
【用户需求】
{latest_requirement}
【工作流程】
1. PM_Agent → 生成 SRS
2. QA_Agent → 生成测试用例
3. Dev_Agent → 编写代码
4. User_Proxy → 执行测试
5. Orchestrator → 汇总报告
开始协作!
"""
# 记录第一条消息
first_msg = {
"timestamp": datetime.now().isoformat(),
"agent_name": "Orchestrator",
"role": "assistant",
"message": f"🎯 启动 SDLC 工作流,需求:{latest_requirement[:100]}..."
}
st.session_state.messages.append(first_msg)
st.session_state.current_step = 0
# 启动对话(简化版本,实际应该用异步)
with message_placeholder:
st.info("💬 Agent 们正在协作中,请稍候...")
# 这里简化处理,实际应该使用异步回调
chat_result = user_proxy.initiate_chat(
manager,
message=initial_message,
max_turns=max_round
)
# 记录结果
for msg in groupchat.messages:
chat_msg = {
"timestamp": datetime.now().isoformat(),
"agent_name": msg.get("name", "Unknown"),
"role": msg.get("role", "assistant"),
"message": msg.get("content", "")
}
st.session_state.messages.append(chat_msg)
# 更新状态
st.session_state.workflow_result = {
"success": True,
"summary": chat_result.summary if hasattr(chat_result, 'summary') else "完成"
}
st.session_state.current_step = 5 # 完成所有步骤
st.session_state.is_running = False
# 刷新显示
st.rerun()
except Exception as e:
st.session_state.is_running = False
st.error(f"❌ 错误:{str(e)}")
st.session_state.workflow_result = {"success": False, "error": str(e)}
if stop_btn:
st.session_state.is_running = False
st.info("⏸️ 工作流已暂停")
if __name__ == "__main__":
main()

View File

@@ -1,725 +0,0 @@
# -*- coding: utf-8 -*-
"""
Streamlit 前端 v2 - 增强版多智能体实时协作可视化界面
功能:
1. Agent 协作流程图(桑基图)
2. 实时状态监控面板
3. 分屏对话展示
4. 生成的文件预览
5. 时间线视图
"""
import streamlit as st
import os
import sys
from pathlib import Path
from datetime import datetime
import time
from typing import Dict, Any, List
import json
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
AUTOGEN_AVAILABLE = True
except ImportError:
AUTOGEN_AVAILABLE = False
from config.llm_config import (
get_llm_config,
PM_PROMPT,
QA_PROMPT,
DEV_PROMPT,
ORCH_PROMPT
)
# 页面配置
st.set_page_config(
page_title="AutoGen SDLC 多智能体协作平台",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded"
)
# 自定义 CSS 样式
st.markdown("""
<style>
/* Agent 消息样式 */
.agent-message {
padding: 1rem;
border-radius: 0.5rem;
margin-bottom: 1rem;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.pm-agent { background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); border-left: 5px solid #2196f3; }
.qa-agent { background: linear-gradient(135deg, #e8f5e9 0%, #c8e6c9 100%); border-left: 5px solid #4caf50; }
.dev-agent { background: linear-gradient(135deg, #fff3e0 0%, #ffe0b2 100%); border-left: 5px solid #ff9800; }
.orchestrator { background: linear-gradient(135deg, #f3e5f5 0%, #e1bee7 100%); border-left: 5px solid #9c27b0; }
.system { background: linear-gradient(135deg, #fce4ec 0%, #f8bbd9 100%); border-left: 5px solid #e91e63; }
.user { background: linear-gradient(135deg, #efebe9 0%, #d7ccc8 100%); border-left: 5px solid #795548; }
/* 状态卡片 */
.status-card {
padding: 1.5rem;
border-radius: 0.5rem;
text-align: center;
color: white;
font-weight: bold;
}
.status-running { background: linear-gradient(135deg, #4caf50, #8bc34a); }
.status-stopped { background: linear-gradient(135deg, #f44336, #e91e63); }
.status-pending { background: linear-gradient(135deg, #ff9800, #ffc107); }
/* Agent 状态指示器 */
.agent-status-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 8px;
animation: pulse 2s infinite;
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.5; }
100% { opacity: 1; }
}
/* 文件卡片 */
.file-card {
padding: 1rem;
border: 1px solid #ddd;
border-radius: 0.5rem;
margin-bottom: 0.5rem;
background: #f9f9f9;
}
/* 时间线 */
.timeline-item {
padding: 1rem;
border-left: 3px solid #2196f3;
margin-left: 1rem;
position: relative;
}
.timeline-item::before {
content: '';
position: absolute;
left: -8px;
top: 1rem;
width: 13px;
height: 13px;
border-radius: 50%;
background: #2196f3;
}
</style>
""", unsafe_allow_html=True)
# Agent 配置信息
AGENT_CONFIG = {
"PM_Agent": {
"name": "产品经理",
"avatar": "📋",
"color": "#2196f3",
"description": "需求分析与 SRS 生成",
"icon": "📊"
},
"QA_Agent": {
"name": "测试工程师",
"avatar": "",
"color": "#4caf50",
"description": "测试用例设计与 TDD",
"icon": "🧪"
},
"Dev_Agent": {
"name": "开发工程师",
"avatar": "💻",
"color": "#ff9800",
"description": "代码实现与修复",
"icon": "⚙️"
},
"Orchestrator": {
"name": "协调器",
"avatar": "🎯",
"color": "#9c27b0",
"description": "流程调度与验证",
"icon": "🎭"
},
"User_Proxy": {
"name": "用户代理",
"avatar": "👤",
"color": "#795548",
"description": "人机交互接口",
"icon": "🔌"
}
}
def init_session_state():
"""初始化 session state"""
if "messages" not in st.session_state:
st.session_state.messages = []
if "is_running" not in st.session_state:
st.session_state.is_running = False
if "workflow_result" not in st.session_state:
st.session_state.workflow_result = None
if "conversation_history" not in st.session_state:
st.session_state.conversation_history = []
if "current_step" not in st.session_state:
st.session_state.current_step = 0
if "agent_stats" not in st.session_state:
st.session_state.agent_stats = {
"PM_Agent": {"messages": 0, "last_active": None},
"QA_Agent": {"messages": 0, "last_active": None},
"Dev_Agent": {"messages": 0, "last_active": None},
"Orchestrator": {"messages": 0, "last_active": None},
"User_Proxy": {"messages": 0, "last_active": None}
}
if "generated_files" not in st.session_state:
st.session_state.generated_files = []
if "active_agent" not in st.session_state:
st.session_state.active_agent = None
def create_agents(api_key: str, base_url: str, model: str):
"""创建所有 Agent"""
llm_config = get_llm_config(model=model, api_key=api_key, base_url=base_url)
pm_agent = AssistantAgent(
name="PM_Agent",
system_message=PM_PROMPT,
llm_config=llm_config,
description="资深软件产品经理",
human_input_mode="NEVER"
)
qa_agent = AssistantAgent(
name="QA_Agent",
system_message=QA_PROMPT,
llm_config=llm_config,
description="资深测试工程师",
human_input_mode="NEVER"
)
dev_agent = AssistantAgent(
name="Dev_Agent",
system_message=DEV_PROMPT,
llm_config=llm_config,
description="资深软件工程师",
human_input_mode="NEVER"
)
orchestrator = AssistantAgent(
name="Orchestrator",
system_message=ORCH_PROMPT,
llm_config=llm_config,
description="多智能体协调器",
human_input_mode="NEVER"
)
user_proxy = UserProxyAgent(
name="User_Proxy",
human_input_mode="NEVER", # 修复Web 环境不支持 TERMINAL
max_consecutive_auto_reply=0,
code_execution_config={
"work_dir": "workspace",
"use_docker": False,
},
is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE")
)
return pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config
def display_agent_status_panel():
"""显示 Agent 状态面板"""
st.subheader("🎭 Agent 实时状态")
cols = st.columns(5)
agent_names = list(AGENT_CONFIG.keys())
for i, (agent_name, config) in enumerate(AGENT_CONFIG.items()):
with cols[i]:
stats = st.session_state.agent_stats.get(agent_name, {})
msg_count = stats.get("messages", 0)
last_active = stats.get("last_active")
is_active = st.session_state.active_agent == agent_name
status_color = config["color"]
if is_active:
status_html = f'<span class="agent-status-indicator" style="background: {status_color}; animation: pulse 1s infinite;"></span>'
else:
status_html = f'<span style="display:inline-block;width:12px;height:12px;border-radius:50%;background:{status_color};opacity:0.3;"></span>'
st.markdown(f"""
<div style="text-align:center;padding:1rem;border:2px {'solid' if is_active else 'dashed'} {status_color};border-radius:0.5rem;">
<div style="font-size:2rem;">{config['avatar']}</div>
<div style="font-weight:bold;color:{status_color};">{config['name']}</div>
<div style="font-size:0.8rem;color:#666;">消息:{msg_count}</div>
<div style="font-size:0.7rem;color:#999;">{status_html}{'工作中' if is_active else '等待中'}</div>
</div>
""", unsafe_allow_html=True)
def display_workflow_diagram():
"""显示工作流程图"""
st.subheader("🔄 工作流程图")
# 使用 Mermaid 绘制流程图
mermaid_code = """
```mermaid
graph LR
A[👤 用户需求] --> B[📋 PM Agent]
B --> C[📄 SRS 文档]
C --> D[✅ QA Agent]
D --> E[🧪 测试用例]
E --> F[💻 Dev Agent]
F --> G[💾 源代码]
G --> H[🔧 自动测试]
H -->|失败 | F
H -->|通过 | I[🎯 Orchestrator]
I --> J[✅ 最终报告]
style B fill:#e3f2fd,stroke:#2196f3,stroke-width:3px
style D fill:#e8f5e9,stroke:#4caf50,stroke-width:3px
style F fill:#fff3e0,stroke:#ff9800,stroke-width:3px
style I fill:#f3e5f5,stroke:#9c27b0,stroke-width:3px
```
"""
st.markdown(mermaid_code)
def display_message(agent_name: str, message: str, timestamp: str, index: int):
"""显示单条消息(增强版)"""
config = AGENT_CONFIG.get(agent_name, {"name": agent_name, "avatar": "🤖", "color": "#999"})
# 折叠长消息
if len(message) > 500:
with st.expander(f"{config['avatar']} {config['name']} - {timestamp[:16]}"):
st.markdown(message)
else:
with st.chat_message(name=agent_name.lower().replace("_", ""), avatar=config['avatar']):
st.markdown(f"**{config['avatar']} {config['name']}** *({timestamp[:16]})*")
st.markdown(message)
def display_timeline():
"""显示时间线视图"""
st.subheader("⏱️ 执行时间线")
if not st.session_state.messages:
st.info("暂无执行记录")
return
# 按时间分组显示关键事件
timeline_data = []
for msg in st.session_state.messages:
agent = msg.get("agent_name", "Unknown")
timestamp = msg.get("timestamp", "")[:19].replace('T', ' ')
message = msg.get("message", "")[:100]
if agent in AGENT_CONFIG:
timeline_data.append({
"time": timestamp,
"agent": agent,
"message": message,
"icon": AGENT_CONFIG[agent]["avatar"]
})
# 倒序显示(最新的在前)
for item in reversed(timeline_data[-10:]): # 只显示最近 10 条
with st.container():
st.markdown(f"""
<div class="timeline-item">
<strong>{item['icon']} {item['time']}</strong><br>
<span style="color:#666;">{AGENT_CONFIG.get(item['agent'], {}).get('name', item['agent'])}</span><br>
{item['message']}...
</div>
""", unsafe_allow_html=True)
def display_generated_files():
"""显示生成的文件"""
st.subheader("📁 生成的文件")
workspace_dir = Path("workspace")
if not workspace_dir.exists():
st.info("工作目录为空")
return
files = list(workspace_dir.glob("*"))
if not files:
st.info("暂无生成的文件")
return
for file_path in files:
if file_path.is_file():
file_size = file_path.stat().st_size
file_type = file_path.suffix
icon_map = {
".py": "🐍",
".md": "📝",
".txt": "📄",
".json": "📊"
}
icon = icon_map.get(file_type, "📎")
with st.container():
st.markdown(f"""
<div class="file-card">
<div style="display:flex;justify-content:space-between;align-items:center;">
<div>
<span style="font-size:1.5rem;">{icon}</span>
<strong>{file_path.name}</strong>
<span style="color:#666;font-size:0.9rem;margin-left:1rem;">
{file_size:,} bytes
</span>
</div>
</div>
</div>
""", unsafe_allow_html=True)
# 文件内容预览
if st.button(f"👁️ 预览 {file_path.name}", key=f"preview_{file_path.name}"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
st.code(content[:2000] + ("..." if len(content) > 2000 else ""))
except Exception as e:
st.error(f"读取失败:{e}")
def display_chat_interface():
"""显示聊天界面"""
st.subheader("💬 Agent 对话实录")
if not st.session_state.messages:
st.info("👈 暂无对话,请在下方输入需求并启动工作流")
else:
# 使用容器显示消息
chat_container = st.container()
with chat_container:
for idx, msg in enumerate(st.session_state.messages):
display_message(
agent_name=msg.get("agent_name", "Unknown"),
message=msg.get("message", ""),
timestamp=msg.get("timestamp", ""),
index=idx
)
def export_conversation(messages: List[Dict]) -> str:
"""导出对话为 Markdown"""
md_content = "# AutoGen SDLC 对话历史\n\n"
md_content += f"**导出时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
md_content += "---\n\n"
for msg in messages:
timestamp = msg.get("timestamp", "")[:19].replace('T', ' ')
agent = msg.get("agent_name", "Unknown")
content = msg.get("message", "")
config = AGENT_CONFIG.get(agent, {"avatar": "🤖", "name": agent})
md_content += f"### {config['avatar']} [{timestamp}] {config['name']}\n\n"
md_content += f"{content}\n\n"
md_content += "---\n\n"
return md_content
def main():
"""主应用"""
init_session_state()
# ===== 顶部标题区 =====
st.title("🤖 AutoGen SDLC 多智能体协作平台")
st.markdown("**基于 AutoGen + Qwen3.5-flash 的端到端软件交付系统 · 实时可视化 Agent 协作**")
# ===== 侧边栏 - 配置区域 =====
with st.sidebar:
st.title("⚙️ 控制中心")
# API 配置
st.subheader("🔑 模型配置")
api_key = st.text_input(
"API Key",
type="password",
value=os.getenv("DASHSCOPE_API_KEY", ""),
help="阿里云 DashScope API Key"
)
base_url = st.text_input(
"Base URL",
value="https://dashscope.aliyuncs.com/compatible-mode/v1",
help="API Base URL"
)
model = st.selectbox(
"模型选择",
options=["qwen3.5-flash", "qwen-max", "qwen-plus", "qwen-turbo"],
index=0,
help="选择使用的 Qwen 模型"
)
# Agent 参数
st.subheader("🎛️ Agent 参数")
max_round = st.slider(
"最大对话轮数",
min_value=5,
max_value=50,
value=20,
step=1
)
temperature = st.slider(
"温度参数",
min_value=0.0,
max_value=1.0,
value=0.7,
step=0.1
)
st.divider()
# 控制按钮
st.subheader("🎮 流程控制")
col1, col2 = st.columns(2)
with col1:
start_btn = st.button("▶️ 启动", use_container_width=True, type="primary")
with col2:
stop_btn = st.button("⏸️ 暂停", use_container_width=True)
st.divider()
# 导出选项
st.subheader("💾 数据导出")
if st.button("📄 导出 JSON", use_container_width=True):
if st.session_state.messages:
json_str = json.dumps(st.session_state.messages, ensure_ascii=False, indent=2)
st.download_button(
label="下载 JSON",
data=json_str,
file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json"
)
if st.button("📝 导出 Markdown", use_container_width=True):
if st.session_state.messages:
md_content = export_conversation(st.session_state.messages)
st.download_button(
label="下载 Markdown",
data=md_content,
file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md",
mime="text/markdown"
)
# ===== 主区域 - 三栏布局 =====
# 第一行:状态指标
status_col1, status_col2, status_col3, status_col4 = st.columns(4)
with status_col1:
total_msgs = len(st.session_state.messages)
st.metric("💬 总消息数", total_msgs)
with status_col2:
status = "🟢 运行中" if st.session_state.is_running else "🔴 已停止"
status_class = "running" if st.session_state.is_running else "stopped"
st.markdown(f"""
<div class="status-card status-{status_class}">
{status}
</div>
""", unsafe_allow_html=True)
with status_col3:
result_status = "✅ 成功" if st.session_state.workflow_result and st.session_state.workflow_result.get("success") else "⏳ 未开始"
st.metric("📊 工作流状态", result_status)
with status_col4:
files_count = len(list(Path("workspace").glob("*"))) if Path("workspace").exists() else 0
st.metric("📁 生成文件数", files_count)
st.divider()
# 第二行:左侧 - Agent 状态 + 流程图,右侧 - 时间线
left_col, right_col = st.columns([2, 1])
with left_col:
display_agent_status_panel()
st.divider()
display_workflow_diagram()
with right_col:
display_timeline()
st.divider()
# 第三行:聊天界面
display_chat_interface()
# ===== 底部 - 用户需求输入 =====
st.divider()
# 用户输入
user_input = st.chat_input("💡 请输入您的软件需求,例如:我需要一个电池健康状态预测 API...")
if user_input:
# 添加用户消息
user_msg = {
"timestamp": datetime.now().isoformat(),
"agent_name": "User_Proxy",
"role": "user",
"message": user_input
}
st.session_state.messages.append(user_msg)
st.session_state.conversation_history.append(user_msg)
# 更新统计
st.session_state.agent_stats["User_Proxy"]["messages"] += 1
st.session_state.agent_stats["User_Proxy"]["last_active"] = datetime.now().isoformat()
# 刷新显示
st.rerun()
# ===== 工作流执行逻辑 =====
if start_btn:
if not api_key:
st.error("请先设置 API Key")
st.stop()
if not AUTOGEN_AVAILABLE:
st.error("请先安装 AutoGen: pip install pyautogen")
st.stop()
# 获取最后一条用户消息
user_messages = [m for m in st.session_state.messages if m.get("role") == "user"]
if not user_messages:
st.warning("请先输入需求!")
st.stop()
latest_requirement = user_messages[-1]["message"]
# 设置运行状态
st.session_state.is_running = True
st.session_state.current_step = 0
# 创建进度占位符
progress_placeholder = st.empty()
with progress_placeholder:
st.info("🚀 正在启动 SDLC 工作流,请稍候...")
try:
# 创建 Agent
pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config = create_agents(
api_key=api_key,
base_url=base_url,
model=model
)
# 创建 GroupChat
groupchat = GroupChat(
agents=[pm_agent, qa_agent, dev_agent, orchestrator, user_proxy],
messages=[],
max_round=max_round,
speaker_selection_method="round_robin"
)
manager = GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# 构建初始消息
initial_message = f"""
请启动完整的 SDLC 流程:
【用户需求】
{latest_requirement}
【工作流程】
1. PM_Agent → 生成 SRS
2. QA_Agent → 生成测试用例
3. Dev_Agent → 编写代码
4. User_Proxy → 执行测试
5. Orchestrator → 汇总报告
开始协作!
"""
# 记录第一条消息
first_msg = {
"timestamp": datetime.now().isoformat(),
"agent_name": "Orchestrator",
"role": "assistant",
"message": f"🎯 启动 SDLC 工作流,需求:{latest_requirement[:100]}..."
}
st.session_state.messages.append(first_msg)
# 更新统计
st.session_state.agent_stats["Orchestrator"]["messages"] += 1
st.session_state.active_agent = "Orchestrator"
# 执行对话
chat_result = user_proxy.initiate_chat(
manager,
message=initial_message,
max_turns=max_round
)
# 记录所有对话
for msg in groupchat.messages:
chat_msg = {
"timestamp": datetime.now().isoformat(),
"agent_name": msg.get("name", "Unknown"),
"role": msg.get("role", "assistant"),
"message": msg.get("content", "")
}
# 避免重复添加
if chat_msg not in st.session_state.messages:
st.session_state.messages.append(chat_msg)
# 更新统计
agent_name = msg.get("name", "Unknown")
if agent_name in st.session_state.agent_stats:
st.session_state.agent_stats[agent_name]["messages"] += 1
st.session_state.agent_stats[agent_name]["last_active"] = datetime.now().isoformat()
# 扫描生成的文件
if Path("workspace").exists():
files = list(Path("workspace").glob("*"))
st.session_state.generated_files = [str(f) for f in files if f.is_file()]
# 更新状态
st.session_state.workflow_result = {
"success": True,
"summary": chat_result.summary if hasattr(chat_result, 'summary') else "完成"
}
st.session_state.current_step = 5
st.session_state.is_running = False
st.session_state.active_agent = None
progress_placeholder.success("✅ SDLC 工作流完成!")
# 刷新显示
st.rerun()
except Exception as e:
st.session_state.is_running = False
st.session_state.active_agent = None
st.error(f"❌ 错误:{str(e)}")
st.session_state.workflow_result = {"success": False, "error": str(e)}
if stop_btn:
st.session_state.is_running = False
st.session_state.active_agent = None
st.info("⏸️ 工作流已暂停")
if __name__ == "__main__":
main()

View File

@@ -1,756 +0,0 @@
# -*- coding: utf-8 -*-
"""
Streamlit 前端 v3 - Agent 对话流可视化版本
核心特性:
1. 清晰展示每个 Agent 当前在做什么(任务标签)
2. Agent 之间的对话流(类似聊天室)
3. 实时活动指示器(高亮当前发言的 Agent
4. 对话历史完整记录
"""
import streamlit as st
import os
import sys
from pathlib import Path
from datetime import datetime
import time
from typing import Dict, Any, List
import json
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
AUTOGEN_AVAILABLE = True
except ImportError:
AUTOGEN_AVAILABLE = False
from config.llm_config import (
get_llm_config,
PM_PROMPT,
QA_PROMPT,
DEV_PROMPT,
ORCH_PROMPT
)
# 页面配置
st.set_page_config(
page_title="AutoGen SDLC - Agent 对话流",
page_icon="💬",
layout="wide",
initial_sidebar_state="expanded"
)
# 自定义 CSS 样式 - 突出对话流
st.markdown("""
<style>
/* 主对话区域 */
.chat-flow-container {
background: #f0f2f6;
padding: 20px;
border-radius: 10px;
margin-bottom: 20px;
}
/* Agent 对话气泡 */
.agent-chat-bubble {
padding: 15px 20px;
margin: 10px 0;
border-radius: 15px;
max-width: 85%;
position: relative;
animation: bubble-in 0.3s ease;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
@keyframes bubble-in {
from {
opacity: 0;
transform: translateY(10px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
/* 不同 Agent 的气泡样式 */
.pm-bubble {
background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%);
border: 2px solid #2196f3;
margin-left: 0;
}
.qa-bubble {
background: linear-gradient(135deg, #e8f5e9 0%, #c8e6c9 100%);
border: 2px solid #4caf50;
margin-left: 0;
}
.dev-bubble {
background: linear-gradient(135deg, #fff3e0 0%, #ffe0b2 100%);
border: 2px solid #ff9800;
margin-left: 0;
}
.orchestrator-bubble {
background: linear-gradient(135deg, #f3e5f5 0%, #e1bee7 100%);
border: 2px solid #9c27b0;
margin-left: 0;
}
.user-bubble {
background: linear-gradient(135deg, #efebe9 0%, #d7ccc8 100%);
border: 2px solid #795548;
margin-left: auto;
}
/* Agent 头像和名称 */
.agent-chat-header {
display: flex;
align-items: center;
margin-bottom: 8px;
font-weight: bold;
}
.agent-chat-avatar {
font-size: 1.5rem;
margin-right: 8px;
}
.agent-chat-name {
font-size: 1rem;
color: #333;
}
.agent-chat-time {
margin-left: auto;
font-size: 0.75rem;
color: #666;
}
/* 任务标签 */
.task-badge {
display: inline-block;
padding: 3px 10px;
border-radius: 12px;
font-size: 0.75rem;
font-weight: bold;
margin-left: 8px;
color: white;
}
.pm-task { background: #2196f3; }
.qa-task { background: #4caf50; }
.dev-task { background: #ff9800; }
.orchestrator-task { background: #9c27b0; }
.user-task { background: #795548; }
/* Agent 状态卡片 */
.agent-status-card {
padding: 15px;
border-radius: 10px;
text-align: center;
border: 3px solid #e0e0e0;
transition: all 0.3s ease;
background: white;
}
.agent-status-card.active {
border-color: #4caf50;
background: linear-gradient(135deg, #e8f5e9 0%, #c8e6c9 100%);
animation: agent-pulse 1.5s infinite;
}
@keyframes agent-pulse {
0%, 100% { transform: scale(1); }
50% { transform: scale(1.02); }
}
.agent-status-avatar {
font-size: 2.5rem;
margin-bottom: 5px;
}
.agent-status-name {
font-weight: bold;
font-size: 0.9rem;
margin-bottom: 5px;
}
.agent-status-task {
font-size: 0.75rem;
color: #666;
min-height: 20px;
}
.agent-status-indicator {
display: inline-block;
width: 8px;
height: 8px;
border-radius: 50%;
margin-right: 5px;
}
.status-active {
background: #4caf50;
animation: dot-pulse 1s infinite;
}
.status-waiting {
background: #ffc107;
}
.status-inactive {
background: #e0e0e0;
}
@keyframes dot-pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.3; }
}
/* 对话流指示器 */
.chat-flow-indicator {
display: flex;
align-items: center;
margin: 10px 0;
padding: 10px;
background: white;
border-radius: 8px;
}
.flow-arrow {
font-size: 1.5rem;
color: #2196f3;
margin: 0 10px;
}
/* 当前活动提示 */
.active-agent-banner {
background: linear-gradient(135deg, #4caf50, #8bc34a);
color: white;
padding: 10px 20px;
border-radius: 8px;
text-align: center;
font-weight: bold;
margin-bottom: 20px;
animation: banner-pulse 2s infinite;
}
@keyframes banner-pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.8; }
}
</style>
""", unsafe_allow_html=True)
# Agent 配置
AGENT_CONFIG = {
"PM_Agent": {
"name": "产品经理",
"avatar": "📋",
"color": "#2196f3",
"bubble_class": "pm-bubble",
"task_class": "pm-task",
"default_task": "待命"
},
"QA_Agent": {
"name": "测试工程师",
"avatar": "",
"color": "#4caf50",
"bubble_class": "qa-bubble",
"task_class": "qa-task",
"default_task": "待命"
},
"Dev_Agent": {
"name": "开发工程师",
"avatar": "💻",
"color": "#ff9800",
"bubble_class": "dev-bubble",
"task_class": "dev-task",
"default_task": "待命"
},
"Orchestrator": {
"name": "协调器",
"avatar": "🎯",
"color": "#9c27b0",
"bubble_class": "orchestrator-bubble",
"task_class": "orchestrator-task",
"default_task": "待命"
},
"User_Proxy": {
"name": "用户代理",
"avatar": "👤",
"color": "#795548",
"bubble_class": "user-bubble",
"task_class": "user-task",
"default_task": "待命"
}
}
def init_session_state():
"""初始化 session state"""
if "messages" not in st.session_state:
st.session_state.messages = []
if "is_running" not in st.session_state:
st.session_state.is_running = False
if "current_agent" not in st.session_state:
st.session_state.current_agent = None
if "agent_tasks" not in st.session_state:
st.session_state.agent_tasks = {
"PM_Agent": "待命",
"QA_Agent": "待命",
"Dev_Agent": "待命",
"Orchestrator": "待命",
"User_Proxy": "待命"
}
if "agent_stats" not in st.session_state:
st.session_state.agent_stats = {agent: 0 for agent in AGENT_CONFIG}
def display_agent_status_row():
"""显示 Agent 状态行 - 突出当前活动的 Agent"""
st.subheader("🎭 Agent 实时状态")
cols = st.columns(5)
for i, (agent_key, config) in enumerate(AGENT_CONFIG.items()):
with cols[i]:
is_active = st.session_state.current_agent == agent_key
current_task = st.session_state.agent_tasks.get(agent_key, "待命")
msg_count = st.session_state.agent_stats.get(agent_key, 0)
status_class = "active" if is_active else ""
indicator_class = "status-active" if is_active else "status-inactive"
st.markdown(f"""
<div class="agent-status-card {status_class}">
<div class="agent-status-avatar">{config['avatar']}</div>
<div class="agent-status-name">
<span class="agent-status-indicator {indicator_class}"></span>
{config['name']}
</div>
<div class="agent-status-task">
📝 {current_task}
</div>
<div style="font-size:0.7rem;color:#999;margin-top:5px;">
💬 {msg_count} 条消息
</div>
</div>
""", unsafe_allow_html=True)
def display_active_agent_banner():
"""显示当前活动 Agent 的横幅"""
if st.session_state.current_agent:
config = AGENT_CONFIG.get(st.session_state.current_agent, {})
st.markdown(f"""
<div class="active-agent-banner">
{config.get('avatar', '🤖')} 当前发言:{config.get('name', 'Unknown')} - {st.session_state.agent_tasks.get(st.session_state.current_agent, '工作中...')}
</div>
""", unsafe_allow_html=True)
def display_chat_flow():
"""显示 Agent 对话流 - 核心功能"""
st.subheader("💬 Agent 对话流")
if not st.session_state.messages:
st.info("👈 暂无对话,请启动工作流")
return
# 使用容器显示对话流
chat_container = st.container()
for idx, msg in enumerate(st.session_state.messages):
agent_key = msg.get("agent_key", "Unknown")
config = AGENT_CONFIG.get(agent_key, {"name": "未知", "avatar": "🤖", "bubble_class": ""})
task_class = AGENT_CONFIG.get(agent_key, {}).get("task_class", "")
timestamp = msg.get("timestamp", "")
if timestamp:
time_str = datetime.fromisoformat(timestamp).strftime("%H:%M:%S")
else:
time_str = datetime.now().strftime("%H:%M:%S")
content = msg.get("content", "")
task = msg.get("task", "工作中")
# 显示对话气泡
st.markdown(f"""
<div class="agent-chat-bubble {config['bubble_class']}">
<div class="agent-chat-header">
<span class="agent-chat-avatar">{config['avatar']}</span>
<span class="agent-chat-name">{config['name']}</span>
<span class="task-badge {task_class}">{task}</span>
<span class="agent-chat-time">{time_str}</span>
</div>
<div style="color:#333;line-height:1.6;">{content}</div>
</div>
""", unsafe_allow_html=True)
# 显示对话流指示器(下一条消息的箭头)
if idx < len(st.session_state.messages) - 1:
next_msg = st.session_state.messages[idx + 1]
next_agent = next_msg.get("agent_key", "")
next_config = AGENT_CONFIG.get(next_agent, {})
if next_agent != agent_key:
st.markdown(f"""
<div class="chat-flow-indicator">
<div style="flex:1;"></div>
<div class="flow-arrow">⬇️</div>
<div style="flex:1;text-align:center;color:#666;font-size:0.8rem;">
传递给 {next_config.get('name', 'Unknown')}
</div>
</div>
""", unsafe_allow_html=True)
def save_generated_files():
"""从对话中提取并保存生成的文件到 workspace 目录"""
workspace_dir = Path("workspace")
workspace_dir.mkdir(parents=True, exist_ok=True)
files_saved = []
# 遍历所有消息,提取文件内容
for msg in st.session_state.messages:
content = msg.get("content", "")
agent_key = msg.get("agent_key", "")
# PM Agent 生成 SRS.md
if agent_key == "PM_Agent" and ("SRS" in content or "软件需求规格" in content):
srs_file = workspace_dir / "SRS.md"
# 提取 SRS 内容(查找包含 FR- 或 NFR- 的段落)
srs_content = content
if "功能性需求" in content:
srs_content = content[content.find("功能性需求"):]
with open(srs_file, 'w', encoding='utf-8') as f:
f.write(f"# 软件需求规格说明书 (SRS)\n\n")
f.write(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(srs_content)
files_saved.append(str(srs_file))
# QA Agent 生成测试文件
if agent_key == "QA_Agent" and ("test_" in content or "def test_" in content):
test_file = workspace_dir / "test_battery_health.py"
# 提取 Python 代码
if "```python" in content:
code = content.split("```python")[1].split("```")[0]
else:
code = content
with open(test_file, 'w', encoding='utf-8') as f:
f.write(f'"""\n电池健康状态测试用例\n生成时间:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n"""\n\n')
f.write(code)
files_saved.append(str(test_file))
# Dev Agent 生成源代码
if agent_key == "Dev_Agent" and ("def " in content or "class " in content):
src_file = workspace_dir / "src_battery_health.py"
# 提取 Python 代码
if "```python" in content:
code = content.split("```python")[1].split("```")[0]
else:
code = content
with open(src_file, 'w', encoding='utf-8') as f:
f.write(f'"""\n电池健康状态计算模块\n生成时间:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n"""\n\n')
f.write(code)
files_saved.append(str(src_file))
# Orchestrator 生成最终报告
if agent_key == "Orchestrator" and ("完成" in content and "SDLC" in content):
report_file = workspace_dir / "FINAL_REPORT.md"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(f"# SDLC 项目最终报告\n\n")
f.write(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("## 项目概述\n\n")
f.write("基于 AutoGen 多智能体系统完成的软件交付项目。\n\n")
f.write("## 生成文件列表\n\n")
for i, saved_file in enumerate(files_saved, 1):
f.write(f"{i}. {saved_file}\n")
f.write(f"\n## 项目总结\n\n{content}\n")
files_saved.append(str(report_file))
return files_saved
def create_agents(api_key: str, base_url: str, model: str):
"""创建所有 Agent"""
llm_config = get_llm_config(model=model, api_key=api_key, base_url=base_url)
pm_agent = AssistantAgent(
name="PM_Agent",
system_message=PM_PROMPT,
llm_config=llm_config,
description="资深软件产品经理",
human_input_mode="NEVER"
)
qa_agent = AssistantAgent(
name="QA_Agent",
system_message=QA_PROMPT,
llm_config=llm_config,
description="资深测试工程师",
human_input_mode="NEVER"
)
dev_agent = AssistantAgent(
name="Dev_Agent",
system_message=DEV_PROMPT,
llm_config=llm_config,
description="资深软件工程师",
human_input_mode="NEVER"
)
orchestrator = AssistantAgent(
name="Orchestrator",
system_message=ORCH_PROMPT,
llm_config=llm_config,
description="多智能体协调器",
human_input_mode="NEVER"
)
user_proxy = UserProxyAgent(
name="User_Proxy",
human_input_mode="NEVER",
max_consecutive_auto_reply=0,
code_execution_config={
"work_dir": "workspace",
"use_docker": False,
},
is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE")
)
return pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config
def add_message(agent_key: str, content: str, task: str = "工作中"):
"""添加消息到对话流"""
msg = {
"agent_key": agent_key,
"content": content,
"timestamp": datetime.now().isoformat(),
"task": task
}
st.session_state.messages.append(msg)
# 更新统计
st.session_state.agent_stats[agent_key] = st.session_state.agent_stats.get(agent_key, 0) + 1
# 更新当前 Agent
st.session_state.current_agent = agent_key
# 更新任务
st.session_state.agent_tasks[agent_key] = task
def main():
"""主应用"""
init_session_state()
# 标题
st.title("💬 AutoGen SDLC - Agent 对话流可视化")
st.markdown("**清晰展示每个 Agent 在做什么 · 实时追踪 Agent 之间的对话交互**")
# 侧边栏配置
with st.sidebar:
st.title("⚙️ 控制中心")
api_key = st.text_input("API Key", type="password", value=os.getenv("DASHSCOPE_API_KEY", ""))
base_url = st.text_input("Base URL", value="https://dashscope.aliyuncs.com/compatible-mode/v1")
model = st.selectbox("模型选择", ["qwen3.5-flash", "qwen-max", "qwen-plus", "qwen-turbo"], index=0)
max_round = st.slider("最大对话轮数", 5, 50, 20)
st.divider()
col1, col2 = st.columns(2)
with col1:
start_btn = st.button("▶️ 启动对话流", type="primary", use_container_width=True)
with col2:
stop_btn = st.button("⏸️ 暂停", use_container_width=True)
st.divider()
if st.button("🗑️ 清空对话", use_container_width=True):
st.session_state.messages = []
st.session_state.current_agent = None
st.session_state.agent_tasks = {k: "待命" for k in AGENT_CONFIG}
st.session_state.agent_stats = {k: 0 for k in AGENT_CONFIG}
st.rerun()
if st.button("📥 导出对话", use_container_width=True):
if st.session_state.messages:
json_str = json.dumps(st.session_state.messages, ensure_ascii=False, indent=2)
st.download_button(
label="下载 JSON",
data=json_str,
file_name=f"agent_chat_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json"
)
st.divider()
# 显示生成的文件
st.subheader("📁 生成的文件")
workspace_dir = Path("workspace")
if workspace_dir.exists():
files = list(workspace_dir.glob("*"))
if files:
for file in files:
if file.is_file():
with st.expander(f"📄 {file.name}"):
try:
content = file.read_text(encoding='utf-8')
st.code(content[:500] + ("..." if len(content) > 500 else ""))
st.download_button(
label="⬇️ 下载",
data=content,
file_name=file.name,
mime="text/plain",
key=f"download_{file.name}"
)
except Exception as e:
st.error(f"读取失败:{e}")
else:
st.info("工作目录为空,请先运行工作流")
else:
st.info("工作目录不存在")
# 主界面
display_agent_status_row()
st.divider()
display_active_agent_banner()
display_chat_flow()
# 用户输入
st.divider()
user_input = st.chat_input("💡 输入需求,或点击启动按钮开始...")
if user_input:
add_message("User_Proxy", user_input, "提出需求")
st.rerun()
# 启动工作流
if start_btn:
if not api_key:
st.error("请先设置 API Key")
st.stop()
if not AUTOGEN_AVAILABLE:
st.error("请先安装 AutoGen: pip install pyautogen")
st.stop()
# 获取需求
user_msgs = [m for m in st.session_state.messages if m.get("agent_key") == "User_Proxy"]
if not user_msgs:
add_message("User_Proxy", "请开发一个电池健康状态预测 API", "提出需求")
latest_requirement = user_msgs[-1]["content"] if user_msgs else "请开发一个电池健康状态预测 API"
st.session_state.is_running = True
# 进度提示
progress_placeholder = st.empty()
progress_placeholder.info("🚀 启动 SDLC 工作流Agent 开始协作...")
try:
# 创建 Agent
pm_agent, qa_agent, dev_agent, orchestrator, user_proxy, llm_config = create_agents(
api_key=api_key,
base_url=base_url,
model=model
)
# 添加 Orchestrator 启动消息
add_message("Orchestrator", f"🚀 启动 SDLC 工作流!用户需求:{latest_requirement[:100]}...", "启动流程")
# 创建 GroupChat
groupchat = GroupChat(
agents=[pm_agent, qa_agent, dev_agent, orchestrator, user_proxy],
messages=[],
max_round=max_round,
speaker_selection_method="round_robin"
)
manager = GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# 初始消息
initial_message = f"""
请启动完整的 SDLC 流程:
【用户需求】
{latest_requirement}
【工作流程】
1. PM_Agent → 生成 SRS 文档
2. QA_Agent → 生成测试用例
3. Dev_Agent → 编写代码
4. User_Proxy → 执行测试
5. Orchestrator → 汇总报告
开始协作!每个步骤完成后请明确说明。
"""
# 执行对话
with st.spinner("💬 Agent 们正在协作中,请稍候..."):
chat_result = user_proxy.initiate_chat(
manager,
message=initial_message,
max_turns=max_round
)
# 记录所有对话
for msg in groupchat.messages:
agent_name = msg.get("name", "Unknown")
content = msg.get("content", "")
# 推断任务
task_map = {
"PM_Agent": "需求分析",
"QA_Agent": "测试设计",
"Dev_Agent": "代码实现",
"Orchestrator": "流程协调",
"User_Proxy": "测试执行"
}
task = task_map.get(agent_name, "工作中")
add_message(agent_name, content, task)
# 完成
add_message("Orchestrator", "✅ SDLC 流程完成!所有任务已完成。", "总结完成")
# 保存生成的文件
progress_placeholder.info("💾 正在保存生成的文件...")
saved_files = save_generated_files()
if saved_files:
progress_placeholder.success(f"✅ SDLC 工作流完成!已保存 {len(saved_files)} 个文件到 workspace/ 目录")
else:
progress_placeholder.success("✅ SDLC 工作流完成!查看上方的对话流了解详情。")
st.session_state.is_running = False
st.rerun()
except Exception as e:
st.session_state.is_running = False
st.session_state.current_agent = None
progress_placeholder.error(f"❌ 错误:{str(e)}")
st.error("请检查 API Key 和网络连接")
if stop_btn:
st.session_state.is_running = False
st.session_state.current_agent = None
st.info("⏸️ 工作流已暂停")
if __name__ == "__main__":
main()