Files
autogen/autogen_self_healing_demo.py

385 lines
12 KiB
Python
Raw Normal View History

2026-03-12 13:27:03 +08:00
"""
AutoGen 自愈演示 - 展示测试失败自动修复测试通过的完整闭环
用于 Workshop 演示场景
"""
import os
import sys
from pathlib import Path
from typing import Dict, Any
import time
sys.path.insert(0, str(Path(__file__).parent))
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
from config.llm_config import get_llm_config, DEV_PROMPT, QA_PROMPT
class SelfHealingDemo:
"""自愈功能演示类"""
def __init__(self, api_key: str, model: str = "qwen3.5-flash"):
"""
初始化演示
Args:
api_key: API Key
model: 模型名称
"""
self.api_key = api_key
self.model = model
self.llm_config = get_llm_config(model=model, api_key=api_key)
self.workspace_dir = Path("workspace")
self.workspace_dir.mkdir(exist_ok=True)
print("=" * 70)
print("🔄 AutoGen 自愈功能演示")
print("=" * 70)
def create_intentional_bug(self) -> str:
"""创建一个有 bug 的代码示例"""
buggy_code = '''"""
电池健康状态 (SOH) 计算模块 - 包含故意引入的 bug
"""
from typing import Dict, Any
def calculate_soh(voltage: float, current: float, temperature: float) -> Dict[str, Any]:
"""
计算电池健康状态
Args:
voltage: 电压 (V)
current: 电流 (A)
temperature: 温度 (°C)
Returns:
包含 SOH 百分比的字典
"""
# BUG 1: 除零错误 - 没有检查 voltage 是否为 0
nominal_voltage = 3.7 # 标称电压
voltage_ratio = voltage / nominal_voltage # 可能除零
# BUG 2: 温度范围未验证
# 应该检查 temperature 是否在合理范围内 (-20 到 60)
temp_factor = 1.0 - (temperature - 25) * 0.005
# BUG 3: 电流方向未考虑(充电/放电)
# 充电和放电时的 SOH 计算应该不同
current_load_factor = 1.0 - abs(current) * 0.01
# 计算 SOH
soh = voltage_ratio * temp_factor * current_load_factor * 100
# BUG 4: 没有限制 SOH 在 0-100 范围内
return {"soh": soh, "status": "healthy"}
def validate_sensor_data(voltage: float, current: float, temperature: float) -> bool:
"""
验证传感器数据有效性
BUG 5: 逻辑错误 - 应该返回 False 当数据无效时
"""
# 错误的逻辑:数据超出范围时返回 True
if voltage < 0 or voltage > 5:
return True # 应该是 False
if current < -10 or current > 10:
return True # 应该是 False
if temperature < -20 or temperature > 60:
return True # 应该是 False
return False # 应该是 True
if __name__ == "__main__":
# 测试
result = calculate_soh(3.8, 2.0, 30)
print(f"SOH: {result['soh']:.2f}%")
'''
code_file = self.workspace_dir / "src_battery_health_buggy.py"
with open(code_file, 'w', encoding='utf-8') as f:
f.write(buggy_code)
print(f"\n📝 已创建包含 bug 的代码:{code_file}")
return str(code_file)
def create_test_cases(self) -> str:
"""创建测试用例"""
test_code = '''"""
电池健康状态测试用例
"""
import pytest
import sys
from pathlib import Path
# 导入被测试模块buggy 版本)
sys.path.insert(0, str(Path(__file__).parent))
from src_battery_health_buggy import calculate_soh, validate_sensor_data
class TestCalculateSOH:
"""测试 SOH 计算功能"""
def test_normal_operation(self):
"""测试正常工作情况"""
result = calculate_soh(voltage=3.8, current=2.0, temperature=30)
assert "soh" in result
assert 0 <= result["soh"] <= 100, "SOH 应该在 0-100 范围内"
assert result["status"] == "healthy"
def test_zero_voltage(self):
"""测试零电压情况(应该触发除零保护)"""
# 这个测试会暴露 BUG 1
result = calculate_soh(voltage=0, current=0, temperature=25)
assert result["soh"] == 0, "零电压时 SOH 应为 0"
def test_extreme_temperature(self):
"""测试极端温度"""
# 这个测试会暴露 BUG 2
result_high = calculate_soh(voltage=3.8, current=1.0, temperature=70)
result_low = calculate_soh(voltage=3.8, current=1.0, temperature=-30)
# 极端温度应该被检测并处理
assert result_high["status"] != "healthy", "高温应标记为异常"
assert result_low["status"] != "healthy", "低温应标记为异常"
def test_high_current(self):
"""测试大电流情况"""
result = calculate_soh(voltage=3.8, current=15.0, temperature=25)
# 过大电流应该被检测
assert result["soh"] >= 0, "SOH 不应为负值"
assert result["soh"] <= 100, "SOH 不应超过 100%"
class TestValidateSensorData:
"""测试传感器数据验证"""
def test_valid_data(self):
"""测试有效数据"""
result = validate_sensor_data(voltage=3.8, current=2.0, temperature=30)
assert result is True, "有效数据应返回 True"
def test_invalid_voltage(self):
"""测试无效电压"""
result = validate_sensor_data(voltage=6.0, current=2.0, temperature=30)
assert result is False, "电压过高应返回 False"
def test_invalid_current(self):
"""测试无效电流"""
result = validate_sensor_data(voltage=3.8, current=15.0, temperature=30)
assert result is False, "电流过大应返回 False"
def test_invalid_temperature(self):
"""测试无效温度"""
result = validate_sensor_data(voltage=3.8, current=2.0, temperature=80)
assert result is False, "温度过高应返回 False"
if __name__ == "__main__":
pytest.main([__file__, "-v"])
'''
test_file = self.workspace_dir / "test_battery_health.py"
with open(test_file, 'w', encoding='utf-8') as f:
f.write(test_code)
print(f"✅ 已创建测试用例:{test_file}")
return str(test_file)
def run_tests(self) -> Dict[str, Any]:
"""运行测试并收集错误日志"""
import subprocess
test_file = self.workspace_dir / "test_battery_health.py"
print("\n🧪 正在执行测试...")
result = subprocess.run(
[sys.executable, "-m", "pytest", str(test_file), "-v", "--tb=short"],
capture_output=True,
text=True,
timeout=60,
cwd=str(self.workspace_dir)
)
print("\n" + "=" * 70)
print("测试结果:")
print("=" * 70)
print(result.stdout)
if result.stderr:
print("错误输出:")
print(result.stderr)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
def fix_bugs_with_agent(self, error_log: str) -> str:
"""使用 Agent 自动修复 bug"""
print("\n🔧 启动 Dev_Agent 进行自动修复...")
dev_agent = AssistantAgent(
name="Dev_Fix_Agent",
system_message="""你是一名资深软件工程师,擅长调试和修复代码。
你的任务是分析测试失败日志找出代码中的 bug 并修复
1. 分析每个测试失败的原因
2. 定位代码中的具体 bug
3. 提供完整的修复后代码
4. 确保所有测试能够通过""",
llm_config=self.llm_config,
human_input_mode="NEVER"
)
# 读取原始代码
buggy_file = self.workspace_dir / "src_battery_health_buggy.py"
with open(buggy_file, 'r', encoding='utf-8') as f:
buggy_code = f.read()
prompt = f"""
请修复以下代码中的 bug
原始代码
{buggy_code}
测试失败日志
{error_log[:2000]} # 限制长度
已知 bug 列表
1. 除零错误 - 没有检查 voltage 是否为 0
2. 温度范围未验证
3. 电流方向未考虑
4. SOH 未限制在 0-100 范围
5. validate_sensor_data 逻辑反转
请输出完整的修复后代码保存为 src_battery_health_fixed.py
"""
response = dev_agent.generate_reply(
messages=[{"role": "user", "content": prompt}]
)
fixed_code = response if isinstance(response, str) else str(response)
# 提取代码块
if "```python" in fixed_code:
fixed_code = fixed_code.split("```python")[1].split("```")[0]
# 保存修复后的代码
fixed_file = self.workspace_dir / "src_battery_health_fixed.py"
with open(fixed_file, 'w', encoding='utf-8') as f:
f.write(fixed_code)
print(f"✅ 已生成修复后的代码:{fixed_file}")
return str(fixed_file)
def verify_fix(self) -> Dict[str, Any]:
"""验证修复后的代码"""
print("\n✅ 验证修复结果...")
# 修改测试文件导入固定的版本
test_file = self.workspace_dir / "test_battery_health_fixed.py"
original_test = self.workspace_dir / "test_battery_health.py"
with open(original_test, 'r', encoding='utf-8') as f:
test_content = f.read()
# 替换导入
test_content = test_content.replace(
"from src_battery_health_buggy import",
"from src_battery_health_fixed import"
)
with open(test_file, 'w', encoding='utf-8') as f:
f.write(test_content)
# 运行测试
import subprocess
result = subprocess.run(
[sys.executable, "-m", "pytest", str(test_file), "-v", "--tb=short"],
capture_output=True,
text=True,
timeout=60,
cwd=str(self.workspace_dir)
)
print("\n" + "=" * 70)
print("修复后测试结果:")
print("=" * 70)
print(result.stdout)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr
}
def run_demo(self):
"""运行完整演示"""
print("\n🎬 开始自愈演示流程...\n")
# 步骤 1: 创建有 bug 的代码
print("步骤 1: 创建包含 bug 的代码")
self.create_intentional_bug()
# 步骤 2: 创建测试用例
print("\n步骤 2: 创建测试用例")
self.create_test_cases()
# 步骤 3: 运行测试(预期失败)
print("\n步骤 3: 运行测试(预期会失败)")
test_result = self.run_tests()
if not test_result["success"]:
print("\n❌ 测试失败(符合预期),开始自动修复...")
# 步骤 4: 使用 Agent 修复 bug
print("\n步骤 4: AI 自动修复 bug")
self.fix_bugs_with_agent(test_result["stdout"] + "\n" + test_result["stderr"])
# 步骤 5: 验证修复
print("\n步骤 5: 验证修复结果")
verify_result = self.verify_fix()
if verify_result["success"]:
print("\n" + "=" * 70)
print("✅ 自愈演示成功!")
print("=" * 70)
print("\n演示亮点:")
print("1. ✅ 自动检测测试失败")
print("2. ✅ AI 分析错误原因")
print("3. ✅ 自动生成修复代码")
print("4. ✅ 自动验证修复结果")
print("\n这展示了 TDD + AI 的强大能力测试驱动开发AI 辅助修复!")
else:
print("\n⚠️ 修复未完全成功,这是正常现象(复杂 bug 可能需要多轮修复)")
else:
print("\n⚠️ 测试意外通过,可能是 bug 不够明显")
def main():
"""主函数"""
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
print("❌ 错误:请设置 DASHSCOPE_API_KEY 环境变量")
return
demo = SelfHealingDemo(api_key=api_key)
demo.run_demo()
if __name__ == "__main__":
main()