265 lines
8.6 KiB
Python
265 lines
8.6 KiB
Python
"""
|
||
使用示例 - 展示如何使用 AutoGen SDLC 系统
|
||
包含多个实际场景的演示
|
||
"""
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
|
||
from autogen_sdls_system import AutoGenSDLCSystem
|
||
|
||
|
||
def example_1_basic_workflow():
|
||
"""示例 1: 基本工作流 - 自动驾驶车辆检测 API"""
|
||
print("\n" + "=" * 70)
|
||
print("示例 1: 基本 SDLC 工作流")
|
||
print("=" * 70)
|
||
|
||
api_key = os.getenv("DASHSCOPE_API_KEY")
|
||
if not api_key:
|
||
print("❌ 请设置 DASHSCOPE_API_KEY 环境变量")
|
||
return
|
||
|
||
system = AutoGenSDLCSystem(api_key=api_key)
|
||
|
||
requirement = """
|
||
我需要一个自动驾驶目标检测与跟踪 API,要求:
|
||
1. 接收摄像头图像和激光雷达点云数据
|
||
2. 实现车辆、行人、骑行者的实时检测
|
||
3. 输出目标的 3D 边界框(位置、尺寸、朝向)
|
||
4. 支持多目标跟踪(MOT),输出轨迹信息
|
||
5. 符合车规级功能安全标准(ISO 26262 ASIL-B)
|
||
6. 响应时间 < 50ms,检测精度 mAP > 85%
|
||
7. 包含异常处理(传感器失效、恶劣天气等)
|
||
"""
|
||
|
||
print(f"\n📋 需求:{requirement}")
|
||
result = system.run_workflow(requirement, max_round=20)
|
||
|
||
if result["success"]:
|
||
print("\n✅ 工作流完成!")
|
||
print(f"📄 摘要:{result['summary'][:300]}...")
|
||
else:
|
||
print(f"\n❌ 工作流失败:{result.get('error')}")
|
||
|
||
|
||
def example_2_custom_agents():
|
||
"""示例 2: 自定义 Agent 配置"""
|
||
print("\n" + "=" * 70)
|
||
print("示例 2: 自定义 Agent 配置")
|
||
print("=" * 70)
|
||
|
||
from config.llm_config import get_llm_config
|
||
from agents import create_pm_agent, create_qa_agent
|
||
|
||
api_key = os.getenv("DASHSCOPE_API_KEY")
|
||
if not api_key:
|
||
print("❌ 请设置 DASHSCOPE_API_KEY 环境变量")
|
||
return
|
||
|
||
# 使用不同的模型配置
|
||
llm_config = get_llm_config(
|
||
model="qwen3.5-flash",
|
||
api_key=api_key,
|
||
temperature=0.5, # 降低温度使输出更稳定
|
||
max_tokens=3000
|
||
)
|
||
|
||
# 创建自定义 Agent
|
||
pm_agent = create_pm_agent(llm_config=llm_config)
|
||
qa_agent = create_qa_agent(llm_config=llm_config)
|
||
|
||
print("✅ 已创建自定义配置的 Agent")
|
||
print(f" PM Agent: {pm_agent.name}")
|
||
print(f" QA Agent: {qa_agent.name}")
|
||
|
||
|
||
def example_3_direct_agent_call():
|
||
"""示例 3: 直接调用单个 Agent"""
|
||
print("\n" + "=" * 70)
|
||
print("示例 3: 直接调用 PM Agent 生成 SRS")
|
||
print("=" * 70)
|
||
|
||
from agents import ProductManagerAgent
|
||
from config.llm_config import get_agent_llm_config
|
||
|
||
api_key = os.getenv("DASHSCOPE_API_KEY")
|
||
if not api_key:
|
||
print("❌ 请设置 DASHSCOPE_API_KEY 环境变量")
|
||
return
|
||
|
||
# 创建 PM Agent
|
||
llm_config = get_agent_llm_config("PM_Agent", api_key=api_key)
|
||
pm_agent = ProductManagerAgent(llm_config=llm_config)
|
||
|
||
# 简单需求
|
||
requirement = "开发一个车载摄像头图像识别模块,支持车道线检测和交通标志识别"
|
||
|
||
print(f"\n📋 需求:{requirement}")
|
||
print("\n🤖 PM Agent 正在生成 SRS...\n")
|
||
|
||
try:
|
||
srs = pm_agent.generate_srs(requirement)
|
||
print("\n✅ SRS 生成成功!")
|
||
print(f"📄 前 500 字符预览:\n{srs[:500]}...")
|
||
except Exception as e:
|
||
print(f"\n❌ 生成失败:{e}")
|
||
|
||
|
||
def example_4_test_generation():
|
||
"""示例 4: QA Agent 生成测试用例"""
|
||
print("\n" + "=" * 70)
|
||
print("示例 4: QA Agent 生成测试用例")
|
||
print("=" * 70)
|
||
|
||
from agents import QAAgent
|
||
from config.llm_config import get_agent_llm_config
|
||
|
||
api_key = os.getenv("DASHSCOPE_API_KEY")
|
||
if not api_key:
|
||
print("❌ 请设置 DASHSCOPE_API_KEY 环境变量")
|
||
return
|
||
|
||
# 创建 QA Agent
|
||
llm_config = get_agent_llm_config("QA_Agent", api_key=api_key)
|
||
qa_agent = QAAgent(llm_config=llm_config)
|
||
|
||
# 简单的 SRS 片段
|
||
srs_sample = """
|
||
【功能性需求】
|
||
FR-001: 系统应能接收摄像头 RGB 图像(分辨率 1920x1080)
|
||
FR-002: 系统应能检测车辆、行人、骑行者三类目标
|
||
FR-003: 系统应能输出 3D 边界框(x,y,z,long,width,height,yaw)
|
||
FR-004: 系统应能处理夜间和雨雪天气场景
|
||
|
||
【非功能性需求】
|
||
NFR-001: 检测延迟 < 50ms
|
||
NFR-002: 检测精度 mAP@0.5 > 85%
|
||
NFR-003: 符合 ISO 26262 ASIL-B 功能安全标准
|
||
NFR-004: 支持 -40°C~85°C 工作温度范围
|
||
"""
|
||
|
||
print("\n📋 SRS 样本:")
|
||
print(srs_sample)
|
||
print("\n🤖 QA Agent 正在生成测试用例...\n")
|
||
|
||
try:
|
||
test_cases = qa_agent.generate_test_cases(srs_sample)
|
||
print("\n✅ 测试用例生成成功!")
|
||
print(f"📄 预览:\n{test_cases[:500]}...")
|
||
except Exception as e:
|
||
print(f"\n❌ 生成失败:{e}")
|
||
|
||
|
||
def example_5_code_generation():
|
||
"""示例 5: Dev Agent 生成代码"""
|
||
print("\n" + "=" * 70)
|
||
print("示例 5: Dev Agent 生成代码")
|
||
print("=" * 70)
|
||
|
||
from agents import DevAgent
|
||
from config.llm_config import get_agent_llm_config
|
||
|
||
api_key = os.getenv("DASHSCOPE_API_KEY")
|
||
if not api_key:
|
||
print("❌ 请设置 DASHSCOPE_API_KEY 环境变量")
|
||
return
|
||
|
||
# 创建 Dev Agent
|
||
llm_config = get_agent_llm_config("Dev_Agent", api_key=api_key)
|
||
dev_agent = DevAgent(llm_config=llm_config)
|
||
|
||
# SRS 和测试样本
|
||
srs_sample = """
|
||
FR-001: 实现点云预处理函数(去噪、下采样、坐标变换)
|
||
FR-002: 实现 3D 目标检测函数(基于 PointPillars 或 VoxelNet)
|
||
FR-003: 输入验证(点云格式 Nx5,包含 x,y,z,intensity,timestamp)
|
||
FR-004: 输出限制(边界框参数归一化,置信度 0-1)
|
||
"""
|
||
|
||
test_sample = """
|
||
def test_pointcloud_preprocessing():
|
||
raw_points = np.random.rand(10000, 5).astype(np.float32)
|
||
processed = preprocess_pointcloud(raw_points)
|
||
assert processed.shape[0] < raw_points.shape[0] # 下采样后点数减少
|
||
assert not np.isnan(processed).any() # 无 NaN
|
||
|
||
def test_3d_detection():
|
||
points = np.random.rand(5000, 5).astype(np.float32)
|
||
boxes, scores = detect_objects_3d(points)
|
||
assert len(boxes) == len(scores)
|
||
assert all(0 <= s <= 1 for s in scores)
|
||
assert boxes.shape[1] == 7 # (x,y,z,l,w,h,yaw)
|
||
"""
|
||
|
||
print("\n📋 需求和测试样本已准备")
|
||
print("\n🤖 Dev Agent 正在生成代码...\n")
|
||
|
||
try:
|
||
code = dev_agent.generate_code(srs_sample, test_sample)
|
||
print("\n✅ 代码生成成功!")
|
||
print(f"📄 代码预览:\n{code[:500]}...")
|
||
except Exception as e:
|
||
print(f"\n❌ 生成失败:{e}")
|
||
|
||
|
||
def example_6_realtime_file_saving():
|
||
"""示例 6: 实时文件保存演示"""
|
||
print("\n" + "=" * 70)
|
||
print("示例 6: 实时文件保存 - 生成的文件会立即创建")
|
||
print("=" * 70)
|
||
|
||
api_key = os.getenv("DASHSCOPE_API_KEY")
|
||
if not api_key:
|
||
print("❌ 请设置 DASHSCOPE_API_KEY 环境变量")
|
||
return
|
||
|
||
system = AutoGenSDLCSystem(api_key=api_key, workspace_dir="workspace_realtime_demo")
|
||
|
||
requirement = """
|
||
我需要一个简单的 Python 工具函数库,要求:
|
||
1. 包含一个计算阶乘的函数 factorial(n)
|
||
2. 包含一个判断质数的函数 is_prime(n)
|
||
3. 包含一个生成斐波那契数列的函数 fibonacci(n)
|
||
4. 每个函数都要有完整的文档字符串和类型注解
|
||
5. 符合 PEP8 规范
|
||
"""
|
||
|
||
print(f"\n📋 需求:{requirement}")
|
||
print("\n💡 提示:在对话过程中,您会看到文件被实时保存到 workspace_realtime_demo 目录\n")
|
||
|
||
result = system.run_workflow(requirement, max_round=15)
|
||
|
||
if result["success"]:
|
||
print("\n✅ 工作流完成!")
|
||
print(f"📄 摘要:{result['summary'][:300]}...")
|
||
|
||
# 显示生成的文件
|
||
print(f"\n📁 已保存 {len(result['saved_files'])} 个文件:")
|
||
for file in result["saved_files"]:
|
||
print(f" ✓ {file}")
|
||
|
||
# 验证文件是否真的存在
|
||
print("\n🔍 验证文件是否存在...")
|
||
import os
|
||
for file in result["saved_files"]:
|
||
if os.path.exists(file):
|
||
print(f" ✅ {file} (存在)")
|
||
else:
|
||
print(f" ❌ {file} (不存在)")
|
||
else:
|
||
print(f"\n❌ 工作流失败:{result.get('error')}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 可以选择运行单个示例或全部示例
|
||
# example_3_direct_agent_call()
|
||
# example_4_test_generation()
|
||
# example_5_code_generation()
|
||
# example_6_realtime_file_saving() # 新增:实时文件保存演示
|
||
|
||
# 运行所有示例
|
||
example_1_basic_workflow()
|