From 97881ee00eb3c8ea5629cf6636295aaa45e01188 Mon Sep 17 00:00:00 2001 From: Dang Zerong Date: Fri, 13 Mar 2026 17:32:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=9A=84=E6=89=AB=E6=8F=8F?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 285 ++++++++++++----------------------------- app.py | 21 +++ config.yaml | 2 +- scanner/ai_reviewer.py | 76 +++++------ 4 files changed, 138 insertions(+), 246 deletions(-) diff --git a/README.md b/README.md index 4731168..d80ccbe 100644 --- a/README.md +++ b/README.md @@ -1,224 +1,109 @@ -# AI Code Quality Scanner - 飞书通知版 +# AI 代码质量扫描系统 -一个自动化代码质量扫描系统,在代码提交时自动扫描并发送报告到飞书。 +自动化代码质量扫描工具,监听 PR 事件,自动扫描代码缺陷并提供合并决策支持。 -## 功能特性 - -- 🤖 自动监听 Gitea 代码提交事件 -- 🔍 多维度代码质量扫描(语法、风格、安全) -- 📊 生成 Markdown 格式扫描报告 -- 📱 实时推送飞书机器人通知 - -## 系统架构 +## 工作流程 ``` -┌─────────────┐ Webhook ┌──────────────────┐ -│ Gitea │ ───────────────► │ Webhook Server │ -│ 代码仓库 │ │ (Flask) │ -└─────────────┘ └────────┬─────────┘ - │ - ▼ - ┌──────────────────┐ - │ Code Scanner │ - │ - ESLint │ - │ - Pylint │ - │ - SonarQube │ - └────────┬─────────┘ - │ - ▼ - ┌──────────────────┐ - │ Report Generator│ - │ - Markdown │ - └────────┬─────────┘ - │ - ▼ - ┌──────────────────┐ - │ Feishu Bot │ - │ - Webhook │ - └──────────────────┘ +┌──────────┐ 1. 创建 PR ┌────────────┐ +│ Gitea │ ───────────────► │ Webhook │ +└──────────┘ │ Server │ + └─────┬──────┘ + │ 2. 拉取代码、扫描、存库 + ▼ + ┌────────────┐ + │ SQLite │ + │ Database │ + └────────────┘ + │ 3. 前端查询 + ▼ + ┌────────────┐ + │ 前端页面 │ + └────────────┘ ``` +## 三个核心功能 + +### 1. PR 创建 + +- Gitea 仓库创建 PR 时自动触发扫描 +- 支持事件:`opened`、`reopened`、`synchronize` + +### 2. 后端处理 + +- 拉取 PR 对应的代码 +- 执行代码扫描(Python/JavaScript/TypeScript) +- AI 智能审查代码缺陷 +- 扫描结果存入 SQLite 数据库 + +### 3. 前端功能 + +- 查询所有 PR 及扫描状态 +- 查看每个 PR 的缺陷详情 +- 一键「拒绝合并」或「同意合并」 + ## 快速开始 -### 1. 安装依赖 - ```bash +# 安装依赖 pip install -r requirements.txt -``` -### 2. 配置飞书机器人 - -1. 打开飞书群聊 → 设置 → 群机器人 -2. 添加机器人 → 选择"自定义机器人" -3. 获取 Webhook 地址 -4. 配置 `config.yaml` - -### 3. 配置 Gitea Webhook - -#### 方式一:Push 时扫描(原有方式) - -1. 进入 Gitea 仓库 → 设置 → Webhooks -2. 添加 Webhook: - - 目标 URL: `http://你的服务器IP:5000/webhook/gitea` - - 触发事件: Push - - 密钥: 配置 `config.yaml` 中的 secret - -#### 方式二:PR 创建时扫描(推荐) - -1. 进入 Gitea 仓库 → 设置 → Webhooks -2. 添加 Webhook: - - 目标 URL: `http://你的服务器IP:5000/webhook/gitea` - - 触发事件: Pull Request - - 密钥: 配置 `config.yaml` 中的 secret - -**支持的 PR 事件:** -- `opened` - 创建新 PR -- `reopened` - 重新打开 PR -- `synchronize` - PR 中的提交有更新 -- `ready_for_review` - PR 标记为准备好审查 - -### 4. 运行服务 - -```bash +# 运行服务 python app.py ``` -## 配置说明 +访问 http://localhost:5000 查看前端页面。 -所有配置在 `config.yaml` 中: +## Docker 部署 + +### 1. 构建镜像 + +```bash +docker build -t dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest . +``` + +### 2. 登录仓库 + +```bash +docker login dcr-by1jwyxk44.71826370.xyz +``` + +### 3. Push 到仓库 + +```bash +docker push dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest +``` + +### 4. 使用 docker compose 启动 + +```bash +# 启动服务 +docker compose up -d + +# 查看日志 +docker compose logs -f + +# 停止服务 +docker compose down +``` + +## 配置 + +配置文件 `config.yaml`: ```yaml server: host: "0.0.0.0" port: 5000 - debug: true gitea: - base_url: "http://localhost:3000" - # Webhook 签名密钥 - webhook_secret: "your_webhook_secret" + base_url: "https://code.deep-pilot.chat" + webhook_secret: "xxx" + api_token: "xxx" -feishu: - # 飞书机器人 Webhook 地址 - webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" - # 消息推送 secret(可选,用于签名) - secret: "your_feishu_secret" - -scanner: - # 支持的语言 - languages: - - python - - javascript - - typescript - # 扫描阈值 - max_issues: 10 - # 是否启用详细扫描 - detailed: true - -report: - # 报告保存路径 - output_dir: "./reports" - # 是否保留报告文件 - keep_files: true +ai: + provider: "api" + model: "qwen3.5-plus" + api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1" + api_key: "sk-xxx" ``` - -## 项目结构 - -``` -code-scanner/ -├── app.py # 主应用入口 -├── config.yaml # 配置文件 -├── requirements.txt # Python 依赖 -├── README.md # 项目说明 -├── scanner/ -│ ├── __init__.py -│ ├── base.py # 扫描器基类 -│ ├── python_scanner.py # Python 代码扫描 -│ ├── js_scanner.py # JavaScript/TypeScript 扫描 -│ └── security_scanner.py # 安全扫描 -├── report/ -│ ├── __init__.py -│ └── generator.py # Markdown 报告生成 -├── notify/ -│ ├── __init__.py -│ └── feishu.py # 飞书通知 -├── webhook/ -│ ├── __init__.py -│ └── handler.py # Webhook 处理 -└── reports/ # 报告输出目录 -``` - -## 支持的扫描工具 - -### Python -- **Pylint** - 代码风格和错误检查 -- **Flake8** - Python 代码检查 -- **Bandit** - 安全漏洞扫描 - -### JavaScript/TypeScript -- **ESLint** - JavaScript/TypeScript 检查 -- **Prettier** - 代码格式化 - -## 飞书消息效果 - -扫描完成后,将收到类似以下消息: - -### Push 扫描消息 - -``` -📊 代码质量扫描报告 - -仓库: my-project -分支: main -提交: abc1234 -提交者: developer@example.com - -✅ 扫描通过 (0 issues) -或 -⚠️ 发现问题 (5 issues) -``` - -### PR 扫描消息 - -``` -📊 PR 代码质量扫描报告 - -仓库: my-project -源分支: feature-xxx → 目标分支: main -PR链接: https://gitea.example.com/user/project/pulls/123 -提交: abc1234 -提交者: developer@example.com - -✅ 扫描通过 (0 issues) -或 -⚠️ 发现问题 (5 issues) -``` - -## Docker 部署 - -```dockerfile -FROM python:3.11-slim - -WORKDIR /app -COPY requirements.txt . -RUN pip install -r requirements.txt - -COPY . . -EXPOSE 5000 - -CMD ["python", "app.py"] -``` - -## 环境变量 - -也可以通过环境变量配置: - -```bash -export FEISHU_WEBHOOK_URL="https://open.feishu.cn/..." -export GITEA_WEBHOOK_SECRET="secret" -export SCANNER_MAX_ISSUES=10 -``` - -## 许可证 - -MIT License diff --git a/app.py b/app.py index c5f7eb6..7919fad 100644 --- a/app.py +++ b/app.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import os +import time import logging import traceback from typing import Dict, Tuple, Any @@ -131,21 +132,27 @@ def handle_gitea_webhook(): # Python 扫描 if 'python' in config.get('scanner', {}).get('languages', []): + start_time = time.time() scan_results['python'] = python_scanner.scan( clone_url, commit_id, branch ) + logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒") # JavaScript/TypeScript 扫描 if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): + start_time = time.time() scan_results['javascript'] = js_scanner.scan( clone_url, commit_id, branch ) + logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒") # 安全扫描 + start_time = time.time() scan_results['security'] = security_scanner.scan( clone_url, commit_id, branch ) + logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒") # 生成报告 report = report_generator.generate( @@ -228,27 +235,35 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: # Python 扫描 if 'python' in config.get('scanner', {}).get('languages', []): + start_time = time.time() scan_results['python'] = python_scanner.scan( clone_url, source_sha, source_branch, changed_files ) + logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒") # JavaScript/TypeScript 扫描 if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): + start_time = time.time() scan_results['javascript'] = js_scanner.scan( clone_url, source_sha, source_branch, changed_files ) + logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒") # 安全扫描 + start_time = time.time() scan_results['security'] = security_scanner.scan( clone_url, source_sha, source_branch, changed_files ) + logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒") # AI 代码审查 if config.get('ai', {}).get('enabled', False): + start_time = time.time() scan_results['ai'] = ai_reviewer.scan( clone_url, source_sha, source_branch, changed_files ) + logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}秒") # 获取 PR 的代码差异,用于将问题与代码片段关联 pr_diff = None @@ -326,13 +341,19 @@ def manual_scan(): scan_results = {} if 'python' in config.get('scanner', {}).get('languages', []): + start_time = time.time() scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch) + logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒") if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): + start_time = time.time() scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch) + logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒") + start_time = time.time() scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch) + logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒") # 生成报告 report = report_generator.generate( diff --git a/config.yaml b/config.yaml index 49dd481..0953002 100644 --- a/config.yaml +++ b/config.yaml @@ -58,4 +58,4 @@ ai: # 是否启用 AI 审查 enabled: true # 每次审查的最大代码行数 - max_lines: 200 \ No newline at end of file + max_lines: 100 \ No newline at end of file diff --git a/scanner/ai_reviewer.py b/scanner/ai_reviewer.py index 171abfa..f80281c 100644 --- a/scanner/ai_reviewer.py +++ b/scanner/ai_reviewer.py @@ -30,7 +30,7 @@ class AIReviewer(BaseScanner): self.config = config self.enabled = config.get('enabled', True) - self.provider = config.get('provider', 'ollama') + self.provider = config.get('provider', 'api') self.model = config.get('model', 'llama3') self.api_url = config.get('api_url', 'http://localhost:11434') self.api_key = config.get('api_key', '') @@ -424,13 +424,7 @@ class AIReviewer(BaseScanner): def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]: """调用 AI 服务""" try: - if self.provider == 'ollama': - return self._call_ollama(prompt) - elif self.provider == 'api': - return self._call_api(prompt) - else: - logger.warning(f'未知的 AI provider: {self.provider}') - return None + return self._call_api(prompt) except Exception as e: print("异常追踪信息:", e.__traceback__) logger.error(f'AI 调用失败: {str(e)}') @@ -517,32 +511,6 @@ class AIReviewer(BaseScanner): logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象") return None - def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]: - """调用 Ollama 本地模型""" - import requests - - url = f"{self.api_url}/api/generate" - payload = { - "model": self.model, - "prompt": prompt, - "stream": False, - "format": "json" - } - - logger.info(f"调用 Ollama: {url}, model={self.model}") - response = requests.post(url, json=payload, timeout=120) - - if response.status_code == 200: - result = response.json() - content = result.get('response', '') - logger.info(f"Ollama 返回内容长度: {len(content) if content else 0}") - logger.debug(f"Ollama 返回内容预览: {content[:200] if content else 'empty'}") - parsed = self._extract_json_obj(content) - return parsed - - logger.warning(f'Ollama 返回错误: {response.status_code}') - return None - def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]: """调用在线 API""" import requests @@ -560,7 +528,7 @@ class AIReviewer(BaseScanner): payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], - "max_tokens": 1024*5, + "max_tokens": 1024, "temperature": 0.7 } elif 'deepseek' in self.api_url: @@ -568,25 +536,43 @@ class AIReviewer(BaseScanner): payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], - "max_tokens": 1024*5, + "max_tokens": 1024, "temperature": 0.7 } + elif 'dashscope' in self.api_url: + # 阿里云 dashscope 专用端点 + url = f"{self.api_url}/chat/completions" + payload = { + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 1024, + "temperature": 0.7, + "stream": False # 显式关闭流式 + } else: url = f"{self.api_url}/chat/completions" payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], - "max_tokens": 1024*5, + "max_tokens": 1024, "temperature": 0.7 } - response = requests.post(url, json=payload, headers=headers, timeout=120) + logger.info(f"调用 API: {url}, model={self.model}") - if response.status_code == 200: - result = response.json() - content = result['choices'][0]['message']['content'] - parsed = self._extract_json_obj(content) - return parsed + try: + response = requests.post(url, json=payload, headers=headers, timeout=120) - logger.warning(f'API 返回错误: {response.status_code}') - return None + if response.status_code == 200: + result = response.json() + content = result['choices'][0]['message']['content'] + logger.info(f"API 返回内容长度: {len(content) if content else 0}") + parsed = self._extract_json_obj(content) + return parsed + + logger.warning(f'API 返回错误: {response.status_code}, {response.text[:200]}') + return None + + except Exception as e: + logger.warning(f'API 调用失败: {e}') + return None