Files
demo2-qwen/app/message.py

274 lines
7.5 KiB
Python
Raw Normal View History

2026-03-04 13:56:08 +08:00
from collections import defaultdict
import requests
import json
from app.models import RequirementAnalysis, TestCaseResult, CodeGenerationResult
webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/0cd37406-31b8-424a-a3da-c11ce465ac29"
def send_message_to_feishu(message: str):
data = {
"msg_type": "text",
"content": {
"text": message
}
}
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(data)
)
def send_requirement_result(requirement_analysis: RequirementAnalysis):
data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "📌 需求分析结果"
},
"template": "blue"
},
"elements": [
{
"tag": "markdown",
"content": f"---\n**📎 总结:**\n\n{requirement_analysis['summary']}"
},
{
"tag": "markdown",
"content": f"### 🧩 功能需求\n\n{chr(10).join([f'- {req}' for req in requirement_analysis['functional_requirements']])}"
},
{
"tag": "markdown",
"content": f"### 🚀 非功能需求\n\n{chr(10).join([f'- {req}' for req in requirement_analysis['non_functional_requirements']])}"
},
{
"tag": "markdown",
"content": f"### ⚠️ 边界情况\n\n{chr(10).join([f'- {case}' for case in requirement_analysis['edge_cases']])}"
},
{
"tag": "markdown",
"content": f"### ✅ 验收标准\n\n{chr(10).join([f'- {criteria}' for criteria in requirement_analysis['acceptance_criteria']])}"
}
]
}
}
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(data)
)
def send_generate_code(code_result: CodeGenerationResult):
implementation_notes_data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "📌 代码生成结果 - 实现思路"
},
"template": "green"
},
"elements": [
{
"tag": "markdown",
"content": code_result['implementation_notes']
}
]
}
}
java_code_data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "📌 代码生成结果 - 业务代码"
},
"template": "green"
},
"elements": [
{
"tag": "markdown",
"content": code_result['java_code']
}
]
}
}
unit_tests_data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "🚀 代码生成结果 - 测试代码"
},
"template": "green"
},
"elements": [
{
"tag": "markdown",
"content": code_result['unit_tests']
}
]
}
}
passed_tests_data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "🚀 代码生成结果 - 单元测试执行结果"
},
"template": "green"
},
"elements": [
{
"tag": "markdown",
"content": f"""
### 📊 测试统计
- **总用例数** {code_result['unit_tests_count']}
- **通过** {code_result['passed_tests_count']}
"""
}
]
}
}
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(implementation_notes_data)
)
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(java_code_data)
)
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(unit_tests_data)
)
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(passed_tests_data)
)
def send_test_cases(test_case: TestCaseResult):
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(build_full_feishu_card(test_case))
)
def build_full_feishu_card(data: TestCaseResult) -> dict:
test_cases = data.get("test_cases", [])
type_counter = defaultdict(int)
grouped_cases = defaultdict(list)
# 分组 + 统计
for case in test_cases:
t_type = case.get("test_type", "未分类")
type_counter[t_type] += 1
grouped_cases[t_type].append(case)
total = len(test_cases)
elements = []
# 📊 统计
stats_text = "### 📊 测试统计\n\n"
stats_text += f"- 总用例数:**{total}**\n"
for k, v in type_counter.items():
stats_text += f"- {k}{v}\n"
elements.append({
"tag": "markdown",
"content": stats_text
})
# 🗂 分组 + 全量展示
for test_type, cases in grouped_cases.items():
elements.append({
"tag": "markdown",
"content": f"---\n## 🗂 {test_type}"
})
for case in cases:
steps_text = "\n".join([f" {i+1}. {s}" for i, s in enumerate(case.get("steps", []))])
case_text = f"""### {case.get('test_id')} - {case.get('test_name')}
- **前置条件** {case.get('precondition')}
- **执行步骤**
{steps_text}
- **期望结果** {case.get('expected_result')}
"""
elements.append({
"tag": "markdown",
"content": case_text
})
# 🧠 测试策略
elements.append({
"tag": "markdown",
"content": f"---\n### 🧠 测试策略\n\n{data.get('test_strategy')}"
})
# 🎯 覆盖计划
elements.append({
"tag": "markdown",
"content": f"---\n### 🎯 覆盖计划\n\n{data.get('coverage_plan')}"
})
return {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "🧪 自动化测试用例结果"
},
"template": "blue"
},
"elements": elements
}
}