From 7931ce5070bb1dcbc2daff0ecc4a83c249c1f64e Mon Sep 17 00:00:00 2001 From: ZhuJW <1421267742@qq.com> Date: Thu, 12 Mar 2026 17:58:15 +0800 Subject: [PATCH] fix --- .gitignore | 3 +- REALTIME_FILE_SAVING.md | 151 +++++++++++++++++++++++++++++++ autogen_sdls_system.py | 188 +++++++++++++++++++++++++++------------ config/llm_config.py | 4 +- demo_realtime_saving.py | 79 +++++++++++++++++ frontend/app.py | 190 ++++++++++++++++++++++++++++++++-------- usage_examples.py | 122 ++++++++++++++++---------- 7 files changed, 598 insertions(+), 139 deletions(-) create mode 100644 REALTIME_FILE_SAVING.md create mode 100644 demo_realtime_saving.py diff --git a/.gitignore b/.gitignore index ff1f7ce..cacb502 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ __pycache__ -workspace \ No newline at end of file +workspace +logs \ No newline at end of file diff --git a/REALTIME_FILE_SAVING.md b/REALTIME_FILE_SAVING.md new file mode 100644 index 0000000..b38071b --- /dev/null +++ b/REALTIME_FILE_SAVING.md @@ -0,0 +1,151 @@ +# 实时文件保存功能 + +## 功能说明 + +现在,AutoGen SDLC 系统支持**实时文件保存**功能。在 Agent 协作过程中,每个 Agent 生成的内容会立即保存到文件系统中,而不需要等待整个工作流完成。 + +## 工作原理 + +### 修改前 +``` +对话开始 → PM Agent 发言 → QA Agent 发言 → Dev Agent 发言 → ... → 对话结束 + ↓ + 遍历所有消息 + ↓ + 批量保存文件 +``` + +### 修改后 +``` +对话开始 → PM Agent 发言 → 立即保存 SRS.md + ↓ + QA Agent 发言 → 立即保存 test_sample.py + ↓ + Dev Agent 发言 → 立即保存 src_sample.py (或其他文件) + ↓ + ... + ↓ + 对话结束 (所有文件已实时保存完成) +``` + +## 技术实现 + +1. **回调机制**: 使用 AutoGen 的 `register_reply` 方法注册回调函数 +2. **实时检测**: 每次 Agent 发言后立即检测内容并保存文件 +3. **文件识别**: 根据 Agent 名称和内容类型自动判断文件类型 + - PM_Agent → SRS.md (需求文档) + - QA_Agent → test_sample.py (测试用例) + - Dev_Agent → src_sample.py 或指定文件名的代码文件 + +## 使用方法 + +### 方法 1: 运行示例 + +```bash +# 设置 API Key +$env:DASHSCOPE_API_KEY="your_api_key" + +# 运行实时文件保存示例 +python usage_examples.py +``` + +示例会自动演示实时文件保存功能。 + +### 方法 2: 直接使用 API + +```python +from autogen_sdls_system import AutoGenSDLCSystem + +system = AutoGenSDLCSystem( + api_key="your_api_key", + workspace_dir="workspace" # 指定工作目录 +) + +result = system.run_workflow( + "我需要一个电池健康状态预测 API", + max_round=15 +) + +# 查看实时保存的文件 +print("生成的文件:", result["saved_files"]) +``` + +## 文件命名规则 + +### PM Agent +- **文件名**: `SRS.md` +- **触发条件**: 内容包含"需求"、"SRS"或"软件需求规格说明书" +- **内容格式**: Markdown 格式的 Software Requirements Specification + +### QA Agent +- **文件名**: `test_sample.py` +- **触发条件**: 内容包含"test"、"测试"或"def test_" +- **内容格式**: Python 测试代码(Pytest 风格) + +### Dev Agent +- **带文件名的代码块**: + ```python + # File: my_module.py + def my_function(): + pass + ``` + 会保存为 `my_module.py` + +- **不带文件名的代码块**: 保存为 `src_sample.py` +- **触发条件**: 内容包含"def "、"class "或"import " + +## 实时性保证 + +每个文件在以下时机立即保存: +1. Agent 生成回复内容 +2. 回调函数被触发 +3. 解析内容并提取代码/文档 +4. 写入文件系统 +5. 输出日志:`💾 [实时保存] <文件路径>` + +整个过程在毫秒级完成,用户可以在控制台中实时看到文件保存的提示。 + +## 优势 + +✅ **即时可见**: 无需等待整个流程完成,立即可见生成的文件 +✅ **过程追踪**: 可以观察每个 Agent 的贡献和输出顺序 +✅ **错误恢复**: 如果中途中断,已生成的文件不会丢失 +✅ **调试方便**: 可以实时检查每个阶段的输出质量 +✅ **人机协同**: 人工可以在流程进行中审查已生成的文件 + +## 注意事项 + +⚠️ **文件覆盖**: 如果同一个 Agent 多次生成同类型文件,后生成的会覆盖先生成的 +⚠️ **工作目录**: 确保工作目录有写权限 +⚠️ **API 响应**: 某些模型响应可能包含多个代码块,只会保存第一个匹配的 + +## 示例输出 + +``` +🚀 开始对话... + +PM_Agent 正在分析需求... +💾 [实时保存] workspace\SRS.md + +QA_Agent 正在设计测试... +💾 [实时保存] workspace\test_sample.py + +Dev_Agent 正在编写代码... +💾 [实时保存] workspace\src_sample.py + +User_Proxy 正在执行测试... + +Orchestrator 正在汇总结果... + +✅ 工作流完成! +📁 已保存 3 个文件: + ✓ workspace\SRS.md + ✓ workspace\test_sample.py + ✓ workspace\src_sample.py +``` + +## 相关文件 + +- `autogen_sdls_system.py`: 核心实现文件 +- `usage_examples.py`: 使用示例(包含示例 6) +- `utils/callback_handler.py`: 回调处理器 diff --git a/autogen_sdls_system.py b/autogen_sdls_system.py index b327fcd..c196b51 100644 --- a/autogen_sdls_system.py +++ b/autogen_sdls_system.py @@ -7,6 +7,8 @@ import sys from pathlib import Path from typing import Dict, Any, Optional, List import json +import re +from datetime import datetime # 添加项目根目录到路径 sys.path.insert(0, str(Path(__file__).parent)) @@ -23,6 +25,81 @@ from utils.logger import get_logger from utils.callback_handler import get_callback_handler +def extract_code(content): + """从 Markdown 代码块中提取纯代码""" + if "```python" in content: + parts = content.split("```python") + if len(parts) > 1: + code = parts[1].split("```")[0].strip() + return code + elif "```" in content: + parts = content.split("```") + if len(parts) > 1: + code = parts[1].strip() + return code + return content + + +def save_file_from_content(content: str, agent_name: str, workspace_dir: Path) -> List[str]: + """ + 根据 Agent 的内容实时保存文件 + + Args: + content: Agent 的输出内容 + agent_name: Agent 名称 + workspace_dir: 工作目录 + + Returns: + 保存的文件路径列表 + """ + saved_files = [] + + # PM Agent 生成 SRS + if agent_name == "PM_Agent" and ("需求" in content or "SRS" in content or "软件需求规格说明书" in content): + file = workspace_dir / "SRS.md" + with open(file, "w", encoding="utf-8") as f: + f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") + f.write(content) + saved_files.append(str(file)) + print(f"💾 [实时保存] {file}") + + # QA Agent 生成测试 + elif agent_name == "QA_Agent" and ("test" in content.lower() or "测试" in content or "def test_" in content): + file = workspace_dir / "test_sample.py" + code = extract_code(content) + if code: # 确保有代码内容才保存 + with open(file, "w", encoding="utf-8") as f: + f.write(f"# 测试用例\n\n{code}") + saved_files.append(str(file)) + print(f"💾 [实时保存] {file}") + + # Dev Agent 生成代码 - 支持多个文件 + elif agent_name == "Dev_Agent" and ("def " in content or "class " in content or "import " in content): + # 尝试提取带文件名的代码块 + code_blocks = re.findall(r'```python\s*\n#(?:\s*File:|\s*filename:)?\s*([^\n]+)\n(.*?)```', content, re.DOTALL) + + if code_blocks: + # 保存多个文件 + for filename, code_content in code_blocks: + filename = filename.strip() + file_path = workspace_dir / filename + with open(file_path, "w", encoding="utf-8") as f: + f.write(f"# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{code_content}") + saved_files.append(str(file_path)) + print(f"💾 [实时保存] {file_path}") + else: + # 没有文件名标记,保存为 src_sample.py + file = workspace_dir / "src_sample.py" + code = extract_code(content) + if code: # 确保有代码内容才保存 + with open(file, "w", encoding="utf-8") as f: + f.write(f"# 源代码\n\n{code}") + saved_files.append(str(file)) + print(f"💾 [实时保存] {file}") + + return saved_files + + class AutoGenSDLCSystem: """AutoGen SDLC 多智能体协同系统""" @@ -70,7 +147,43 @@ class AutoGenSDLCSystem: # 创建 GroupChat self.groupchat = None self.manager = None + + # 记录已保存的文件 + self.saved_files = [] + def _on_new_message(self, sender, message, **kwargs): + """ + 新消息回调函数 - 实时保存文件 + + Args: + sender: 发送者 + message: 消息内容 + **kwargs: 其他参数 + """ + try: + agent_name = getattr(sender, 'name', 'Unknown') + content = message.get('content', '') if isinstance(message, dict) else str(message) + + # 实时保存文件 + saved_files = save_file_from_content(content, agent_name, self.workspace_dir) + + # 如果有文件保存,记录并通知回调 + if saved_files: + self.saved_files.extend(saved_files) + for file_path in saved_files: + # 确定文件类型 + if file_path.endswith('.md'): + file_type = 'markdown' + elif file_path.endswith('.py'): + file_type = 'python' + else: + file_type = 'other' + + # 触发回调 + self.callback_handler.on_file_created(agent_name, file_path, file_type) + except Exception as e: + print(f"⚠️ 实时保存文件时出错:{e}") + def _create_agents(self): """创建所有 Agent""" # PM Agent @@ -113,7 +226,7 @@ class AutoGenSDLCSystem: self.user_proxy = UserProxyAgent( name="User_Proxy", human_input_mode="NEVER", # 修复:Web 环境不支持 TERMINAL - max_consecutive_auto_reply=0, + max_consecutive_auto_reply=10, # 允许连续自动回复,避免提前终止 code_execution_config={ "work_dir": str(self.workspace_dir), "use_docker": False, @@ -167,6 +280,9 @@ class AutoGenSDLCSystem: # 创建 GroupChat self.create_groupchat(max_round) + # 重置已保存文件列表 + self.saved_files = [] + # 构建初始消息 initial_message = f""" 请启动完整的 SDLC 流程,开发以下功能: @@ -188,7 +304,17 @@ class AutoGenSDLCSystem: """ try: - # 启动对话 + # 启动对话 - 使用回调函数实时处理消息 + print("\n🚀 开始对话...\n") + + # 为每个 Agent 注册实时文件保存回调 + for agent in [self.pm_agent, self.qa_agent, self.dev_agent, self.orchestrator]: + agent.register_reply( + self._on_new_message, + reply_func_total_count=1, + position=0 # 在最前面执行,确保实时保存 + ) + chat_result = self.user_proxy.initiate_chat( self.manager, message=initial_message, @@ -215,7 +341,8 @@ class AutoGenSDLCSystem: "success": True, "summary": chat_result.summary if hasattr(chat_result, 'summary') else "工作流完成", "messages": self.groupchat.messages, - "workspace": str(self.workspace_dir) + "workspace": str(self.workspace_dir), + "saved_files": self.saved_files } except Exception as e: @@ -224,57 +351,4 @@ class AutoGenSDLCSystem: "success": False, "error": str(e), "messages": self.groupchat.messages if self.groupchat else [] - } - - def export_conversation(self, output_path: Optional[str] = None) -> str: - """导出对话历史""" - return self.logger.export_to_json(output_path) - - def export_report(self, output_path: Optional[str] = None) -> str: - """导出 Markdown 格式报告""" - return self.logger.export_to_markdown(output_path) - - -def main(): - """主函数 - 演示模式""" - print("=" * 60) - print("AutoGen SDLC 多智能体协同系统") - print("=" * 60) - - # 检查 API Key - api_key = os.getenv("DASHSCOPE_API_KEY") - if not api_key: - print("\n❌ 错误:未设置 DASHSCOPE_API_KEY 环境变量") - print("请运行:export DASHSCOPE_API_KEY='your_api_key'") - return - - # 创建系统实例 - system = AutoGenSDLCSystem(api_key=api_key) - - # 演示用例 - demo_requirement = "我需要一个电池健康状态 (SOH) 预测 API,能够接收电池的电压、电流、温度数据,输出健康度百分比" - - print(f"\n📋 演示需求:{demo_requirement}") - print("\n🚀 启动 SDLC 工作流...\n") - - # 运行工作流 - result = system.run_workflow(demo_requirement, max_round=15) - - # 输出结果 - print("\n" + "=" * 60) - if result["success"]: - print("✅ 工作流成功完成!") - print(f"📄 摘要:{result['summary'][:200]}...") - print(f"📂 工作目录:{result['workspace']}") - else: - print(f"❌ 工作流失败:{result.get('error', '未知错误')}") - - # 导出报告 - report_path = system.export_report() - print(f"📊 对话报告已导出:{report_path}") - - print("\n" + "=" * 60) - - -if __name__ == "__main__": - main() + } \ No newline at end of file diff --git a/config/llm_config.py b/config/llm_config.py index 3f0786f..a52aed0 100644 --- a/config/llm_config.py +++ b/config/llm_config.py @@ -106,7 +106,7 @@ QA_PROMPT = """你是一名资深测试工程师,专注于自动化测试和 T - 保存为 workspace/test_*.py 文件 """ -DEV_PROMPT = """你是一名资深软件工程师,专注于汽车嵌入式 C++/Python 开发。 +DEV_PROMPT = """你是一名资深软件工程师,专注于汽车嵌入式 C++/Python/Html 开发。 你的任务是根据 SRS 和测试用例编写: 1. 核心业务逻辑代码 @@ -120,7 +120,7 @@ DEV_PROMPT = """你是一名资深软件工程师,专注于汽车嵌入式 C++ - 保持代码简洁、可读、可维护 输出格式要求: -- 源文件命名为 src_.py 或 .cpp +- 源文件 src_*, 自己决定文件类型 - 包含完整的 docstring - 添加类型注解 - 保存为 workspace/ 目录下的相应文件 diff --git a/demo_realtime_saving.py b/demo_realtime_saving.py new file mode 100644 index 0000000..619de4f --- /dev/null +++ b/demo_realtime_saving.py @@ -0,0 +1,79 @@ +""" +实时文件保存演示脚本 +展示如何在对话过程中实时创建文件 +""" +import os +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) + +from autogen_sdls_system import AutoGenSDLCSystem + + +def demo_realtime_saving(): + """演示实时文件保存功能""" + print("=" * 70) + print("🚀 AutoGen SDLC - 实时文件保存演示") + print("=" * 70) + + # 检查 API Key + api_key = os.getenv("DASHSCOPE_API_KEY") + if not api_key: + print("\n❌ 错误:未设置 DASHSCOPE_API_KEY 环境变量") + print("请运行:$env:DASHSCOPE_API_KEY='your_api_key' (PowerShell)") + print("或:export DASHSCOPE_API_KEY='your_api_key' (Linux/Mac)") + return + + # 创建系统实例,使用独立的工作目录 + workspace = "workspace_realtime_demo" + print(f"\n📂 工作目录:{Path(workspace).absolute()}") + + system = AutoGenSDLCSystem( + api_key=api_key, + workspace_dir=workspace + ) + + # 简单需求用于快速演示 + requirement = """ +我需要一个 Python 工具函数库,包含以下功能: +1. factorial(n): 计算阶乘 +2. is_prime(n): 判断质数 +3. fibonacci(n): 生成斐波那契数列前 n 项 +要求:每个函数都有类型注解和完整文档字符串 +""" + + print(f"\n📋 需求:{requirement}") + print("\n💡 提示:您会看到文件在对话过程中被实时创建...\n") + + # 运行工作流 + result = system.run_workflow(requirement, max_round=15) + + # 显示结果 + print("\n" + "=" * 70) + if result["success"]: + print("✅ 工作流成功完成!") + print(f"\n📄 摘要:\n{result['summary'][:500]}...") + + # 显示生成的文件 + print(f"\n📁 已保存 {len(result['saved_files'])} 个文件:") + for file in result["saved_files"]: + exists = "✓" if Path(file).exists() else "✗" + print(f" {exists} {file}") + + # 显示工作目录内容 + print(f"\n📂 工作目录内容:") + workspace_path = Path(workspace) + if workspace_path.exists(): + for item in workspace_path.iterdir(): + if item.is_file(): + size = f"({item.stat().st_size} bytes)" + print(f" 📄 {item.name} {size}") + else: + print(f"❌ 工作流失败:{result.get('error', '未知错误')}") + + print("\n" + "=" * 70) + + +if __name__ == "__main__": + demo_realtime_saving() diff --git a/frontend/app.py b/frontend/app.py index 7c61042..d2f515d 100644 --- a/frontend/app.py +++ b/frontend/app.py @@ -4,13 +4,15 @@ Streamlit 实时 Agent 协作平台 功能: 1. 实时展示每个 Agent 的状态和动作 2. 自动保存生成的文件到 workspace/ -3. 简单稳定的代码结构 +3. Agent 自主决定对话对象(动态发言顺序) +4. 实时更新对话流和 Agent 状态 """ import streamlit as st import os from pathlib import Path from datetime import datetime import time +import re try: from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager @@ -23,17 +25,18 @@ import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from config.llm_config import get_llm_config, PM_PROMPT, QA_PROMPT, DEV_PROMPT, ORCH_PROMPT +from utils.callback_handler import get_callback_handler # 页面配置 st.set_page_config(page_title="多 Agent 协作平台", page_icon="🤖", layout="wide") # Agent 配置 AGENTS = { - "PM_Agent": {"name": "产品经理", "avatar": "📋", "color": "blue"}, - "QA_Agent": {"name": "测试工程师", "avatar": "✅", "color": "green"}, - "Dev_Agent": {"name": "开发工程师", "avatar": "💻", "color": "orange"}, - "Orchestrator": {"name": "协调器", "avatar": "🎯", "color": "purple"}, - "User_Proxy": {"name": "用户代理", "avatar": "👤", "color": "gray"} + "PM_Agent": {"name": "产品经理", "avatar": "📋", "color": "blue", "desc": "需求分析与 SRS 生成"}, + "QA_Agent": {"name": "测试工程师", "avatar": "✅", "color": "green", "desc": "测试用例设计"}, + "Dev_Agent": {"name": "开发工程师", "avatar": "💻", "color": "orange", "desc": "代码实现"}, + "Orchestrator": {"name": "协调器", "avatar": "🎯", "color": "purple", "desc": "流程协调与验证"}, + "User_Proxy": {"name": "用户代理", "avatar": "👤", "color": "gray", "desc": "测试执行"} } def init_state(): @@ -46,6 +49,14 @@ def init_state(): st.session_state.current_agent = None if "agent_counts" not in st.session_state: st.session_state.agent_counts = {k: 0 for k in AGENTS} + if "agent_status" not in st.session_state: + st.session_state.agent_status = {k: "⚪ 等待中" for k in AGENTS} + if "current_task" not in st.session_state: + st.session_state.current_task = {k: "" for k in AGENTS} + if "saved_files" not in st.session_state: + st.session_state.saved_files = [] + if "conversation_history" not in st.session_state: + st.session_state.conversation_history = [] def add_message(agent, content, task=""): """添加消息""" @@ -58,14 +69,29 @@ def add_message(agent, content, task=""): st.session_state.messages.append(msg) st.session_state.agent_counts[agent] = st.session_state.agent_counts.get(agent, 0) + 1 st.session_state.current_agent = agent + + # 更新 Agent 状态 + for ag in AGENTS: + if ag == agent: + st.session_state.agent_status[ag] = "🟢 发言中" + st.session_state.current_task[ag] = task + else: + st.session_state.agent_status[ag] = "⚪ 等待中" + + # 添加到对话历史(用于展示完整的对话流) + st.session_state.conversation_history.append(msg) def show_agent_status(): """显示 Agent 状态""" + st.subheader("🎯 Agent 实时状态") cols = st.columns(len(AGENTS)) + for i, (agent_key, info) in enumerate(AGENTS.items()): with cols[i]: is_active = st.session_state.current_agent == agent_key count = st.session_state.agent_counts.get(agent_key, 0) + status = st.session_state.agent_status.get(agent_key, "⚪ 等待中") + current_task = st.session_state.current_task.get(agent_key, "") border_color = info["color"] bg_color = "#e8f5e9" if is_active else "white" @@ -77,15 +103,18 @@ def show_agent_status(): border: 3px {"solid" if is_active else "dashed"} {border_color}; background: {bg_color}; text-align: center; + box-shadow: 0 2px 4px rgba(0,0,0,0.1); '>
{info["avatar"]}
-
{info["name"]}
-
- {"🟢 发言中" if is_active else "⚪ 等待中"} +
{info["name"]}
+
{info["desc"]}
+
+ {status}
-
+
💬 {count} 条消息
+ {f"
📋 {current_task}
" if current_task else ""}
""", unsafe_allow_html=True) @@ -93,17 +122,41 @@ def show_chat(): """显示对话流""" st.subheader("💬 Agent 对话流") - if not st.session_state.messages: + if not st.session_state.conversation_history: st.info("👈 暂无对话,请在下方输入需求并启动") return - for msg in st.session_state.messages: - agent = msg["agent"] - info = AGENTS.get(agent, {"name": "未知", "avatar": "🤖", "color": "gray"}) - - with st.chat_message(agent.lower(), avatar=info["avatar"]): - st.markdown(f"**{info['name']}** *{msg['time']}* - {msg['task']}") - st.markdown(msg["content"][:800] + ("..." if len(msg["content"]) > 800 else "")) + # 使用容器展示对话,支持滚动 + chat_container = st.container() + + with chat_container: + for i, msg in enumerate(st.session_state.conversation_history): + agent = msg["agent"] + info = AGENTS.get(agent, {"name": "未知", "avatar": "🤖", "color": "gray"}) + + # 创建对话气泡 + with st.chat_message(agent.lower(), avatar=info["avatar"]): + # 显示 Agent 名称、时间和任务 + task_info = f"- {msg['task']}" if msg['task'] else "" + st.markdown(f"**{info['name']}** *{msg['time']}* {task_info}") + + # 处理长内容,提供折叠选项 + content = msg["content"] + if len(content) > 1000: + # 短预览 + 展开详情 + preview = content[:800] + "..." + st.markdown(preview) + with st.expander("查看完整内容"): + st.markdown(content) + else: + st.markdown(content) + + # 如果是代码内容,提供语法高亮 + if "```" in content: + code_blocks = re.findall(r'```(\w+)?\n(.*?)```', content, re.DOTALL) + for lang, code in code_blocks: + language = lang if lang else "python" + st.code(code, language=language) def extract_code(content): """从 Markdown 代码块中提取纯代码""" @@ -123,6 +176,40 @@ def extract_code(content): # 没有标记,返回原内容 return content +def is_code_complete(code): + """检查代码是否完整""" + if not code or not code.strip(): + return False + + # 检查是否有未闭合的括号 + if code.count('(') != code.count(')'): + print(f"⚠️ 括号不匹配:( {code.count('(')} vs ) {code.count(')')}") + return False + if code.count('[') != code.count(']'): + print(f"⚠️ 方括号不匹配:[ {code.count('[')} vs ] {code.count(']')}") + return False + if code.count('{') != code.count('}'): + print(f"⚠️ 花括号不匹配:{{ {code.count('{')} vs }} {code.count('}')}") + return False + + # 检查是否以完整的方式结束(不是突然截断) + lines = code.strip().split('\n') + if lines: + last_line = lines[-1].rstrip() + # 如果最后一行以这些符号结尾,说明可能被截断了 + if last_line.endswith(':') or last_line.endswith('\\') or last_line.endswith('(') or last_line.endswith('['): + print(f"⚠️ 代码可能截断:最后一行是 '{last_line}'") + return False + + # 尝试简单的语法检查 + try: + compile(code, '', 'exec') + except SyntaxError as e: + print(f"⚠️ 语法错误:{e}") + return False + + return True + def save_files(): """保存生成的文件到 workspace/""" workspace = Path("workspace") @@ -130,37 +217,70 @@ def save_files(): files = [] - # 遍历消息,提取并保存文件 + # 1. 保存 PM Agent 的 SRS 文档 for msg in st.session_state.messages: - agent = msg["agent"] - content = msg["content"] - - # PM Agent 生成 SRS - if agent == "PM_Agent" and ("需求" in content or "SRS" in content): + if msg["agent"] == "PM_Agent" and ("需求" in msg["content"] or "SRS" in msg["content"]): file = workspace / "SRS.md" with open(file, "w", encoding="utf-8") as f: f.write(f"# 软件需求规格说明书\n\n生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") - f.write(content) + f.write(msg["content"]) files.append(str(file)) - - # QA Agent 生成测试 - if agent == "QA_Agent" and ("test" in content.lower() or "测试" in content or "def test_" in content): + break # 只保存第一个 + + # 2. 保存 QA Agent 的测试代码 + for msg in st.session_state.messages: + if msg["agent"] == "QA_Agent" and ("test" in msg["content"].lower() or "测试" in msg["content"] or "def test_" in msg["content"]): file = workspace / "test_sample.py" - # 提取纯代码 - code = extract_code(content) + code = extract_code(msg["content"]) with open(file, "w", encoding="utf-8") as f: f.write(f"# 测试用例\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(code) files.append(str(file)) + break # 只保存第一个 + + # 3. 保存 Dev Agent 的代码 - 收集所有消息合并处理 + dev_messages = [msg for msg in st.session_state.messages if msg["agent"] == "Dev_Agent"] + + if dev_messages: + # 合并所有 Dev Agent 的消息内容 + all_dev_content = "\n\n".join([msg["content"] for msg in dev_messages]) - # Dev Agent 生成代码 - if agent == "Dev_Agent" and ("def " in content or "class " in content): + # 尝试提取带文件名的代码块 + import re + code_blocks = re.findall(r'```python\s*\n#(?:\s*File:|\s*filename:)?\s*([^\n]+)\n(.*?)```', all_dev_content, re.DOTALL) + + if code_blocks: + # 保存多个文件 + for filename, code_content in code_blocks: + filename = filename.strip() + file_path = workspace / filename + code = extract_code(f"```python\n{code_content}```") + + # 检查完整性 + print(f"\n🔍 检查文件 {filename}...") + if not is_code_complete(code): + print(f"⚠️ 警告:{filename} 代码可能不完整,但仍会保存") + else: + print(f"✅ {filename} 代码完整") + + with open(file_path, "w", encoding="utf-8") as f: + f.write(f"# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n{code}") + files.append(str(file_path)) + else: + # 没有文件名标记,合并所有代码保存为 src_sample.py + all_code = "\n\n".join([extract_code(msg["content"]) for msg in dev_messages]) file = workspace / "src_sample.py" - # 提取纯代码 - code = extract_code(content) + + # 检查完整性 + print(f"\n🔍 检查文件 src_sample.py...") + if not is_code_complete(all_code): + print(f"⚠️ 警告:src_sample.py 代码可能不完整,但仍会保存") + else: + print(f"✅ src_sample.py 代码完整") + with open(file, "w", encoding="utf-8") as f: f.write(f"# 源代码\n# 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") - f.write(code) + f.write(all_code) files.append(str(file)) return files diff --git a/usage_examples.py b/usage_examples.py index 2b73593..2ad4014 100644 --- a/usage_examples.py +++ b/usage_examples.py @@ -12,7 +12,7 @@ from autogen_sdls_system import AutoGenSDLCSystem def example_1_basic_workflow(): - """示例 1: 基本工作流 - 电池健康预测 API""" + """示例 1: 基本工作流 - 自动驾驶车辆检测 API""" print("\n" + "=" * 70) print("示例 1: 基本 SDLC 工作流") print("=" * 70) @@ -25,15 +25,18 @@ def example_1_basic_workflow(): system = AutoGenSDLCSystem(api_key=api_key) requirement = """ -我需要一个电池健康状态 (SOH) 预测 API,要求: -1. 接收电池的电压、电流、温度数据 -2. 输出健康度百分比(0-100%) -3. 符合汽车行业标准 -4. 包含错误处理和边界情况检测 +我需要一个自动驾驶目标检测与跟踪 API,要求: +1. 接收摄像头图像和激光雷达点云数据 +2. 实现车辆、行人、骑行者的实时检测 +3. 输出目标的 3D 边界框(位置、尺寸、朝向) +4. 支持多目标跟踪(MOT),输出轨迹信息 +5. 符合车规级功能安全标准(ISO 26262 ASIL-B) +6. 响应时间 < 50ms,检测精度 mAP > 85% +7. 包含异常处理(传感器失效、恶劣天气等) """ print(f"\n📋 需求:{requirement}") - result = system.run_workflow(requirement, max_round=15) + result = system.run_workflow(requirement, max_round=20) if result["success"]: print("\n✅ 工作流完成!") @@ -92,7 +95,7 @@ def example_3_direct_agent_call(): pm_agent = ProductManagerAgent(llm_config=llm_config) # 简单需求 - requirement = "开发一个车载蓝牙连接管理模块" + requirement = "开发一个车载摄像头图像识别模块,支持车道线检测和交通标志识别" print(f"\n📋 需求:{requirement}") print("\n🤖 PM Agent 正在生成 SRS...\n") @@ -126,13 +129,16 @@ def example_4_test_generation(): # 简单的 SRS 片段 srs_sample = """ 【功能性需求】 -FR-001: 系统应能接收电池电压数据(范围 0-5V) -FR-002: 系统应能计算 SOH 百分比(范围 0-100%) -FR-003: 系统应能检测异常数据并报错 +FR-001: 系统应能接收摄像头 RGB 图像(分辨率 1920x1080) +FR-002: 系统应能检测车辆、行人、骑行者三类目标 +FR-003: 系统应能输出 3D 边界框(x,y,z,long,width,height,yaw) +FR-004: 系统应能处理夜间和雨雪天气场景 【非功能性需求】 -NFR-001: 响应时间 < 100ms -NFR-002: 符合 MISRA-C 规范 +NFR-001: 检测延迟 < 50ms +NFR-002: 检测精度 mAP@0.5 > 85% +NFR-003: 符合 ISO 26262 ASIL-B 功能安全标准 +NFR-004: 支持 -40°C~85°C 工作温度范围 """ print("\n📋 SRS 样本:") @@ -167,15 +173,25 @@ def example_5_code_generation(): # SRS 和测试样本 srs_sample = """ -FR-001: 实现 SOH 计算函数 -FR-002: 输入验证(电压 0-5V,温度 -20~60°C) -FR-003: 输出限制在 0-100% +FR-001: 实现点云预处理函数(去噪、下采样、坐标变换) +FR-002: 实现 3D 目标检测函数(基于 PointPillars 或 VoxelNet) +FR-003: 输入验证(点云格式 Nx5,包含 x,y,z,intensity,timestamp) +FR-004: 输出限制(边界框参数归一化,置信度 0-1) """ test_sample = """ -def test_soh_calculation(): - assert calculate_soh(3.7, 2.0, 25)["soh"] > 0 - assert calculate_soh(0, 0, 25)["soh"] == 0 +def test_pointcloud_preprocessing(): + raw_points = np.random.rand(10000, 5).astype(np.float32) + processed = preprocess_pointcloud(raw_points) + assert processed.shape[0] < raw_points.shape[0] # 下采样后点数减少 + assert not np.isnan(processed).any() # 无 NaN + +def test_3d_detection(): + points = np.random.rand(5000, 5).astype(np.float32) + boxes, scores = detect_objects_3d(points) + assert len(boxes) == len(scores) + assert all(0 <= s <= 1 for s in scores) + assert boxes.shape[1] == 7 # (x,y,z,l,w,h,yaw) """ print("\n📋 需求和测试样本已准备") @@ -189,35 +205,52 @@ def test_soh_calculation(): print(f"\n❌ 生成失败:{e}") -def run_all_examples(): - """运行所有示例""" +def example_6_realtime_file_saving(): + """示例 6: 实时文件保存演示""" print("\n" + "=" * 70) - print("🚀 AutoGen SDLC 系统 - 使用示例合集") + print("示例 6: 实时文件保存 - 生成的文件会立即创建") print("=" * 70) - examples = [ - ("直接调用 PM Agent", example_3_direct_agent_call), - ("QA Agent 生成测试", example_4_test_generation), - ("Dev Agent 生成代码", example_5_code_generation), - ("自定义 Agent 配置", example_2_custom_agents), - ("完整工作流", example_1_basic_workflow), - ] + api_key = os.getenv("DASHSCOPE_API_KEY") + if not api_key: + print("❌ 请设置 DASHSCOPE_API_KEY 环境变量") + return - for name, func in examples: - print(f"\n\n{'='*70}") - print(f"▶️ 运行示例:{name}") - print('='*70) - - try: - func() - except Exception as e: - print(f"\n❌ 示例执行失败:{e}") - - input("\n按 Enter 键继续下一个示例...") + system = AutoGenSDLCSystem(api_key=api_key, workspace_dir="workspace_realtime_demo") - print("\n" + "=" * 70) - print("✅ 所有示例运行完成!") - print("=" * 70) + requirement = """ +我需要一个简单的 Python 工具函数库,要求: +1. 包含一个计算阶乘的函数 factorial(n) +2. 包含一个判断质数的函数 is_prime(n) +3. 包含一个生成斐波那契数列的函数 fibonacci(n) +4. 每个函数都要有完整的文档字符串和类型注解 +5. 符合 PEP8 规范 +""" + + print(f"\n📋 需求:{requirement}") + print("\n💡 提示:在对话过程中,您会看到文件被实时保存到 workspace_realtime_demo 目录\n") + + result = system.run_workflow(requirement, max_round=15) + + if result["success"]: + print("\n✅ 工作流完成!") + print(f"📄 摘要:{result['summary'][:300]}...") + + # 显示生成的文件 + print(f"\n📁 已保存 {len(result['saved_files'])} 个文件:") + for file in result["saved_files"]: + print(f" ✓ {file}") + + # 验证文件是否真的存在 + print("\n🔍 验证文件是否存在...") + import os + for file in result["saved_files"]: + if os.path.exists(file): + print(f" ✅ {file} (存在)") + else: + print(f" ❌ {file} (不存在)") + else: + print(f"\n❌ 工作流失败:{result.get('error')}") if __name__ == "__main__": @@ -225,6 +258,7 @@ if __name__ == "__main__": # example_3_direct_agent_call() # example_4_test_generation() # example_5_code_generation() + # example_6_realtime_file_saving() # 新增:实时文件保存演示 # 运行所有示例 - run_all_examples() + example_1_basic_workflow()