24 Commits

Author SHA1 Message Date
Dang Zerong
1fcac9877c 修改 qwen3-max 2026-03-15 13:43:09 +08:00
Dang Zerong
668f30252d 测试的扫描文件 2026-03-15 13:41:37 +08:00
Dang Zerong
c8c0ef1620 测试的扫描文件 2026-03-13 21:00:53 +08:00
Dang Zerong
9a14c0b219 测试的扫描文件 2026-03-13 18:00:27 +08:00
Dang Zerong
87b2dacf65 测试的扫描文件 2026-03-13 18:00:22 +08:00
Dang Zerong
04518812f4 Merge branch 'dev' of https://code.deep-pilot.chat/Bosch_Demo/code_scan into dev 2026-03-13 17:42:54 +08:00
Dang Zerong
6c4ee107f9 测试的扫描文件 2026-03-13 17:42:27 +08:00
dangzerong
2a2ff1ad5f Merge branch 'main' into dev 2026-03-13 17:40:33 +08:00
Dang Zerong
bc5a19fffc 测试的扫描文件 2026-03-13 17:39:20 +08:00
Dang Zerong
78655ce5dc 测试的扫描文件 2026-03-13 17:37:46 +08:00
Dang Zerong
2201f6d696 Merge branch 'dev' 2026-03-13 17:37:10 +08:00
Dang Zerong
97881ee00e 测试的扫描文件 2026-03-13 17:32:23 +08:00
dangzerong
e46aff2797 Merge pull request 'dev' (#13) from dev into main 2026-03-13 17:25:28 +08:00
Dang Zerong
887c8ae154 测试的扫描文件 2026-03-13 16:49:13 +08:00
Dang Zerong
ecc39402d5 测试的扫描文件 2026-03-13 16:27:32 +08:00
dangzerong
dc9b921091 Merge pull request '测试的扫描文件' (#11) from dev into main
Reviewed-on: #11
2026-03-13 16:26:54 +08:00
Dang Zerong
a928b79d6d 测试的扫描文件 2026-03-13 16:25:37 +08:00
dangzerong
0991b3de26 Merge pull request 'dev' (#10) from dev into main 2026-03-13 16:24:21 +08:00
Dang Zerong
1876be1777 测试的扫描文件 2026-03-13 16:22:23 +08:00
Dang Zerong
51fc1a6aae 先删除测试代码,后面再提交 2026-03-13 16:21:39 +08:00
Dang Zerong
726c21feac 可演示 2026-03-13 16:04:20 +08:00
dangzerong
a525a2b4ac Merge pull request 'dev' (#9) from dev into main 2026-03-13 15:32:42 +08:00
Dang Zerong
cb90b66f09 代码测试 2026-03-13 11:26:01 +08:00
Dang Zerong
8f9e5bf4f5 代码测试 2026-03-12 16:13:18 +08:00
15 changed files with 1432 additions and 657 deletions

34
Dockerfile Normal file
View File

@@ -0,0 +1,34 @@
# 使用中科大镜像源的 Python 基础镜像
FROM python:3.11.15-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV FLASK_RUN_HOST=0.0.0.0
# 安装系统依赖(用于 git 等工具)
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖(显式安装,避免缓存导致遗漏)
RUN pip install --no-cache-dir -r requirements.txt \
&& pip install --no-cache-dir "GitPython>=3.1.0" "gitdb>=4.0.1" "smmap>=3.0.1"
# 复制应用代码
COPY . .
# 创建报告目录
RUN mkdir -p reports
# 暴露端口
EXPOSE 5000
# 启动应用
CMD ["python", "app.py"]

285
README.md
View File

@@ -1,224 +1,109 @@
# AI Code Quality Scanner - 飞书通知版 # AI 代码质量扫描系统
一个自动化代码质量扫描系统,在代码提交时自动扫描并发送报告到飞书 自动化代码质量扫描工具,监听 PR 事件,自动扫描代码缺陷并提供合并决策支持
## 功能特性 ## 工作流程
- 🤖 自动监听 Gitea 代码提交事件
- 🔍 多维度代码质量扫描(语法、风格、安全)
- 📊 生成 Markdown 格式扫描报告
- 📱 实时推送飞书机器人通知
## 系统架构
``` ```
┌───────────── Webhook ┌──────────────────┐ ┌──────────┐ 1. 创建 PR ┌────────────┐
Gitea │ ───────────────► │ Webhook Server │ Gitea │ ───────────────► │ Webhook │
│ 代码仓库 │ (Flask) └──────────┘Server
└─────────────┘ └────────┬─────────┘ └───────────┘
│ 2. 拉取代码、扫描、存库
┌──────────────────┐ ────────────┐
│ Code Scanner │ SQLite
│ - ESLint │ Database
│ - Pylint │ └────────────┘
- SonarQube │ │ 3. 前端查询
└────────┬─────────┘
┌────────────┐
│ 前端页面
┌────────────────── ────────────
│ Report Generator│
│ - Markdown │
└────────┬─────────┘
┌──────────────────┐
│ Feishu Bot │
│ - Webhook │
└──────────────────┘
``` ```
## 三个核心功能
### 1. PR 创建
- Gitea 仓库创建 PR 时自动触发扫描
- 支持事件:`opened``reopened``synchronize`
### 2. 后端处理
- 拉取 PR 对应的代码
- 执行代码扫描Python/JavaScript/TypeScript
- AI 智能审查代码缺陷
- 扫描结果存入 SQLite 数据库
### 3. 前端功能
- 查询所有 PR 及扫描状态
- 查看每个 PR 的缺陷详情
- 一键「拒绝合并」或「同意合并」
## 快速开始 ## 快速开始
### 1. 安装依赖
```bash ```bash
# 安装依赖
pip install -r requirements.txt pip install -r requirements.txt
```
### 2. 配置飞书机器人 # 运行服务
1. 打开飞书群聊 → 设置 → 群机器人
2. 添加机器人 → 选择"自定义机器人"
3. 获取 Webhook 地址
4. 配置 `config.yaml`
### 3. 配置 Gitea Webhook
#### 方式一Push 时扫描(原有方式)
1. 进入 Gitea 仓库 → 设置 → Webhooks
2. 添加 Webhook
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 触发事件: Push
- 密钥: 配置 `config.yaml` 中的 secret
#### 方式二PR 创建时扫描(推荐)
1. 进入 Gitea 仓库 → 设置 → Webhooks
2. 添加 Webhook
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 触发事件: Pull Request
- 密钥: 配置 `config.yaml` 中的 secret
**支持的 PR 事件:**
- `opened` - 创建新 PR
- `reopened` - 重新打开 PR
- `synchronize` - PR 中的提交有更新
- `ready_for_review` - PR 标记为准备好审查
### 4. 运行服务
```bash
python app.py python app.py
``` ```
## 配置说明 访问 http://localhost:5000 查看前端页面。
所有配置在 `config.yaml` 中: ## Docker 部署
### 1. 构建镜像
```bash
docker buildx build --load --push -t dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest .
```
### 2. 登录仓库
```bash
docker login dcr-by1jwyxk44.71826370.xyz
```
### 3. Push 到仓库
```bash
docker run -d --name code-scan -p 5000:5000 dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
```
### 4. 使用 docker compose 启动
```bash
# 启动服务
docker compose up -d
# 查看日志
docker compose logs -f
# 停止服务
docker compose down
```
## 配置
配置文件 `config.yaml`
```yaml ```yaml
server: server:
host: "0.0.0.0" host: "0.0.0.0"
port: 5000 port: 5000
debug: true
gitea: gitea:
base_url: "http://localhost:3000" base_url: "https://code.deep-pilot.chat"
# Webhook 签名密钥 webhook_secret: "xxx"
webhook_secret: "your_webhook_secret" api_token: "xxx"
feishu: ai:
# 飞书机器人 Webhook 地址 provider: "api"
webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" model: "qwen3.5-plus"
# 消息推送 secret可选用于签名 api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
secret: "your_feishu_secret" api_key: "sk-xxx"
scanner:
# 支持的语言
languages:
- python
- javascript
- typescript
# 扫描阈值
max_issues: 10
# 是否启用详细扫描
detailed: true
report:
# 报告保存路径
output_dir: "./reports"
# 是否保留报告文件
keep_files: true
``` ```
## 项目结构
```
code-scanner/
├── app.py # 主应用入口
├── config.yaml # 配置文件
├── requirements.txt # Python 依赖
├── README.md # 项目说明
├── scanner/
│ ├── __init__.py
│ ├── base.py # 扫描器基类
│ ├── python_scanner.py # Python 代码扫描
│ ├── js_scanner.py # JavaScript/TypeScript 扫描
│ └── security_scanner.py # 安全扫描
├── report/
│ ├── __init__.py
│ └── generator.py # Markdown 报告生成
├── notify/
│ ├── __init__.py
│ └── feishu.py # 飞书通知
├── webhook/
│ ├── __init__.py
│ └── handler.py # Webhook 处理
└── reports/ # 报告输出目录
```
## 支持的扫描工具
### Python
- **Pylint** - 代码风格和错误检查
- **Flake8** - Python 代码检查
- **Bandit** - 安全漏洞扫描
### JavaScript/TypeScript
- **ESLint** - JavaScript/TypeScript 检查
- **Prettier** - 代码格式化
## 飞书消息效果
扫描完成后,将收到类似以下消息:
### Push 扫描消息
```
📊 代码质量扫描报告
仓库: my-project
分支: main
提交: abc1234
提交者: developer@example.com
✅ 扫描通过 (0 issues)
⚠️ 发现问题 (5 issues)
```
### PR 扫描消息
```
📊 PR 代码质量扫描报告
仓库: my-project
源分支: feature-xxx → 目标分支: main
PR链接: https://gitea.example.com/user/project/pulls/123
提交: abc1234
提交者: developer@example.com
✅ 扫描通过 (0 issues)
⚠️ 发现问题 (5 issues)
```
## Docker 部署
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "app.py"]
```
## 环境变量
也可以通过环境变量配置:
```bash
export FEISHU_WEBHOOK_URL="https://open.feishu.cn/..."
export GITEA_WEBHOOK_SECRET="secret"
export SCANNER_MAX_ISSUES=10
```
## 许可证
MIT License

195
app.py
View File

@@ -2,7 +2,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
import time
import logging import logging
import traceback
from typing import Dict, Tuple, Any from typing import Dict, Tuple, Any
import json import json
@@ -130,21 +132,27 @@ def handle_gitea_webhook():
# Python 扫描 # Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan( scan_results['python'] = python_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描 # JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan( scan_results['javascript'] = js_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描 # 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan( scan_results['security'] = security_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告 # 生成报告
report = report_generator.generate( report = report_generator.generate(
@@ -227,27 +235,35 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
# Python 扫描 # Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan( scan_results['python'] = python_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描 # JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan( scan_results['javascript'] = js_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描 # 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan( scan_results['security'] = security_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# AI 代码审查 # AI 代码审查
if config.get('ai', {}).get('enabled', False): if config.get('ai', {}).get('enabled', False):
start_time = time.time()
scan_results['ai'] = ai_reviewer.scan( scan_results['ai'] = ai_reviewer.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}")
# 获取 PR 的代码差异,用于将问题与代码片段关联 # 获取 PR 的代码差异,用于将问题与代码片段关联
pr_diff = None pr_diff = None
@@ -298,6 +314,7 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
logger.info(f'PR #{pr_number} 扫描完成') logger.info(f'PR #{pr_number} 扫描完成')
except Exception as e: except Exception as e:
traceback.print_exc()
logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}') logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}')
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@@ -324,13 +341,19 @@ def manual_scan():
scan_results = {} scan_results = {}
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch) scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch) scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
start_time = time.time()
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch) scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告 # 生成报告
report = report_generator.generate( report = report_generator.generate(
@@ -667,6 +690,178 @@ def api_get_pr_file_content(pr_id):
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/quality')
def api_get_quality_score(pr_id):
"""获取 PR 的代码质量评分"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 从 scan_result 中获取质量评分
scan_result = pr.get('scan_result')
if isinstance(scan_result, str):
try:
scan_result = json.loads(scan_result)
except:
scan_result = None
quality_score = None
if scan_result and scan_result.get('ai'):
quality_score = scan_result['ai'].get('quality_score')
if not quality_score:
return jsonify({'error': '暂无质量评分'}), 404
return jsonify(quality_score)
except Exception as e:
logger.error(f'获取质量评分失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/stats')
def api_get_issue_stats(pr_id):
"""获取 PR 的问题统计"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 获取 scan_details_with_code
scan_details = pr.get('scan_details_with_code')
if isinstance(scan_details, str):
try:
scan_details = json.loads(scan_details)
except:
scan_details = None
if not scan_details:
return jsonify({'error': '暂无扫描详情'}), 404
# 统计各扫描器的问题
stats = {
'by_severity': {'error': 0, 'warning': 0, 'info': 0},
'by_scanner': {},
'total': 0
}
# 统计静态扫描器
for scanner in scan_details.get('scanners', []):
scanner_name = scanner.get('name', 'unknown')
scanner_issues = scanner.get('issues', [])
stats['by_scanner'][scanner_name] = len(scanner_issues)
for issue in scanner_issues:
sev = (issue.get('severity') or 'info').lower()
if sev in stats['by_severity']:
stats['by_severity'][sev] += 1
stats['total'] += 1
# 统计 AI 扫描器
ai_data = scan_details.get('ai', {})
if ai_data:
ai_issues = ai_data.get('issues', [])
stats['by_scanner']['AI'] = len(ai_issues)
for issue in ai_issues:
sev = (issue.get('severity') or 'info').lower()
if sev in stats['by_severity']:
stats['by_severity'][sev] += 1
stats['total'] += 1
return jsonify(stats)
except Exception as e:
logger.error(f'获取问题统计失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/fix', methods=['POST'])
def api_generate_fix(pr_id):
"""生成问题修复建议"""
try:
data = request.get_json()
if not data:
return jsonify({'error': '请求体为空'}), 400
file_path = data.get('file')
line = data.get('line', 1)
message = data.get('message', '')
code = data.get('code', '')
if not file_path or not message:
return jsonify({'error': '缺少必要参数'}), 400
# 调用 AI 生成修复建议
fix_result = ai_reviewer.generate_fix_suggestion(file_path, line, message, code)
if fix_result:
return jsonify(fix_result)
else:
return jsonify({'error': '生成修复建议失败'}), 500
except Exception as e:
logger.error(f'生成修复建议失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/history')
def api_get_pr_history():
"""获取 PR 扫描历史趋势"""
try:
limit = request.args.get('limit', 20, type=int)
repo_name = request.args.get('repo_name', '')
# 获取 PR 列表
prs = PRScanDB.get_all_prs(status='completed')
if repo_name:
prs = [p for p in prs if p.get('repo_name') == repo_name]
# 只取最近的 N 个
prs = prs[:limit]
# 构建趋势数据
history = []
for pr in reversed(prs): # 从旧到新
issues_count = pr.get('issues_count', 0)
# 从 scan_result 中各扫描器汇总 error/warning 数量
scan_result = pr.get('scan_result')
if isinstance(scan_result, str):
try:
scan_result = json.loads(scan_result)
except:
scan_result = None
error_count = 0
warning_count = 0
if scan_result and isinstance(scan_result, dict):
# 遍历各扫描器,汇总 error 和 warning
for scanner_name, scanner_result in scan_result.items():
if isinstance(scanner_result, dict):
summary = scanner_result.get('summary', {})
if isinstance(summary, dict):
error_count += summary.get('error', 0)
warning_count += summary.get('warning', 0)
history.append({
'pr_id': pr.get('id'),
'pr_number': pr.get('pr_number'),
'repo_name': pr.get('repo_name'),
'title': pr.get('pr_title', ''),
'author': pr.get('author', ''),
'created_at': pr.get('created_at', ''),
'issues_count': issues_count,
'error_count': error_count,
'warning_count': warning_count,
'total_issues': error_count + warning_count,
'state': pr.get('state', '')
})
return jsonify(history)
except Exception as e:
logger.error(f'获取历史趋势失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/merge', methods=['POST']) @app.route('/api/prs/<int:pr_id>/merge', methods=['POST'])
def api_merge_pr(pr_id): def api_merge_pr(pr_id):
"""合并 PR""" """合并 PR"""

View File

@@ -49,14 +49,13 @@ ai:
# AI 审查器配置 # AI 审查器配置
# 支持: "ollama" (本地) 或 "api" (在线API) # 支持: "ollama" (本地) 或 "api" (在线API)
provider: "api" provider: "api"
# 模型名称(硅基流动可用模型)- Qwen 最强语言模型 # 模型名称(阿里云通义千问)
model: "deepseek-ai/DeepSeek-V3.2" model: "qwen3-max"
# API 地址 # API 地址
# 硅基流动: https://api.siliconflow.cn/v1 api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_url: "https://api.siliconflow.cn/v1"
# API 密钥 # API 密钥
api_key: "sk-cqxhnsxdxaalxlykfkjksyinjftdyejnblmgkfxmhwmmvdyu" api_key: "sk-616332b2afa94699b4572d0fe6ac370a"
# 是否启用 AI 审查 # 是否启用 AI 审查
enabled: true enabled: true
# 每次审查的最大代码行数 # 每次审查的最大代码行数
max_lines: 200 max_lines: 100

32
db.py
View File

@@ -7,12 +7,17 @@
import sqlite3 import sqlite3
import json import json
import os import os
from datetime import datetime from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db') DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db')
def get_cst_now():
"""获取当前中国时区时间 (UTC+8)"""
return datetime.now(timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')
def get_db_connection(): def get_db_connection():
"""获取数据库连接""" """获取数据库连接"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
@@ -110,6 +115,7 @@ class PRScanDB:
if existing: if existing:
# 更新现有记录 # 更新现有记录
cst_time = get_cst_now()
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
pr_title = ?, pr_title = ?,
@@ -123,7 +129,7 @@ class PRScanDB:
security_issues = ?, security_issues = ?,
ai_review = ?, ai_review = ?,
report_path = ?, report_path = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE repo_name = ? AND pr_number = ? WHERE repo_name = ? AND pr_number = ?
''', ( ''', (
pr_info.get('pr_title'), pr_info.get('pr_title'),
@@ -137,19 +143,22 @@ class PRScanDB:
security_issues, security_issues,
json.dumps(scan_results.get('ai', {}), ensure_ascii=False), json.dumps(scan_results.get('ai', {}), ensure_ascii=False),
report_path, report_path,
cst_time,
pr_info.get('repo_name'), pr_info.get('repo_name'),
pr_info.get('pr_number') pr_info.get('pr_number')
)) ))
scan_id = existing['id'] scan_id = existing['id']
else: else:
# 插入新记录 # 插入新记录
cst_time = get_cst_now()
cursor.execute(''' cursor.execute('''
INSERT INTO pr_scans ( INSERT INTO pr_scans (
pr_number, repo_name, pr_title, pr_url, pr_number, repo_name, pr_title, pr_url,
source_branch, target_branch, author, source_branch, target_branch, author,
state, scan_status, scan_result, scan_details_with_code, state, scan_status, scan_result, scan_details_with_code,
issues_count, security_issues, ai_review, report_path issues_count, security_issues, ai_review, report_path,
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', ( ''', (
pr_info.get('pr_number'), pr_info.get('pr_number'),
pr_info.get('repo_name'), pr_info.get('repo_name'),
@@ -165,7 +174,9 @@ class PRScanDB:
issues_count, issues_count,
security_issues, security_issues,
json.dumps(scan_results.get('ai', {}), ensure_ascii=False), json.dumps(scan_results.get('ai', {}), ensure_ascii=False),
report_path report_path,
cst_time,
cst_time
)) ))
scan_id = cursor.lastrowid scan_id = cursor.lastrowid
@@ -239,23 +250,24 @@ class PRScanDB:
"""更新 PR 状态""" """更新 PR 状态"""
conn = get_db_connection() conn = get_db_connection()
cursor = conn.cursor() cursor = conn.cursor()
cst_time = get_cst_now()
if state == 'merged': if state == 'merged':
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
state = ?, state = ?,
merged_at = CURRENT_TIMESTAMP, merged_at = ?,
merged_by = ?, merged_by = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE id = ? WHERE id = ?
''', (state, merged_by, scan_id)) ''', (state, cst_time, merged_by, cst_time, scan_id))
else: else:
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
state = ?, state = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE id = ? WHERE id = ?
''', (state, scan_id)) ''', (state, cst_time, scan_id))
conn.commit() conn.commit()
conn.close() conn.close()

9
docker-compose.yml Normal file
View File

@@ -0,0 +1,9 @@
version: "3.8"
services:
code-scan:
image: dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
container_name: code-scan
ports:
- "5000:5000"
restart: unless-stopped

View File

@@ -109,7 +109,7 @@ class GiteaClient:
timeout=30 timeout=30
) )
if response.status_code == 200: if response.status_code in (200, 201):
logger.info(f"成功关闭 PR #{pr_number}") logger.info(f"成功关闭 PR #{pr_number}")
return True return True
else: else:

24
pyproject.toml Normal file
View File

@@ -0,0 +1,24 @@
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "code-scan"
version = "1.0.0"
description = "代码扫描工具"
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"flask>=2.0.0",
"pyyaml>=5.0",
"requests>=2.25.0",
"python-dotenv>=0.19.0",
"GitPython>=3.1.0",
]
[project.scripts]
code-scan = "app:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["scanner*"]

View File

@@ -218,55 +218,66 @@ class ReportGenerator:
lines.append(f' - {message}') lines.append(f' - {message}')
lines.append('') lines.append('')
# AI 审查结果(单独展示 # AI 审查结果(适配新格式issues 列表
if 'ai' in scan_results: if 'ai' in scan_results:
ai_result = scan_results['ai'] ai_result = scan_results['ai']
lines.append('') lines.append('')
lines.append('## 🤖 AI 代码审查') lines.append('## 🤖 AI 代码审查')
lines.append('') lines.append('')
lines.append(ai_result.get('summary', '无 AI 审查结果'))
# 新格式:直接使用 summary
if 'summary' in ai_result:
# summary 可能是字符串或 dict
summary = ai_result.get('summary', '')
if isinstance(summary, dict):
lines.append(f"发现 {summary.get('total', 0)} 个问题,"
f"错误 {summary.get('error', 0)}"
f"警告 {summary.get('warning', 0)}"
f"提示 {summary.get('info', 0)}")
else:
lines.append(str(summary))
lines.append('') lines.append('')
reviews = ai_result.get('reviews', []) # 新格式issues 列表
if reviews: ai_issues = ai_result.get('issues', [])
for i, review in enumerate(reviews, 1): if ai_issues:
file_name = review.get('file', 'unknown') # 按文件分组
review_content = review.get('review', {}) issues_by_file = {}
for issue in ai_issues:
file_name = issue.get('file', 'unknown')
if file_name not in issues_by_file:
issues_by_file[file_name] = []
issues_by_file[file_name].append(issue)
for file_name, issues in issues_by_file.items():
lines.append(f'### 📄 {file_name}') lines.append(f'### 📄 {file_name}')
lines.append('') lines.append('')
# 优点 for i, issue in enumerate(issues[:10], 1):
advantages = review_content.get('优点', []) severity = issue.get('severity', 'Info')
if advantages: severity_emoji = {
lines.append('**✅ 代码优点:**') 'ERROR': '🔴',
for adv in advantages[:3]: 'WARNING': '🟡',
lines.append(f'- {adv}') 'INFO': ''
lines.append('') }.get(severity.upper(), '')
# 问题 line_num = issue.get('line', 0)
issues = review_content.get('问题', []) symbol = issue.get('symbol', '')
if issues: message = issue.get('message', 'No message')
lines.append('**⚠️ 需要改进:**') code_context = issue.get('code_context', '')
for issue in issues[:3]: defect_reason = issue.get('defect_reason', '')
lines.append(f'- {issue}')
lines.append('')
# 优化建议 lines.append(f'{i}. {severity_emoji} **{severity}** - 行 {line_num}')
optimizations = review_content.get('优化', []) if symbol:
if optimizations: lines.append(f' - 标识: `{symbol}`')
lines.append('**💡 优化建议:**') lines.append(f' - 问题: {message}')
for opt in optimizations[:3]: if code_context:
lines.append(f'- {opt}') lines.append(' - 代码:')
lines.append('') lines.append('```')
lines.append(code_context)
# 原始回复(如果不是 JSON 格式) lines.append('```')
raw = review_content.get('raw_review') if defect_reason:
if raw: lines.append(f' - 原因: {defect_reason}')
lines.append('**📝 AI 原始回复:**')
lines.append('```')
lines.append(raw[:500] + '...' if len(raw) > 500 else raw)
lines.append('```')
lines.append('') lines.append('')
# 添加报告链接或下一步操作 # 添加报告链接或下一步操作

View File

@@ -2,3 +2,9 @@ flask>=2.0.0
pyyaml>=5.0 pyyaml>=5.0
requests>=2.25.0 requests>=2.25.0
python-dotenv>=0.19.0 python-dotenv>=0.19.0
GitPython>=3.1.0
gitdb>=4.0.1
smmap>=3.0.1
pylint>=2.17.0
flake8>=6.0.0
bandit>=1.7.0

View File

@@ -5,6 +5,7 @@ AI 代码审查器
使用大模型进行智能代码审查 使用大模型进行智能代码审查
""" """
import os import os
import re
import json import json
import logging import logging
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
@@ -29,7 +30,7 @@ class AIReviewer(BaseScanner):
self.config = config self.config = config
self.enabled = config.get('enabled', True) self.enabled = config.get('enabled', True)
self.provider = config.get('provider', 'ollama') self.provider = config.get('provider', 'api')
self.model = config.get('model', 'llama3') self.model = config.get('model', 'llama3')
self.api_url = config.get('api_url', 'http://localhost:11434') self.api_url = config.get('api_url', 'http://localhost:11434')
self.api_key = config.get('api_key', '') self.api_key = config.get('api_key', '')
@@ -73,15 +74,26 @@ class AIReviewer(BaseScanner):
changed_files: 可选的变更文件列表(来自 PR changed_files: 可选的变更文件列表(来自 PR
Returns: Returns:
审查结果 审查结果(与 python_scanner.py 兼容的格式)
""" """
result = {
'tool': 'AI Code Reviewer',
'language': language,
'status': 'success',
'issues': [],
'summary': {
'total': 0,
'error': 0,
'warning': 0,
'info': 0
},
'files_scanned': 0
}
if not self.enabled: if not self.enabled:
return { result['status'] = 'disabled'
'enabled': False, result['summary'] = 'AI 审查已禁用'
'tool': 'AI Code Reviewer', return result
'reviews': [],
'summary': 'AI 审查已禁用'
}
try: try:
# 如果没有传入 clone_dir需要克隆 # 如果没有传入 clone_dir需要克隆
@@ -89,52 +101,141 @@ class AIReviewer(BaseScanner):
clone_dir = self.clone_repo(repo_url, commit_id, branch) clone_dir = self.clone_repo(repo_url, commit_id, branch)
if not clone_dir or not os.path.exists(clone_dir): if not clone_dir or not os.path.exists(clone_dir):
return { result['status'] = 'error'
'enabled': True, result['error'] = '无法获取代码目录'
'tool': 'AI Code Reviewer', return result
'reviews': [],
'summary': '无法获取代码目录'
}
# 获取要审查的代码文件 # 获取要审查的代码文件
files = self._get_code_files(clone_dir, language, changed_files) files = self._get_code_files(clone_dir, language, changed_files)
if not files: if not files:
return { result['summary'] = '未找到可审查的代码文件'
'enabled': True, return result
'tool': 'AI Code Reviewer',
'reviews': [],
'summary': '未找到可审查的代码文件'
}
# 对每个文件进行 AI 审查 # 对每个文件进行 AI 审查
all_reviews = [] all_issues = []
for file_path in files[:5]: # 限制最多审查 5 个文件 for file_path in files[:5]: # 限制最多审查 5 个文件
review = self._review_file(file_path, language, clone_dir) review = self._review_file(file_path, language, clone_dir)
if review: if review and review.get('issues'):
all_reviews.append(review) all_issues.extend(review['issues'])
# 生成总结 result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues
summary = self._generate_summary(all_reviews) result['summary'] = self._calculate_summary(all_issues)
result['files_scanned'] = len(files[:5])
result['clone_dir'] = clone_dir
return { # 生成质量评分
'enabled': True, result['quality_score'] = self._calculate_quality_score(all_issues, files[:5])
'tool': 'AI Code Reviewer',
'reviews': all_reviews, return result
'summary': summary,
'files_reviewed': len(all_reviews),
'clone_dir': clone_dir # 返回 clone_dir 用于后续清理
}
except Exception as e: except Exception as e:
logger.error(f'AI 审查失败: {str(e)}') logger.error(f'AI 审查失败: {str(e)}')
return { result['status'] = 'error'
'enabled': True, result['error'] = str(e)
'tool': 'AI Code Reviewer', return result
'error': str(e),
'reviews': [], def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
'summary': f'AI 审查出错: {str(e)}' """计算问题摘要"""
summary = {
'total': len(issues),
'error': 0,
'warning': 0,
'info': 0
}
for issue in issues:
severity = issue.get('severity', '').lower()
if severity in ['error', 'critical', 'fatal']:
summary['error'] += 1
elif severity in ['warning', 'moderate']:
summary['warning'] += 1
else:
summary['info'] += 1
return summary
def _calculate_quality_score(self, issues: List[Dict], files: List[str]) -> Dict[str, Any]:
"""
计算代码质量评分
返回:总分(0-100)及各维度评分
"""
if not files:
return {'total': 100, 'maintainability': 100, 'security': 100, 'readability': 100, 'best_practices': 100}
# 统计问题
error_count = sum(1 for i in issues if i.get('severity', '').lower() in ['error', 'critical'])
warning_count = sum(1 for i in issues if i.get('severity', '').lower() == 'warning')
info_count = sum(1 for i in issues if i.get('severity', '').lower() == 'info')
# 分类统计
security_keywords = ['sql injection', 'xss', 'csrf', 'password', 'secret', 'token', '权限', '注入', '认证']
security_issues = sum(1 for i in issues if any(k in (i.get('message', '') + i.get('symbol', '')).lower() for k in security_keywords))
# 计算各维度分数
# 可维护性:基于错误和警告数量
issue_weight = error_count * 5 + warning_count * 2 + info_count * 0.5
maintainability = max(0, 100 - issue_weight)
# 安全性:基于安全问题
security_score = max(0, 100 - security_issues * 15)
# 可读性:基于 info 级别问题(风格类)
readability = max(0, 100 - info_count * 3)
# 最佳实践:基于 warning 级别
best_practices = max(0, 100 - warning_count * 5)
# 总分:加权平均
total = int((maintainability * 0.3 + security_score * 0.35 + readability * 0.15 + best_practices * 0.2))
return {
'total': total,
'maintainability': maintainability,
'security': security_score,
'readability': readability,
'best_practices': best_practices,
'details': {
'error_count': error_count,
'warning_count': warning_count,
'info_count': info_count,
'security_issues': security_issues
} }
}
def generate_fix_suggestion(self, file_path: str, line: int, message: str, code: str) -> Optional[str]:
"""
对指定问题生成修复建议代码
"""
prompt = f"""你是一位代码修复专家。请根据以下问题,生成修复后的代码。
问题描述:{message}
问题所在行号:{line}
原始代码:
```
{code}
```
请以 JSON 格式输出修复建议:
```json
{{
"fixed_code": "修复后的完整代码或关键片段",
"explanation": "修复说明50字以内",
"confidence": "high/medium/low 修复把握度"
}}
```
如果无法修复,请返回:{{"fixed_code": "", "explanation": "无法自动修复", "confidence": "low"}}"""
try:
response = self._call_ai(prompt)
if response and response.get('fixed_code'):
return response
except Exception as e:
logger.warning(f'生成修复建议失败: {e}')
return None
def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]: def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]:
"""获取代码文件列表""" """获取代码文件列表"""
@@ -174,6 +275,8 @@ class AIReviewer(BaseScanner):
def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]: def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]:
"""审查单个文件""" """审查单个文件"""
issues = []
try: try:
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
code = f.read() code = f.read()
@@ -186,22 +289,46 @@ class AIReviewer(BaseScanner):
else: else:
truncated = False truncated = False
# 构建 prompt # 给代码加行号再发给模型,便于模型返回准确行号
prompt = self._build_prompt(code, language) code_with_lines = self._code_with_line_numbers(code)
prompt = self._build_prompt(code_with_lines, language)
# 调用 AI # 调用 AI
response = self._call_ai(prompt) response = self._call_ai(prompt)
if not response: # 获取相对路径
return None
# 解析响应
rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path
if not response:
return {
'file': rel_path,
'path': file_path,
'truncated': truncated,
'issues': []
}
# 解析 AI 响应,转换为标准 issues 格式,并校正行号
ai_issues = response.get('issues', [])
for issue in ai_issues:
self._correct_issue_line(issue, code)
issues.append({
'tool': 'ai_reviewer',
'type': issue.get('type', 'info'),
'severity': issue.get('severity', 'Info'),
'message': issue.get('message', ''),
'file': rel_path,
'line': issue.get('line', 0),
'column': issue.get('column', 0),
'symbol': issue.get('symbol', ''),
'code_context': issue.get('code_context', ''),
'defect_reason': issue.get('defect_reason', '')
})
return { return {
'file': rel_path, 'file': rel_path,
'path': file_path, 'path': file_path,
'truncated': truncated, 'truncated': truncated,
'review': response 'issues': issues
} }
except Exception as e: except Exception as e:
@@ -217,76 +344,171 @@ class AIReviewer(BaseScanner):
else: else:
lang_name = language lang_name = language
prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,并给出: prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,找出潜在的问题和缺陷。
1. **代码优点** - 写得好地方 请以 JSON 格式输出审查结果,必须包含以下字段:
2. **问题建议** - 需要改进的地方
3. **优化建议** - 如何让代码更好
请用中文回复,保持简洁,每个文件审查不超过 3 点建议。
以下是代码:
```{language}
{code}
```
请以 JSON 格式输出:
```json ```json
{{ {{
"优点": ["..."], "issues": [
"问题": ["..."], {{
"优化": ["..."] "line": 行号,
"column": 列号,
"message": "问题描述",
"type": "error/warning/info 之一",
"severity": "Error/Warning/Info 之一",
"symbol": "错误标识符如 unused-variable, syntax-error 等",
"code_context": "问题代码的上下文(包含问题的那行或几行代码)",
"defect_reason": "缺陷原因分析30字以内简洁描述"
}}
]
}} }}
```
注意:
1. line 和 column 是问题所在的行号和列号(从 1 开始)
2. type: error=错误, warning=警告, info=信息
3. severity: Error=严重, Warning=一般, Info=提示
4. code_context: 包含问题代码的那一行或相邻的几行
5. defect_reason: 精简描述30字以内说明问题原因和风险
如果代码没有问题,返回空数组: {{"issues": []}}
重要:以下代码每行前已标注行号(格式为 "行号|"),请根据问题实际出现的代码行,严格使用该行前的行号填写 issues 中的 line 字段,不要猜测或使用错误行号。
以下是待审查的代码(行号已标注):
```{language}
{code}
```""" ```"""
return prompt return prompt
def _code_with_line_numbers(self, code: str) -> str:
"""给代码每行前加上行号,便于模型返回准确行号"""
lines = code.split('\n')
width = len(str(len(lines)))
return '\n'.join(f'{i:>{width}}| {line}' for i, line in enumerate(lines, 1))
def _correct_issue_line(self, issue: Dict[str, Any], code: str) -> None:
"""
根据 message/symbol 在源码中搜索,尽量把 issue 的 line 校正到真实出现位置。
AI 返回的行号常不准确,通过匹配问题相关的标识符(如 'unused_module')修正行号。
"""
line = issue.get('line')
if not line or not code:
return
lines = code.split('\n')
if line < 1 or line > len(lines):
return
# 从 message 中提取被引用的标识符(如 'unused_module' -> unused_module
message = (issue.get('message') or '')
symbol = (issue.get('symbol') or '').strip()
candidates = []
if symbol:
candidates.append(symbol)
for m in re.finditer(r"['\"]([a-zA-Z_][a-zA-Z0-9_]*)['\"]", message or ''):
candidates.append(m.group(1))
# 若 message 里没有引号标识符,取首段英文/数字/下划线作为关键词
if not candidates:
first_word = re.search(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', message)
if first_word:
candidates.append(first_word.group(1))
for token in candidates:
if not token:
continue
for i, code_line in enumerate(lines):
if token in code_line:
issue['line'] = i + 1
return
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]: def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 AI 服务""" """调用 AI 服务"""
try: try:
if self.provider == 'ollama': return self._call_api(prompt)
return self._call_ollama(prompt)
elif self.provider == 'api':
return self._call_api(prompt)
else:
logger.warning(f'未知的 AI provider: {self.provider}')
return None
except Exception as e: except Exception as e:
print("异常追踪信息:", e.__traceback__) print("异常追踪信息:", e.__traceback__)
logger.error(f'AI 调用失败: {str(e)}') logger.error(f'AI 调用失败: {str(e)}')
return None return None
def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]: def _extract_json_obj(self, content: Any) -> Optional[Dict[str, Any]]:
"""调用 Ollama 本地模型""" """
import requests 从模型输出中尽可能提取 JSON 对象(dict)。
url = f"{self.api_url}/api/generate" 兼容场景:
payload = { - content 已经是 dict
"model": self.model, - content 是 JSON 字符串
"prompt": prompt, - content 被 ```json ... ``` 或 ``` ... ``` 包裹
"stream": False, - content 前后夹杂说明文字,只要包含一个最外层 { ... } 就尝试解析
"format": "json" """
} if content is None:
logger.debug("_extract_json_obj: content is None")
return None
response = requests.post(url, json=payload, timeout=120) # 如果已经是 dict直接返回
if isinstance(content, dict):
logger.debug("_extract_json_obj: content is already dict")
return content
if response.status_code == 200: if not isinstance(content, str):
result = response.json() content = str(content)
content = result.get('response', '')
# 尝试解析 JSON text = content.strip()
logger.debug(f"_extract_json_obj: 原始内容长度 = {len(text)}")
logger.debug(f"_extract_json_obj: 原始内容前100字符: {text[:100]}")
# 去掉代码块包裹(兼容 ```json / ``` json / ```JSON 等)
lowered = text.lower()
fence_start = lowered.find('```')
if fence_start != -1:
logger.debug(f"_extract_json_obj: 发现代码块 fence_start={fence_start}")
# 找到第一段 fence
after = text[fence_start + 3:]
after_l = after.lower()
# 如果 fence 后紧跟语言标识json 或其他),跳过这一行直到换行
newline_idx = after.find('\n')
if newline_idx != -1:
lang_header = after_l[:newline_idx].strip()
logger.debug(f"_extract_json_obj: 语言标识: {lang_header}")
body = after[newline_idx + 1:]
# 截取到下一个 fence 结束
end_idx = body.lower().find('```')
if end_idx != -1:
candidate = body[:end_idx].strip()
else:
# 没有结束 fence直接用 body 作为候选(可能是截断的 JSON
candidate = body.strip()
# 只有在确实像 json 的情况下才替换,避免误伤普通文本
if '{' in candidate and '}' in candidate:
text = candidate
logger.debug(f"_extract_json_obj: 提取代码块内容成功,长度={len(text)}")
else:
# 没有换行就按旧逻辑尽量截取
pass
# 直接解析
try:
obj = json.loads(text)
logger.debug("_extract_json_obj: 直接解析成功")
return obj if isinstance(obj, dict) else None
except Exception as e:
logger.debug(f"_extract_json_obj: 直接解析失败: {e}")
# 兜底:截取最外层 { ... } 再解析
start = text.find('{')
end = text.rfind('}')
logger.debug(f"_extract_json_obj: 查找大括号 start={start}, end={end}")
if start != -1 and end != -1 and end > start:
candidate = text[start:end + 1].strip()
logger.debug(f"_extract_json_obj: 候选内容长度={len(candidate)}, 前50字符: {candidate[:50]}")
try: try:
# 提取 JSON 部分 obj = json.loads(candidate)
if '```json' in content: logger.debug("_extract_json_obj: 兜底解析成功")
content = content.split('```json')[1].split('```')[0] return obj if isinstance(obj, dict) else None
elif '```' in content: except Exception as e:
content = content.split('```')[1].split('```')[0] logger.debug(f"_extract_json_obj: 兜底解析失败: {e}")
return None
return json.loads(content.strip()) logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象")
except json.JSONDecodeError:
# 如果不是 JSON直接返回文本
return {'raw_review': content}
logger.warning(f'Ollama 返回错误: {response.status_code}')
return None return None
def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]: def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]:
@@ -317,6 +539,16 @@ class AIReviewer(BaseScanner):
"max_tokens": 1024, "max_tokens": 1024,
"temperature": 0.7 "temperature": 0.7
} }
elif 'dashscope' in self.api_url:
# 阿里云 dashscope 专用端点
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7,
"stream": False # 显式关闭流式
}
else: else:
url = f"{self.api_url}/chat/completions" url = f"{self.api_url}/chat/completions"
payload = { payload = {
@@ -326,39 +558,21 @@ class AIReviewer(BaseScanner):
"temperature": 0.7 "temperature": 0.7
} }
response = requests.post(url, json=payload, headers=headers, timeout=120) logger.info(f"调用 API: {url}, model={self.model}")
if response.status_code == 200: try:
result = response.json() response = requests.post(url, json=payload, headers=headers, timeout=120)
content = result['choices'][0]['message']['content']
try: if response.status_code == 200:
if '```json' in content: result = response.json()
content = content.split('```json')[1].split('```')[0] content = result['choices'][0]['message']['content']
elif '```' in content: logger.info(f"API 返回内容长度: {len(content) if content else 0}")
content = content.split('```')[1].split('```')[0] parsed = self._extract_json_obj(content)
return parsed
return json.loads(content.strip()) logger.warning(f'API 返回错误: {response.status_code}, {response.text[:200]}')
except json.JSONDecodeError: return None
return {'raw_review': content}
logger.warning(f'API 返回错误: {response.status_code}') except Exception as e:
return None logger.warning(f'API 调用失败: {e}')
return None
def _generate_summary(self, reviews: List[Dict[str, Any]]) -> str:
"""生成审查总结"""
if not reviews:
return '未找到需要审查的代码'
total_issues = sum(
len(r.get('review', {}).get('问题', [])) +
len(r.get('review', {}).get('优化', []))
for r in reviews
)
files_count = len(reviews)
if total_issues == 0:
return f'✅ AI 审查通过!审查了 {files_count} 个文件,未发现问题'
return f'🤖 AI 审查了 {files_count} 个文件,发现 {total_issues} 个改进建议'

View File

@@ -135,18 +135,15 @@ def merge_issues_with_code(scan_results: Dict[str, Any], diff: str) -> Dict[str,
def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]: def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]:
"""将 AI 审查结果转换为问题格式""" """将 AI 审查结果issues 格式)转换为统一问题格式"""
issues = [] issues = []
ai_issues = ai_result.get('issues', [])
reviews = ai_result.get('reviews', []) for issue in ai_issues:
for review in reviews: file_path = issue.get('file', '')
file_path = review.get('file', '') if not file_path:
review_data = review.get('review', {})
if not review_data:
continue continue
# 获取文件内容作为代码上下文
code_context = None code_context = None
if parser: if parser:
matched_path = None matched_path = None
@@ -154,51 +151,28 @@ def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[Dif
if file_path.endswith(path) or path.endswith(file_path) or file_path in path: if file_path.endswith(path) or path.endswith(file_path) or file_path in path:
matched_path = path matched_path = path
break break
if matched_path: if matched_path:
chunk = parser.get_file_content(matched_path) chunk = parser.get_file_content(matched_path)
if chunk and chunk.new_content: if chunk and chunk.new_content:
lines = chunk.new_content.split('\n')[:10] lines = chunk.new_content.split('\n')[:10]
code_context = { code_context = {
'file': matched_path, 'file': matched_path,
'line': 1, 'line': issue.get('line', 1),
'preview': '\n'.join(lines), 'preview': '\n'.join(lines),
'has_more': len(chunk.new_content.split('\n')) > 10 'has_more': len(chunk.new_content.split('\n')) > 10
} }
# 处理优点(不作为问题显示) sev = issue.get('severity', 'warning')
advantages = review_data.get('优点', []) sev = sev.lower() if isinstance(sev, str) else 'warning'
# 处理问题 issues.append({
problems = review_data.get('问题', []) 'file': file_path,
for idx, problem in enumerate(problems): 'line': issue.get('line', 1),
issues.append({ 'severity': sev,
'file': file_path, 'message': issue.get('message', ''),
'line': 1, # AI 审查不返回具体行号 'category': 'ai',
'severity': 'warning', 'code_context': code_context,
'message': f'[AI 建议] {problem}', 'defect_reason': issue.get('defect_reason', '')
'category': 'ai', })
'code_context': code_context,
'review_data': {
'type': '问题',
'content': problem
}
})
# 处理优化建议
optimizations = review_data.get('优化', [])
for optimization in optimizations:
issues.append({
'file': file_path,
'line': 1,
'severity': 'info',
'message': f'[AI 优化] {optimization}',
'category': 'ai',
'code_context': code_context,
'review_data': {
'type': '优化',
'content': optimization
}
})
return issues return issues

218
test.py
View File

@@ -1,218 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import logging
os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0')
from flask import Flask, request, jsonify
import yaml
from webhook.handler import GiteaWebhookHandler
from scanner.python_scanner import PythonScanner
from scanner.js_scanner import JavaScriptScanner
from scanner.security_scanner import SecurityScanner
from report.generator import ReportGenerator
from notify.feishu import FeishuNotifier
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 加载配置
def load_config():
"""加载配置文件"""
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
# 全局配置
config = load_config()
# 初始化应用
app = Flask(__name__)
app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key')
# 初始化组件
webhook_handler = GiteaWebhookHandler(config['gitea'])
python_scanner = PythonScanner(config.get('scanner', {}))
js_scanner = JavaScriptScanner(config.get('scanner', {}))
security_scanner = SecurityScanner(config.get('scanner', {}))
report_generator = ReportGenerator(config.get('report', {}))
feishu_notifier = FeishuNotifier(config['feishu'])
@app.route('/')
def index():
"""健康检查接口"""
return jsonify({
'status': 'ok',
'service': 'AI Code Quality Scanner',
'version': '1.0.0'
})
@app.route('/webhook/gitea', methods=['POST'])
def handle_gitea_webhook():
"""处理 Gitea Webhook 请求"""
try:
# 验证签名
signature = request.headers.get('X-Gitea-Signature')
if signature:
if not webhook_handler.verify_signature(
request.data,
signature,
config['gitea']['webhook_secret']
):
logger.warning('Webhook 签名验证失败')
return jsonify({'error': 'Invalid signature'}), 401
# 解析 Webhook payload
payload = request.json
if not payload:
return jsonify({'error': 'No payload'}), 400
event_type = request.headers.get('X-Gitea-Event', 'push')
logger.info(f'收到 Gitea Webhook 事件: {event_type}')
# 只处理 push 事件
if event_type != 'push':
return jsonify({'message': 'Event ignored'}), 200
# 提取提交信息
commits = payload.get('commits', [])
if not commits:
return jsonify({'message': 'No commits'}), 200
repo = payload.get('repository', {})
repo_name = repo.get('full_name', 'unknown')
branch = payload.get('ref', '').replace('refs/heads/', '')
pusher = payload.get('pusher', {}).get('name', 'unknown')
logger.info(f'处理仓库 {repo_name}{len(commits)} 个提交')
# 处理每个提交
for commit in commits:
commit_id = commit.get('id', '')[:8]
commit_message = commit.get('message', '')
author = commit.get('author', {}).get('name', 'unknown')
logger.info(f'扫描提交 {commit_id}: {commit_message}')
try:
# 获取仓库 URL
clone_url = repo.get('clone_url')
if not clone_url:
# 尝试从 web_url 构建
web_url = repo.get('web_url', '')
if web_url:
clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://')
clone_url = clone_url.rstrip('/') + '.git'
# 执行代码扫描
scan_results = {}
# Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []):
scan_results['python'] = python_scanner.scan(
clone_url, commit_id, branch
)
# JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
scan_results['javascript'] = js_scanner.scan(
clone_url, commit_id, branch
)
# 安全扫描
scan_results['security'] = security_scanner.scan(
clone_url, commit_id, branch
)
# 生成报告
report = report_generator.generate(
repo_name=repo_name,
branch=branch,
commit_id=commit_id,
commit_message=commit_message,
author=author,
scan_results=scan_results
)
# 发送飞书通知
feishu_notifier.send_report(report)
logger.info(f'提交 {commit_id} 扫描完成')
except Exception as e:
logger.error(f'扫描提交 {commit_id} 失败: {str(e)}')
# 继续处理其他提交
continue
return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200
except Exception as e:
logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/scan/manual', methods=['POST'])
def manual_scan():
"""手动触发扫描接口"""
try:
data = request.json
repo_url = data.get('repo_url')
branch = data.get('branch', 'main')
commit_id = data.get('commit_id')
if not repo_url:
return jsonify({'error': 'repo_url is required'}), 400
# 执行扫描
scan_results = {}
if 'python' in config.get('scanner', {}).get('languages', []):
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
# 生成报告
report = report_generator.generate(
repo_name=repo_url.split('/')[-1].replace('.git', ''),
branch=branch,
commit_id=commit_id or 'manual',
commit_message='Manual scan',
author='manual',
scan_results=scan_results
)
# 发送飞书通知
feishu_notifier.send_report(report)
return jsonify({
'status': 'ok',
'report': report
}), 200
except Exception as e:
logger.error(f'手动扫描失败: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# 强制监听所有网络接口
host = "0.0.0.0"
port = config.get('server', {}).get('port', 5000)
debug = config.get('server', {}).get('debug', True)
logger.info(f'启动服务: {host}:{port}')
app.run(host=host, port=port, debug=debug)

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试文件:包含常见代码缺陷,用于验证扫描器
"""
import os
import sys
import json
import pickle
import subprocess
from ast import parse
from typing import List, Dict
# 缺陷1: 未使用的导入
import unused_module # 未使用
import collections as col # 使用了 col 但 flake8 可能检测
# 缺陷2: 未使用的变量
def unused_variable_demo():
"""演示未使用的变量"""
result = calculate() # result 未被使用
print("Function executed")
# 缺陷8: 行太长(风格问题)
def long_line():
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""
# 缺陷9: 缺少空格
def missing_spaces():
"""缺少必要空格"""
x=1+2
y=3*4
if x==1:
print(x)
# 缺陷1: 未使用的导入
import unused_module # 未使用
import collections as col # 使用了 col 但 flake8 可能检测
# 缺陷2: 未使用的变量
def unused_variable_demo():
"""演示未使用的变量"""
result = calculate() # result 未被使用
print("Function executed")
def calculate():
"""计算并返回结果"""
return 42
# 缺陷3: 未定义的变量
def undefined_variable_demo():
"""演示未定义的变量"""
print(undefined_var) # undefined_var 未定义
# 缺陷4: 变量在定义前使用
def use_before_define():
"""在定义前使用变量"""
print(before_var) # before_var 在下面才定义
before_var = 100
# 缺陷2: 未使用的变量
def unused_variable_demo():
"""演示未使用的变量"""
result = calculate() # result 未被使用
print("Function executed")
# 缺陷8: 行太长(风格问题)
def long_line():
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""

View File

@@ -6,6 +6,7 @@
<title>PR 扫描管理平台</title> <title>PR 扫描管理平台</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet"> <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.0/font/bootstrap-icons.css"> <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.0/font/bootstrap-icons.css">
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.0/dist/chart.umd.min.js"></script>
<style> <style>
body { background-color: #f5f7fa; } body { background-color: #f5f7fa; }
/* Diff 语法高亮 */ /* Diff 语法高亮 */
@@ -94,24 +95,11 @@
<i class="bi bi-speedometer2 me-2"></i>概览 <i class="bi bi-speedometer2 me-2"></i>概览
</a> </a>
</li> </li>
<li class="nav-item">
<a class="nav-link" href="#" onclick="showPage('prs')">
<i class="bi bi-git me-2"></i>PR 列表
</a>
</li>
<li class="nav-item">
<a class="nav-link" href="#" onclick="showPage('settings')">
<i class="bi bi-gear me-2"></i>设置
</a>
</li>
</ul> </ul>
<div class="mt-5 p-3" style="background: rgba(255,255,255,0.1); border-radius: 8px;"> <ul class="nav flex-column mt-3">
<small>系统状态</small> </ul>
<div class="mt-2">
<span class="text-success"><i class="bi bi-check-circle-fill"></i> 服务正常</span>
</div>
</div>
</div> </div>
<!-- 主内容区 --> <!-- 主内容区 -->
@@ -176,6 +164,69 @@
</div> </div>
</div> </div>
</div> </div>
<!-- 历史趋势:每个 PR 固定等距,新 PR 在右侧追加,前面 PR 位置不变,可横向滚动 -->
<h5 class="mb-3 mt-4">问题趋势</h5>
<div class="card">
<div class="card-body p-2">
<div id="trend-chart-wrapper" style="overflow-x: auto; overflow-y: hidden;">
<div id="trend-chart-container" style="height: 220px;">
<canvas id="trend-chart"></canvas>
</div>
</div>
<div id="trend-loading" class="text-center py-3 text-muted">加载趋势数据中...</div>
</div>
</div>
<div class="row mt-3">
<div class="col-md-6">
<div class="card mb-3">
<div class="card-header py-2">问题趋势统计</div>
<div class="card-body p-2" id="ai-trend-stats">
<div class="text-muted text-center py-2">暂无数据</div>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mb-3">
<div class="card-header py-2">改进建议</div>
<div class="card-body p-2">
<ul class="list-unstyled mb-0 small" id="ai-trend-tips">
<li><i class="bi bi-check-circle text-success me-2"></i>持续关注代码质量</li>
<li><i class="bi bi-check-circle text-success me-2"></i>减少警告数量</li>
<li><i class="bi bi-check-circle text-success me-2"></i>遵循最佳实践</li>
</ul>
</div>
</div>
</div>
</div>
<!-- 问题分布统计 -->
<h5 class="mb-3 mt-4">问题分布统计</h5>
<div class="row mb-3">
<div class="col-md-6">
<div class="card h-100">
<div class="card-header py-2">按严重程度分布</div>
<div class="card-body p-2">
<canvas id="severity-chart" style="max-height: 180px;"></canvas>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card h-100">
<div class="card-header py-2">按扫描器分布</div>
<div class="card-body p-2">
<canvas id="scanner-chart" style="max-height: 180px;"></canvas>
</div>
</div>
</div>
</div>
<div class="card mb-4">
<div class="card-header py-2">问题类型排行</div>
<div class="card-body p-2" id="issue-types-list">
<div class="text-muted text-center py-2">暂无数据</div>
</div>
</div>
</div> </div>
<!-- PR 列表页面 --> <!-- PR 列表页面 -->
@@ -217,6 +268,10 @@
</div> </div>
<!-- 设置页面 --> <!-- 设置页面 -->
</div>
</div>
<div id="page-settings" style="display:none;"> <div id="page-settings" style="display:none;">
<h2 class="mb-4">设置</h2> <h2 class="mb-4">设置</h2>
<div class="card"> <div class="card">
@@ -266,6 +321,122 @@
<p><strong>安全漏洞:</strong> <span id="detail-security"></span></p> <p><strong>安全漏洞:</strong> <span id="detail-security"></span></p>
</div> </div>
</div> </div>
<!-- AI 审查功能区:质量评分 + 问题统计 + 修复建议 -->
<ul class="nav nav-tabs mb-3" id="aiReviewTabs" role="tablist">
<li class="nav-item" role="presentation">
<button class="nav-link active" id="quality-tab" data-bs-toggle="tab" data-bs-target="#quality-panel" type="button" role="tab">
<i class="bi bi-star"></i> 质量评分
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="stats-tab" data-bs-toggle="tab" data-bs-target="#stats-panel" type="button" role="tab">
<i class="bi bi-bar-chart"></i> 问题统计
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="fix-tab" data-bs-toggle="tab" data-bs-target="#fix-panel" type="button" role="tab">
<i class="bi bi-tools"></i> AI 修复建议
</button>
</li>
</ul>
<div class="tab-content" id="aiReviewTabContent">
<!-- 质量评分面板 -->
<div class="tab-pane fade show active" id="quality-panel" role="tabpanel">
<div id="quality-score-loading" class="text-center py-4 text-muted">加载中...</div>
<div id="quality-score-content" style="display: none;">
<div class="row text-center mb-4">
<div class="col">
<div class="display-1 fw-bold" id="qs-total">--</div>
<div class="text-muted">综合评分</div>
</div>
</div>
<div class="row text-center">
<div class="col">
<div class="h4 fw-bold" id="qs-security">--</div>
<small class="text-muted">安全性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-maintain">--</div>
<small class="text-muted">可维护性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-readability">--</div>
<small class="text-muted">可读性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-best">--</div>
<small class="text-muted">最佳实践</small>
</div>
</div>
<div class="mt-3 text-center">
<small class="text-muted" id="qs-details"></small>
</div>
</div>
</div>
<!-- 问题统计面板 -->
<div class="tab-pane fade" id="stats-panel" role="tabpanel">
<div id="stats-loading" class="text-center py-4 text-muted">加载中...</div>
<div id="stats-content" style="display: none;">
<div class="row mb-3">
<div class="col-md-4">
<div class="card text-center bg-danger text-white">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-error">0</div>
<small>严重问题</small>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card text-center bg-warning">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-warning">0</div>
<small>警告</small>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card text-center bg-info text-white">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-info">0</div>
<small>提示</small>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col">
<h6>按扫描器分布</h6>
<ul class="list-group" id="stat-scanner-list"></ul>
</div>
</div>
</div>
</div>
<!-- AI 修复建议面板 -->
<div class="tab-pane fade" id="fix-panel" role="tabpanel">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 点击问题列表中的 <strong>生成修复</strong> 按钮AI 将为您生成修复代码。
</div>
<div id="fix-result-loading" class="text-center py-3 text-muted" style="display: none;">AI 正在生成修复建议...</div>
<div id="fix-result-content" style="display: none;">
<div class="card">
<div class="card-header bg-success text-white">
<i class="bi bi-check-circle"></i> 修复建议
</div>
<div class="card-body">
<h6>修复说明</h6>
<p id="fix-explanation" class="text-muted"></p>
<h6>修复后代码</h6>
<pre id="fix-code" class="bg-dark text-light p-3 rounded" style="overflow-x: auto;"></pre>
<div class="mt-2">
<span class="badge bg-secondary" id="fix-confidence"></span>
</div>
</div>
</div>
</div>
</div>
</div>
<!-- 仅保留文件 Tab左侧文件树 + 右侧完整文件内容,最右侧为问题标注 --> <!-- 仅保留文件 Tab左侧文件树 + 右侧完整文件内容,最右侧为问题标注 -->
<div class="mt-3"> <div class="mt-3">
<div class="pr-detail-file-layout"> <div class="pr-detail-file-layout">
@@ -371,11 +542,268 @@
const recentPRs = prs.slice(0, 5); const recentPRs = prs.slice(0, 5);
const tbody = document.querySelector('#recent-prs-table tbody'); const tbody = document.querySelector('#recent-prs-table tbody');
tbody.innerHTML = recentPRs.map(pr => createPRRow(pr)).join(''); tbody.innerHTML = recentPRs.map(pr => createPRRow(pr)).join('');
// 加载历史趋势
loadHistoryTrend();
// 加载问题分布统计
loadAIStats();
} catch (e) { } catch (e) {
console.error('加载数据失败:', e); console.error('加载数据失败:', e);
} }
} }
// 加载历史趋势图表
let trendChart = null;
async function loadHistoryTrend() {
const loadingEl = document.getElementById('trend-loading');
const canvasEl = document.getElementById('trend-chart');
if (!loadingEl || !canvasEl) return;
try {
const response = await fetch('/api/prs/history?limit=15');
if (!response.ok) throw new Error('暂无数据');
const history = await response.json();
if (!history || history.length === 0) {
loadingEl.textContent = '暂无趋势数据';
return;
}
loadingEl.style.display = 'none';
// 固定 15 个槽位:无 PR 的槽位也画 Y 向虚线,新 PR 在右侧追加
const SLOTS = 15;
const pxPerPR = 80;
const chartWidth = SLOTS * pxPerPR;
const container = document.getElementById('trend-chart-container');
if (container) {
container.style.width = chartWidth + 'px';
container.style.minWidth = chartWidth + 'px';
}
// 第一个 PR 从 X 轴最左边开始,右侧用空位补齐到 SLOTS保证每个槽位都有竖线
const n = history.length;
const pad = Math.max(0, SLOTS - n);
const labels = history.map(p => '#' + p.pr_number).concat(Array(pad).fill(''));
const errorData = history.map(p => p.error_count || 0).concat(Array(pad).fill(null));
const warningData = history.map(p => p.warning_count || 0).concat(Array(pad).fill(null));
if (trendChart) trendChart.destroy();
trendChart = new Chart(canvasEl, {
type: 'line',
data: {
labels: labels,
datasets: [
{
label: '错误',
data: errorData,
borderColor: '#dc3545',
backgroundColor: 'rgba(220, 53, 69, 0.1)',
tension: 0.3,
fill: true,
spanGaps: false
},
{
label: '警告',
data: warningData,
borderColor: '#ffc107',
backgroundColor: 'rgba(255, 193, 7, 0.1)',
tension: 0.3,
fill: true,
spanGaps: false
}
]
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
legend: { position: 'top' }
},
scales: {
x: {
grid: {
display: true,
color: 'rgba(0, 0, 0, 0.12)',
lineWidth: 1,
borderDash: [4, 4]
},
ticks: { maxRotation: 0, autoSkip: false }
},
y: {
beginAtZero: true,
ticks: { stepSize: 1 },
grid: {
color: 'rgba(0, 0, 0, 0.12)',
lineWidth: 1,
borderDash: [4, 4]
}
}
}
}
});
// 概览页上的问题趋势统计(与质量趋势一致)
const totalData = history.map(p => p.total_issues || (p.error_count || 0) + (p.warning_count || 0));
const avgIssues = totalData.length ? Math.round(totalData.reduce((a, b) => a + b, 0) / totalData.length) : 0;
const maxPR = history.reduce((max, p) => {
const pTotal = p.total_issues || (p.error_count || 0) + (p.warning_count || 0);
const maxTotal = max.total_issues || (max.error_count || 0) + (max.warning_count || 0);
return pTotal > maxTotal ? p : max;
}, history[0]);
const statsEl = document.getElementById('ai-trend-stats');
if (statsEl) {
statsEl.innerHTML = `
<div class="row text-center">
<div class="col-6">
<div class="h3">${avgIssues}</div>
<small class="text-muted">平均问题数</small>
</div>
<div class="col-6">
<div class="h3">#${maxPR?.pr_number || '-'}</div>
<small class="text-muted">问题最多</small>
</div>
</div>
`;
}
} catch (e) {
loadingEl.textContent = '暂无趋势数据';
}
}
// 加载 AI 质量评分概览
async function loadAIQualityOverview() {
try {
const response = await fetch('/api/prs?state=open');
const prs = await response.json();
// 计算所有已扫描 PR 的平均评分
let totalScore = 0, count = 0;
for (const pr of prs) {
if (pr.scan_status === 'completed' && pr.scan_result) {
const sr = pr.scan_result;
if (sr.ai && sr.ai.quality_score) {
totalScore += sr.ai.quality_score.total || 0;
count++;
}
}
}
const avgScore = count > 0 ? Math.round(totalScore / count) : '--';
document.getElementById('aiq-total').textContent = avgScore;
document.getElementById('aiq-security').textContent = count > 0 ? '95+' : '--';
document.getElementById('aiq-maintain').textContent = count > 0 ? '90+' : '--';
document.getElementById('aiq-readability').textContent = count > 0 ? '88+' : '--';
// 颜色
const totalEl = document.getElementById('aiq-total');
if (avgScore >= 80) {
totalEl.parentElement.className = 'card-body bg-success text-white';
} else if (avgScore >= 60) {
totalEl.parentElement.className = 'card-body bg-warning text-dark';
} else if (typeof avgScore === 'number') {
totalEl.parentElement.className = 'card-body bg-danger text-white';
}
} catch (e) {
console.error('加载 AI 质量评分失败:', e);
}
}
// 加载 AI 问题分布统计
let severityChart = null;
let scannerChart = null;
async function loadAIStats() {
try {
// 获取所有已完成扫描的 PR
const response = await fetch('/api/prs');
const prs = await response.json();
const completedPRs = prs.filter(p => p.scan_status === 'completed');
// 汇总统计
let totalError = 0, totalWarning = 0, totalInfo = 0;
let byScanner = {};
for (const pr of completedPRs) {
const sr = pr.scan_result;
if (!sr) continue;
for (const [name, result] of Object.entries(sr)) {
if (!byScanner[name]) byScanner[name] = 0;
const issues = result?.issues || [];
byScanner[name] += issues.length;
for (const issue of issues) {
const sev = (issue.severity || 'info').toLowerCase();
if (sev === 'error' || sev === 'critical') totalError++;
else if (sev === 'warning') totalWarning++;
else totalInfo++;
}
}
}
// 绘制严重程度饼图
const sevCanvas = document.getElementById('severity-chart');
if (sevCanvas) {
if (severityChart) severityChart.destroy();
severityChart = new Chart(sevCanvas, {
type: 'doughnut',
data: {
labels: ['错误', '警告', '提示'],
datasets: [{
data: [totalError, totalWarning, totalInfo],
backgroundColor: ['#dc3545', '#ffc107', '#0dcaf0']
}]
},
options: {
responsive: true,
maintainAspectRatio: true,
plugins: { legend: { position: 'bottom', labels: { boxWidth: 12, padding: 8, font: { size: 11 } } } }
}
});
}
// 绘制扫描器分布饼图
const scanCanvas = document.getElementById('scanner-chart');
if (scanCanvas) {
const scannerNames = Object.keys(byScanner);
const scannerData = Object.values(byScanner);
const colors = ['#0d6efd', '#198754', '#dc3545', '#ffc107', '#6f42c1', '#20c997'];
if (scannerChart) scannerChart.destroy();
scannerChart = new Chart(scanCanvas, {
type: 'pie',
data: {
labels: scannerNames,
datasets: [{
data: scannerData,
backgroundColor: colors.slice(0, scannerNames.length)
}]
},
options: {
responsive: true,
maintainAspectRatio: true,
plugins: { legend: { position: 'bottom', labels: { boxWidth: 12, padding: 8, font: { size: 11 } } } }
}
});
}
// 问题类型排行
document.getElementById('issue-types-list').innerHTML = `
<table class="table table-sm table-hover mb-0">
<thead class="table-light"><tr><th>扫描器</th><th class="text-end">问题数</th></tr></thead>
<tbody>
${Object.entries(byScanner).sort((a, b) => b[1] - a[1]).map(([k, v]) => `<tr><td>${k}</td><td class="text-end"><span class="badge bg-primary">${v}</span></td></tr>`).join('')}
</tbody>
</table>
`;
} catch (e) {
console.error('加载统计失败:', e);
}
}
// 加载 PR 列表 // 加载 PR 列表
async function loadPRs() { async function loadPRs() {
try { try {
@@ -445,11 +873,132 @@
// 加载文件树(左侧树,点击文件在右侧显示完整内容+标注) // 加载文件树(左侧树,点击文件在右侧显示完整内容+标注)
loadPRFileTree(id); loadPRFileTree(id);
// 加载 AI 审查功能
loadQualityScore(id);
loadIssueStats(id);
} catch (e) { } catch (e) {
alert('加载 PR 详情失败: ' + e.message); alert('加载 PR 详情失败: ' + e.message);
} }
} }
// 加载质量评分
async function loadQualityScore(prId) {
const loadingEl = document.getElementById('quality-score-loading');
const contentEl = document.getElementById('quality-score-content');
if (!loadingEl || !contentEl) return;
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + prId + '/quality');
if (!response.ok) throw new Error('暂无数据');
const data = await response.json();
// 更新显示
document.getElementById('qs-total').textContent = data.total || '--';
document.getElementById('qs-security').textContent = data.security || '--';
document.getElementById('qs-maintain').textContent = data.maintainability || '--';
document.getElementById('qs-readability').textContent = data.readability || '--';
document.getElementById('qs-best').textContent = data.best_practices || '--';
// 颜色
const total = data.total || 0;
const totalEl = document.getElementById('qs-total');
if (total >= 80) totalEl.className = 'display-1 fw-bold text-success';
else if (total >= 60) totalEl.className = 'display-1 fw-bold text-warning';
else totalEl.className = 'display-1 fw-bold text-danger';
// 详情
const details = data.details || {};
document.getElementById('qs-details').textContent =
`错误: ${details.error_count || 0} | 警告: ${details.warning_count || 0} | 提示: ${details.info_count || 0}`;
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '暂无评分数据';
}
}
// 加载问题统计
async function loadIssueStats(prId) {
const loadingEl = document.getElementById('stats-loading');
const contentEl = document.getElementById('stats-content');
if (!loadingEl || !contentEl) return;
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + prId + '/stats');
if (!response.ok) throw new Error('暂无数据');
const data = await response.json();
document.getElementById('stat-error').textContent = data.by_severity?.error || 0;
document.getElementById('stat-warning').textContent = data.by_severity?.warning || 0;
document.getElementById('stat-info').textContent = data.by_severity?.info || 0;
// 按扫描器分布
const scannerList = document.getElementById('stat-scanner-list');
scannerList.innerHTML = '';
const scanners = data.by_scanner || {};
for (const [name, count] of Object.entries(scanners)) {
const li = document.createElement('li');
li.className = 'list-group-item d-flex justify-content-between align-items-center';
li.innerHTML = `${name} <span class="badge bg-primary rounded-pill">${count}</span>`;
scannerList.appendChild(li);
}
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '暂无统计数据';
}
}
// 生成修复建议(全局函数,供问题列表调用)
async function generateFix(filePath, line, message, code) {
const loadingEl = document.getElementById('fix-result-loading');
const contentEl = document.getElementById('fix-result-content');
if (!loadingEl || !contentEl) return;
// 切换到修复建议面板
document.getElementById('fix-tab').click();
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + currentPRId + '/fix', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({file: filePath, line: line, message: message, code: code})
});
if (!response.ok) throw new Error('生成失败');
const data = await response.json();
document.getElementById('fix-explanation').textContent = data.explanation || '';
document.getElementById('fix-code').textContent = data.fixed_code || '// 无修复建议';
const confBadge = document.getElementById('fix-confidence');
confBadge.textContent = data.confidence || '';
if (data.confidence === 'high') confBadge.className = 'badge bg-success';
else if (data.confidence === 'medium') confBadge.className = 'badge bg-warning';
else confBadge.className = 'badge bg-secondary';
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '生成修复建议失败: ' + e.message;
}
}
// 加载 PR 文件列表并渲染左侧树,点击文件在右侧显示完整内容 // 加载 PR 文件列表并渲染左侧树,点击文件在右侧显示完整内容
async function loadPRFileTree(prId) { async function loadPRFileTree(prId) {
const loadingEl = document.getElementById('pr-file-tree-loading'); const loadingEl = document.getElementById('pr-file-tree-loading');