第一次提交

This commit is contained in:
2026-03-12 13:27:03 +08:00
commit 27fba7a7cc
26 changed files with 6323 additions and 0 deletions

16
agents/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
"""Agent 模块初始化文件"""
from .pm_agent import ProductManagerAgent, create_pm_agent
from .qa_agent import QAAgent, create_qa_agent
from .dev_agent import DevAgent, create_dev_agent
from .orchestrator import OrchestratorAgent, create_orchestrator_agent
__all__ = [
"ProductManagerAgent",
"create_pm_agent",
"QAAgent",
"create_qa_agent",
"DevAgent",
"create_dev_agent",
"OrchestratorAgent",
"create_orchestrator_agent"
]

256
agents/dev_agent.py Normal file
View File

@@ -0,0 +1,256 @@
"""
Dev Agent - 开发工程师智能体
负责代码实现和测试驱动开发
"""
from autogen import AssistantAgent
from typing import Dict, Any, Optional, List
import os
from pathlib import Path
from config.llm_config import get_agent_llm_config, DEV_PROMPT
class DevAgent:
"""开发工程师 Agent负责编写高质量代码"""
def __init__(self, llm_config: Optional[Dict] = None):
"""
初始化 Dev Agent
Args:
llm_config: LLM 配置
"""
self.llm_config = llm_config or get_agent_llm_config("Dev_Agent")
self.agent = AssistantAgent(
name="Dev_Agent",
system_message=DEV_PROMPT,
llm_config=self.llm_config,
description="资深软件工程师,专注于汽车嵌入式 C++/Python 开发",
human_input_mode="NEVER"
)
self.workspace_dir = Path("workspace")
self.workspace_dir.mkdir(exist_ok=True)
# 代码历史用于迭代修复
self.code_history: List[str] = []
def generate_code(self, srs_content: str, test_code: str) -> str:
"""
根据 SRS 和测试用例生成实现代码
Args:
srs_content: SRS 文档内容
test_code: 测试代码
Returns:
生成的源代码内容
"""
prompt = f"""
请根据以下 SRS 和测试用例编写实现代码:
【SRS 需求】
{self._truncate(srs_content, 2500)}
【测试用例】
{self._truncate(test_code, 2000)}
请编写:
1. 完整的实现代码
2. 符合 MISRA-C/PEP8 规范
3. 包含详细的 docstring 和类型注解
4. 确保所有测试用例通过
输出 Python 代码,保存为 src_battery_health.py。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
code_content = response if isinstance(response, str) else str(response)
# 提取代码块(如果包含 Markdown 标记)
code_content = self._extract_code(code_content)
# 保存到文件
code_file = self.workspace_dir / "src_battery_health.py"
with open(code_file, 'w', encoding='utf-8') as f:
f.write(code_content)
self.code_history.append(code_content)
print(f"✅ 源代码已生成:{code_file}")
return code_content
def fix_code(self, srs_content: str, test_code: str, error_log: str) -> str:
"""
根据测试错误修复代码
Args:
srs_content: SRS 文档
test_code: 测试代码
error_log: 测试失败日志
Returns:
修复后的代码
"""
previous_code = self.code_history[-1] if self.code_history else ""
prompt = f"""
代码测试失败,请根据错误日志修复代码:
【SRS 需求】
{self._truncate(srs_content, 1500)}
【测试代码】
{self._truncate(test_code, 1500)}
【之前的代码】
{self._truncate(previous_code, 2000)}
【错误日志】
{self._truncate(error_log, 1500)}
请分析错误原因并修复代码,确保:
1. 所有测试用例通过
2. 保持代码质量
3. 不引入新的问题
输出完整的修复后代码。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
fixed_code = response if isinstance(response, str) else str(response)
fixed_code = self._extract_code(fixed_code)
# 更新文件
code_file = self.workspace_dir / "src_battery_health.py"
with open(code_file, 'w', encoding='utf-8') as f:
f.write(fixed_code)
self.code_history.append(fixed_code)
print(f"✅ 代码已修复:{code_file}")
return fixed_code
def optimize_code(self, code: str, optimization_goal: str) -> str:
"""
优化现有代码
Args:
code: 原始代码
optimization_goal: 优化目标(性能、可读性等)
Returns:
优化后的代码
"""
prompt = f"""
请优化以下代码:
【原始代码】
{code}
【优化目标】
{optimization_goal}
请保持功能不变,提升代码质量。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
optimized_code = response if isinstance(response, str) else str(response)
optimized_code = self._extract_code(optimized_code)
return optimized_code
def add_documentation(self, code: str) -> str:
"""
为代码添加完整文档
Args:
code: 代码
Returns:
带完整文档的代码
"""
prompt = f"""
请为以下代码添加完整的文档:
【代码】
{code}
请添加:
1. 模块级 docstring
2. 类 docstring
3. 函数 docstring包含参数说明和返回值
4. 类型注解
5. 关键步骤的注释
保持代码功能不变。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
documented_code = response if isinstance(response, str) else str(response)
documented_code = self._extract_code(documented_code)
return documented_code
def _extract_code(self, text: str) -> str:
"""从文本中提取代码(移除 Markdown 标记)"""
lines = text.split('\n')
code_lines = []
in_code_block = False
for line in lines:
if line.strip().startswith('```python'):
in_code_block = True
continue
elif line.strip().startswith('```'):
in_code_block = False
continue
if in_code_block or not any(line.strip().startswith(x) for x in ['```']):
code_lines.append(line)
# 如果没有找到代码块,返回原文
if not code_lines:
return text
return '\n'.join(code_lines)
def _truncate(self, text: str, max_length: int) -> str:
"""截断文本"""
if len(text) <= max_length:
return text
return text[:max_length] + "... [内容已截断]"
def create_dev_agent(llm_config: Optional[Dict] = None) -> AssistantAgent:
"""
创建 Dev AgentAutoGen 原生格式)
Args:
llm_config: LLM 配置
Returns:
AutoGen AssistantAgent 实例
"""
config = llm_config or get_agent_llm_config("Dev_Agent")
agent = AssistantAgent(
name="Dev_Agent",
system_message=DEV_PROMPT,
llm_config=config,
description="资深软件工程师",
human_input_mode="NEVER"
)
return agent

292
agents/orchestrator.py Normal file
View File

@@ -0,0 +1,292 @@
"""
Orchestrator Agent - 协调器智能体
负责流程调度与最终验证
"""
from autogen import AssistantAgent
from typing import Dict, Any, Optional, List
import os
from pathlib import Path
from config.llm_config import get_agent_llm_config, ORCH_PROMPT
class OrchestratorAgent:
"""协调器 Agent负责多智能体协同和流程控制"""
def __init__(self, llm_config: Optional[Dict] = None):
"""
初始化 Orchestrator Agent
Args:
llm_config: LLM 配置
"""
self.llm_config = llm_config or get_agent_llm_config("Orchestrator")
self.agent = AssistantAgent(
name="Orchestrator",
system_message=ORCH_PROMPT,
llm_config=self.llm_config,
description="多智能体系统协调器",
human_input_mode="NEVER"
)
self.workspace_dir = Path("workspace")
self.workspace_dir.mkdir(exist_ok=True)
# 流程状态
self.workflow_state: Dict[str, Any] = {
"current_step": 0,
"total_steps": 5,
"status": "pending",
"artifacts": {}
}
def start_workflow(self, user_requirement: str) -> Dict[str, Any]:
"""
启动完整的工作流程
Args:
user_requirement: 用户需求
Returns:
工作流状态
"""
self.workflow_state = {
"current_step": 1,
"total_steps": 5,
"status": "in_progress",
"user_requirement": user_requirement,
"artifacts": {}
}
return self.workflow_state
def validate_srs(self, srs_content: str) -> Dict[str, Any]:
"""
验证 SRS 文档的完整性
Args:
srs_content: SRS 文档内容
Returns:
验证结果
"""
prompt = f"""
请验证以下 SRS 文档的完整性:
{self._truncate(srs_content, 3000)}
检查清单:
1. ✅ 包含功能性需求列表
2. ✅ 包含非功能性需求
3. ✅ 包含验收标准
4. ✅ 包含风险分析
5. ✅ 需求具有唯一 ID
请输出验证报告,指出缺失或不完整的部分。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
validation_report = response if isinstance(response, str) else str(response)
# 保存验证报告
report_file = self.workspace_dir / "srs_validation.md"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(validation_report)
return {
"valid": "" in validation_report and "" not in validation_report,
"report": validation_report,
"file": str(report_file)
}
def validate_tests(self, test_code: str, srs_content: str) -> Dict[str, Any]:
"""
验证测试用例的覆盖率
Args:
test_code: 测试代码
srs_content: SRS 文档
Returns:
验证结果
"""
prompt = f"""
请验证测试用例是否覆盖了所有 SRS 需求:
【SRS 需求】
{self._truncate(srs_content, 2000)}
【测试代码】
{self._truncate(test_code, 2000)}
检查清单:
1. ✅ 每个功能需求都有对应测试
2. ✅ 包含边界情况测试
3. ✅ 包含异常场景测试
4. ✅ 测试可执行且独立
5. ✅ 遵循 TDD 原则
请输出验证报告。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
validation_report = response if isinstance(response, str) else str(response)
return {
"valid": "" in validation_report,
"report": validation_report
}
def validate_code(self, code: str, srs_content: str, test_result: Dict) -> Dict[str, Any]:
"""
验证代码质量
Args:
code: 源代码
srs_content: SRS 文档
test_result: 测试结果
Returns:
验证结果
"""
prompt = f"""
请验证代码质量:
【SRS 需求】
{self._truncate(srs_content, 1500)}
【代码】
{self._truncate(code, 2000)}
【测试结果】
{test_result}
检查清单:
1. ✅ 实现所有功能需求
2. ✅ 通过所有测试用例
3. ✅ 代码符合规范MISRA-C/PEP8
4. ✅ 包含完整文档
5. ✅ 无安全漏洞
6. ✅ 性能满足要求
请输出代码质量验证报告。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
validation_report = response if isinstance(response, str) else str(response)
return {
"valid": test_result.get("success", False) and "" in validation_report,
"report": validation_report
}
def generate_final_report(self) -> str:
"""
生成最终项目总结报告
Returns:
最终报告内容
"""
prompt = """
请生成项目最终总结报告,包含:
1. 项目概述
2. 交付物清单:
- SRS 文档
- 测试用例
- 源代码
3. 质量指标:
- 测试覆盖率
- 代码质量评分
4. 合规性说明:
- ISO 26262
- MISRA-C
- ASPICE
5. 后续建议
请基于 workspace 目录下的所有文件生成完整报告。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
final_report = response if isinstance(response, str) else str(response)
# 保存最终报告
report_file = self.workspace_dir / "FINAL_REPORT.md"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(final_report)
print(f"✅ 最终报告已生成:{report_file}")
return final_report
def request_human_approval(
self,
approval_type: str,
description: str,
data: Dict[str, Any]
) -> bool:
"""
请求人工确认(需要前端配合实现)
Args:
approval_type: 确认类型
description: 确认描述
data: 相关数据
Returns:
用户是否批准
"""
# 这里只是标记,实际的前端交互由 Streamlit 处理
print(f"\n⚠️ 需要人工确认:{description}")
print(f"类型:{approval_type}")
print(f"数据:{data}\n")
# 在命令行模式下,可以简单询问用户
# 在 GUI 模式下,这会触发前端弹窗
try:
response = input("是否批准?(y/n): ").lower()
return response == 'y'
except:
# 非交互模式下默认批准
return True
def _truncate(self, text: str, max_length: int) -> str:
"""截断文本"""
if len(text) <= max_length:
return text
return text[:max_length] + "... [内容已截断]"
def create_orchestrator_agent(llm_config: Optional[Dict] = None) -> AssistantAgent:
"""
创建 Orchestrator AgentAutoGen 原生格式)
Args:
llm_config: LLM 配置
Returns:
AutoGen AssistantAgent 实例
"""
config = llm_config or get_agent_llm_config("Orchestrator")
agent = AssistantAgent(
name="Orchestrator",
system_message=ORCH_PROMPT,
llm_config=config,
description="多智能体系统协调器",
human_input_mode="NEVER"
)
return agent

139
agents/pm_agent.py Normal file
View File

@@ -0,0 +1,139 @@
"""
PM Agent - 产品经理智能体
负责需求消歧与规格生成
"""
from autogen import AssistantAgent
from typing import Dict, Any, Optional
import os
from pathlib import Path
from config.llm_config import get_agent_llm_config, PM_PROMPT
class ProductManagerAgent:
"""产品经理 Agent负责生成软件需求规格说明书"""
def __init__(self, llm_config: Optional[Dict] = None):
"""
初始化 PM Agent
Args:
llm_config: LLM 配置,为 None 时使用默认配置
"""
self.llm_config = llm_config or get_agent_llm_config("PM_Agent")
# 创建 AutoGen AssistantAgent
self.agent = AssistantAgent(
name="PM_Agent",
system_message=PM_PROMPT,
llm_config=self.llm_config,
description="资深软件产品经理,专注于汽车嵌入式系统领域",
human_input_mode="NEVER" # 全自动模式
)
self.workspace_dir = Path("workspace")
self.workspace_dir.mkdir(exist_ok=True)
def generate_srs(self, user_requirement: str) -> str:
"""
根据用户需求生成 SRS 文档
Args:
user_requirement: 用户输入的原始需求
Returns:
生成的 SRS 文档内容
"""
prompt = f"""
请根据以下用户需求生成完整的《软件需求规格说明书 (SRS)》:
用户需求:{user_requirement}
请确保输出包含:
1. 文档标题和版本信息
2. 功能性需求列表FR-001, FR-002...
3. 非功能性需求NFR-001, NFR-002...
4. 验收标准AC-001, AC-002...
5. 潜在风险与边缘情况
请以 Markdown 格式输出完整文档。
"""
# 调用 Agent 生成 SRS
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
srs_content = response if isinstance(response, str) else str(response)
# 保存到文件
srs_file = self.workspace_dir / "SRS.md"
with open(srs_file, 'w', encoding='utf-8') as f:
f.write(srs_content)
print(f"✅ SRS 文档已生成:{srs_file}")
return srs_content
def refine_requirements(
self,
original_srs: str,
feedback: str
) -> str:
"""
根据反馈优化需求
Args:
original_srs: 原始 SRS 文档
feedback: 反馈意见
Returns:
优化后的 SRS 文档
"""
prompt = f"""
请根据以下反馈优化现有的 SRS 文档:
原始 SRS:
{original_srs[:2000]}... # 限制长度避免超出上下文
反馈意见:
{feedback}
请输出优化后的完整 SRS 文档。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
refined_srs = response if isinstance(response, str) else str(response)
# 更新文件
srs_file = self.workspace_dir / "SRS.md"
with open(srs_file, 'w', encoding='utf-8') as f:
f.write(refined_srs)
print(f"✅ SRS 文档已更新:{srs_file}")
return refined_srs
def create_pm_agent(llm_config: Optional[Dict] = None) -> AssistantAgent:
"""
创建 PM AgentAutoGen 原生格式)
Args:
llm_config: LLM 配置
Returns:
AutoGen AssistantAgent 实例
"""
config = llm_config or get_agent_llm_config("PM_Agent")
agent = AssistantAgent(
name="PM_Agent",
system_message=PM_PROMPT,
llm_config=config,
description="资深软件产品经理",
human_input_mode="NEVER"
)
return agent

221
agents/qa_agent.py Normal file
View File

@@ -0,0 +1,221 @@
"""
QA Agent - 测试工程师智能体
负责测试用例创建和 TDD 实践
"""
from autogen import AssistantAgent
from typing import Dict, Any, Optional
import os
from pathlib import Path
from config.llm_config import get_agent_llm_config, QA_PROMPT
class QAAgent:
"""测试工程师 Agent负责生成测试用例和测试脚本"""
def __init__(self, llm_config: Optional[Dict] = None):
"""
初始化 QA Agent
Args:
llm_config: LLM 配置
"""
self.llm_config = llm_config or get_agent_llm_config("QA_Agent")
self.agent = AssistantAgent(
name="QA_Agent",
system_message=QA_PROMPT,
llm_config=self.llm_config,
description="资深测试工程师,专注于自动化测试和 TDD 实践",
human_input_mode="NEVER"
)
self.workspace_dir = Path("workspace")
self.workspace_dir.mkdir(exist_ok=True)
def generate_test_cases(self, srs_content: str) -> str:
"""
根据 SRS 生成测试用例
Args:
srs_content: SRS 文档内容
Returns:
生成的测试用例内容
"""
prompt = f"""
请根据以下 SRS 文档生成完整的测试用例:
{self._truncate(srs_content, 3000)}
请生成:
1. Pytest 测试脚本(包含完整的测试函数)
2. BDD 风格的测试场景描述
3. 测试数据准备
4. 预期结果验证
确保遵循 TDD 原则,测试先于代码存在。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
test_content = response if isinstance(response, str) else str(response)
# 保存测试文件
test_file = self.workspace_dir / "test_battery_health.py"
with open(test_file, 'w', encoding='utf-8') as f:
f.write(test_content)
print(f"✅ 测试用例已生成:{test_file}")
return test_content
def create_bdd_scenarios(self, srs_content: str) -> str:
"""
创建 BDD 风格的测试场景
Args:
srs_content: SRS 文档内容
Returns:
BDD 测试场景描述
"""
prompt = f"""
请根据 SRS 创建 BDD (Behavior-Driven Development) 测试场景:
{self._truncate(srs_content, 2000)}
请使用 Given-When-Then 格式描述每个测试场景:
- Feature: 功能描述
- Scenario: 场景描述
- Given: 前置条件
- When: 操作
- Then: 预期结果
输出为 Markdown 格式。
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
bdd_content = response if isinstance(response, str) else str(response)
# 保存 BDD 场景文件
bdd_file = self.workspace_dir / "bdd_scenarios.md"
with open(bdd_file, 'w', encoding='utf-8') as f:
f.write(bdd_content)
print(f"✅ BDD 场景已生成:{bdd_file}")
return bdd_content
def analyze_test_coverage(self, test_code: str, srs_content: str) -> Dict[str, Any]:
"""
分析测试覆盖率
Args:
test_code: 测试代码
srs_content: SRS 文档
Returns:
覆盖率分析报告
"""
prompt = f"""
请分析以下测试代码对 SRS 需求的覆盖情况:
SRS 需求:
{self._truncate(srs_content, 1500)}
测试代码:
{self._truncate(test_code, 2000)}
请输出:
1. 已覆盖的需求列表
2. 未覆盖的需求列表
3. 覆盖率百分比
4. 改进建议
"""
response = self.agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
coverage_report = response if isinstance(response, str) else str(response)
# 保存报告
report_file = self.workspace_dir / "coverage_report.md"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(coverage_report)
return {"report": coverage_report, "file": str(report_file)}
def _truncate(self, text: str, max_length: int) -> str:
"""截断文本以避免超出上下文限制"""
if len(text) <= max_length:
return text
return text[:max_length] + "... [内容已截断]"
def run_tests(self, test_file_pattern: str = "test_*.py") -> Dict[str, Any]:
"""
执行测试(需要实际运行 pytest
Args:
test_file_pattern: 测试文件模式
Returns:
测试结果字典
"""
import subprocess
import sys
try:
# 使用 pytest 执行测试
result = subprocess.run(
[sys.executable, "-m", "pytest",
str(self.workspace_dir / test_file_pattern),
"-v", "--tb=short"],
capture_output=True,
text=True,
timeout=60
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": "测试执行超时"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def create_qa_agent(llm_config: Optional[Dict] = None) -> AssistantAgent:
"""
创建 QA AgentAutoGen 原生格式)
Args:
llm_config: LLM 配置
Returns:
AutoGen AssistantAgent 实例
"""
config = llm_config or get_agent_llm_config("QA_Agent")
agent = AssistantAgent(
name="QA_Agent",
system_message=QA_PROMPT,
llm_config=config,
description="资深测试工程师",
human_input_mode="NEVER"
)
return agent