from collections import defaultdict import requests import json from app.models import RequirementAnalysis, TestCaseResult, CodeGenerationResult webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/0cd37406-31b8-424a-a3da-c11ce465ac29" _TYPE_EMOJI = { "功能测试": "🧩", "性能测试": "⚡", "安全测试": "🔒", } def _post(data: dict): requests.post( webhook_url, headers={"Content-Type": "application/json"}, data=json.dumps(data, ensure_ascii=False) ) def _make_card(title: str, color: str, elements: list) -> dict: return { "msg_type": "interactive", "card": { "config": {"wide_screen_mode": True}, "header": { "title": {"tag": "plain_text", "content": title}, "template": color }, "elements": elements } } def send_message_to_feishu(message: str): _post({"msg_type": "text", "content": {"text": message}}) def send_workflow_start(requirement: str): elements = [ { "tag": "markdown", "content": f"收到新需求:\n\n**{requirement}**" }, {"tag": "hr"}, { "tag": "markdown", "content": "🤖 **AI 将按以下流程自动处理,请稍候...**" }, { "tag": "column_set", "flex_mode": "stretch", "background_style": "default", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "background_style": "blue-300", "elements": [{ "tag": "markdown", "content": "\n **📌 需求分析**\n\n - 功能需求拆解\n - 非功能需求识别\n - 验收标准制定\n - 边界情况梳理\n " }] }, { "tag": "column", "width": "auto", "elements": [{"tag": "markdown", "content": "\n\n\n\n**→**"}] }, { "tag": "column", "width": "weighted", "weight": 1, "background_style": "sunflower-300", "elements": [{ "tag": "markdown", "content": "\n **🧪 测试用例生成**\n\n - 功能测试用例\n - 性能测试用例\n - 安全测试用例\n - 测试策略规划\n - 测试覆盖计划\n " }] }, { "tag": "column", "width": "auto", "elements": [{"tag": "markdown", "content": "\n\n\n\n**→**"}] }, { "tag": "column", "width": "weighted", "weight": 1, "background_style": "green-300", "elements": [{ "tag": "markdown", "content": "\n **💻 代码生成**\n\n - 实现思路\n - 业务代码\n - 单元测试代码\n - 测试执行报告\n " }] } ] } ] _post(_make_card("🚀 AI 全自动开发流程启动", "purple", elements)) def send_requirement_result(requirement_analysis: RequirementAnalysis): elements = [ {"tag": "markdown", "content": f"**📎 需求总结**\n\n{requirement_analysis['summary']}"}, {"tag": "hr"}, {"tag": "markdown", "content": "**🧩 功能需求**\n\n" + "\n".join(f"- {r}" for r in requirement_analysis["functional_requirements"])}, {"tag": "hr"}, {"tag": "markdown", "content": "**🚀 非功能需求**\n\n" + "\n".join(f"- {r}" for r in requirement_analysis["non_functional_requirements"])}, {"tag": "hr"}, {"tag": "markdown", "content": "**⚠️ 边界情况**\n\n" + "\n".join(f"- {c}" for c in requirement_analysis["edge_cases"])}, {"tag": "hr"}, {"tag": "markdown", "content": "**✅ 验收标准**\n\n" + "\n".join(f"- {c}" for c in requirement_analysis["acceptance_criteria"])}, ] _post(_make_card("📌 需求分析结果", "blue", elements)) def send_generate_code(code_result: CodeGenerationResult): # 实现思路 _post(_make_card( "📌 代码生成结果 — 实现思路", "green", [{"tag": "markdown", "content": code_result["implementation_notes"]}] )) # 业务代码 _post(_make_card( "💻 代码生成结果 — 业务代码", "green", [{"tag": "markdown", "content": f"```java\n{code_result['java_code']}\n```"}] )) # 单元测试代码 _post(_make_card( "🧪 代码生成结果 — 单元测试代码", "green", [{"tag": "markdown", "content": f"```java\n{code_result['unit_tests']}\n```"}] )) # 单元测试执行结果 try: total = int(code_result["unit_tests_count"]) passed = int(code_result["passed_tests_count"]) failed = total - passed rate = f"{passed / total * 100:.0f}%" if total > 0 else "—" status = "✅ 全部通过" if failed == 0 else f"⚠️ {failed} 个未通过" failed_display = f"{failed} ❌" if failed > 0 else "0" except (ValueError, TypeError): total = code_result["unit_tests_count"] passed = code_result["passed_tests_count"] failed_display, rate, status = "—", "—", "—" result_md = ( f"**总用例数:** {total}\n\n" f"**通过:** {passed} ✅\n\n" f"**未通过:** {failed_display}\n\n" f"**通过率:** {rate}\n\n" f"---\n\n" f"**整体状态:** {status}" ) _post(_make_card( "📊 代码生成结果 — 单元测试执行结果", "green", [{"tag": "markdown", "content": result_md}] )) def send_test_cases(test_case: TestCaseResult): _post(build_full_feishu_card(test_case)) def build_full_feishu_card(data: TestCaseResult) -> dict: test_cases = data.get("test_cases", []) type_counter = defaultdict(int) grouped_cases = defaultdict(list) for case in test_cases: t_type = case.get("test_type", "未分类") type_counter[t_type] += 1 grouped_cases[t_type].append(case) total = len(test_cases) elements = [] # 测试统计 stats_lines = [f"**📊 测试用例统计**\n", f"- 总用例数:**{total}**"] for t_type, count in type_counter.items(): emoji = _TYPE_EMOJI.get(t_type, "📋") stats_lines.append(f"- {emoji} {t_type}:**{count}**") elements.append({"tag": "markdown", "content": "\n".join(stats_lines)}) # 分组展示 for test_type, cases in grouped_cases.items(): emoji = _TYPE_EMOJI.get(test_type, "📋") elements.append({"tag": "hr"}) elements.append({"tag": "markdown", "content": f"## {emoji} {test_type}"}) for case in cases: steps_text = "\n".join(f" {i + 1}. {s}" for i, s in enumerate(case.get("steps", []))) case_text = ( f"**{case.get('test_id')} · {case.get('test_name')}**\n\n" f"- **前置条件:** {case.get('precondition')}\n" f"- **执行步骤:**\n{steps_text}\n" f"- **期望结果:** {case.get('expected_result')}" ) elements.append({"tag": "markdown", "content": case_text}) # 测试策略 elements.append({"tag": "hr"}) elements.append({"tag": "markdown", "content": f"**🧠 测试策略**\n\n{data.get('test_strategy')}"}) # 覆盖计划 elements.append({"tag": "hr"}) elements.append({"tag": "markdown", "content": f"**🎯 覆盖计划**\n\n{data.get('coverage_plan')}"}) return _make_card("🧪 自动化测试用例结果", "blue", elements)