测试的扫描文件

This commit is contained in:
Dang Zerong
2026-03-13 17:32:23 +08:00
parent 887c8ae154
commit 97881ee00e
4 changed files with 138 additions and 246 deletions

281
README.md
View File

@@ -1,224 +1,109 @@
# AI Code Quality Scanner - 飞书通知版 # AI 代码质量扫描系统
一个自动化代码质量扫描系统,在代码提交时自动扫描并发送报告到飞书 自动化代码质量扫描工具,监听 PR 事件,自动扫描代码缺陷并提供合并决策支持
## 功能特性 ## 工作流程
- 🤖 自动监听 Gitea 代码提交事件
- 🔍 多维度代码质量扫描(语法、风格、安全)
- 📊 生成 Markdown 格式扫描报告
- 📱 实时推送飞书机器人通知
## 系统架构
``` ```
┌───────────── Webhook ┌──────────────────┐ ┌──────────┐ 1. 创建 PR ┌────────────┐
Gitea │ ───────────────► │ Webhook Server │ Gitea │ ───────────────► │ Webhook │
│ 代码仓库 │ (Flask) └──────────┘Server
└─────────────┘ └────────┬─────────┘ └───────────┘
│ 2. 拉取代码、扫描、存库
┌──────────────────┐ ────────────┐
│ Code Scanner │ SQLite
│ - ESLint │ Database
│ - Pylint │ └────────────┘
- SonarQube │ │ 3. 前端查询
└────────┬─────────┘
┌──────────────────┐ ────────────┐
│ Report Generator 前端页面
│ - Markdown │ └────────────┘
└────────┬─────────┘
┌──────────────────┐
│ Feishu Bot │
│ - Webhook │
└──────────────────┘
``` ```
## 三个核心功能
### 1. PR 创建
- Gitea 仓库创建 PR 时自动触发扫描
- 支持事件:`opened``reopened``synchronize`
### 2. 后端处理
- 拉取 PR 对应的代码
- 执行代码扫描Python/JavaScript/TypeScript
- AI 智能审查代码缺陷
- 扫描结果存入 SQLite 数据库
### 3. 前端功能
- 查询所有 PR 及扫描状态
- 查看每个 PR 的缺陷详情
- 一键「拒绝合并」或「同意合并」
## 快速开始 ## 快速开始
### 1. 安装依赖
```bash ```bash
# 安装依赖
pip install -r requirements.txt pip install -r requirements.txt
```
### 2. 配置飞书机器人 # 运行服务
1. 打开飞书群聊 → 设置 → 群机器人
2. 添加机器人 → 选择"自定义机器人"
3. 获取 Webhook 地址
4. 配置 `config.yaml`
### 3. 配置 Gitea Webhook
#### 方式一Push 时扫描(原有方式)
1. 进入 Gitea 仓库 → 设置 → Webhooks
2. 添加 Webhook
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 触发事件: Push
- 密钥: 配置 `config.yaml` 中的 secret
#### 方式二PR 创建时扫描(推荐)
1. 进入 Gitea 仓库 → 设置 → Webhooks
2. 添加 Webhook
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 触发事件: Pull Request
- 密钥: 配置 `config.yaml` 中的 secret
**支持的 PR 事件:**
- `opened` - 创建新 PR
- `reopened` - 重新打开 PR
- `synchronize` - PR 中的提交有更新
- `ready_for_review` - PR 标记为准备好审查
### 4. 运行服务
```bash
python app.py python app.py
``` ```
## 配置说明 访问 http://localhost:5000 查看前端页面。
所有配置在 `config.yaml` 中: ## Docker 部署
### 1. 构建镜像
```bash
docker build -t dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest .
```
### 2. 登录仓库
```bash
docker login dcr-by1jwyxk44.71826370.xyz
```
### 3. Push 到仓库
```bash
docker push dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
```
### 4. 使用 docker compose 启动
```bash
# 启动服务
docker compose up -d
# 查看日志
docker compose logs -f
# 停止服务
docker compose down
```
## 配置
配置文件 `config.yaml`
```yaml ```yaml
server: server:
host: "0.0.0.0" host: "0.0.0.0"
port: 5000 port: 5000
debug: true
gitea: gitea:
base_url: "http://localhost:3000" base_url: "https://code.deep-pilot.chat"
# Webhook 签名密钥 webhook_secret: "xxx"
webhook_secret: "your_webhook_secret" api_token: "xxx"
feishu: ai:
# 飞书机器人 Webhook 地址 provider: "api"
webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" model: "qwen3.5-plus"
# 消息推送 secret可选用于签名 api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
secret: "your_feishu_secret" api_key: "sk-xxx"
scanner:
# 支持的语言
languages:
- python
- javascript
- typescript
# 扫描阈值
max_issues: 10
# 是否启用详细扫描
detailed: true
report:
# 报告保存路径
output_dir: "./reports"
# 是否保留报告文件
keep_files: true
``` ```
## 项目结构
```
code-scanner/
├── app.py # 主应用入口
├── config.yaml # 配置文件
├── requirements.txt # Python 依赖
├── README.md # 项目说明
├── scanner/
│ ├── __init__.py
│ ├── base.py # 扫描器基类
│ ├── python_scanner.py # Python 代码扫描
│ ├── js_scanner.py # JavaScript/TypeScript 扫描
│ └── security_scanner.py # 安全扫描
├── report/
│ ├── __init__.py
│ └── generator.py # Markdown 报告生成
├── notify/
│ ├── __init__.py
│ └── feishu.py # 飞书通知
├── webhook/
│ ├── __init__.py
│ └── handler.py # Webhook 处理
└── reports/ # 报告输出目录
```
## 支持的扫描工具
### Python
- **Pylint** - 代码风格和错误检查
- **Flake8** - Python 代码检查
- **Bandit** - 安全漏洞扫描
### JavaScript/TypeScript
- **ESLint** - JavaScript/TypeScript 检查
- **Prettier** - 代码格式化
## 飞书消息效果
扫描完成后,将收到类似以下消息:
### Push 扫描消息
```
📊 代码质量扫描报告
仓库: my-project
分支: main
提交: abc1234
提交者: developer@example.com
✅ 扫描通过 (0 issues)
⚠️ 发现问题 (5 issues)
```
### PR 扫描消息
```
📊 PR 代码质量扫描报告
仓库: my-project
源分支: feature-xxx → 目标分支: main
PR链接: https://gitea.example.com/user/project/pulls/123
提交: abc1234
提交者: developer@example.com
✅ 扫描通过 (0 issues)
⚠️ 发现问题 (5 issues)
```
## Docker 部署
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "app.py"]
```
## 环境变量
也可以通过环境变量配置:
```bash
export FEISHU_WEBHOOK_URL="https://open.feishu.cn/..."
export GITEA_WEBHOOK_SECRET="secret"
export SCANNER_MAX_ISSUES=10
```
## 许可证
MIT License

21
app.py
View File

@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
import time
import logging import logging
import traceback import traceback
from typing import Dict, Tuple, Any from typing import Dict, Tuple, Any
@@ -131,21 +132,27 @@ def handle_gitea_webhook():
# Python 扫描 # Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan( scan_results['python'] = python_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描 # JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan( scan_results['javascript'] = js_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描 # 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan( scan_results['security'] = security_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告 # 生成报告
report = report_generator.generate( report = report_generator.generate(
@@ -228,27 +235,35 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
# Python 扫描 # Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan( scan_results['python'] = python_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描 # JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan( scan_results['javascript'] = js_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描 # 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan( scan_results['security'] = security_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# AI 代码审查 # AI 代码审查
if config.get('ai', {}).get('enabled', False): if config.get('ai', {}).get('enabled', False):
start_time = time.time()
scan_results['ai'] = ai_reviewer.scan( scan_results['ai'] = ai_reviewer.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}")
# 获取 PR 的代码差异,用于将问题与代码片段关联 # 获取 PR 的代码差异,用于将问题与代码片段关联
pr_diff = None pr_diff = None
@@ -326,13 +341,19 @@ def manual_scan():
scan_results = {} scan_results = {}
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch) scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch) scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
start_time = time.time()
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch) scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告 # 生成报告
report = report_generator.generate( report = report_generator.generate(

View File

@@ -58,4 +58,4 @@ ai:
# 是否启用 AI 审查 # 是否启用 AI 审查
enabled: true enabled: true
# 每次审查的最大代码行数 # 每次审查的最大代码行数
max_lines: 200 max_lines: 100

View File

@@ -30,7 +30,7 @@ class AIReviewer(BaseScanner):
self.config = config self.config = config
self.enabled = config.get('enabled', True) self.enabled = config.get('enabled', True)
self.provider = config.get('provider', 'ollama') self.provider = config.get('provider', 'api')
self.model = config.get('model', 'llama3') self.model = config.get('model', 'llama3')
self.api_url = config.get('api_url', 'http://localhost:11434') self.api_url = config.get('api_url', 'http://localhost:11434')
self.api_key = config.get('api_key', '') self.api_key = config.get('api_key', '')
@@ -424,13 +424,7 @@ class AIReviewer(BaseScanner):
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]: def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 AI 服务""" """调用 AI 服务"""
try: try:
if self.provider == 'ollama':
return self._call_ollama(prompt)
elif self.provider == 'api':
return self._call_api(prompt) return self._call_api(prompt)
else:
logger.warning(f'未知的 AI provider: {self.provider}')
return None
except Exception as e: except Exception as e:
print("异常追踪信息:", e.__traceback__) print("异常追踪信息:", e.__traceback__)
logger.error(f'AI 调用失败: {str(e)}') logger.error(f'AI 调用失败: {str(e)}')
@@ -517,32 +511,6 @@ class AIReviewer(BaseScanner):
logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象") logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象")
return None return None
def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 Ollama 本地模型"""
import requests
url = f"{self.api_url}/api/generate"
payload = {
"model": self.model,
"prompt": prompt,
"stream": False,
"format": "json"
}
logger.info(f"调用 Ollama: {url}, model={self.model}")
response = requests.post(url, json=payload, timeout=120)
if response.status_code == 200:
result = response.json()
content = result.get('response', '')
logger.info(f"Ollama 返回内容长度: {len(content) if content else 0}")
logger.debug(f"Ollama 返回内容预览: {content[:200] if content else 'empty'}")
parsed = self._extract_json_obj(content)
return parsed
logger.warning(f'Ollama 返回错误: {response.status_code}')
return None
def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]: def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用在线 API""" """调用在线 API"""
import requests import requests
@@ -560,7 +528,7 @@ class AIReviewer(BaseScanner):
payload = { payload = {
"model": self.model, "model": self.model,
"messages": [{"role": "user", "content": prompt}], "messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024*5, "max_tokens": 1024,
"temperature": 0.7 "temperature": 0.7
} }
elif 'deepseek' in self.api_url: elif 'deepseek' in self.api_url:
@@ -568,25 +536,43 @@ class AIReviewer(BaseScanner):
payload = { payload = {
"model": self.model, "model": self.model,
"messages": [{"role": "user", "content": prompt}], "messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024*5, "max_tokens": 1024,
"temperature": 0.7 "temperature": 0.7
} }
elif 'dashscope' in self.api_url:
# 阿里云 dashscope 专用端点
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7,
"stream": False # 显式关闭流式
}
else: else:
url = f"{self.api_url}/chat/completions" url = f"{self.api_url}/chat/completions"
payload = { payload = {
"model": self.model, "model": self.model,
"messages": [{"role": "user", "content": prompt}], "messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024*5, "max_tokens": 1024,
"temperature": 0.7 "temperature": 0.7
} }
logger.info(f"调用 API: {url}, model={self.model}")
try:
response = requests.post(url, json=payload, headers=headers, timeout=120) response = requests.post(url, json=payload, headers=headers, timeout=120)
if response.status_code == 200: if response.status_code == 200:
result = response.json() result = response.json()
content = result['choices'][0]['message']['content'] content = result['choices'][0]['message']['content']
logger.info(f"API 返回内容长度: {len(content) if content else 0}")
parsed = self._extract_json_obj(content) parsed = self._extract_json_obj(content)
return parsed return parsed
logger.warning(f'API 返回错误: {response.status_code}') logger.warning(f'API 返回错误: {response.status_code}, {response.text[:200]}')
return None
except Exception as e:
logger.warning(f'API 调用失败: {e}')
return None return None