init #3

Merged
dangzerong merged 1 commits from dev into main 2026-03-11 09:28:48 +08:00
9 changed files with 769 additions and 62 deletions

8
app.py
View File

@@ -14,6 +14,7 @@ from webhook.handler import GiteaWebhookHandler
from scanner.python_scanner import PythonScanner from scanner.python_scanner import PythonScanner
from scanner.js_scanner import JavaScriptScanner from scanner.js_scanner import JavaScriptScanner
from scanner.security_scanner import SecurityScanner from scanner.security_scanner import SecurityScanner
from scanner.ai_reviewer import AIReviewer
from report.generator import ReportGenerator from report.generator import ReportGenerator
from notify.feishu import FeishuNotifier from notify.feishu import FeishuNotifier
@@ -43,6 +44,7 @@ webhook_handler = GiteaWebhookHandler(config['gitea'])
python_scanner = PythonScanner(config.get('scanner', {})) python_scanner = PythonScanner(config.get('scanner', {}))
js_scanner = JavaScriptScanner(config.get('scanner', {})) js_scanner = JavaScriptScanner(config.get('scanner', {}))
security_scanner = SecurityScanner(config.get('scanner', {})) security_scanner = SecurityScanner(config.get('scanner', {}))
ai_reviewer = AIReviewer(config.get('ai', {}))
report_generator = ReportGenerator(config.get('report', {})) report_generator = ReportGenerator(config.get('report', {}))
feishu_notifier = FeishuNotifier(config['feishu']) feishu_notifier = FeishuNotifier(config['feishu'])
@@ -226,6 +228,12 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
clone_url, source_sha, source_branch clone_url, source_sha, source_branch
) )
# AI 代码审查
if config.get('ai', {}).get('enabled', False):
scan_results['ai'] = ai_reviewer.scan(
clone_url, source_sha, source_branch
)
# 生成报告 # 生成报告
commit_message = f'PR #{pr_number}: {pr_title}' commit_message = f'PR #{pr_number}: {pr_title}'
report = report_generator.generate( report = report_generator.generate(

View File

@@ -11,9 +11,18 @@ gitea:
feishu: feishu:
# 飞书机器人 Webhook 地址(替换为你的实际地址) # 飞书机器人 Webhook 地址(替换为你的实际地址)
webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/c436570a-e6af-49a1-867d-4331c0f1cb06" #webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/636258bb-5f6e-40aa-aca3-10e61381325e"
# 飞书消息签名密钥(可选) # 飞书消息签名密钥(可选)
secret: "" secret: ""
# 飞书应用配置(用于发送文件)
# 如果需要发送文件,需要在飞书开放平台创建应用并获取以下配置
app_id: "cli_a9256d9d657b9bce"
app_secret: "4rsELdjStVuWnklxn0PLDbC0WPrSaKyN"
# 发送目标的群聊 ID应用机器人发送文件需要群聊 ID
# 在群聊中添加机器人后,使用 https://open.feishu.cn/document/ukTMukTMukTM/uADOwUjLwgDM14CM4ATN 获取群 ID
chat_id: "oc_313d71d460a851f31b7ddd0aca14c5b0"
# 是否在通知中附加报告文件
attach_report_file: true
scanner: scanner:
# 支持的编程语言 # 支持的编程语言
@@ -33,3 +42,19 @@ report:
output_dir: "./reports" output_dir: "./reports"
# 是否保留报告文件 # 是否保留报告文件
keep_files: true keep_files: true
ai:
# AI 审查器配置
# 支持: "ollama" (本地) 或 "api" (在线API)
provider: "api"
# 模型名称(硅基流动可用模型)
model: "Qwen/Qwen2.5-7B-Instruct"
# API 地址
# 硅基流动: https://api.siliconflow.cn/v1
api_url: "https://api.siliconflow.cn/v1"
# API 密钥
api_key: "sk-cqxhnsxdxaalxlykfkjksyinjftdyejnblmgkfxmhwmmvdyu"
# 是否启用 AI 审查
enabled: true
# 每次审查的最大代码行数
max_lines: 200

View File

@@ -10,8 +10,9 @@ import hashlib
import hmac import hmac
import base64 import base64
import logging import logging
import os
import requests import requests
from typing import Dict, Any from typing import Dict, Any, Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -29,10 +30,113 @@ class FeishuNotifier:
self.config = config self.config = config
self.webhook_url = config.get('webhook_url', '') self.webhook_url = config.get('webhook_url', '')
self.secret = config.get('secret', '') self.secret = config.get('secret', '')
# 文件上传配置
self.app_id = config.get('app_id', '')
self.app_secret = config.get('app_secret', '')
self.chat_id = config.get('chat_id', '')
self.attach_report_file = config.get('attach_report_file', True)
# 缓存 token
self._tenant_access_token = None
self._token_expires_at = 0
if not self.webhook_url: if not self.webhook_url:
logger.warning('飞书 Webhook URL 未配置') logger.warning('飞书 Webhook URL 未配置')
def _get_tenant_access_token(self) -> Optional[str]:
"""
获取飞书 tenant_access_token
Returns:
token 字符串,如果失败返回 None
"""
if not self.app_id or not self.app_secret:
return None
# 检查缓存的 token 是否有效
if self._tenant_access_token and time.time() < self._token_expires_at:
return self._tenant_access_token
try:
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {"Content-Type": "application/json; charset=utf-8"}
payload = {
"app_id": self.app_id,
"app_secret": self.app_secret
}
response = requests.post(url, headers=headers, json=payload, timeout=10)
result = response.json()
if result.get("code") == 0:
self._tenant_access_token = result.get("tenant_access_token")
# 提前 5 分钟过期
self._token_expires_at = time.time() + result.get("expire", 7200) - 300
return self._tenant_access_token
else:
logger.error(f"获取 tenant_access_token 失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"获取 tenant_access_token 异常: {str(e)}")
return None
def _upload_file(self, file_path: str, file_name: str) -> Optional[str]:
"""
上传文件到飞书(用于消息中发送)
Args:
file_path: 文件本地路径
file_name: 文件名
Returns:
file_key 用于发送消息,如果失败返回 None
"""
token = self._get_tenant_access_token()
if not token:
logger.error("无法获取 token上传文件失败")
return None
try:
# 使用 im API 上传文件
url = "https://open.feishu.cn/open-apis/im/v1/files"
headers = {
"Authorization": f"Bearer {token}"
}
# 读取文件
with open(file_path, 'rb') as f:
file_content = f.read()
# 获取文件类型
file_ext = os.path.splitext(file_name)[1].lower()
file_type = 'stream' # 默认
# 构建 multipart 请求
files = {
'file': (file_name, file_content, 'application/octet-stream')
}
data = {
'file_name': file_name,
'file_type': file_type
}
response = requests.post(url, headers=headers, files=files, data=data, timeout=60)
result = response.json()
if result.get("code") == 0:
file_key = result.get("data", {}).get("file_key")
logger.info(f"文件上传成功: {file_name}, file_key: {file_key}")
return file_key
else:
logger.error(f"文件上传失败: {result.get('msg')}, code: {result.get('code')}")
return None
except Exception as e:
logger.error(f"文件上传异常: {str(e)}")
return None
def send_report(self, report: Dict[str, Any]) -> bool: def send_report(self, report: Dict[str, Any]) -> bool:
""" """
发送扫描报告到飞书 发送扫描报告到飞书
@@ -48,47 +152,169 @@ class FeishuNotifier:
return False return False
try: try:
# 构建消息内容 # 上传报告文件(如果配置了)
message = self._build_message(report) file_key = None
if self.attach_report_file and self.app_id and self.app_secret:
# 如果配置了签名,则使用签名验证 report_file = report.get('report_file')
if self.secret: if report_file and os.path.exists(report_file):
timestamp, sign = self._generate_sign() file_name = os.path.basename(report_file)
payload = { file_key = self._upload_file(report_file, file_name)
"timestamp": timestamp,
"sign": sign, # 如果配置了 chat_id使用应用机器人发送消息
"msg_type": "interactive", if self.chat_id and self.app_id and self.app_secret:
"card": message # 使用应用机器人 API 发送
} self._send_app_message(report, file_key)
else: else:
payload = { # 使用 Webhook 发送
"msg_type": "interactive", message = self._build_message(report, file_key=file_key)
"card": message self._send_webhook_message(message)
}
# 发送请求 logger.info('飞书消息发送成功')
headers = {'Content-Type': 'application/json'} return True
response = requests.post(
self.webhook_url,
headers=headers,
data=json.dumps(payload).encode('utf-8'),
timeout=30
)
# 解析响应
result = response.json()
if result.get('code') == 0:
logger.info('飞书消息发送成功')
return True
else:
logger.error(f'飞书消息发送失败: {result.get("msg")}')
return False
except Exception as e: except Exception as e:
logger.error(f'发送飞书通知失败: {str(e)}', exc_info=True) logger.error(f'发送飞书通知失败: {str(e)}', exc_info=True)
return False return False
def _send_webhook_message(self, message: Dict[str, Any]) -> bool:
"""使用 Webhook 发送消息"""
# 如果配置了签名,则使用签名验证
if self.secret:
timestamp, sign = self._generate_sign()
payload = {
"timestamp": timestamp,
"sign": sign,
"msg_type": "interactive",
"card": message
}
else:
payload = {
"msg_type": "interactive",
"card": message
}
# 发送请求
headers = {'Content-Type': 'application/json'}
response = requests.post(
self.webhook_url,
headers=headers,
data=json.dumps(payload).encode('utf-8'),
timeout=30
)
# 解析响应
result = response.json()
if result.get('code') == 0:
return True
else:
logger.error(f'飞书消息发送失败: {result.get("msg")}')
return False
def _send_app_message(self, report: Dict[str, Any], file_key: str = None) -> bool:
"""使用应用机器人发送消息(支持文件)"""
token = self._get_tenant_access_token()
if not token:
logger.error("无法获取 token")
return False
# 构建消息内容
basic_info = self._build_basic_info_text(report)
# 构建消息元素
elements = []
# 添加基本信息
elements.append({
"tag": "div",
"text": {
"tag": "lark_md",
"content": basic_info
}
})
# 添加文件
if file_key:
elements.append({
"tag": "file",
"file_key": file_key
})
# 构造消息体
message_content = {
"title": report.get('status_text', '代码扫描报告'),
"elements": elements
}
# 发送消息到群聊
try:
url = "https://open.feishu.cn/open-apis/im/v1/messages"
params = {
"receive_id_type": "chat_id"
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
payload = {
"receive_id": self.chat_id,
"msg_type": "interactive",
"content": json.dumps(message_content)
}
response = requests.post(url, params=params, headers=headers, json=payload, timeout=30)
result = response.json()
if result.get("code") == 0:
logger.info("应用机器人消息发送成功")
return True
else:
logger.error(f"应用机器人消息发送失败: {result.get('msg')}")
return False
except Exception as e:
logger.error(f"应用机器人消息发送异常: {str(e)}")
return False
def _build_basic_info_text(self, report: Dict[str, Any]) -> str:
"""构建基本信息的文本"""
status = report.get('status', 'pass')
if status == 'pass':
status_icon = ''
elif status == 'fail':
status_icon = ''
else:
status_icon = '⚠️'
pr_url = report.get('pr_url')
target_branch = report.get('target_branch')
if pr_url and target_branch:
title = f"{status_icon} PR 代码质量扫描报告"
basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n"
f"**源分支:** `{report.get('branch', 'unknown')}` → **目标分支:** `{target_branch}`\n"
f"**PR链接:** [查看PR]({pr_url})\n"
f"**提交:** `{report.get('commit_id', 'unknown')}`\n"
f"**提交者:** {report.get('author', 'unknown')}")
else:
title = f"{status_icon} 代码质量扫描报告"
basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n"
f"**分支:** `{report.get('branch', 'unknown')}`\n"
f"**提交:** `{report.get('commit_id', 'unknown')}`\n"
f"**提交者:** {report.get('author', 'unknown')}")
total_issues = report.get('total_issues', 0)
total_errors = report.get('total_errors', 0)
total_warnings = report.get('total_warnings', 0)
info = f"{title}\n\n{basic_info}\n\n"
info += f"**扫描状态:** {report.get('status_text', 'unknown')}\n"
info += f"📊 总问题: {total_issues} | 🔴 错误: {total_errors} | 🟡 警告: {total_warnings}\n"
info += f"🕐 扫描时间: {report.get('timestamp', '')}"
return info
def _generate_sign(self) -> tuple: def _generate_sign(self) -> tuple:
""" """
生成飞书签名 生成飞书签名
@@ -113,7 +339,7 @@ class FeishuNotifier:
return timestamp, sign return timestamp, sign
def _build_message(self, report: Dict[str, Any]) -> Dict[str, Any]: def _build_message(self, report: Dict[str, Any], file_key: str = None) -> Dict[str, Any]:
""" """
构建飞书卡片消息 构建飞书卡片消息
@@ -143,8 +369,14 @@ class FeishuNotifier:
# 获取扫描结果详情 # 获取扫描结果详情
scan_details = [] scan_details = []
for scanner_name, result in report.get('scan_results', {}).items(): for scanner_name, result in report.get('scan_results', {}).items():
# AI 审查的 summary 是字符串,跳过
if scanner_name == 'ai':
continue
tool_name = result.get('tool', scanner_name) tool_name = result.get('tool', scanner_name)
summary = result.get('summary', {}) summary = result.get('summary', {})
if not isinstance(summary, dict):
continue
files_scanned = result.get('files_scanned', 0) files_scanned = result.get('files_scanned', 0)
total = summary.get('total', 0) total = summary.get('total', 0)
@@ -257,6 +489,13 @@ class FeishuNotifier:
} }
}) })
# 添加报告文件附件
if file_key:
card["elements"].append({
"tag": "file",
"file_key": file_key
})
return card return card
def send_simple_message(self, title: str, content: str) -> bool: def send_simple_message(self, title: str, content: str) -> bool:

View File

@@ -63,7 +63,13 @@ class ReportGenerator:
total_warnings = 0 total_warnings = 0
for scanner_name, result in scan_results.items(): for scanner_name, result in scan_results.items():
# AI 审查的 summary 是字符串,跳过统计
if scanner_name == 'ai':
continue
summary = result.get('summary', {}) summary = result.get('summary', {})
if not isinstance(summary, dict):
continue
total_issues += summary.get('total', 0) total_issues += summary.get('total', 0)
total_errors += summary.get('error', 0) + summary.get('high', 0) total_errors += summary.get('error', 0) + summary.get('high', 0)
total_warnings += summary.get('warning', 0) + summary.get('medium', 0) total_warnings += summary.get('warning', 0) + summary.get('medium', 0)
@@ -101,8 +107,10 @@ class ReportGenerator:
} }
# 保存报告文件 # 保存报告文件
report_file = None
if self.keep_files: if self.keep_files:
self._save_report(report) report_file = self._save_report(report)
report['report_file'] = report_file
return report return report
@@ -161,6 +169,10 @@ class ReportGenerator:
lines.append('') lines.append('')
for scanner_name, result in scan_results.items(): for scanner_name, result in scan_results.items():
# 跳过 AI 审查结果(单独处理)
if scanner_name == 'ai':
continue
tool_name = result.get('tool', scanner_name) tool_name = result.get('tool', scanner_name)
summary = result.get('summary', {}) summary = result.get('summary', {})
@@ -204,6 +216,57 @@ class ReportGenerator:
lines.append(f' - {message}') lines.append(f' - {message}')
lines.append('') lines.append('')
# AI 审查结果(单独展示)
if 'ai' in scan_results:
ai_result = scan_results['ai']
lines.append('')
lines.append('## 🤖 AI 代码审查')
lines.append('')
lines.append(ai_result.get('summary', '无 AI 审查结果'))
lines.append('')
reviews = ai_result.get('reviews', [])
if reviews:
for i, review in enumerate(reviews, 1):
file_name = review.get('file', 'unknown')
review_content = review.get('review', {})
lines.append(f'### 📄 {file_name}')
lines.append('')
# 优点
advantages = review_content.get('优点', [])
if advantages:
lines.append('**✅ 代码优点:**')
for adv in advantages[:3]:
lines.append(f'- {adv}')
lines.append('')
# 问题
issues = review_content.get('问题', [])
if issues:
lines.append('**⚠️ 需要改进:**')
for issue in issues[:3]:
lines.append(f'- {issue}')
lines.append('')
# 优化建议
optimizations = review_content.get('优化', [])
if optimizations:
lines.append('**💡 优化建议:**')
for opt in optimizations[:3]:
lines.append(f'- {opt}')
lines.append('')
# 原始回复(如果不是 JSON 格式)
raw = review_content.get('raw_review')
if raw:
lines.append('**📝 AI 原始回复:**')
lines.append('```')
lines.append(raw[:500] + '...' if len(raw) > 500 else raw)
lines.append('```')
lines.append('')
# 添加报告链接或下一步操作 # 添加报告链接或下一步操作
lines.append('---') lines.append('---')
lines.append('') lines.append('')
@@ -211,8 +274,13 @@ class ReportGenerator:
return '\n'.join(lines) return '\n'.join(lines)
def _save_report(self, report: Dict[str, Any]): def _save_report(self, report: Dict[str, Any]) -> str:
"""保存报告到文件""" """
保存报告到文件
Returns:
保存的文件路径
"""
try: try:
# 生成文件名 # 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
@@ -225,6 +293,8 @@ class ReportGenerator:
f.write(report['markdown']) f.write(report['markdown'])
logger.info(f'报告已保存: {filepath}') logger.info(f'报告已保存: {filepath}')
return filepath
# 同时保存 JSON 格式(便于程序解析) # 同时保存 JSON 格式(便于程序解析)
json_filename = filename.replace('.md', '.json') json_filename = filename.replace('.md', '.json')

348
scanner/ai_reviewer.py Normal file
View File

@@ -0,0 +1,348 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI 代码审查器
使用大模型进行智能代码审查
"""
import os
import json
import logging
from typing import Dict, Any, List, Optional
from scanner.base import BaseScanner
logger = logging.getLogger(__name__)
class AIReviewer(BaseScanner):
"""AI 代码审查器"""
def __init__(self, config: Dict[str, Any]):
"""
初始化 AI 审查器
Args:
config: AI 配置
"""
# 先初始化基类
super().__init__(config.get('scanner', {}))
self.config = config
self.enabled = config.get('enabled', True)
self.provider = config.get('provider', 'ollama')
self.model = config.get('model', 'llama3')
self.api_url = config.get('api_url', 'http://localhost:11434')
self.api_key = config.get('api_key', '')
self.max_lines = config.get('max_lines', 200)
if not self.enabled:
logger.info('AI 审查器已禁用')
return
logger.info(f'AI 审查器初始化: {self.provider}/{self.model}')
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
"""
执行代码扫描(实现抽象方法)
Args:
repo_url: 仓库 URL
commit_id: 提交 ID
branch: 分支名
Returns:
审查结果
"""
# 调用实际的审查逻辑
return self._do_review(repo_url=repo_url, commit_id=commit_id, branch=branch)
def _do_review(self, clone_dir: str = None, repo_url: str = None,
commit_id: str = None, branch: str = None,
language: str = 'python') -> Dict[str, Any]:
"""
执行 AI 代码审查
Args:
clone_dir: 仓库目录(如果已克隆则直接传入)
repo_url: 仓库 URL如果未克隆则需要传入
commit_id: 提交 ID
branch: 分支名
language: 编程语言
Returns:
审查结果
"""
if not self.enabled:
return {
'enabled': False,
'tool': 'AI Code Reviewer',
'reviews': [],
'summary': 'AI 审查已禁用'
}
try:
# 如果没有传入 clone_dir需要克隆
if not clone_dir and repo_url:
clone_dir = self.clone_repo(repo_url, commit_id, branch)
if not clone_dir or not os.path.exists(clone_dir):
return {
'enabled': True,
'tool': 'AI Code Reviewer',
'reviews': [],
'summary': '无法获取代码目录'
}
# 获取要审查的代码文件
files = self._get_code_files(clone_dir, language)
if not files:
return {
'enabled': True,
'tool': 'AI Code Reviewer',
'reviews': [],
'summary': '未找到可审查的代码文件'
}
# 对每个文件进行 AI 审查
all_reviews = []
for file_path in files[:5]: # 限制最多审查 5 个文件
review = self._review_file(file_path, language)
if review:
all_reviews.append(review)
# 生成总结
summary = self._generate_summary(all_reviews)
return {
'enabled': True,
'tool': 'AI Code Reviewer',
'reviews': all_reviews,
'summary': summary,
'files_reviewed': len(all_reviews),
'clone_dir': clone_dir # 返回 clone_dir 用于后续清理
}
except Exception as e:
logger.error(f'AI 审查失败: {str(e)}')
return {
'enabled': True,
'tool': 'AI Code Reviewer',
'error': str(e),
'reviews': [],
'summary': f'AI 审查出错: {str(e)}'
}
def _get_code_files(self, clone_dir: str, language: str) -> List[str]:
"""获取代码文件列表"""
import glob
extensions = {
'python': ['.py'],
'javascript': ['.js', '.jsx'],
'typescript': ['.ts', '.tsx']
}
exts = extensions.get(language, ['.py'])
files = []
for ext in exts:
pattern = os.path.join(clone_dir, '**', f'*{ext}')
files.extend(glob.glob(pattern, recursive=True))
# 过滤掉测试文件和虚拟环境
files = [f for f in files if not any(x in f for x in [
'test_', '_test.', 'venv', 'node_modules', '__pycache__'
])]
return files[:10] # 最多 10 个文件
def _review_file(self, file_path: str, language: str) -> Optional[Dict[str, Any]]:
"""审查单个文件"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
code = f.read()
# 限制代码行数
lines = code.split('\n')
if len(lines) > self.max_lines:
code = '\n'.join(lines[:self.max_lines])
truncated = True
else:
truncated = False
# 构建 prompt
prompt = self._build_prompt(code, language)
# 调用 AI
response = self._call_ai(prompt)
if not response:
return None
# 解析响应
filename = os.path.basename(file_path)
return {
'file': filename,
'path': file_path,
'truncated': truncated,
'review': response
}
except Exception as e:
logger.warning(f'审查文件失败 {file_path}: {str(e)}')
return None
def _build_prompt(self, code: str, language: str) -> str:
"""构建审查 prompt"""
if language == 'python':
lang_name = 'Python'
elif language in ['javascript', 'typescript']:
lang_name = 'JavaScript/TypeScript'
else:
lang_name = language
prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,并给出:
1. **代码优点** - 写得好地方
2. **问题建议** - 需要改进的地方
3. **优化建议** - 如何让代码更好
请用中文回复,保持简洁,每个文件审查不超过 3 点建议。
以下是代码:
```{language}
{code}
```
请以 JSON 格式输出:
```json
{{
"优点": ["..."],
"问题": ["..."],
"优化": ["..."]
}}
```"""
return prompt
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 AI 服务"""
try:
if self.provider == 'ollama':
return self._call_ollama(prompt)
elif self.provider == 'api':
return self._call_api(prompt)
else:
logger.warning(f'未知的 AI provider: {self.provider}')
return None
except Exception as e:
logger.error(f'AI 调用失败: {str(e)}')
return None
def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 Ollama 本地模型"""
import requests
url = f"{self.api_url}/api/generate"
payload = {
"model": self.model,
"prompt": prompt,
"stream": False,
"format": "json"
}
response = requests.post(url, json=payload, timeout=120)
if response.status_code == 200:
result = response.json()
content = result.get('response', '')
# 尝试解析 JSON
try:
# 提取 JSON 部分
if '```json' in content:
content = content.split('```json')[1].split('```')[0]
elif '```' in content:
content = content.split('```')[1].split('```')[0]
return json.loads(content.strip())
except json.JSONDecodeError:
# 如果不是 JSON直接返回文本
return {'raw_review': content}
logger.warning(f'Ollama 返回错误: {response.status_code}')
return None
def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用在线 API"""
import requests
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 根据 API URL 自动判断 provider
if 'siliconflow' in self.api_url:
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7
}
elif 'deepseek' in self.api_url:
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7
}
else:
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7
}
response = requests.post(url, json=payload, headers=headers, timeout=120)
if response.status_code == 200:
result = response.json()
content = result['choices'][0]['message']['content']
try:
if '```json' in content:
content = content.split('```json')[1].split('```')[0]
elif '```' in content:
content = content.split('```')[1].split('```')[0]
return json.loads(content.strip())
except json.JSONDecodeError:
return {'raw_review': content}
logger.warning(f'API 返回错误: {response.status_code}')
return None
def _generate_summary(self, reviews: List[Dict[str, Any]]) -> str:
"""生成审查总结"""
if not reviews:
return '未找到需要审查的代码'
total_issues = sum(
len(r.get('review', {}).get('问题', [])) +
len(r.get('review', {}).get('优化', []))
for r in reviews
)
files_count = len(reviews)
if total_issues == 0:
return f'✅ AI 审查通过!审查了 {files_count} 个文件,未发现问题'
return f'🤖 AI 审查了 {files_count} 个文件,发现 {total_issues} 个改进建议'

View File

@@ -52,9 +52,12 @@ class BaseScanner(ABC):
repo_name = repo_url.split('/')[-1].replace('.git', '') repo_name = repo_url.split('/')[-1].replace('.git', '')
commit_hash = commit_id or branch commit_hash = commit_id or branch
clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}") clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}")
# 如果目录已存在,先删除
# 如果目录已存在,先删除(带重试机制)
if os.path.exists(clone_dir): if os.path.exists(clone_dir):
shutil.rmtree(clone_dir) self.cleanup(clone_dir)
repo = None
try: try:
logger.info(f'克隆仓库: {repo_url}') logger.info(f'克隆仓库: {repo_url}')
# 克隆仓库(浅克隆,只获取最新提交) # 克隆仓库(浅克隆,只获取最新提交)
@@ -64,26 +67,52 @@ class BaseScanner(ABC):
depth=1, depth=1,
branch=branch branch=branch
) )
# 如果指定了 commit_id切换到该提交 # 如果指定了 commit_id切换到该提交
if commit_id: if commit_id:
repo.git.checkout(commit_id) repo.git.checkout(commit_id)
logger.info(f'仓库克隆成功: {clone_dir}') logger.info(f'仓库克隆成功: {clone_dir}')
return clone_dir return clone_dir
except Exception as e: except Exception as e:
logger.error(f'克隆仓库失败: {str(e)}') logger.error(f'克隆仓库失败: {str(e)}')
raise raise
finally:
# 显式关闭 Repo 对象以释放文件句柄(特别是 Windows
if repo is not None:
repo.close()
def cleanup(self, clone_dir: str): def cleanup(self, clone_dir: str):
""" """
清理临时目录 清理临时目录(带重试机制,处理 Windows 权限问题)
Args: Args:
clone_dir: 克隆的目录路径 clone_dir: 克隆的目录路径
""" """
try: import time
if os.path.exists(clone_dir): import stat
shutil.rmtree(clone_dir)
logger.info(f'清理临时目录: {clone_dir}') def handle_remove_readonly(func, path, exc_info):
except Exception as e: """处理只读文件的删除问题Windows"""
logger.warning(f'清理临时目录失败: {str(e)}') # 添加写权限并重试
os.chmod(path, stat.S_IWRITE)
func(path)
max_retries = 3
retry_delay = 1 # 秒
for attempt in range(max_retries):
try:
if os.path.exists(clone_dir):
# Windows 上使用 onerror 回调处理只读文件
shutil.rmtree(clone_dir, onerror=handle_remove_readonly)
logger.info(f'清理临时目录: {clone_dir}')
return # 成功清理,直接返回
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f'清理临时目录失败,{retry_delay}秒后重试: {str(e)}')
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
else:
logger.warning(f'清理临时目录失败(已重试{max_retries}次): {str(e)}')
def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]: def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]:
""" """
运行命令并返回结果 运行命令并返回结果

View File

@@ -72,10 +72,6 @@ class JavaScriptScanner(BaseScanner):
result['status'] = 'error' result['status'] = 'error'
result['error'] = str(e) result['error'] = str(e)
finally:
# 清理临时目录
if clone_dir:
self.cleanup(clone_dir)
return result return result

View File

@@ -80,10 +80,7 @@ class PythonScanner(BaseScanner):
result['status'] = 'error' result['status'] = 'error'
result['error'] = str(e) result['error'] = str(e)
finally:
# 清理临时目录
if clone_dir:
self.cleanup(clone_dir)
return result return result

View File

@@ -86,11 +86,6 @@ class SecurityScanner(BaseScanner):
result['status'] = 'error' result['status'] = 'error'
result['error'] = str(e) result['error'] = str(e)
finally:
# 清理临时目录
if clone_dir:
self.cleanup(clone_dir)
return result return result
def _run_bandit(self, cwd: str, files: List[str]) -> Dict[str, Any]: def _run_bandit(self, cwd: str, files: List[str]) -> Dict[str, Any]: