From 17306c6814e1683a197018ca253e212d4c9f97fe Mon Sep 17 00:00:00 2001 From: Dang Zerong Date: Tue, 10 Mar 2026 17:22:07 +0800 Subject: [PATCH] init --- app.py | 8 + config.yaml | 27 ++- notify/feishu.py | 311 ++++++++++++++++++++++++++++---- report/generator.py | 76 +++++++- scanner/ai_reviewer.py | 348 ++++++++++++++++++++++++++++++++++++ scanner/base.py | 47 ++++- scanner/js_scanner.py | 4 - scanner/python_scanner.py | 5 +- scanner/security_scanner.py | 5 - 9 files changed, 769 insertions(+), 62 deletions(-) create mode 100644 scanner/ai_reviewer.py diff --git a/app.py b/app.py index 0de1f35..10653bc 100644 --- a/app.py +++ b/app.py @@ -14,6 +14,7 @@ from webhook.handler import GiteaWebhookHandler from scanner.python_scanner import PythonScanner from scanner.js_scanner import JavaScriptScanner from scanner.security_scanner import SecurityScanner +from scanner.ai_reviewer import AIReviewer from report.generator import ReportGenerator from notify.feishu import FeishuNotifier @@ -43,6 +44,7 @@ webhook_handler = GiteaWebhookHandler(config['gitea']) python_scanner = PythonScanner(config.get('scanner', {})) js_scanner = JavaScriptScanner(config.get('scanner', {})) security_scanner = SecurityScanner(config.get('scanner', {})) +ai_reviewer = AIReviewer(config.get('ai', {})) report_generator = ReportGenerator(config.get('report', {})) feishu_notifier = FeishuNotifier(config['feishu']) @@ -226,6 +228,12 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: clone_url, source_sha, source_branch ) + # AI 代码审查 + if config.get('ai', {}).get('enabled', False): + scan_results['ai'] = ai_reviewer.scan( + clone_url, source_sha, source_branch + ) + # 生成报告 commit_message = f'PR #{pr_number}: {pr_title}' report = report_generator.generate( diff --git a/config.yaml b/config.yaml index 272251c..80f1181 100644 --- a/config.yaml +++ b/config.yaml @@ -11,9 +11,18 @@ gitea: feishu: # 飞书机器人 Webhook 地址(替换为你的实际地址) - webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/c436570a-e6af-49a1-867d-4331c0f1cb06" + #webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/636258bb-5f6e-40aa-aca3-10e61381325e" # 飞书消息签名密钥(可选) secret: "" + # 飞书应用配置(用于发送文件) + # 如果需要发送文件,需要在飞书开放平台创建应用并获取以下配置 + app_id: "cli_a9256d9d657b9bce" + app_secret: "4rsELdjStVuWnklxn0PLDbC0WPrSaKyN" + # 发送目标的群聊 ID(应用机器人发送文件需要群聊 ID) + # 在群聊中添加机器人后,使用 https://open.feishu.cn/document/ukTMukTMukTM/uADOwUjLwgDM14CM4ATN 获取群 ID + chat_id: "oc_313d71d460a851f31b7ddd0aca14c5b0" + # 是否在通知中附加报告文件 + attach_report_file: true scanner: # 支持的编程语言 @@ -33,3 +42,19 @@ report: output_dir: "./reports" # 是否保留报告文件 keep_files: true + +ai: + # AI 审查器配置 + # 支持: "ollama" (本地) 或 "api" (在线API) + provider: "api" + # 模型名称(硅基流动可用模型) + model: "Qwen/Qwen2.5-7B-Instruct" + # API 地址 + # 硅基流动: https://api.siliconflow.cn/v1 + api_url: "https://api.siliconflow.cn/v1" + # API 密钥 + api_key: "sk-cqxhnsxdxaalxlykfkjksyinjftdyejnblmgkfxmhwmmvdyu" + # 是否启用 AI 审查 + enabled: true + # 每次审查的最大代码行数 + max_lines: 200 \ No newline at end of file diff --git a/notify/feishu.py b/notify/feishu.py index 69a289f..f974cb5 100644 --- a/notify/feishu.py +++ b/notify/feishu.py @@ -10,8 +10,9 @@ import hashlib import hmac import base64 import logging +import os import requests -from typing import Dict, Any +from typing import Dict, Any, Optional logger = logging.getLogger(__name__) @@ -29,10 +30,113 @@ class FeishuNotifier: self.config = config self.webhook_url = config.get('webhook_url', '') self.secret = config.get('secret', '') + + # 文件上传配置 + self.app_id = config.get('app_id', '') + self.app_secret = config.get('app_secret', '') + self.chat_id = config.get('chat_id', '') + self.attach_report_file = config.get('attach_report_file', True) + + # 缓存 token + self._tenant_access_token = None + self._token_expires_at = 0 if not self.webhook_url: logger.warning('飞书 Webhook URL 未配置') + def _get_tenant_access_token(self) -> Optional[str]: + """ + 获取飞书 tenant_access_token + + Returns: + token 字符串,如果失败返回 None + """ + if not self.app_id or not self.app_secret: + return None + + # 检查缓存的 token 是否有效 + if self._tenant_access_token and time.time() < self._token_expires_at: + return self._tenant_access_token + + try: + url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" + headers = {"Content-Type": "application/json; charset=utf-8"} + payload = { + "app_id": self.app_id, + "app_secret": self.app_secret + } + + response = requests.post(url, headers=headers, json=payload, timeout=10) + result = response.json() + + if result.get("code") == 0: + self._tenant_access_token = result.get("tenant_access_token") + # 提前 5 分钟过期 + self._token_expires_at = time.time() + result.get("expire", 7200) - 300 + return self._tenant_access_token + else: + logger.error(f"获取 tenant_access_token 失败: {result.get('msg')}") + return None + + except Exception as e: + logger.error(f"获取 tenant_access_token 异常: {str(e)}") + return None + + def _upload_file(self, file_path: str, file_name: str) -> Optional[str]: + """ + 上传文件到飞书(用于消息中发送) + + Args: + file_path: 文件本地路径 + file_name: 文件名 + + Returns: + file_key 用于发送消息,如果失败返回 None + """ + token = self._get_tenant_access_token() + if not token: + logger.error("无法获取 token,上传文件失败") + return None + + try: + # 使用 im API 上传文件 + url = "https://open.feishu.cn/open-apis/im/v1/files" + headers = { + "Authorization": f"Bearer {token}" + } + + # 读取文件 + with open(file_path, 'rb') as f: + file_content = f.read() + + # 获取文件类型 + file_ext = os.path.splitext(file_name)[1].lower() + file_type = 'stream' # 默认 + + # 构建 multipart 请求 + files = { + 'file': (file_name, file_content, 'application/octet-stream') + } + data = { + 'file_name': file_name, + 'file_type': file_type + } + + response = requests.post(url, headers=headers, files=files, data=data, timeout=60) + result = response.json() + + if result.get("code") == 0: + file_key = result.get("data", {}).get("file_key") + logger.info(f"文件上传成功: {file_name}, file_key: {file_key}") + return file_key + else: + logger.error(f"文件上传失败: {result.get('msg')}, code: {result.get('code')}") + return None + + except Exception as e: + logger.error(f"文件上传异常: {str(e)}") + return None + def send_report(self, report: Dict[str, Any]) -> bool: """ 发送扫描报告到飞书 @@ -48,47 +152,169 @@ class FeishuNotifier: return False try: - # 构建消息内容 - message = self._build_message(report) - - # 如果配置了签名,则使用签名验证 - if self.secret: - timestamp, sign = self._generate_sign() - payload = { - "timestamp": timestamp, - "sign": sign, - "msg_type": "interactive", - "card": message - } + # 上传报告文件(如果配置了) + file_key = None + if self.attach_report_file and self.app_id and self.app_secret: + report_file = report.get('report_file') + if report_file and os.path.exists(report_file): + file_name = os.path.basename(report_file) + file_key = self._upload_file(report_file, file_name) + + # 如果配置了 chat_id,使用应用机器人发送消息 + if self.chat_id and self.app_id and self.app_secret: + # 使用应用机器人 API 发送 + self._send_app_message(report, file_key) else: - payload = { - "msg_type": "interactive", - "card": message - } + # 使用 Webhook 发送 + message = self._build_message(report, file_key=file_key) + self._send_webhook_message(message) - # 发送请求 - headers = {'Content-Type': 'application/json'} - response = requests.post( - self.webhook_url, - headers=headers, - data=json.dumps(payload).encode('utf-8'), - timeout=30 - ) - - # 解析响应 - result = response.json() - - if result.get('code') == 0: - logger.info('飞书消息发送成功') - return True - else: - logger.error(f'飞书消息发送失败: {result.get("msg")}') - return False + logger.info('飞书消息发送成功') + return True except Exception as e: logger.error(f'发送飞书通知失败: {str(e)}', exc_info=True) return False + def _send_webhook_message(self, message: Dict[str, Any]) -> bool: + """使用 Webhook 发送消息""" + # 如果配置了签名,则使用签名验证 + if self.secret: + timestamp, sign = self._generate_sign() + payload = { + "timestamp": timestamp, + "sign": sign, + "msg_type": "interactive", + "card": message + } + else: + payload = { + "msg_type": "interactive", + "card": message + } + + # 发送请求 + headers = {'Content-Type': 'application/json'} + response = requests.post( + self.webhook_url, + headers=headers, + data=json.dumps(payload).encode('utf-8'), + timeout=30 + ) + + # 解析响应 + result = response.json() + + if result.get('code') == 0: + return True + else: + logger.error(f'飞书消息发送失败: {result.get("msg")}') + return False + + def _send_app_message(self, report: Dict[str, Any], file_key: str = None) -> bool: + """使用应用机器人发送消息(支持文件)""" + token = self._get_tenant_access_token() + if not token: + logger.error("无法获取 token") + return False + + # 构建消息内容 + basic_info = self._build_basic_info_text(report) + + # 构建消息元素 + elements = [] + + # 添加基本信息 + elements.append({ + "tag": "div", + "text": { + "tag": "lark_md", + "content": basic_info + } + }) + + # 添加文件 + if file_key: + elements.append({ + "tag": "file", + "file_key": file_key + }) + + # 构造消息体 + message_content = { + "title": report.get('status_text', '代码扫描报告'), + "elements": elements + } + + # 发送消息到群聊 + try: + url = "https://open.feishu.cn/open-apis/im/v1/messages" + params = { + "receive_id_type": "chat_id" + } + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json; charset=utf-8" + } + + payload = { + "receive_id": self.chat_id, + "msg_type": "interactive", + "content": json.dumps(message_content) + } + + response = requests.post(url, params=params, headers=headers, json=payload, timeout=30) + result = response.json() + + if result.get("code") == 0: + logger.info("应用机器人消息发送成功") + return True + else: + logger.error(f"应用机器人消息发送失败: {result.get('msg')}") + return False + + except Exception as e: + logger.error(f"应用机器人消息发送异常: {str(e)}") + return False + + def _build_basic_info_text(self, report: Dict[str, Any]) -> str: + """构建基本信息的文本""" + status = report.get('status', 'pass') + if status == 'pass': + status_icon = '✅' + elif status == 'fail': + status_icon = '❌' + else: + status_icon = '⚠️' + + pr_url = report.get('pr_url') + target_branch = report.get('target_branch') + + if pr_url and target_branch: + title = f"{status_icon} PR 代码质量扫描报告" + basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n" + f"**源分支:** `{report.get('branch', 'unknown')}` → **目标分支:** `{target_branch}`\n" + f"**PR链接:** [查看PR]({pr_url})\n" + f"**提交:** `{report.get('commit_id', 'unknown')}`\n" + f"**提交者:** {report.get('author', 'unknown')}") + else: + title = f"{status_icon} 代码质量扫描报告" + basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n" + f"**分支:** `{report.get('branch', 'unknown')}`\n" + f"**提交:** `{report.get('commit_id', 'unknown')}`\n" + f"**提交者:** {report.get('author', 'unknown')}") + + total_issues = report.get('total_issues', 0) + total_errors = report.get('total_errors', 0) + total_warnings = report.get('total_warnings', 0) + + info = f"{title}\n\n{basic_info}\n\n" + info += f"**扫描状态:** {report.get('status_text', 'unknown')}\n" + info += f"📊 总问题: {total_issues} | 🔴 错误: {total_errors} | 🟡 警告: {total_warnings}\n" + info += f"🕐 扫描时间: {report.get('timestamp', '')}" + + return info + def _generate_sign(self) -> tuple: """ 生成飞书签名 @@ -113,7 +339,7 @@ class FeishuNotifier: return timestamp, sign - def _build_message(self, report: Dict[str, Any]) -> Dict[str, Any]: + def _build_message(self, report: Dict[str, Any], file_key: str = None) -> Dict[str, Any]: """ 构建飞书卡片消息 @@ -143,8 +369,14 @@ class FeishuNotifier: # 获取扫描结果详情 scan_details = [] for scanner_name, result in report.get('scan_results', {}).items(): + # AI 审查的 summary 是字符串,跳过 + if scanner_name == 'ai': + continue + tool_name = result.get('tool', scanner_name) summary = result.get('summary', {}) + if not isinstance(summary, dict): + continue files_scanned = result.get('files_scanned', 0) total = summary.get('total', 0) @@ -257,6 +489,13 @@ class FeishuNotifier: } }) + # 添加报告文件附件 + if file_key: + card["elements"].append({ + "tag": "file", + "file_key": file_key + }) + return card def send_simple_message(self, title: str, content: str) -> bool: diff --git a/report/generator.py b/report/generator.py index 1b674f7..6b41972 100644 --- a/report/generator.py +++ b/report/generator.py @@ -63,7 +63,13 @@ class ReportGenerator: total_warnings = 0 for scanner_name, result in scan_results.items(): + # AI 审查的 summary 是字符串,跳过统计 + if scanner_name == 'ai': + continue + summary = result.get('summary', {}) + if not isinstance(summary, dict): + continue total_issues += summary.get('total', 0) total_errors += summary.get('error', 0) + summary.get('high', 0) total_warnings += summary.get('warning', 0) + summary.get('medium', 0) @@ -101,8 +107,10 @@ class ReportGenerator: } # 保存报告文件 + report_file = None if self.keep_files: - self._save_report(report) + report_file = self._save_report(report) + report['report_file'] = report_file return report @@ -161,6 +169,10 @@ class ReportGenerator: lines.append('') for scanner_name, result in scan_results.items(): + # 跳过 AI 审查结果(单独处理) + if scanner_name == 'ai': + continue + tool_name = result.get('tool', scanner_name) summary = result.get('summary', {}) @@ -204,6 +216,57 @@ class ReportGenerator: lines.append(f' - {message}') lines.append('') + # AI 审查结果(单独展示) + if 'ai' in scan_results: + ai_result = scan_results['ai'] + lines.append('') + lines.append('## 🤖 AI 代码审查') + lines.append('') + lines.append(ai_result.get('summary', '无 AI 审查结果')) + lines.append('') + + reviews = ai_result.get('reviews', []) + if reviews: + for i, review in enumerate(reviews, 1): + file_name = review.get('file', 'unknown') + review_content = review.get('review', {}) + + lines.append(f'### 📄 {file_name}') + lines.append('') + + # 优点 + advantages = review_content.get('优点', []) + if advantages: + lines.append('**✅ 代码优点:**') + for adv in advantages[:3]: + lines.append(f'- {adv}') + lines.append('') + + # 问题 + issues = review_content.get('问题', []) + if issues: + lines.append('**⚠️ 需要改进:**') + for issue in issues[:3]: + lines.append(f'- {issue}') + lines.append('') + + # 优化建议 + optimizations = review_content.get('优化', []) + if optimizations: + lines.append('**💡 优化建议:**') + for opt in optimizations[:3]: + lines.append(f'- {opt}') + lines.append('') + + # 原始回复(如果不是 JSON 格式) + raw = review_content.get('raw_review') + if raw: + lines.append('**📝 AI 原始回复:**') + lines.append('```') + lines.append(raw[:500] + '...' if len(raw) > 500 else raw) + lines.append('```') + lines.append('') + # 添加报告链接或下一步操作 lines.append('---') lines.append('') @@ -211,8 +274,13 @@ class ReportGenerator: return '\n'.join(lines) - def _save_report(self, report: Dict[str, Any]): - """保存报告到文件""" + def _save_report(self, report: Dict[str, Any]) -> str: + """ + 保存报告到文件 + + Returns: + 保存的文件路径 + """ try: # 生成文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') @@ -225,6 +293,8 @@ class ReportGenerator: f.write(report['markdown']) logger.info(f'报告已保存: {filepath}') + + return filepath # 同时保存 JSON 格式(便于程序解析) json_filename = filename.replace('.md', '.json') diff --git a/scanner/ai_reviewer.py b/scanner/ai_reviewer.py new file mode 100644 index 0000000..2faeacf --- /dev/null +++ b/scanner/ai_reviewer.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +AI 代码审查器 +使用大模型进行智能代码审查 +""" +import os +import json +import logging +from typing import Dict, Any, List, Optional + +from scanner.base import BaseScanner + +logger = logging.getLogger(__name__) + + +class AIReviewer(BaseScanner): + """AI 代码审查器""" + + def __init__(self, config: Dict[str, Any]): + """ + 初始化 AI 审查器 + + Args: + config: AI 配置 + """ + # 先初始化基类 + super().__init__(config.get('scanner', {})) + + self.config = config + self.enabled = config.get('enabled', True) + self.provider = config.get('provider', 'ollama') + self.model = config.get('model', 'llama3') + self.api_url = config.get('api_url', 'http://localhost:11434') + self.api_key = config.get('api_key', '') + self.max_lines = config.get('max_lines', 200) + + if not self.enabled: + logger.info('AI 审查器已禁用') + return + + logger.info(f'AI 审查器初始化: {self.provider}/{self.model}') + + def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + """ + 执行代码扫描(实现抽象方法) + + Args: + repo_url: 仓库 URL + commit_id: 提交 ID + branch: 分支名 + + Returns: + 审查结果 + """ + # 调用实际的审查逻辑 + return self._do_review(repo_url=repo_url, commit_id=commit_id, branch=branch) + + def _do_review(self, clone_dir: str = None, repo_url: str = None, + commit_id: str = None, branch: str = None, + language: str = 'python') -> Dict[str, Any]: + """ + 执行 AI 代码审查 + + Args: + clone_dir: 仓库目录(如果已克隆则直接传入) + repo_url: 仓库 URL(如果未克隆则需要传入) + commit_id: 提交 ID + branch: 分支名 + language: 编程语言 + + Returns: + 审查结果 + """ + if not self.enabled: + return { + 'enabled': False, + 'tool': 'AI Code Reviewer', + 'reviews': [], + 'summary': 'AI 审查已禁用' + } + + try: + # 如果没有传入 clone_dir,需要克隆 + if not clone_dir and repo_url: + clone_dir = self.clone_repo(repo_url, commit_id, branch) + + if not clone_dir or not os.path.exists(clone_dir): + return { + 'enabled': True, + 'tool': 'AI Code Reviewer', + 'reviews': [], + 'summary': '无法获取代码目录' + } + + # 获取要审查的代码文件 + files = self._get_code_files(clone_dir, language) + + if not files: + return { + 'enabled': True, + 'tool': 'AI Code Reviewer', + 'reviews': [], + 'summary': '未找到可审查的代码文件' + } + + # 对每个文件进行 AI 审查 + all_reviews = [] + for file_path in files[:5]: # 限制最多审查 5 个文件 + review = self._review_file(file_path, language) + if review: + all_reviews.append(review) + + # 生成总结 + summary = self._generate_summary(all_reviews) + + return { + 'enabled': True, + 'tool': 'AI Code Reviewer', + 'reviews': all_reviews, + 'summary': summary, + 'files_reviewed': len(all_reviews), + 'clone_dir': clone_dir # 返回 clone_dir 用于后续清理 + } + + except Exception as e: + logger.error(f'AI 审查失败: {str(e)}') + return { + 'enabled': True, + 'tool': 'AI Code Reviewer', + 'error': str(e), + 'reviews': [], + 'summary': f'AI 审查出错: {str(e)}' + } + + def _get_code_files(self, clone_dir: str, language: str) -> List[str]: + """获取代码文件列表""" + import glob + + extensions = { + 'python': ['.py'], + 'javascript': ['.js', '.jsx'], + 'typescript': ['.ts', '.tsx'] + } + + exts = extensions.get(language, ['.py']) + files = [] + + for ext in exts: + pattern = os.path.join(clone_dir, '**', f'*{ext}') + files.extend(glob.glob(pattern, recursive=True)) + + # 过滤掉测试文件和虚拟环境 + files = [f for f in files if not any(x in f for x in [ + 'test_', '_test.', 'venv', 'node_modules', '__pycache__' + ])] + + return files[:10] # 最多 10 个文件 + + def _review_file(self, file_path: str, language: str) -> Optional[Dict[str, Any]]: + """审查单个文件""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + code = f.read() + + # 限制代码行数 + lines = code.split('\n') + if len(lines) > self.max_lines: + code = '\n'.join(lines[:self.max_lines]) + truncated = True + else: + truncated = False + + # 构建 prompt + prompt = self._build_prompt(code, language) + + # 调用 AI + response = self._call_ai(prompt) + + if not response: + return None + + # 解析响应 + filename = os.path.basename(file_path) + return { + 'file': filename, + 'path': file_path, + 'truncated': truncated, + 'review': response + } + + except Exception as e: + logger.warning(f'审查文件失败 {file_path}: {str(e)}') + return None + + def _build_prompt(self, code: str, language: str) -> str: + """构建审查 prompt""" + if language == 'python': + lang_name = 'Python' + elif language in ['javascript', 'typescript']: + lang_name = 'JavaScript/TypeScript' + else: + lang_name = language + + prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,并给出: + +1. **代码优点** - 写得好地方 +2. **问题建议** - 需要改进的地方 +3. **优化建议** - 如何让代码更好 + +请用中文回复,保持简洁,每个文件审查不超过 3 点建议。 + +以下是代码: +```{language} +{code} +``` + +请以 JSON 格式输出: +```json +{{ + "优点": ["..."], + "问题": ["..."], + "优化": ["..."] +}} +```""" + return prompt + + def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]: + """调用 AI 服务""" + try: + if self.provider == 'ollama': + return self._call_ollama(prompt) + elif self.provider == 'api': + return self._call_api(prompt) + else: + logger.warning(f'未知的 AI provider: {self.provider}') + return None + except Exception as e: + logger.error(f'AI 调用失败: {str(e)}') + return None + + def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]: + """调用 Ollama 本地模型""" + import requests + + url = f"{self.api_url}/api/generate" + payload = { + "model": self.model, + "prompt": prompt, + "stream": False, + "format": "json" + } + + response = requests.post(url, json=payload, timeout=120) + + if response.status_code == 200: + result = response.json() + content = result.get('response', '') + + # 尝试解析 JSON + try: + # 提取 JSON 部分 + if '```json' in content: + content = content.split('```json')[1].split('```')[0] + elif '```' in content: + content = content.split('```')[1].split('```')[0] + + return json.loads(content.strip()) + except json.JSONDecodeError: + # 如果不是 JSON,直接返回文本 + return {'raw_review': content} + + logger.warning(f'Ollama 返回错误: {response.status_code}') + return None + + def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]: + """调用在线 API""" + import requests + + headers = { + 'Content-Type': 'application/json' + } + + if self.api_key: + headers['Authorization'] = f'Bearer {self.api_key}' + + # 根据 API URL 自动判断 provider + if 'siliconflow' in self.api_url: + url = f"{self.api_url}/chat/completions" + payload = { + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 1024, + "temperature": 0.7 + } + elif 'deepseek' in self.api_url: + url = f"{self.api_url}/chat/completions" + payload = { + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 1024, + "temperature": 0.7 + } + else: + url = f"{self.api_url}/chat/completions" + payload = { + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 1024, + "temperature": 0.7 + } + + response = requests.post(url, json=payload, headers=headers, timeout=120) + + if response.status_code == 200: + result = response.json() + content = result['choices'][0]['message']['content'] + + try: + if '```json' in content: + content = content.split('```json')[1].split('```')[0] + elif '```' in content: + content = content.split('```')[1].split('```')[0] + + return json.loads(content.strip()) + except json.JSONDecodeError: + return {'raw_review': content} + + logger.warning(f'API 返回错误: {response.status_code}') + return None + + def _generate_summary(self, reviews: List[Dict[str, Any]]) -> str: + """生成审查总结""" + if not reviews: + return '未找到需要审查的代码' + + total_issues = sum( + len(r.get('review', {}).get('问题', [])) + + len(r.get('review', {}).get('优化', [])) + for r in reviews + ) + + files_count = len(reviews) + + if total_issues == 0: + return f'✅ AI 审查通过!审查了 {files_count} 个文件,未发现问题' + + return f'🤖 AI 审查了 {files_count} 个文件,发现 {total_issues} 个改进建议' diff --git a/scanner/base.py b/scanner/base.py index 0a93395..7528661 100644 --- a/scanner/base.py +++ b/scanner/base.py @@ -52,9 +52,12 @@ class BaseScanner(ABC): repo_name = repo_url.split('/')[-1].replace('.git', '') commit_hash = commit_id or branch clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}") - # 如果目录已存在,先删除 + + # 如果目录已存在,先删除(带重试机制) if os.path.exists(clone_dir): - shutil.rmtree(clone_dir) + self.cleanup(clone_dir) + + repo = None try: logger.info(f'克隆仓库: {repo_url}') # 克隆仓库(浅克隆,只获取最新提交) @@ -64,26 +67,52 @@ class BaseScanner(ABC): depth=1, branch=branch ) + # 如果指定了 commit_id,切换到该提交 if commit_id: repo.git.checkout(commit_id) + logger.info(f'仓库克隆成功: {clone_dir}') return clone_dir except Exception as e: logger.error(f'克隆仓库失败: {str(e)}') raise + finally: + # 显式关闭 Repo 对象以释放文件句柄(特别是 Windows) + if repo is not None: + repo.close() def cleanup(self, clone_dir: str): """ - 清理临时目录 + 清理临时目录(带重试机制,处理 Windows 权限问题) Args: clone_dir: 克隆的目录路径 """ - try: - if os.path.exists(clone_dir): - shutil.rmtree(clone_dir) - logger.info(f'清理临时目录: {clone_dir}') - except Exception as e: - logger.warning(f'清理临时目录失败: {str(e)}') + import time + import stat + + def handle_remove_readonly(func, path, exc_info): + """处理只读文件的删除问题(Windows)""" + # 添加写权限并重试 + os.chmod(path, stat.S_IWRITE) + func(path) + + max_retries = 3 + retry_delay = 1 # 秒 + + for attempt in range(max_retries): + try: + if os.path.exists(clone_dir): + # Windows 上使用 onerror 回调处理只读文件 + shutil.rmtree(clone_dir, onerror=handle_remove_readonly) + logger.info(f'清理临时目录: {clone_dir}') + return # 成功清理,直接返回 + except Exception as e: + if attempt < max_retries - 1: + logger.warning(f'清理临时目录失败,{retry_delay}秒后重试: {str(e)}') + time.sleep(retry_delay) + retry_delay *= 2 # 指数退避 + else: + logger.warning(f'清理临时目录失败(已重试{max_retries}次): {str(e)}') def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]: """ 运行命令并返回结果 diff --git a/scanner/js_scanner.py b/scanner/js_scanner.py index d290e26..128c818 100644 --- a/scanner/js_scanner.py +++ b/scanner/js_scanner.py @@ -72,10 +72,6 @@ class JavaScriptScanner(BaseScanner): result['status'] = 'error' result['error'] = str(e) - finally: - # 清理临时目录 - if clone_dir: - self.cleanup(clone_dir) return result diff --git a/scanner/python_scanner.py b/scanner/python_scanner.py index c2293c7..30c829c 100644 --- a/scanner/python_scanner.py +++ b/scanner/python_scanner.py @@ -80,10 +80,7 @@ class PythonScanner(BaseScanner): result['status'] = 'error' result['error'] = str(e) - finally: - # 清理临时目录 - if clone_dir: - self.cleanup(clone_dir) + return result diff --git a/scanner/security_scanner.py b/scanner/security_scanner.py index 0a68a4f..de515e4 100644 --- a/scanner/security_scanner.py +++ b/scanner/security_scanner.py @@ -86,11 +86,6 @@ class SecurityScanner(BaseScanner): result['status'] = 'error' result['error'] = str(e) - finally: - # 清理临时目录 - if clone_dir: - self.cleanup(clone_dir) - return result def _run_bandit(self, cwd: str, files: List[str]) -> Dict[str, Any]: