This commit is contained in:
2026-03-12 17:58:15 +08:00
parent cd67c6dac2
commit 7931ce5070
7 changed files with 598 additions and 139 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
__pycache__ __pycache__
workspace workspace
logs

151
REALTIME_FILE_SAVING.md Normal file
View File

@@ -0,0 +1,151 @@
# 实时文件保存功能
## 功能说明
现在AutoGen SDLC 系统支持**实时文件保存**功能。在 Agent 协作过程中,每个 Agent 生成的内容会立即保存到文件系统中,而不需要等待整个工作流完成。
## 工作原理
### 修改前
```
对话开始 → PM Agent 发言 → QA Agent 发言 → Dev Agent 发言 → ... → 对话结束
遍历所有消息
批量保存文件
```
### 修改后
```
对话开始 → PM Agent 发言 → 立即保存 SRS.md
QA Agent 发言 → 立即保存 test_sample.py
Dev Agent 发言 → 立即保存 src_sample.py (或其他文件)
...
对话结束 (所有文件已实时保存完成)
```
## 技术实现
1. **回调机制**: 使用 AutoGen 的 `register_reply` 方法注册回调函数
2. **实时检测**: 每次 Agent 发言后立即检测内容并保存文件
3. **文件识别**: 根据 Agent 名称和内容类型自动判断文件类型
- PM_Agent → SRS.md (需求文档)
- QA_Agent → test_sample.py (测试用例)
- Dev_Agent → src_sample.py 或指定文件名的代码文件
## 使用方法
### 方法 1: 运行示例
```bash
# 设置 API Key
$env:DASHSCOPE_API_KEY="your_api_key"
# 运行实时文件保存示例
python usage_examples.py
```
示例会自动演示实时文件保存功能。
### 方法 2: 直接使用 API
```python
from autogen_sdls_system import AutoGenSDLCSystem
system = AutoGenSDLCSystem(
api_key="your_api_key",
workspace_dir="workspace" # 指定工作目录
)
result = system.run_workflow(
"我需要一个电池健康状态预测 API",
max_round=15
)
# 查看实时保存的文件
print("生成的文件:", result["saved_files"])
```
## 文件命名规则
### PM Agent
- **文件名**: `SRS.md`
- **触发条件**: 内容包含"需求"、"SRS"或"软件需求规格说明书"
- **内容格式**: Markdown 格式的 Software Requirements Specification
### QA Agent
- **文件名**: `test_sample.py`
- **触发条件**: 内容包含"test"、"测试"或"def test_"
- **内容格式**: Python 测试代码Pytest 风格)
### Dev Agent
- **带文件名的代码块**:
```python
# File: my_module.py
def my_function():
pass
```
会保存为 `my_module.py`
- **不带文件名的代码块**: 保存为 `src_sample.py`
- **触发条件**: 内容包含"def "、"class "或"import "
## 实时性保证
每个文件在以下时机立即保存:
1. Agent 生成回复内容
2. 回调函数被触发
3. 解析内容并提取代码/文档
4. 写入文件系统
5. 输出日志:`💾 [实时保存] <文件路径>`
整个过程在毫秒级完成,用户可以在控制台中实时看到文件保存的提示。
## 优势
✅ **即时可见**: 无需等待整个流程完成,立即可见生成的文件
✅ **过程追踪**: 可以观察每个 Agent 的贡献和输出顺序
✅ **错误恢复**: 如果中途中断,已生成的文件不会丢失
✅ **调试方便**: 可以实时检查每个阶段的输出质量
✅ **人机协同**: 人工可以在流程进行中审查已生成的文件
## 注意事项
⚠️ **文件覆盖**: 如果同一个 Agent 多次生成同类型文件,后生成的会覆盖先生成的
⚠️ **工作目录**: 确保工作目录有写权限
⚠️ **API 响应**: 某些模型响应可能包含多个代码块,只会保存第一个匹配的
## 示例输出
```
🚀 开始对话...
PM_Agent 正在分析需求...
💾 [实时保存] workspace\SRS.md
QA_Agent 正在设计测试...
💾 [实时保存] workspace\test_sample.py
Dev_Agent 正在编写代码...
💾 [实时保存] workspace\src_sample.py
User_Proxy 正在执行测试...
Orchestrator 正在汇总结果...
✅ 工作流完成!
📁 已保存 3 个文件:
✓ workspace\SRS.md
✓ workspace\test_sample.py
✓ workspace\src_sample.py
```
## 相关文件
- `autogen_sdls_system.py`: 核心实现文件
- `usage_examples.py`: 使用示例(包含示例 6
- `utils/callback_handler.py`: 回调处理器

View File

@@ -7,6 +7,8 @@ import sys
from pathlib import Path from pathlib import Path
from typing import Dict, Any, Optional, List from typing import Dict, Any, Optional, List
import json import json
import re
from datetime import datetime
# 添加项目根目录到路径 # 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
@@ -23,6 +25,81 @@ from utils.logger import get_logger
from utils.callback_handler import get_callback_handler from utils.callback_handler import get_callback_handler
def extract_code(content):
"""从 Markdown 代码块中提取纯代码"""
if "```python" in content:
parts = content.split("```python")
if len(parts) > 1:
code = parts[1].split("```")[0].strip()
return code
elif "```" in content:
parts = content.split("```")
if len(parts) > 1:
code = parts[1].strip()
return code
return content
def save_file_from_content(content: str, agent_name: str, workspace_dir: Path) -> List[str]:
"""
根据 Agent 的内容实时保存文件
Args:
content: Agent 的输出内容
agent_name: Agent 名称
workspace_dir: 工作目录
Returns:
保存的文件路径列表
"""
saved_files = []
# PM Agent 生成 SRS
if agent_name == "PM_Agent" and ("需求" in content or "SRS" in content or "软件需求规格说明书" in content):
file = workspace_dir / "SRS.md"
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(content)
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
# QA Agent 生成测试
elif agent_name == "QA_Agent" and ("test" in content.lower() or "测试" in content or "def test_" in content):
file = workspace_dir / "test_sample.py"
code = extract_code(content)
if code: # 确保有代码内容才保存
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 测试用例\n\n{code}")
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
# Dev Agent 生成代码 - 支持多个文件
elif agent_name == "Dev_Agent" and ("def " in content or "class " in content or "import " in content):
# 尝试提取带文件名的代码块
code_blocks = re.findall(r'```python\s*\n#(?:\s*File:|\s*filename:)?\s*([^\n]+)\n(.*?)```', content, re.DOTALL)
if code_blocks:
# 保存多个文件
for filename, code_content in code_blocks:
filename = filename.strip()
file_path = workspace_dir / filename
with open(file_path, "w", encoding="utf-8") as f:
f.write(f"# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{code_content}")
saved_files.append(str(file_path))
print(f"💾 [实时保存] {file_path}")
else:
# 没有文件名标记,保存为 src_sample.py
file = workspace_dir / "src_sample.py"
code = extract_code(content)
if code: # 确保有代码内容才保存
with open(file, "w", encoding="utf-8") as f:
f.write(f"# 源代码\n\n{code}")
saved_files.append(str(file))
print(f"💾 [实时保存] {file}")
return saved_files
class AutoGenSDLCSystem: class AutoGenSDLCSystem:
"""AutoGen SDLC 多智能体协同系统""" """AutoGen SDLC 多智能体协同系统"""
@@ -71,6 +148,42 @@ class AutoGenSDLCSystem:
self.groupchat = None self.groupchat = None
self.manager = None self.manager = None
# 记录已保存的文件
self.saved_files = []
def _on_new_message(self, sender, message, **kwargs):
"""
新消息回调函数 - 实时保存文件
Args:
sender: 发送者
message: 消息内容
**kwargs: 其他参数
"""
try:
agent_name = getattr(sender, 'name', 'Unknown')
content = message.get('content', '') if isinstance(message, dict) else str(message)
# 实时保存文件
saved_files = save_file_from_content(content, agent_name, self.workspace_dir)
# 如果有文件保存,记录并通知回调
if saved_files:
self.saved_files.extend(saved_files)
for file_path in saved_files:
# 确定文件类型
if file_path.endswith('.md'):
file_type = 'markdown'
elif file_path.endswith('.py'):
file_type = 'python'
else:
file_type = 'other'
# 触发回调
self.callback_handler.on_file_created(agent_name, file_path, file_type)
except Exception as e:
print(f"⚠️ 实时保存文件时出错:{e}")
def _create_agents(self): def _create_agents(self):
"""创建所有 Agent""" """创建所有 Agent"""
# PM Agent # PM Agent
@@ -113,7 +226,7 @@ class AutoGenSDLCSystem:
self.user_proxy = UserProxyAgent( self.user_proxy = UserProxyAgent(
name="User_Proxy", name="User_Proxy",
human_input_mode="NEVER", # 修复Web 环境不支持 TERMINAL human_input_mode="NEVER", # 修复Web 环境不支持 TERMINAL
max_consecutive_auto_reply=0, max_consecutive_auto_reply=10, # 允许连续自动回复,避免提前终止
code_execution_config={ code_execution_config={
"work_dir": str(self.workspace_dir), "work_dir": str(self.workspace_dir),
"use_docker": False, "use_docker": False,
@@ -167,6 +280,9 @@ class AutoGenSDLCSystem:
# 创建 GroupChat # 创建 GroupChat
self.create_groupchat(max_round) self.create_groupchat(max_round)
# 重置已保存文件列表
self.saved_files = []
# 构建初始消息 # 构建初始消息
initial_message = f""" initial_message = f"""
请启动完整的 SDLC 流程,开发以下功能: 请启动完整的 SDLC 流程,开发以下功能:
@@ -188,7 +304,17 @@ class AutoGenSDLCSystem:
""" """
try: try:
# 启动对话 # 启动对话 - 使用回调函数实时处理消息
print("\n🚀 开始对话...\n")
# 为每个 Agent 注册实时文件保存回调
for agent in [self.pm_agent, self.qa_agent, self.dev_agent, self.orchestrator]:
agent.register_reply(
self._on_new_message,
reply_func_total_count=1,
position=0 # 在最前面执行,确保实时保存
)
chat_result = self.user_proxy.initiate_chat( chat_result = self.user_proxy.initiate_chat(
self.manager, self.manager,
message=initial_message, message=initial_message,
@@ -215,7 +341,8 @@ class AutoGenSDLCSystem:
"success": True, "success": True,
"summary": chat_result.summary if hasattr(chat_result, 'summary') else "工作流完成", "summary": chat_result.summary if hasattr(chat_result, 'summary') else "工作流完成",
"messages": self.groupchat.messages, "messages": self.groupchat.messages,
"workspace": str(self.workspace_dir) "workspace": str(self.workspace_dir),
"saved_files": self.saved_files
} }
except Exception as e: except Exception as e:
@@ -225,56 +352,3 @@ class AutoGenSDLCSystem:
"error": str(e), "error": str(e),
"messages": self.groupchat.messages if self.groupchat else [] "messages": self.groupchat.messages if self.groupchat else []
} }
def export_conversation(self, output_path: Optional[str] = None) -> str:
"""导出对话历史"""
return self.logger.export_to_json(output_path)
def export_report(self, output_path: Optional[str] = None) -> str:
"""导出 Markdown 格式报告"""
return self.logger.export_to_markdown(output_path)
def main():
"""主函数 - 演示模式"""
print("=" * 60)
print("AutoGen SDLC 多智能体协同系统")
print("=" * 60)
# 检查 API Key
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
print("\n❌ 错误:未设置 DASHSCOPE_API_KEY 环境变量")
print("请运行export DASHSCOPE_API_KEY='your_api_key'")
return
# 创建系统实例
system = AutoGenSDLCSystem(api_key=api_key)
# 演示用例
demo_requirement = "我需要一个电池健康状态 (SOH) 预测 API能够接收电池的电压、电流、温度数据输出健康度百分比"
print(f"\n📋 演示需求:{demo_requirement}")
print("\n🚀 启动 SDLC 工作流...\n")
# 运行工作流
result = system.run_workflow(demo_requirement, max_round=15)
# 输出结果
print("\n" + "=" * 60)
if result["success"]:
print("✅ 工作流成功完成!")
print(f"📄 摘要:{result['summary'][:200]}...")
print(f"📂 工作目录:{result['workspace']}")
else:
print(f"❌ 工作流失败:{result.get('error', '未知错误')}")
# 导出报告
report_path = system.export_report()
print(f"📊 对话报告已导出:{report_path}")
print("\n" + "=" * 60)
if __name__ == "__main__":
main()

View File

@@ -106,7 +106,7 @@ QA_PROMPT = """你是一名资深测试工程师,专注于自动化测试和 T
- 保存为 workspace/test_*.py 文件 - 保存为 workspace/test_*.py 文件
""" """
DEV_PROMPT = """你是一名资深软件工程师,专注于汽车嵌入式 C++/Python 开发。 DEV_PROMPT = """你是一名资深软件工程师,专注于汽车嵌入式 C++/Python/Html 开发。
你的任务是根据 SRS 和测试用例编写: 你的任务是根据 SRS 和测试用例编写:
1. 核心业务逻辑代码 1. 核心业务逻辑代码
@@ -120,7 +120,7 @@ DEV_PROMPT = """你是一名资深软件工程师,专注于汽车嵌入式 C++
- 保持代码简洁、可读、可维护 - 保持代码简洁、可读、可维护
输出格式要求: 输出格式要求:
- 源文件命名为 src_<feature>.py 或 <feature>.cpp - 源文件 src_*, 自己决定文件类型
- 包含完整的 docstring - 包含完整的 docstring
- 添加类型注解 - 添加类型注解
- 保存为 workspace/ 目录下的相应文件 - 保存为 workspace/ 目录下的相应文件

79
demo_realtime_saving.py Normal file
View File

@@ -0,0 +1,79 @@
"""
实时文件保存演示脚本
展示如何在对话过程中实时创建文件
"""
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from autogen_sdls_system import AutoGenSDLCSystem
def demo_realtime_saving():
"""演示实时文件保存功能"""
print("=" * 70)
print("🚀 AutoGen SDLC - 实时文件保存演示")
print("=" * 70)
# 检查 API Key
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
print("\n❌ 错误:未设置 DASHSCOPE_API_KEY 环境变量")
print("请运行:$env:DASHSCOPE_API_KEY='your_api_key' (PowerShell)")
print("export DASHSCOPE_API_KEY='your_api_key' (Linux/Mac)")
return
# 创建系统实例,使用独立的工作目录
workspace = "workspace_realtime_demo"
print(f"\n📂 工作目录:{Path(workspace).absolute()}")
system = AutoGenSDLCSystem(
api_key=api_key,
workspace_dir=workspace
)
# 简单需求用于快速演示
requirement = """
我需要一个 Python 工具函数库,包含以下功能:
1. factorial(n): 计算阶乘
2. is_prime(n): 判断质数
3. fibonacci(n): 生成斐波那契数列前 n 项
要求:每个函数都有类型注解和完整文档字符串
"""
print(f"\n📋 需求:{requirement}")
print("\n💡 提示:您会看到文件在对话过程中被实时创建...\n")
# 运行工作流
result = system.run_workflow(requirement, max_round=15)
# 显示结果
print("\n" + "=" * 70)
if result["success"]:
print("✅ 工作流成功完成!")
print(f"\n📄 摘要:\n{result['summary'][:500]}...")
# 显示生成的文件
print(f"\n📁 已保存 {len(result['saved_files'])} 个文件:")
for file in result["saved_files"]:
exists = "" if Path(file).exists() else ""
print(f" {exists} {file}")
# 显示工作目录内容
print(f"\n📂 工作目录内容:")
workspace_path = Path(workspace)
if workspace_path.exists():
for item in workspace_path.iterdir():
if item.is_file():
size = f"({item.stat().st_size} bytes)"
print(f" 📄 {item.name} {size}")
else:
print(f"❌ 工作流失败:{result.get('error', '未知错误')}")
print("\n" + "=" * 70)
if __name__ == "__main__":
demo_realtime_saving()

View File

@@ -4,13 +4,15 @@ Streamlit 实时 Agent 协作平台
功能: 功能:
1. 实时展示每个 Agent 的状态和动作 1. 实时展示每个 Agent 的状态和动作
2. 自动保存生成的文件到 workspace/ 2. 自动保存生成的文件到 workspace/
3. 简单稳定的代码结构 3. Agent 自主决定对话对象(动态发言顺序)
4. 实时更新对话流和 Agent 状态
""" """
import streamlit as st import streamlit as st
import os import os
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
import time import time
import re
try: try:
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
@@ -23,17 +25,18 @@ import sys
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from config.llm_config import get_llm_config, PM_PROMPT, QA_PROMPT, DEV_PROMPT, ORCH_PROMPT from config.llm_config import get_llm_config, PM_PROMPT, QA_PROMPT, DEV_PROMPT, ORCH_PROMPT
from utils.callback_handler import get_callback_handler
# 页面配置 # 页面配置
st.set_page_config(page_title="多 Agent 协作平台", page_icon="🤖", layout="wide") st.set_page_config(page_title="多 Agent 协作平台", page_icon="🤖", layout="wide")
# Agent 配置 # Agent 配置
AGENTS = { AGENTS = {
"PM_Agent": {"name": "产品经理", "avatar": "📋", "color": "blue"}, "PM_Agent": {"name": "产品经理", "avatar": "📋", "color": "blue", "desc": "需求分析与 SRS 生成"},
"QA_Agent": {"name": "测试工程师", "avatar": "", "color": "green"}, "QA_Agent": {"name": "测试工程师", "avatar": "", "color": "green", "desc": "测试用例设计"},
"Dev_Agent": {"name": "开发工程师", "avatar": "💻", "color": "orange"}, "Dev_Agent": {"name": "开发工程师", "avatar": "💻", "color": "orange", "desc": "代码实现"},
"Orchestrator": {"name": "协调器", "avatar": "🎯", "color": "purple"}, "Orchestrator": {"name": "协调器", "avatar": "🎯", "color": "purple", "desc": "流程协调与验证"},
"User_Proxy": {"name": "用户代理", "avatar": "👤", "color": "gray"} "User_Proxy": {"name": "用户代理", "avatar": "👤", "color": "gray", "desc": "测试执行"}
} }
def init_state(): def init_state():
@@ -46,6 +49,14 @@ def init_state():
st.session_state.current_agent = None st.session_state.current_agent = None
if "agent_counts" not in st.session_state: if "agent_counts" not in st.session_state:
st.session_state.agent_counts = {k: 0 for k in AGENTS} st.session_state.agent_counts = {k: 0 for k in AGENTS}
if "agent_status" not in st.session_state:
st.session_state.agent_status = {k: "⚪ 等待中" for k in AGENTS}
if "current_task" not in st.session_state:
st.session_state.current_task = {k: "" for k in AGENTS}
if "saved_files" not in st.session_state:
st.session_state.saved_files = []
if "conversation_history" not in st.session_state:
st.session_state.conversation_history = []
def add_message(agent, content, task=""): def add_message(agent, content, task=""):
"""添加消息""" """添加消息"""
@@ -59,13 +70,28 @@ def add_message(agent, content, task=""):
st.session_state.agent_counts[agent] = st.session_state.agent_counts.get(agent, 0) + 1 st.session_state.agent_counts[agent] = st.session_state.agent_counts.get(agent, 0) + 1
st.session_state.current_agent = agent st.session_state.current_agent = agent
# 更新 Agent 状态
for ag in AGENTS:
if ag == agent:
st.session_state.agent_status[ag] = "🟢 发言中"
st.session_state.current_task[ag] = task
else:
st.session_state.agent_status[ag] = "⚪ 等待中"
# 添加到对话历史(用于展示完整的对话流)
st.session_state.conversation_history.append(msg)
def show_agent_status(): def show_agent_status():
"""显示 Agent 状态""" """显示 Agent 状态"""
st.subheader("🎯 Agent 实时状态")
cols = st.columns(len(AGENTS)) cols = st.columns(len(AGENTS))
for i, (agent_key, info) in enumerate(AGENTS.items()): for i, (agent_key, info) in enumerate(AGENTS.items()):
with cols[i]: with cols[i]:
is_active = st.session_state.current_agent == agent_key is_active = st.session_state.current_agent == agent_key
count = st.session_state.agent_counts.get(agent_key, 0) count = st.session_state.agent_counts.get(agent_key, 0)
status = st.session_state.agent_status.get(agent_key, "⚪ 等待中")
current_task = st.session_state.current_task.get(agent_key, "")
border_color = info["color"] border_color = info["color"]
bg_color = "#e8f5e9" if is_active else "white" bg_color = "#e8f5e9" if is_active else "white"
@@ -77,15 +103,18 @@ def show_agent_status():
border: 3px {"solid" if is_active else "dashed"} {border_color}; border: 3px {"solid" if is_active else "dashed"} {border_color};
background: {bg_color}; background: {bg_color};
text-align: center; text-align: center;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
'> '>
<div style='font-size: 2.5rem;'>{info["avatar"]}</div> <div style='font-size: 2.5rem;'>{info["avatar"]}</div>
<div style='font-weight: bold; margin: 5px 0;'>{info["name"]}</div> <div style='font-weight: bold; margin: 5px 0; font-size: 1.1rem;'>{info["name"]}</div>
<div style='font-size: 0.8rem; color: #666;'> <div style='font-size: 0.75rem; color: #666; margin-bottom: 5px;'>{info["desc"]}</div>
{"🟢 发言中" if is_active else "⚪ 等待中"} <div style='font-size: 0.85rem; color: #2e7d32; margin: 5px 0;'>
{status}
</div> </div>
<div style='font-size: 0.8rem; color: #999; margin-top: 5px;'> <div style='font-size: 0.75rem; color: #999; margin-top: 5px;'>
💬 {count} 条消息 💬 {count} 条消息
</div> </div>
{f"<div style='font-size: 0.7rem; color: #1976d2; margin-top: 5px; font-style: italic;'>📋 {current_task}</div>" if current_task else ""}
</div> </div>
""", unsafe_allow_html=True) """, unsafe_allow_html=True)
@@ -93,17 +122,41 @@ def show_chat():
"""显示对话流""" """显示对话流"""
st.subheader("💬 Agent 对话流") st.subheader("💬 Agent 对话流")
if not st.session_state.messages: if not st.session_state.conversation_history:
st.info("👈 暂无对话,请在下方输入需求并启动") st.info("👈 暂无对话,请在下方输入需求并启动")
return return
for msg in st.session_state.messages: # 使用容器展示对话,支持滚动
chat_container = st.container()
with chat_container:
for i, msg in enumerate(st.session_state.conversation_history):
agent = msg["agent"] agent = msg["agent"]
info = AGENTS.get(agent, {"name": "未知", "avatar": "🤖", "color": "gray"}) info = AGENTS.get(agent, {"name": "未知", "avatar": "🤖", "color": "gray"})
# 创建对话气泡
with st.chat_message(agent.lower(), avatar=info["avatar"]): with st.chat_message(agent.lower(), avatar=info["avatar"]):
st.markdown(f"**{info['name']}** *{msg['time']}* - {msg['task']}") # 显示 Agent 名称、时间和任务
st.markdown(msg["content"][:800] + ("..." if len(msg["content"]) > 800 else "")) task_info = f"- {msg['task']}" if msg['task'] else ""
st.markdown(f"**{info['name']}** *{msg['time']}* {task_info}")
# 处理长内容,提供折叠选项
content = msg["content"]
if len(content) > 1000:
# 短预览 + 展开详情
preview = content[:800] + "..."
st.markdown(preview)
with st.expander("查看完整内容"):
st.markdown(content)
else:
st.markdown(content)
# 如果是代码内容,提供语法高亮
if "```" in content:
code_blocks = re.findall(r'```(\w+)?\n(.*?)```', content, re.DOTALL)
for lang, code in code_blocks:
language = lang if lang else "python"
st.code(code, language=language)
def extract_code(content): def extract_code(content):
"""从 Markdown 代码块中提取纯代码""" """从 Markdown 代码块中提取纯代码"""
@@ -123,6 +176,40 @@ def extract_code(content):
# 没有标记,返回原内容 # 没有标记,返回原内容
return content return content
def is_code_complete(code):
"""检查代码是否完整"""
if not code or not code.strip():
return False
# 检查是否有未闭合的括号
if code.count('(') != code.count(')'):
print(f"⚠️ 括号不匹配:( {code.count('(')} vs ) {code.count(')')}")
return False
if code.count('[') != code.count(']'):
print(f"⚠️ 方括号不匹配:[ {code.count('[')} vs ] {code.count(']')}")
return False
if code.count('{') != code.count('}'):
print(f"⚠️ 花括号不匹配:{{ {code.count('{')} vs }} {code.count('}')}")
return False
# 检查是否以完整的方式结束(不是突然截断)
lines = code.strip().split('\n')
if lines:
last_line = lines[-1].rstrip()
# 如果最后一行以这些符号结尾,说明可能被截断了
if last_line.endswith(':') or last_line.endswith('\\') or last_line.endswith('(') or last_line.endswith('['):
print(f"⚠️ 代码可能截断:最后一行是 '{last_line}'")
return False
# 尝试简单的语法检查
try:
compile(code, '<string>', 'exec')
except SyntaxError as e:
print(f"⚠️ 语法错误:{e}")
return False
return True
def save_files(): def save_files():
"""保存生成的文件到 workspace/""" """保存生成的文件到 workspace/"""
workspace = Path("workspace") workspace = Path("workspace")
@@ -130,37 +217,70 @@ def save_files():
files = [] files = []
# 遍历消息,提取并保存文件 # 1. 保存 PM Agent 的 SRS 文档
for msg in st.session_state.messages: for msg in st.session_state.messages:
agent = msg["agent"] if msg["agent"] == "PM_Agent" and ("需求" in msg["content"] or "SRS" in msg["content"]):
content = msg["content"]
# PM Agent 生成 SRS
if agent == "PM_Agent" and ("需求" in content or "SRS" in content):
file = workspace / "SRS.md" file = workspace / "SRS.md"
with open(file, "w", encoding="utf-8") as f: with open(file, "w", encoding="utf-8") as f:
f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(content) f.write(msg["content"])
files.append(str(file)) files.append(str(file))
break # 只保存第一个
# QA Agent 生成测试 # 2. 保存 QA Agent 的测试代码
if agent == "QA_Agent" and ("test" in content.lower() or "测试" in content or "def test_" in content): for msg in st.session_state.messages:
if msg["agent"] == "QA_Agent" and ("test" in msg["content"].lower() or "测试" in msg["content"] or "def test_" in msg["content"]):
file = workspace / "test_sample.py" file = workspace / "test_sample.py"
# 提取纯代码 code = extract_code(msg["content"])
code = extract_code(content)
with open(file, "w", encoding="utf-8") as f: with open(file, "w", encoding="utf-8") as f:
f.write(f"# 测试用例\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"# 测试用例\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(code) f.write(code)
files.append(str(file)) files.append(str(file))
break # 只保存第一个
# Dev Agent 生成代码 # 3. 保存 Dev Agent 的代码 - 收集所有消息合并处理
if agent == "Dev_Agent" and ("def " in content or "class " in content): dev_messages = [msg for msg in st.session_state.messages if msg["agent"] == "Dev_Agent"]
if dev_messages:
# 合并所有 Dev Agent 的消息内容
all_dev_content = "\n\n".join([msg["content"] for msg in dev_messages])
# 尝试提取带文件名的代码块
import re
code_blocks = re.findall(r'```python\s*\n#(?:\s*File:|\s*filename:)?\s*([^\n]+)\n(.*?)```', all_dev_content, re.DOTALL)
if code_blocks:
# 保存多个文件
for filename, code_content in code_blocks:
filename = filename.strip()
file_path = workspace / filename
code = extract_code(f"```python\n{code_content}```")
# 检查完整性
print(f"\n🔍 检查文件 {filename}...")
if not is_code_complete(code):
print(f"⚠️ 警告:{filename} 代码可能不完整,但仍会保存")
else:
print(f"{filename} 代码完整")
with open(file_path, "w", encoding="utf-8") as f:
f.write(f"# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{code}")
files.append(str(file_path))
else:
# 没有文件名标记,合并所有代码保存为 src_sample.py
all_code = "\n\n".join([extract_code(msg["content"]) for msg in dev_messages])
file = workspace / "src_sample.py" file = workspace / "src_sample.py"
# 提取纯代码
code = extract_code(content) # 检查完整性
print(f"\n🔍 检查文件 src_sample.py...")
if not is_code_complete(all_code):
print(f"⚠️ 警告src_sample.py 代码可能不完整,但仍会保存")
else:
print(f"✅ src_sample.py 代码完整")
with open(file, "w", encoding="utf-8") as f: with open(file, "w", encoding="utf-8") as f:
f.write(f"# 源代码\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"# 源代码\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(code) f.write(all_code)
files.append(str(file)) files.append(str(file))
return files return files

View File

@@ -12,7 +12,7 @@ from autogen_sdls_system import AutoGenSDLCSystem
def example_1_basic_workflow(): def example_1_basic_workflow():
"""示例 1: 基本工作流 - 电池健康预测 API""" """示例 1: 基本工作流 - 自动驾驶车辆检测 API"""
print("\n" + "=" * 70) print("\n" + "=" * 70)
print("示例 1: 基本 SDLC 工作流") print("示例 1: 基本 SDLC 工作流")
print("=" * 70) print("=" * 70)
@@ -25,15 +25,18 @@ def example_1_basic_workflow():
system = AutoGenSDLCSystem(api_key=api_key) system = AutoGenSDLCSystem(api_key=api_key)
requirement = """ requirement = """
我需要一个电池健康状态 (SOH) 预测 API要求 我需要一个自动驾驶目标检测与跟踪 API要求
1. 接收电池的电压、电流、温度数据 1. 接收摄像头图像和激光雷达点云数据
2. 输出健康度百分比0-100% 2. 实现车辆、行人、骑行者的实时检测
3. 符合汽车行业标准 3. 输出目标的 3D 边界框(位置、尺寸、朝向)
4. 包含错误处理和边界情况检测 4. 支持多目标跟踪MOT输出轨迹信息
5. 符合车规级功能安全标准ISO 26262 ASIL-B
6. 响应时间 < 50ms检测精度 mAP > 85%
7. 包含异常处理(传感器失效、恶劣天气等)
""" """
print(f"\n📋 需求:{requirement}") print(f"\n📋 需求:{requirement}")
result = system.run_workflow(requirement, max_round=15) result = system.run_workflow(requirement, max_round=20)
if result["success"]: if result["success"]:
print("\n✅ 工作流完成!") print("\n✅ 工作流完成!")
@@ -92,7 +95,7 @@ def example_3_direct_agent_call():
pm_agent = ProductManagerAgent(llm_config=llm_config) pm_agent = ProductManagerAgent(llm_config=llm_config)
# 简单需求 # 简单需求
requirement = "开发一个车载蓝牙连接管理模块" requirement = "开发一个车载摄像头图像识别模块,支持车道线检测和交通标志识别"
print(f"\n📋 需求:{requirement}") print(f"\n📋 需求:{requirement}")
print("\n🤖 PM Agent 正在生成 SRS...\n") print("\n🤖 PM Agent 正在生成 SRS...\n")
@@ -126,13 +129,16 @@ def example_4_test_generation():
# 简单的 SRS 片段 # 简单的 SRS 片段
srs_sample = """ srs_sample = """
【功能性需求】 【功能性需求】
FR-001: 系统应能接收电池电压数据(范围 0-5V FR-001: 系统应能接收摄像头 RGB 图像(分辨率 1920x1080
FR-002: 系统应能计算 SOH 百分比(范围 0-100% FR-002: 系统应能检测车辆、行人、骑行者三类目标
FR-003: 系统应能检测异常数据并报错 FR-003: 系统应能输出 3D 边界框x,y,z,long,width,height,yaw
FR-004: 系统应能处理夜间和雨雪天气场景
【非功能性需求】 【非功能性需求】
NFR-001: 响应时间 < 100ms NFR-001: 检测延迟 < 50ms
NFR-002: 符合 MISRA-C 规范 NFR-002: 检测精度 mAP@0.5 > 85%
NFR-003: 符合 ISO 26262 ASIL-B 功能安全标准
NFR-004: 支持 -40°C~85°C 工作温度范围
""" """
print("\n📋 SRS 样本:") print("\n📋 SRS 样本:")
@@ -167,15 +173,25 @@ def example_5_code_generation():
# SRS 和测试样本 # SRS 和测试样本
srs_sample = """ srs_sample = """
FR-001: 实现 SOH 计算函数 FR-001: 实现点云预处理函数(去噪、下采样、坐标变换)
FR-002: 输入验证(电压 0-5V温度 -20~60°C FR-002: 实现 3D 目标检测函数(基于 PointPillars 或 VoxelNet
FR-003: 输出限制在 0-100% FR-003: 输入验证(点云格式 Nx5包含 x,y,z,intensity,timestamp
FR-004: 输出限制(边界框参数归一化,置信度 0-1
""" """
test_sample = """ test_sample = """
def test_soh_calculation(): def test_pointcloud_preprocessing():
assert calculate_soh(3.7, 2.0, 25)["soh"] > 0 raw_points = np.random.rand(10000, 5).astype(np.float32)
assert calculate_soh(0, 0, 25)["soh"] == 0 processed = preprocess_pointcloud(raw_points)
assert processed.shape[0] < raw_points.shape[0] # 下采样后点数减少
assert not np.isnan(processed).any() # 无 NaN
def test_3d_detection():
points = np.random.rand(5000, 5).astype(np.float32)
boxes, scores = detect_objects_3d(points)
assert len(boxes) == len(scores)
assert all(0 <= s <= 1 for s in scores)
assert boxes.shape[1] == 7 # (x,y,z,l,w,h,yaw)
""" """
print("\n📋 需求和测试样本已准备") print("\n📋 需求和测试样本已准备")
@@ -189,35 +205,52 @@ def test_soh_calculation():
print(f"\n❌ 生成失败:{e}") print(f"\n❌ 生成失败:{e}")
def run_all_examples(): def example_6_realtime_file_saving():
"""运行所有示例""" """示例 6: 实时文件保存演示"""
print("\n" + "=" * 70) print("\n" + "=" * 70)
print("🚀 AutoGen SDLC 系统 - 使用示例合集") print("示例 6: 实时文件保存 - 生成的文件会立即创建")
print("=" * 70) print("=" * 70)
examples = [ api_key = os.getenv("DASHSCOPE_API_KEY")
("直接调用 PM Agent", example_3_direct_agent_call), if not api_key:
("QA Agent 生成测试", example_4_test_generation), print("❌ 请设置 DASHSCOPE_API_KEY 环境变量")
("Dev Agent 生成代码", example_5_code_generation), return
("自定义 Agent 配置", example_2_custom_agents),
("完整工作流", example_1_basic_workflow),
]
for name, func in examples: system = AutoGenSDLCSystem(api_key=api_key, workspace_dir="workspace_realtime_demo")
print(f"\n\n{'='*70}")
print(f"▶️ 运行示例:{name}")
print('='*70)
try: requirement = """
func() 我需要一个简单的 Python 工具函数库,要求:
except Exception as e: 1. 包含一个计算阶乘的函数 factorial(n)
print(f"\n❌ 示例执行失败:{e}") 2. 包含一个判断质数的函数 is_prime(n)
3. 包含一个生成斐波那契数列的函数 fibonacci(n)
4. 每个函数都要有完整的文档字符串和类型注解
5. 符合 PEP8 规范
"""
input("\n按 Enter 键继续下一个示例...") print(f"\n📋 需求:{requirement}")
print("\n💡 提示:在对话过程中,您会看到文件被实时保存到 workspace_realtime_demo 目录\n")
print("\n" + "=" * 70) result = system.run_workflow(requirement, max_round=15)
print("✅ 所有示例运行完成!")
print("=" * 70) if result["success"]:
print("\n✅ 工作流完成!")
print(f"📄 摘要:{result['summary'][:300]}...")
# 显示生成的文件
print(f"\n📁 已保存 {len(result['saved_files'])} 个文件:")
for file in result["saved_files"]:
print(f"{file}")
# 验证文件是否真的存在
print("\n🔍 验证文件是否存在...")
import os
for file in result["saved_files"]:
if os.path.exists(file):
print(f"{file} (存在)")
else:
print(f"{file} (不存在)")
else:
print(f"\n❌ 工作流失败:{result.get('error')}")
if __name__ == "__main__": if __name__ == "__main__":
@@ -225,6 +258,7 @@ if __name__ == "__main__":
# example_3_direct_agent_call() # example_3_direct_agent_call()
# example_4_test_generation() # example_4_test_generation()
# example_5_code_generation() # example_5_code_generation()
# example_6_realtime_file_saving() # 新增:实时文件保存演示
# 运行所有示例 # 运行所有示例
run_all_examples() example_1_basic_workflow()