Files
code_scan/notify/feishu.py
Dang Zerong 17306c6814 init
2026-03-10 17:22:07 +08:00

546 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
飞书机器人通知器
发送代码质量扫描报告到飞书
"""
import json
import time
import hashlib
import hmac
import base64
import logging
import os
import requests
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class FeishuNotifier:
"""飞书机器人通知器"""
def __init__(self, config: Dict[str, Any]):
"""
初始化飞书通知器
Args:
config: 飞书配置
"""
self.config = config
self.webhook_url = config.get('webhook_url', '')
self.secret = config.get('secret', '')
# 文件上传配置
self.app_id = config.get('app_id', '')
self.app_secret = config.get('app_secret', '')
self.chat_id = config.get('chat_id', '')
self.attach_report_file = config.get('attach_report_file', True)
# 缓存 token
self._tenant_access_token = None
self._token_expires_at = 0
if not self.webhook_url:
logger.warning('飞书 Webhook URL 未配置')
def _get_tenant_access_token(self) -> Optional[str]:
"""
获取飞书 tenant_access_token
Returns:
token 字符串,如果失败返回 None
"""
if not self.app_id or not self.app_secret:
return None
# 检查缓存的 token 是否有效
if self._tenant_access_token and time.time() < self._token_expires_at:
return self._tenant_access_token
try:
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {"Content-Type": "application/json; charset=utf-8"}
payload = {
"app_id": self.app_id,
"app_secret": self.app_secret
}
response = requests.post(url, headers=headers, json=payload, timeout=10)
result = response.json()
if result.get("code") == 0:
self._tenant_access_token = result.get("tenant_access_token")
# 提前 5 分钟过期
self._token_expires_at = time.time() + result.get("expire", 7200) - 300
return self._tenant_access_token
else:
logger.error(f"获取 tenant_access_token 失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"获取 tenant_access_token 异常: {str(e)}")
return None
def _upload_file(self, file_path: str, file_name: str) -> Optional[str]:
"""
上传文件到飞书(用于消息中发送)
Args:
file_path: 文件本地路径
file_name: 文件名
Returns:
file_key 用于发送消息,如果失败返回 None
"""
token = self._get_tenant_access_token()
if not token:
logger.error("无法获取 token上传文件失败")
return None
try:
# 使用 im API 上传文件
url = "https://open.feishu.cn/open-apis/im/v1/files"
headers = {
"Authorization": f"Bearer {token}"
}
# 读取文件
with open(file_path, 'rb') as f:
file_content = f.read()
# 获取文件类型
file_ext = os.path.splitext(file_name)[1].lower()
file_type = 'stream' # 默认
# 构建 multipart 请求
files = {
'file': (file_name, file_content, 'application/octet-stream')
}
data = {
'file_name': file_name,
'file_type': file_type
}
response = requests.post(url, headers=headers, files=files, data=data, timeout=60)
result = response.json()
if result.get("code") == 0:
file_key = result.get("data", {}).get("file_key")
logger.info(f"文件上传成功: {file_name}, file_key: {file_key}")
return file_key
else:
logger.error(f"文件上传失败: {result.get('msg')}, code: {result.get('code')}")
return None
except Exception as e:
logger.error(f"文件上传异常: {str(e)}")
return None
def send_report(self, report: Dict[str, Any]) -> bool:
"""
发送扫描报告到飞书
Args:
report: 报告数据
Returns:
是否发送成功
"""
if not self.webhook_url:
logger.error('飞书 Webhook URL 未配置')
return False
try:
# 上传报告文件(如果配置了)
file_key = None
if self.attach_report_file and self.app_id and self.app_secret:
report_file = report.get('report_file')
if report_file and os.path.exists(report_file):
file_name = os.path.basename(report_file)
file_key = self._upload_file(report_file, file_name)
# 如果配置了 chat_id使用应用机器人发送消息
if self.chat_id and self.app_id and self.app_secret:
# 使用应用机器人 API 发送
self._send_app_message(report, file_key)
else:
# 使用 Webhook 发送
message = self._build_message(report, file_key=file_key)
self._send_webhook_message(message)
logger.info('飞书消息发送成功')
return True
except Exception as e:
logger.error(f'发送飞书通知失败: {str(e)}', exc_info=True)
return False
def _send_webhook_message(self, message: Dict[str, Any]) -> bool:
"""使用 Webhook 发送消息"""
# 如果配置了签名,则使用签名验证
if self.secret:
timestamp, sign = self._generate_sign()
payload = {
"timestamp": timestamp,
"sign": sign,
"msg_type": "interactive",
"card": message
}
else:
payload = {
"msg_type": "interactive",
"card": message
}
# 发送请求
headers = {'Content-Type': 'application/json'}
response = requests.post(
self.webhook_url,
headers=headers,
data=json.dumps(payload).encode('utf-8'),
timeout=30
)
# 解析响应
result = response.json()
if result.get('code') == 0:
return True
else:
logger.error(f'飞书消息发送失败: {result.get("msg")}')
return False
def _send_app_message(self, report: Dict[str, Any], file_key: str = None) -> bool:
"""使用应用机器人发送消息(支持文件)"""
token = self._get_tenant_access_token()
if not token:
logger.error("无法获取 token")
return False
# 构建消息内容
basic_info = self._build_basic_info_text(report)
# 构建消息元素
elements = []
# 添加基本信息
elements.append({
"tag": "div",
"text": {
"tag": "lark_md",
"content": basic_info
}
})
# 添加文件
if file_key:
elements.append({
"tag": "file",
"file_key": file_key
})
# 构造消息体
message_content = {
"title": report.get('status_text', '代码扫描报告'),
"elements": elements
}
# 发送消息到群聊
try:
url = "https://open.feishu.cn/open-apis/im/v1/messages"
params = {
"receive_id_type": "chat_id"
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
payload = {
"receive_id": self.chat_id,
"msg_type": "interactive",
"content": json.dumps(message_content)
}
response = requests.post(url, params=params, headers=headers, json=payload, timeout=30)
result = response.json()
if result.get("code") == 0:
logger.info("应用机器人消息发送成功")
return True
else:
logger.error(f"应用机器人消息发送失败: {result.get('msg')}")
return False
except Exception as e:
logger.error(f"应用机器人消息发送异常: {str(e)}")
return False
def _build_basic_info_text(self, report: Dict[str, Any]) -> str:
"""构建基本信息的文本"""
status = report.get('status', 'pass')
if status == 'pass':
status_icon = ''
elif status == 'fail':
status_icon = ''
else:
status_icon = '⚠️'
pr_url = report.get('pr_url')
target_branch = report.get('target_branch')
if pr_url and target_branch:
title = f"{status_icon} PR 代码质量扫描报告"
basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n"
f"**源分支:** `{report.get('branch', 'unknown')}` → **目标分支:** `{target_branch}`\n"
f"**PR链接:** [查看PR]({pr_url})\n"
f"**提交:** `{report.get('commit_id', 'unknown')}`\n"
f"**提交者:** {report.get('author', 'unknown')}")
else:
title = f"{status_icon} 代码质量扫描报告"
basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n"
f"**分支:** `{report.get('branch', 'unknown')}`\n"
f"**提交:** `{report.get('commit_id', 'unknown')}`\n"
f"**提交者:** {report.get('author', 'unknown')}")
total_issues = report.get('total_issues', 0)
total_errors = report.get('total_errors', 0)
total_warnings = report.get('total_warnings', 0)
info = f"{title}\n\n{basic_info}\n\n"
info += f"**扫描状态:** {report.get('status_text', 'unknown')}\n"
info += f"📊 总问题: {total_issues} | 🔴 错误: {total_errors} | 🟡 警告: {total_warnings}\n"
info += f"🕐 扫描时间: {report.get('timestamp', '')}"
return info
def _generate_sign(self) -> tuple:
"""
生成飞书签名
Returns:
(timestamp, sign) 元组
"""
# 当前时间戳(秒)
timestamp = str(int(time.time()))
# 拼接字符串
string_to_sign = '{}\n{}'.format(timestamp, self.secret)
# 使用 HmacSHA256 计算签名
hmac_code = hmac.new(
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
# 进行 Base64 编码
sign = base64.b64encode(hmac_code).decode('utf-8')
return timestamp, sign
def _build_message(self, report: Dict[str, Any], file_key: str = None) -> Dict[str, Any]:
"""
构建飞书卡片消息
Args:
report: 报告数据
Returns:
飞书卡片消息结构
"""
# 根据状态选择颜色
status = report.get('status', 'pass')
if status == 'pass':
theme_color = 'green'
status_icon = ''
elif status == 'fail':
theme_color = 'red'
status_icon = ''
else:
theme_color = 'orange'
status_icon = '⚠️'
# 构建问题摘要
total_issues = report.get('total_issues', 0)
total_errors = report.get('total_errors', 0)
total_warnings = report.get('total_warnings', 0)
# 获取扫描结果详情
scan_details = []
for scanner_name, result in report.get('scan_results', {}).items():
# AI 审查的 summary 是字符串,跳过
if scanner_name == 'ai':
continue
tool_name = result.get('tool', scanner_name)
summary = result.get('summary', {})
if not isinstance(summary, dict):
continue
files_scanned = result.get('files_scanned', 0)
total = summary.get('total', 0)
if total > 0:
detail_text = f"{tool_name}: 扫描 {files_scanned} 个文件,发现 {total} 个问题"
else:
detail_text = f"{tool_name}: 扫描 {files_scanned} 个文件,无问题"
scan_details.append(detail_text)
# 检查是否为 PR 扫描
pr_url = report.get('pr_url')
target_branch = report.get('target_branch')
# 构建基本信息文本
if pr_url and target_branch:
# PR 扫描
title = f"{status_icon} PR 代码质量扫描报告"
basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n"
f"**源分支:** `{report.get('branch', 'unknown')}` → **目标分支:** `{target_branch}`\n"
f"**PR链接:** [查看PR]({pr_url})\n"
f"**提交:** `{report.get('commit_id', 'unknown')}`\n"
f"**提交者:** {report.get('author', 'unknown')}")
else:
# Push 扫描
title = f"{status_icon} 代码质量扫描报告"
basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n"
f"**分支:** `{report.get('branch', 'unknown')}`\n"
f"**提交:** `{report.get('commit_id', 'unknown')}`\n"
f"**提交者:** {report.get('author', 'unknown')}")
# 构建卡片消息
card = {
"header": {
"title": {
"tag": "plain_text",
"content": title
},
"template": theme_color
},
"elements": [
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": basic_info
}
},
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": f"**扫描状态:** {report.get('status_text', 'unknown')}\n"
f"📊 总问题: {total_issues} | "
f"🔴 错误: {total_errors} | "
f"🟡 警告: {total_warnings}"
}
}
]
}
# 添加扫描详情
if scan_details:
card["elements"].append({
"tag": "div",
"text": {
"tag": "lark_md",
"content": "**扫描详情:**\n" + "\n".join([f"- {d}" for d in scan_details])
}
})
# 添加主要问题列表最多显示5个
all_issues = []
for scanner_name, result in report.get('scan_results', {}).items():
for issue in result.get('issues', [])[:3]: # 每个扫描器最多显示3个
all_issues.append(issue)
if all_issues:
issues_text = "**主要问题:**\n"
for i, issue in enumerate(all_issues[:5], 1):
severity = issue.get('severity', 'Unknown')
severity_emoji = {
'HIGH': '🔴',
'MEDIUM': '🟡',
'LOW': '🔵',
'ERROR': '🔴',
'WARNING': '🟡'
}.get(severity.upper(), '')
file_path = issue.get('file', 'unknown')
line_num = issue.get('line', 0)
message = issue.get('message', 'No message')[:50]
issues_text += f"{i}. {severity_emoji} `{file_path}:{line_num}` - {message}\n"
card["elements"].append({
"tag": "div",
"text": {
"tag": "lark_md",
"content": issues_text
}
})
# 添加时间戳
card["elements"].append({
"tag": "div",
"text": {
"tag": "lark_md",
"content": f"🕐 扫描时间: {report.get('timestamp', '')}"
}
})
# 添加报告文件附件
if file_key:
card["elements"].append({
"tag": "file",
"file_key": file_key
})
return card
def send_simple_message(self, title: str, content: str) -> bool:
"""
发送简单文本消息
Args:
title: 标题
content: 内容
Returns:
是否发送成功
"""
if not self.webhook_url:
logger.error('飞书 Webhook URL 未配置')
return False
try:
# 构建消息
payload = {
"msg_type": "text",
"content": {
"text": f"{title}\n{content}"
}
}
# 发送请求
headers = {'Content-Type': 'application/json'}
response = requests.post(
self.webhook_url,
headers=headers,
data=json.dumps(payload).encode('utf-8'),
timeout=30
)
result = response.json()
if result.get('code') == 0:
logger.info('飞书消息发送成功')
return True
else:
logger.error(f'飞书消息发送失败: {result.get("msg")}')
return False
except Exception as e:
logger.error(f'发送飞书通知失败: {str(e)}')
return False