Files
demo2-qwen/app/message.py

217 lines
7.8 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from collections import defaultdict
import requests
import json
from app.models import RequirementAnalysis, TestCaseResult, CodeGenerationResult
webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/0cd37406-31b8-424a-a3da-c11ce465ac29"
_TYPE_EMOJI = {
"功能测试": "🧩",
"性能测试": "",
"安全测试": "🔒",
}
def _post(data: dict):
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(data, ensure_ascii=False)
)
def _make_card(title: str, color: str, elements: list) -> dict:
return {
"msg_type": "interactive",
"card": {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": color
},
"elements": elements
}
}
def send_message_to_feishu(message: str):
_post({"msg_type": "text", "content": {"text": message}})
def send_workflow_start(requirement: str):
elements = [
{
"tag": "markdown",
"content": f"收到新需求:\n\n**{requirement}**"
},
{"tag": "hr"},
{
"tag": "markdown",
"content": "🤖 **AI 将按以下流程自动处理,请稍候...**"
},
{
"tag": "column_set",
"flex_mode": "stretch",
"background_style": "default",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "blue-300",
"elements": [{
"tag": "markdown",
"content": "\n **📌 需求分析**\n\n - 功能需求拆解\n - 非功能需求识别\n - 验收标准制定\n - 边界情况梳理\n "
}]
},
{
"tag": "column",
"width": "auto",
"elements": [{"tag": "markdown", "content": "\n\n\n\n**→**"}]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "sunflower-300",
"elements": [{
"tag": "markdown",
"content": "\n **🧪 测试用例生成**\n\n - 功能测试用例\n - 性能测试用例\n - 安全测试用例\n - 测试策略规划\n - 测试覆盖计划\n "
}]
},
{
"tag": "column",
"width": "auto",
"elements": [{"tag": "markdown", "content": "\n\n\n\n**→**"}]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "green-300",
"elements": [{
"tag": "markdown",
"content": "\n **💻 代码生成**\n\n - 实现思路\n - 业务代码\n - 单元测试代码\n - 测试执行报告\n "
}]
}
]
}
]
_post(_make_card("🚀 AI 全自动开发流程启动", "purple", elements))
def send_requirement_result(requirement_analysis: RequirementAnalysis):
elements = [
{"tag": "markdown", "content": f"**📎 需求总结**\n\n{requirement_analysis['summary']}"},
{"tag": "hr"},
{"tag": "markdown", "content": "**🧩 功能需求**\n\n" + "\n".join(f"- {r}" for r in requirement_analysis["functional_requirements"])},
{"tag": "hr"},
{"tag": "markdown", "content": "**🚀 非功能需求**\n\n" + "\n".join(f"- {r}" for r in requirement_analysis["non_functional_requirements"])},
{"tag": "hr"},
{"tag": "markdown", "content": "**⚠️ 边界情况**\n\n" + "\n".join(f"- {c}" for c in requirement_analysis["edge_cases"])},
{"tag": "hr"},
{"tag": "markdown", "content": "**✅ 验收标准**\n\n" + "\n".join(f"- {c}" for c in requirement_analysis["acceptance_criteria"])},
]
_post(_make_card("📌 需求分析结果", "blue", elements))
def send_generate_code(code_result: CodeGenerationResult):
# 实现思路
_post(_make_card(
"📌 代码生成结果 — 实现思路", "green",
[{"tag": "markdown", "content": code_result["implementation_notes"]}]
))
# 业务代码
_post(_make_card(
"💻 代码生成结果 — 业务代码", "green",
[{"tag": "markdown", "content": f"```java\n{code_result['java_code']}\n```"}]
))
# 单元测试代码
_post(_make_card(
"🧪 代码生成结果 — 单元测试代码", "green",
[{"tag": "markdown", "content": f"```java\n{code_result['unit_tests']}\n```"}]
))
# 单元测试执行结果
try:
total = int(code_result["unit_tests_count"])
passed = int(code_result["passed_tests_count"])
failed = total - passed
rate = f"{passed / total * 100:.0f}%" if total > 0 else ""
status = "✅ 全部通过" if failed == 0 else f"⚠️ {failed} 个未通过"
failed_display = f"{failed}" if failed > 0 else "0"
except (ValueError, TypeError):
total = code_result["unit_tests_count"]
passed = code_result["passed_tests_count"]
failed_display, rate, status = "", "", ""
result_md = (
f"**总用例数:** {total}\n\n"
f"**通过:** {passed}\n\n"
f"**未通过:** {failed_display}\n\n"
f"**通过率:** {rate}\n\n"
f"---\n\n"
f"**整体状态:** {status}"
)
_post(_make_card(
"📊 代码生成结果 — 单元测试执行结果", "green",
[{"tag": "markdown", "content": result_md}]
))
def send_test_cases(test_case: TestCaseResult):
_post(build_full_feishu_card(test_case))
def build_full_feishu_card(data: TestCaseResult) -> dict:
test_cases = data.get("test_cases", [])
type_counter = defaultdict(int)
grouped_cases = defaultdict(list)
for case in test_cases:
t_type = case.get("test_type", "未分类")
type_counter[t_type] += 1
grouped_cases[t_type].append(case)
total = len(test_cases)
elements = []
# 测试统计
stats_lines = [f"**📊 测试用例统计**\n", f"- 总用例数:**{total}**"]
for t_type, count in type_counter.items():
emoji = _TYPE_EMOJI.get(t_type, "📋")
stats_lines.append(f"- {emoji} {t_type}**{count}**")
elements.append({"tag": "markdown", "content": "\n".join(stats_lines)})
# 分组展示
for test_type, cases in grouped_cases.items():
emoji = _TYPE_EMOJI.get(test_type, "📋")
elements.append({"tag": "hr"})
elements.append({"tag": "markdown", "content": f"## {emoji} {test_type}"})
for case in cases:
steps_text = "\n".join(f" {i + 1}. {s}" for i, s in enumerate(case.get("steps", [])))
case_text = (
f"**{case.get('test_id')} · {case.get('test_name')}**\n\n"
f"- **前置条件:** {case.get('precondition')}\n"
f"- **执行步骤:**\n{steps_text}\n"
f"- **期望结果:** {case.get('expected_result')}"
)
elements.append({"tag": "markdown", "content": case_text})
# 测试策略
elements.append({"tag": "hr"})
elements.append({"tag": "markdown", "content": f"**🧠 测试策略**\n\n{data.get('test_strategy')}"})
# 覆盖计划
elements.append({"tag": "hr"})
elements.append({"tag": "markdown", "content": f"**🎯 覆盖计划**\n\n{data.get('coverage_plan')}"})
return _make_card("🧪 自动化测试用例结果", "blue", elements)