adjust prompt and feishu message style

This commit is contained in:
2026-03-06 15:47:35 +08:00
parent d63aa74721
commit 40f0d7078b
2 changed files with 194 additions and 238 deletions

View File

@@ -7,189 +7,164 @@ from app.models import RequirementAnalysis, TestCaseResult, CodeGenerationResult
webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/0cd37406-31b8-424a-a3da-c11ce465ac29"
def send_message_to_feishu(message: str):
data = {
"msg_type": "text",
"content": {
"text": message
}
}
_TYPE_EMOJI = {
"功能测试": "🧩",
"性能测试": "",
"安全测试": "🔒",
}
def _post(data: dict):
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(data)
data=json.dumps(data, ensure_ascii=False)
)
def _make_card(title: str, color: str, elements: list) -> dict:
return {
"msg_type": "interactive",
"card": {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": color
},
"elements": elements
}
}
def send_message_to_feishu(message: str):
_post({"msg_type": "text", "content": {"text": message}})
def send_workflow_start(requirement: str):
elements = [
{
"tag": "markdown",
"content": f"收到新需求:\n\n**{requirement}**"
},
{"tag": "hr"},
{
"tag": "markdown",
"content": "🤖 **AI 将按以下流程自动处理,请稍候...**"
},
{
"tag": "column_set",
"flex_mode": "stretch",
"background_style": "default",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "blue-300",
"elements": [{
"tag": "markdown",
"content": "\n **📌 需求分析**\n\n - 功能需求拆解\n - 非功能需求识别\n - 验收标准制定\n - 边界情况梳理\n "
}]
},
{
"tag": "column",
"width": "auto",
"elements": [{"tag": "markdown", "content": "\n\n\n\n**→**"}]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "sunflower-300",
"elements": [{
"tag": "markdown",
"content": "\n **🧪 测试用例生成**\n\n - 功能测试用例\n - 性能测试用例\n - 安全测试用例\n - 测试策略规划\n - 测试覆盖计划\n "
}]
},
{
"tag": "column",
"width": "auto",
"elements": [{"tag": "markdown", "content": "\n\n\n\n**→**"}]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"background_style": "green-300",
"elements": [{
"tag": "markdown",
"content": "\n **💻 代码生成**\n\n - 实现思路\n - 业务代码\n - 单元测试代码\n - 测试执行报告\n "
}]
}
]
}
]
_post(_make_card("🚀 AI 全自动开发流程启动", "purple", elements))
def send_requirement_result(requirement_analysis: RequirementAnalysis):
data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "📌 需求分析结果"
},
"template": "blue"
},
"elements": [
{
"tag": "markdown",
"content": f"---\n**📎 总结:**\n\n{requirement_analysis['summary']}"
},
{
"tag": "markdown",
"content": f"### 🧩 功能需求\n\n{chr(10).join([f'- {req}' for req in requirement_analysis['functional_requirements']])}"
},
{
"tag": "markdown",
"content": f"### 🚀 非功能需求\n\n{chr(10).join([f'- {req}' for req in requirement_analysis['non_functional_requirements']])}"
},
{
"tag": "markdown",
"content": f"### ⚠️ 边界情况\n\n{chr(10).join([f'- {case}' for case in requirement_analysis['edge_cases']])}"
},
{
"tag": "markdown",
"content": f"### ✅ 验收标准\n\n{chr(10).join([f'- {criteria}' for criteria in requirement_analysis['acceptance_criteria']])}"
}
]
}
}
elements = [
{"tag": "markdown", "content": f"**📎 需求总结**\n\n{requirement_analysis['summary']}"},
{"tag": "hr"},
{"tag": "markdown", "content": "**🧩 功能需求**\n\n" + "\n".join(f"- {r}" for r in requirement_analysis["functional_requirements"])},
{"tag": "hr"},
{"tag": "markdown", "content": "**🚀 非功能需求**\n\n" + "\n".join(f"- {r}" for r in requirement_analysis["non_functional_requirements"])},
{"tag": "hr"},
{"tag": "markdown", "content": "**⚠️ 边界情况**\n\n" + "\n".join(f"- {c}" for c in requirement_analysis["edge_cases"])},
{"tag": "hr"},
{"tag": "markdown", "content": "**✅ 验收标准**\n\n" + "\n".join(f"- {c}" for c in requirement_analysis["acceptance_criteria"])},
]
_post(_make_card("📌 需求分析结果", "blue", elements))
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(data)
)
def send_generate_code(code_result: CodeGenerationResult):
implementation_notes_data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "📌 代码生成结果 - 实现思路"
},
"template": "green"
},
"elements": [
{
"tag": "markdown",
"content": code_result['implementation_notes']
}
]
}
}
# 实现思路
_post(_make_card(
"📌 代码生成结果 — 实现思路", "green",
[{"tag": "markdown", "content": code_result["implementation_notes"]}]
))
java_code_data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "📌 代码生成结果 - 业务代码"
},
"template": "green"
},
"elements": [
{
"tag": "markdown",
"content": code_result['java_code']
}
]
}
}
# 业务代码
_post(_make_card(
"💻 代码生成结果 — 业务代码", "green",
[{"tag": "markdown", "content": f"```java\n{code_result['java_code']}\n```"}]
))
unit_tests_data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "🚀 代码生成结果 - 测试代码"
},
"template": "green"
},
"elements": [
{
"tag": "markdown",
"content": code_result['unit_tests']
}
]
}
}
# 单元测试代码
_post(_make_card(
"🧪 代码生成结果 — 单元测试代码", "green",
[{"tag": "markdown", "content": f"```java\n{code_result['unit_tests']}\n```"}]
))
passed_tests_data = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "🚀 代码生成结果 - 单元测试执行结果"
},
"template": "green"
},
"elements": [
{
"tag": "markdown",
"content": f"""
### 📊 测试统计
# 单元测试执行结果
try:
total = int(code_result["unit_tests_count"])
passed = int(code_result["passed_tests_count"])
failed = total - passed
rate = f"{passed / total * 100:.0f}%" if total > 0 else ""
status = "✅ 全部通过" if failed == 0 else f"⚠️ {failed} 个未通过"
failed_display = f"{failed}" if failed > 0 else "0"
except (ValueError, TypeError):
total = code_result["unit_tests_count"]
passed = code_result["passed_tests_count"]
failed_display, rate, status = "", "", ""
- **总用例数:** {code_result['unit_tests_count']}
- **通过** {code_result['passed_tests_count']}
"""
}
]
}
}
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(implementation_notes_data)
result_md = (
f"**总用例数** {total}\n\n"
f"**通过:** {passed}\n\n"
f"**未通过:** {failed_display}\n\n"
f"**通过率:** {rate}\n\n"
f"---\n\n"
f"**整体状态:** {status}"
)
_post(_make_card(
"📊 代码生成结果 — 单元测试执行结果", "green",
[{"tag": "markdown", "content": result_md}]
))
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(java_code_data)
)
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(unit_tests_data)
)
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(passed_tests_data)
)
def send_test_cases(test_case: TestCaseResult):
requests.post(
webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(build_full_feishu_card(test_case))
)
_post(build_full_feishu_card(test_case))
def build_full_feishu_card(data: TestCaseResult) -> dict:
@@ -198,76 +173,44 @@ def build_full_feishu_card(data: TestCaseResult) -> dict:
type_counter = defaultdict(int)
grouped_cases = defaultdict(list)
# 分组 + 统计
for case in test_cases:
t_type = case.get("test_type", "未分类")
type_counter[t_type] += 1
grouped_cases[t_type].append(case)
total = len(test_cases)
elements = []
# 📊 统计
stats_text = "### 📊 测试统计\n\n"
stats_text += f"- 总用例数:**{total}**\n"
for k, v in type_counter.items():
stats_text += f"- {k}{v}\n"
# 测试统计
stats_lines = [f"**📊 测试用例统计**\n", f"- 总用例数:**{total}**"]
for t_type, count in type_counter.items():
emoji = _TYPE_EMOJI.get(t_type, "📋")
stats_lines.append(f"- {emoji} {t_type}**{count}**")
elements.append({"tag": "markdown", "content": "\n".join(stats_lines)})
elements.append({
"tag": "markdown",
"content": stats_text
})
# 🗂 分组 + 全量展示
# 分组展示
for test_type, cases in grouped_cases.items():
elements.append({
"tag": "markdown",
"content": f"---\n## 🗂 {test_type}"
})
emoji = _TYPE_EMOJI.get(test_type, "📋")
elements.append({"tag": "hr"})
elements.append({"tag": "markdown", "content": f"## {emoji} {test_type}"})
for case in cases:
steps_text = "\n".join([f" {i+1}. {s}" for i, s in enumerate(case.get("steps", []))])
steps_text = "\n".join(f" {i + 1}. {s}" for i, s in enumerate(case.get("steps", [])))
case_text = (
f"**{case.get('test_id')} · {case.get('test_name')}**\n\n"
f"- **前置条件:** {case.get('precondition')}\n"
f"- **执行步骤:**\n{steps_text}\n"
f"- **期望结果:** {case.get('expected_result')}"
)
elements.append({"tag": "markdown", "content": case_text})
case_text = f"""### {case.get('test_id')} - {case.get('test_name')}
# 测试策略
elements.append({"tag": "hr"})
elements.append({"tag": "markdown", "content": f"**🧠 测试策略**\n\n{data.get('test_strategy')}"})
- **前置条件:** {case.get('precondition')}
- **执行步骤:**
{steps_text}
- **期望结果:** {case.get('expected_result')}
"""
# 覆盖计划
elements.append({"tag": "hr"})
elements.append({"tag": "markdown", "content": f"**🎯 覆盖计划**\n\n{data.get('coverage_plan')}"})
elements.append({
"tag": "markdown",
"content": case_text
})
# 🧠 测试策略
elements.append({
"tag": "markdown",
"content": f"---\n### 🧠 测试策略\n\n{data.get('test_strategy')}"
})
# 🎯 覆盖计划
elements.append({
"tag": "markdown",
"content": f"---\n### 🎯 覆盖计划\n\n{data.get('coverage_plan')}"
})
return {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": "🧪 自动化测试用例结果"
},
"template": "blue"
},
"elements": elements
}
}
return _make_card("🧪 自动化测试用例结果", "blue", elements)