36 Commits

Author SHA1 Message Date
590ccf8093 Update test_demo/demo_flaws.py 2026-03-26 11:18:21 +08:00
dangzerong
c6a27a6b29 Merge pull request 'Update test_demo/demo_flaws.py' (#29) from dingshuo-patch-5 into main 2026-03-17 11:29:31 +08:00
0274f01b6e Update test_demo/demo_flaws.py 2026-03-17 11:28:17 +08:00
dangzerong
a49f42efe5 Merge pull request 'Update test_demo/demo_flaws.py' (#27) from dingshuo-patch-3 into main 2026-03-16 12:56:45 +08:00
8062ed4bfd Update test_demo/demo_flaws.py 2026-03-16 12:55:31 +08:00
dangzerong
ffd77057e3 Merge pull request 'Update test_demo/demo_flaws.py' (#25) from dingshuo-patch-2 into main 2026-03-15 13:34:19 +08:00
279a01b897 Update test_demo/demo_flaws.py 2026-03-15 13:29:58 +08:00
77fd09e6d2 Update test_demo/demo_flaws.py 2026-03-15 12:27:43 +08:00
dangzerong
91c16cbc88 Merge pull request '测试的扫描文件' (#20) from dev into main 2026-03-13 21:04:31 +08:00
Dang Zerong
c8c0ef1620 测试的扫描文件 2026-03-13 21:00:53 +08:00
dangzerong
95831d5190 Merge pull request 'dev' (#19) from dev into main 2026-03-13 18:09:32 +08:00
Dang Zerong
9a14c0b219 测试的扫描文件 2026-03-13 18:00:27 +08:00
Dang Zerong
87b2dacf65 测试的扫描文件 2026-03-13 18:00:22 +08:00
dangzerong
453414efb2 Merge pull request 'dev' (#17) from dev into main 2026-03-13 17:57:36 +08:00
Dang Zerong
04518812f4 Merge branch 'dev' of https://code.deep-pilot.chat/Bosch_Demo/code_scan into dev 2026-03-13 17:42:54 +08:00
Dang Zerong
6c4ee107f9 测试的扫描文件 2026-03-13 17:42:27 +08:00
dangzerong
d11b349d5e Merge pull request '测试的扫描文件' (#15) from dev into main 2026-03-13 17:41:51 +08:00
dangzerong
2a2ff1ad5f Merge branch 'main' into dev 2026-03-13 17:40:33 +08:00
Dang Zerong
bc5a19fffc 测试的扫描文件 2026-03-13 17:39:20 +08:00
Dang Zerong
78655ce5dc 测试的扫描文件 2026-03-13 17:37:46 +08:00
Dang Zerong
2201f6d696 Merge branch 'dev' 2026-03-13 17:37:10 +08:00
Dang Zerong
97881ee00e 测试的扫描文件 2026-03-13 17:32:23 +08:00
dangzerong
e46aff2797 Merge pull request 'dev' (#13) from dev into main 2026-03-13 17:25:28 +08:00
Dang Zerong
887c8ae154 测试的扫描文件 2026-03-13 16:49:13 +08:00
Dang Zerong
ecc39402d5 测试的扫描文件 2026-03-13 16:27:32 +08:00
dangzerong
dc9b921091 Merge pull request '测试的扫描文件' (#11) from dev into main
Reviewed-on: #11
2026-03-13 16:26:54 +08:00
Dang Zerong
a928b79d6d 测试的扫描文件 2026-03-13 16:25:37 +08:00
dangzerong
0991b3de26 Merge pull request 'dev' (#10) from dev into main 2026-03-13 16:24:21 +08:00
Dang Zerong
1876be1777 测试的扫描文件 2026-03-13 16:22:23 +08:00
Dang Zerong
51fc1a6aae 先删除测试代码,后面再提交 2026-03-13 16:21:39 +08:00
Dang Zerong
726c21feac 可演示 2026-03-13 16:04:20 +08:00
dangzerong
a525a2b4ac Merge pull request 'dev' (#9) from dev into main 2026-03-13 15:32:42 +08:00
Dang Zerong
cb90b66f09 代码测试 2026-03-13 11:26:01 +08:00
Dang Zerong
8f9e5bf4f5 代码测试 2026-03-12 16:13:18 +08:00
dangzerong
b4f923f76c Merge pull request '删除代码测试' (#7) from dev into main 2026-03-12 16:13:04 +08:00
Dang Zerong
a3ae277dcb 删除代码测试 2026-03-12 15:50:17 +08:00
15 changed files with 1381 additions and 885 deletions

34
Dockerfile Normal file
View File

@@ -0,0 +1,34 @@
# 使用中科大镜像源的 Python 基础镜像
FROM python:3.11.15-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV FLASK_RUN_HOST=0.0.0.0
# 安装系统依赖(用于 git 等工具)
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖(显式安装,避免缓存导致遗漏)
RUN pip install --no-cache-dir -r requirements.txt \
&& pip install --no-cache-dir "GitPython>=3.1.0" "gitdb>=4.0.1" "smmap>=3.0.1"
# 复制应用代码
COPY . .
# 创建报告目录
RUN mkdir -p reports
# 暴露端口
EXPOSE 5000
# 启动应用
CMD ["python", "app.py"]

281
README.md
View File

@@ -1,224 +1,109 @@
# AI Code Quality Scanner - 飞书通知版 # AI 代码质量扫描系统
一个自动化代码质量扫描系统,在代码提交时自动扫描并发送报告到飞书 自动化代码质量扫描工具,监听 PR 事件,自动扫描代码缺陷并提供合并决策支持
## 功能特性 ## 工作流程
- 🤖 自动监听 Gitea 代码提交事件
- 🔍 多维度代码质量扫描(语法、风格、安全)
- 📊 生成 Markdown 格式扫描报告
- 📱 实时推送飞书机器人通知
## 系统架构
``` ```
┌───────────── Webhook ┌──────────────────┐ ┌──────────┐ 1. 创建 PR ┌────────────┐
Gitea │ ───────────────► │ Webhook Server │ Gitea │ ───────────────► │ Webhook │
│ 代码仓库 │ (Flask) └──────────┘Server
└─────────────┘ └────────┬─────────┘ └───────────┘
│ 2. 拉取代码、扫描、存库
┌──────────────────┐ ────────────┐
│ Code Scanner │ SQLite
│ - ESLint │ Database
│ - Pylint │ └────────────┘
- SonarQube │ │ 3. 前端查询
└────────┬─────────┘
┌──────────────────┐ ────────────┐
│ Report Generator 前端页面
│ - Markdown │ └────────────┘
└────────┬─────────┘
┌──────────────────┐
│ Feishu Bot │
│ - Webhook │
└──────────────────┘
``` ```
## 三个核心功能
### 1. PR 创建
- Gitea 仓库创建 PR 时自动触发扫描
- 支持事件:`opened``reopened``synchronize`
### 2. 后端处理
- 拉取 PR 对应的代码
- 执行代码扫描Python/JavaScript/TypeScript
- AI 智能审查代码缺陷
- 扫描结果存入 SQLite 数据库
### 3. 前端功能
- 查询所有 PR 及扫描状态
- 查看每个 PR 的缺陷详情
- 一键「拒绝合并」或「同意合并」
## 快速开始 ## 快速开始
### 1. 安装依赖
```bash ```bash
# 安装依赖
pip install -r requirements.txt pip install -r requirements.txt
```
### 2. 配置飞书机器人 # 运行服务
1. 打开飞书群聊 → 设置 → 群机器人
2. 添加机器人 → 选择"自定义机器人"
3. 获取 Webhook 地址
4. 配置 `config.yaml`
### 3. 配置 Gitea Webhook
#### 方式一Push 时扫描(原有方式)
1. 进入 Gitea 仓库 → 设置 → Webhooks
2. 添加 Webhook
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 触发事件: Push
- 密钥: 配置 `config.yaml` 中的 secret
#### 方式二PR 创建时扫描(推荐)
1. 进入 Gitea 仓库 → 设置 → Webhooks
2. 添加 Webhook
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 触发事件: Pull Request
- 密钥: 配置 `config.yaml` 中的 secret
**支持的 PR 事件:**
- `opened` - 创建新 PR
- `reopened` - 重新打开 PR
- `synchronize` - PR 中的提交有更新
- `ready_for_review` - PR 标记为准备好审查
### 4. 运行服务
```bash
python app.py python app.py
``` ```
## 配置说明 访问 http://localhost:5000 查看前端页面。
所有配置在 `config.yaml` 中: ## Docker 部署
### 1. 构建镜像
```bash
docker buildx build --load --push -t dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest .
```
### 2. 登录仓库
```bash
docker login dcr-by1jwyxk44.71826370.xyz
```
### 3. Push 到仓库
```bash
docker run -d --name code-scan -p 5000:5000 dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
```
### 4. 使用 docker compose 启动
```bash
# 启动服务
docker compose up -d
# 查看日志
docker compose logs -f
# 停止服务
docker compose down
```
## 配置
配置文件 `config.yaml`
```yaml ```yaml
server: server:
host: "0.0.0.0" host: "0.0.0.0"
port: 5000 port: 5000
debug: true
gitea: gitea:
base_url: "http://localhost:3000" base_url: "https://code.deep-pilot.chat"
# Webhook 签名密钥 webhook_secret: "xxx"
webhook_secret: "your_webhook_secret" api_token: "xxx"
feishu: ai:
# 飞书机器人 Webhook 地址 provider: "api"
webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" model: "qwen3.5-plus"
# 消息推送 secret可选用于签名 api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
secret: "your_feishu_secret" api_key: "sk-xxx"
scanner:
# 支持的语言
languages:
- python
- javascript
- typescript
# 扫描阈值
max_issues: 10
# 是否启用详细扫描
detailed: true
report:
# 报告保存路径
output_dir: "./reports"
# 是否保留报告文件
keep_files: true
``` ```
## 项目结构
```
code-scanner/
├── app.py # 主应用入口
├── config.yaml # 配置文件
├── requirements.txt # Python 依赖
├── README.md # 项目说明
├── scanner/
│ ├── __init__.py
│ ├── base.py # 扫描器基类
│ ├── python_scanner.py # Python 代码扫描
│ ├── js_scanner.py # JavaScript/TypeScript 扫描
│ └── security_scanner.py # 安全扫描
├── report/
│ ├── __init__.py
│ └── generator.py # Markdown 报告生成
├── notify/
│ ├── __init__.py
│ └── feishu.py # 飞书通知
├── webhook/
│ ├── __init__.py
│ └── handler.py # Webhook 处理
└── reports/ # 报告输出目录
```
## 支持的扫描工具
### Python
- **Pylint** - 代码风格和错误检查
- **Flake8** - Python 代码检查
- **Bandit** - 安全漏洞扫描
### JavaScript/TypeScript
- **ESLint** - JavaScript/TypeScript 检查
- **Prettier** - 代码格式化
## 飞书消息效果
扫描完成后,将收到类似以下消息:
### Push 扫描消息
```
📊 代码质量扫描报告
仓库: my-project
分支: main
提交: abc1234
提交者: developer@example.com
✅ 扫描通过 (0 issues)
⚠️ 发现问题 (5 issues)
```
### PR 扫描消息
```
📊 PR 代码质量扫描报告
仓库: my-project
源分支: feature-xxx → 目标分支: main
PR链接: https://gitea.example.com/user/project/pulls/123
提交: abc1234
提交者: developer@example.com
✅ 扫描通过 (0 issues)
⚠️ 发现问题 (5 issues)
```
## Docker 部署
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "app.py"]
```
## 环境变量
也可以通过环境变量配置:
```bash
export FEISHU_WEBHOOK_URL="https://open.feishu.cn/..."
export GITEA_WEBHOOK_SECRET="secret"
export SCANNER_MAX_ISSUES=10
```
## 许可证
MIT License

195
app.py
View File

@@ -2,7 +2,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
import time
import logging import logging
import traceback
from typing import Dict, Tuple, Any from typing import Dict, Tuple, Any
import json import json
@@ -130,21 +132,27 @@ def handle_gitea_webhook():
# Python 扫描 # Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan( scan_results['python'] = python_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描 # JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan( scan_results['javascript'] = js_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描 # 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan( scan_results['security'] = security_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告 # 生成报告
report = report_generator.generate( report = report_generator.generate(
@@ -227,27 +235,35 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
# Python 扫描 # Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan( scan_results['python'] = python_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描 # JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan( scan_results['javascript'] = js_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描 # 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan( scan_results['security'] = security_scanner.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# AI 代码审查 # AI 代码审查
if config.get('ai', {}).get('enabled', False): if config.get('ai', {}).get('enabled', False):
start_time = time.time()
scan_results['ai'] = ai_reviewer.scan( scan_results['ai'] = ai_reviewer.scan(
clone_url, source_sha, source_branch, changed_files clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}")
# 获取 PR 的代码差异,用于将问题与代码片段关联 # 获取 PR 的代码差异,用于将问题与代码片段关联
pr_diff = None pr_diff = None
@@ -298,6 +314,7 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
logger.info(f'PR #{pr_number} 扫描完成') logger.info(f'PR #{pr_number} 扫描完成')
except Exception as e: except Exception as e:
traceback.print_exc()
logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}') logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}')
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@@ -324,13 +341,19 @@ def manual_scan():
scan_results = {} scan_results = {}
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch) scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch) scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
start_time = time.time()
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch) scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告 # 生成报告
report = report_generator.generate( report = report_generator.generate(
@@ -667,6 +690,178 @@ def api_get_pr_file_content(pr_id):
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/quality')
def api_get_quality_score(pr_id):
"""获取 PR 的代码质量评分"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 从 scan_result 中获取质量评分
scan_result = pr.get('scan_result')
if isinstance(scan_result, str):
try:
scan_result = json.loads(scan_result)
except:
scan_result = None
quality_score = None
if scan_result and scan_result.get('ai'):
quality_score = scan_result['ai'].get('quality_score')
if not quality_score:
return jsonify({'error': '暂无质量评分'}), 404
return jsonify(quality_score)
except Exception as e:
logger.error(f'获取质量评分失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/stats')
def api_get_issue_stats(pr_id):
"""获取 PR 的问题统计"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 获取 scan_details_with_code
scan_details = pr.get('scan_details_with_code')
if isinstance(scan_details, str):
try:
scan_details = json.loads(scan_details)
except:
scan_details = None
if not scan_details:
return jsonify({'error': '暂无扫描详情'}), 404
# 统计各扫描器的问题
stats = {
'by_severity': {'error': 0, 'warning': 0, 'info': 0},
'by_scanner': {},
'total': 0
}
# 统计静态扫描器
for scanner in scan_details.get('scanners', []):
scanner_name = scanner.get('name', 'unknown')
scanner_issues = scanner.get('issues', [])
stats['by_scanner'][scanner_name] = len(scanner_issues)
for issue in scanner_issues:
sev = (issue.get('severity') or 'info').lower()
if sev in stats['by_severity']:
stats['by_severity'][sev] += 1
stats['total'] += 1
# 统计 AI 扫描器
ai_data = scan_details.get('ai', {})
if ai_data:
ai_issues = ai_data.get('issues', [])
stats['by_scanner']['AI'] = len(ai_issues)
for issue in ai_issues:
sev = (issue.get('severity') or 'info').lower()
if sev in stats['by_severity']:
stats['by_severity'][sev] += 1
stats['total'] += 1
return jsonify(stats)
except Exception as e:
logger.error(f'获取问题统计失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/fix', methods=['POST'])
def api_generate_fix(pr_id):
"""生成问题修复建议"""
try:
data = request.get_json()
if not data:
return jsonify({'error': '请求体为空'}), 400
file_path = data.get('file')
line = data.get('line', 1)
message = data.get('message', '')
code = data.get('code', '')
if not file_path or not message:
return jsonify({'error': '缺少必要参数'}), 400
# 调用 AI 生成修复建议
fix_result = ai_reviewer.generate_fix_suggestion(file_path, line, message, code)
if fix_result:
return jsonify(fix_result)
else:
return jsonify({'error': '生成修复建议失败'}), 500
except Exception as e:
logger.error(f'生成修复建议失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/history')
def api_get_pr_history():
"""获取 PR 扫描历史趋势"""
try:
limit = request.args.get('limit', 20, type=int)
repo_name = request.args.get('repo_name', '')
# 获取 PR 列表
prs = PRScanDB.get_all_prs(status='completed')
if repo_name:
prs = [p for p in prs if p.get('repo_name') == repo_name]
# 只取最近的 N 个
prs = prs[:limit]
# 构建趋势数据
history = []
for pr in reversed(prs): # 从旧到新
issues_count = pr.get('issues_count', 0)
# 从 scan_result 中各扫描器汇总 error/warning 数量
scan_result = pr.get('scan_result')
if isinstance(scan_result, str):
try:
scan_result = json.loads(scan_result)
except:
scan_result = None
error_count = 0
warning_count = 0
if scan_result and isinstance(scan_result, dict):
# 遍历各扫描器,汇总 error 和 warning
for scanner_name, scanner_result in scan_result.items():
if isinstance(scanner_result, dict):
summary = scanner_result.get('summary', {})
if isinstance(summary, dict):
error_count += summary.get('error', 0)
warning_count += summary.get('warning', 0)
history.append({
'pr_id': pr.get('id'),
'pr_number': pr.get('pr_number'),
'repo_name': pr.get('repo_name'),
'title': pr.get('pr_title', ''),
'author': pr.get('author', ''),
'created_at': pr.get('created_at', ''),
'issues_count': issues_count,
'error_count': error_count,
'warning_count': warning_count,
'total_issues': error_count + warning_count,
'state': pr.get('state', '')
})
return jsonify(history)
except Exception as e:
logger.error(f'获取历史趋势失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/merge', methods=['POST']) @app.route('/api/prs/<int:pr_id>/merge', methods=['POST'])
def api_merge_pr(pr_id): def api_merge_pr(pr_id):
"""合并 PR""" """合并 PR"""

View File

@@ -49,14 +49,13 @@ ai:
# AI 审查器配置 # AI 审查器配置
# 支持: "ollama" (本地) 或 "api" (在线API) # 支持: "ollama" (本地) 或 "api" (在线API)
provider: "api" provider: "api"
# 模型名称(硅基流动可用模型)- Qwen 最强语言模型 # 模型名称(阿里云通义千问)
model: "deepseek-ai/DeepSeek-V3.2" model: "qwen3.5-plus"
# API 地址 # API 地址
# 硅基流动: https://api.siliconflow.cn/v1 api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_url: "https://api.siliconflow.cn/v1"
# API 密钥 # API 密钥
api_key: "sk-cqxhnsxdxaalxlykfkjksyinjftdyejnblmgkfxmhwmmvdyu" api_key: "sk-616332b2afa94699b4572d0fe6ac370a"
# 是否启用 AI 审查 # 是否启用 AI 审查
enabled: true enabled: true
# 每次审查的最大代码行数 # 每次审查的最大代码行数
max_lines: 200 max_lines: 100

32
db.py
View File

@@ -7,12 +7,17 @@
import sqlite3 import sqlite3
import json import json
import os import os
from datetime import datetime from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db') DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db')
def get_cst_now():
"""获取当前中国时区时间 (UTC+8)"""
return datetime.now(timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')
def get_db_connection(): def get_db_connection():
"""获取数据库连接""" """获取数据库连接"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
@@ -110,6 +115,7 @@ class PRScanDB:
if existing: if existing:
# 更新现有记录 # 更新现有记录
cst_time = get_cst_now()
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
pr_title = ?, pr_title = ?,
@@ -123,7 +129,7 @@ class PRScanDB:
security_issues = ?, security_issues = ?,
ai_review = ?, ai_review = ?,
report_path = ?, report_path = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE repo_name = ? AND pr_number = ? WHERE repo_name = ? AND pr_number = ?
''', ( ''', (
pr_info.get('pr_title'), pr_info.get('pr_title'),
@@ -137,19 +143,22 @@ class PRScanDB:
security_issues, security_issues,
json.dumps(scan_results.get('ai', {}), ensure_ascii=False), json.dumps(scan_results.get('ai', {}), ensure_ascii=False),
report_path, report_path,
cst_time,
pr_info.get('repo_name'), pr_info.get('repo_name'),
pr_info.get('pr_number') pr_info.get('pr_number')
)) ))
scan_id = existing['id'] scan_id = existing['id']
else: else:
# 插入新记录 # 插入新记录
cst_time = get_cst_now()
cursor.execute(''' cursor.execute('''
INSERT INTO pr_scans ( INSERT INTO pr_scans (
pr_number, repo_name, pr_title, pr_url, pr_number, repo_name, pr_title, pr_url,
source_branch, target_branch, author, source_branch, target_branch, author,
state, scan_status, scan_result, scan_details_with_code, state, scan_status, scan_result, scan_details_with_code,
issues_count, security_issues, ai_review, report_path issues_count, security_issues, ai_review, report_path,
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', ( ''', (
pr_info.get('pr_number'), pr_info.get('pr_number'),
pr_info.get('repo_name'), pr_info.get('repo_name'),
@@ -165,7 +174,9 @@ class PRScanDB:
issues_count, issues_count,
security_issues, security_issues,
json.dumps(scan_results.get('ai', {}), ensure_ascii=False), json.dumps(scan_results.get('ai', {}), ensure_ascii=False),
report_path report_path,
cst_time,
cst_time
)) ))
scan_id = cursor.lastrowid scan_id = cursor.lastrowid
@@ -239,23 +250,24 @@ class PRScanDB:
"""更新 PR 状态""" """更新 PR 状态"""
conn = get_db_connection() conn = get_db_connection()
cursor = conn.cursor() cursor = conn.cursor()
cst_time = get_cst_now()
if state == 'merged': if state == 'merged':
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
state = ?, state = ?,
merged_at = CURRENT_TIMESTAMP, merged_at = ?,
merged_by = ?, merged_by = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE id = ? WHERE id = ?
''', (state, merged_by, scan_id)) ''', (state, cst_time, merged_by, cst_time, scan_id))
else: else:
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
state = ?, state = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE id = ? WHERE id = ?
''', (state, scan_id)) ''', (state, cst_time, scan_id))
conn.commit() conn.commit()
conn.close() conn.close()

9
docker-compose.yml Normal file
View File

@@ -0,0 +1,9 @@
version: "3.8"
services:
code-scan:
image: dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
container_name: code-scan
ports:
- "5000:5000"
restart: unless-stopped

View File

@@ -109,7 +109,7 @@ class GiteaClient:
timeout=30 timeout=30
) )
if response.status_code == 200: if response.status_code in (200, 201):
logger.info(f"成功关闭 PR #{pr_number}") logger.info(f"成功关闭 PR #{pr_number}")
return True return True
else: else:

24
pyproject.toml Normal file
View File

@@ -0,0 +1,24 @@
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "code-scan"
version = "1.0.0"
description = "代码扫描工具"
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"flask>=2.0.0",
"pyyaml>=5.0",
"requests>=2.25.0",
"python-dotenv>=0.19.0",
"GitPython>=3.1.0",
]
[project.scripts]
code-scan = "app:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["scanner*"]

View File

@@ -218,55 +218,66 @@ class ReportGenerator:
lines.append(f' - {message}') lines.append(f' - {message}')
lines.append('') lines.append('')
# AI 审查结果(单独展示 # AI 审查结果(适配新格式issues 列表
if 'ai' in scan_results: if 'ai' in scan_results:
ai_result = scan_results['ai'] ai_result = scan_results['ai']
lines.append('') lines.append('')
lines.append('## 🤖 AI 代码审查') lines.append('## 🤖 AI 代码审查')
lines.append('') lines.append('')
lines.append(ai_result.get('summary', '无 AI 审查结果'))
# 新格式:直接使用 summary
if 'summary' in ai_result:
# summary 可能是字符串或 dict
summary = ai_result.get('summary', '')
if isinstance(summary, dict):
lines.append(f"发现 {summary.get('total', 0)} 个问题,"
f"错误 {summary.get('error', 0)}"
f"警告 {summary.get('warning', 0)}"
f"提示 {summary.get('info', 0)}")
else:
lines.append(str(summary))
lines.append('') lines.append('')
reviews = ai_result.get('reviews', []) # 新格式issues 列表
if reviews: ai_issues = ai_result.get('issues', [])
for i, review in enumerate(reviews, 1): if ai_issues:
file_name = review.get('file', 'unknown') # 按文件分组
review_content = review.get('review', {}) issues_by_file = {}
for issue in ai_issues:
file_name = issue.get('file', 'unknown')
if file_name not in issues_by_file:
issues_by_file[file_name] = []
issues_by_file[file_name].append(issue)
for file_name, issues in issues_by_file.items():
lines.append(f'### 📄 {file_name}') lines.append(f'### 📄 {file_name}')
lines.append('') lines.append('')
# 优点 for i, issue in enumerate(issues[:10], 1):
advantages = review_content.get('优点', []) severity = issue.get('severity', 'Info')
if advantages: severity_emoji = {
lines.append('**✅ 代码优点:**') 'ERROR': '🔴',
for adv in advantages[:3]: 'WARNING': '🟡',
lines.append(f'- {adv}') 'INFO': ''
lines.append('') }.get(severity.upper(), '')
# 问题 line_num = issue.get('line', 0)
issues = review_content.get('问题', []) symbol = issue.get('symbol', '')
if issues: message = issue.get('message', 'No message')
lines.append('**⚠️ 需要改进:**') code_context = issue.get('code_context', '')
for issue in issues[:3]: defect_reason = issue.get('defect_reason', '')
lines.append(f'- {issue}')
lines.append('')
# 优化建议 lines.append(f'{i}. {severity_emoji} **{severity}** - 行 {line_num}')
optimizations = review_content.get('优化', []) if symbol:
if optimizations: lines.append(f' - 标识: `{symbol}`')
lines.append('**💡 优化建议:**') lines.append(f' - 问题: {message}')
for opt in optimizations[:3]: if code_context:
lines.append(f'- {opt}') lines.append(' - 代码:')
lines.append('')
# 原始回复(如果不是 JSON 格式)
raw = review_content.get('raw_review')
if raw:
lines.append('**📝 AI 原始回复:**')
lines.append('```') lines.append('```')
lines.append(raw[:500] + '...' if len(raw) > 500 else raw) lines.append(code_context)
lines.append('```') lines.append('```')
if defect_reason:
lines.append(f' - 原因: {defect_reason}')
lines.append('') lines.append('')
# 添加报告链接或下一步操作 # 添加报告链接或下一步操作

View File

@@ -2,3 +2,9 @@ flask>=2.0.0
pyyaml>=5.0 pyyaml>=5.0
requests>=2.25.0 requests>=2.25.0
python-dotenv>=0.19.0 python-dotenv>=0.19.0
GitPython>=3.1.0
gitdb>=4.0.1
smmap>=3.0.1
pylint>=2.17.0
flake8>=6.0.0
bandit>=1.7.0

View File

@@ -5,6 +5,7 @@ AI 代码审查器
使用大模型进行智能代码审查 使用大模型进行智能代码审查
""" """
import os import os
import re
import json import json
import logging import logging
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
@@ -29,7 +30,7 @@ class AIReviewer(BaseScanner):
self.config = config self.config = config
self.enabled = config.get('enabled', True) self.enabled = config.get('enabled', True)
self.provider = config.get('provider', 'ollama') self.provider = config.get('provider', 'api')
self.model = config.get('model', 'llama3') self.model = config.get('model', 'llama3')
self.api_url = config.get('api_url', 'http://localhost:11434') self.api_url = config.get('api_url', 'http://localhost:11434')
self.api_key = config.get('api_key', '') self.api_key = config.get('api_key', '')
@@ -73,69 +74,169 @@ class AIReviewer(BaseScanner):
changed_files: 可选的变更文件列表(来自 PR changed_files: 可选的变更文件列表(来自 PR
Returns: Returns:
审查结果 审查结果(与 python_scanner.py 兼容的格式)
""" """
if not self.enabled: result = {
return {
'enabled': False,
'tool': 'AI Code Reviewer', 'tool': 'AI Code Reviewer',
'reviews': [], 'language': language,
'summary': 'AI 审查已禁用' 'status': 'success',
'issues': [],
'summary': {
'total': 0,
'error': 0,
'warning': 0,
'info': 0
},
'files_scanned': 0
} }
if not self.enabled:
result['status'] = 'disabled'
result['summary'] = 'AI 审查已禁用'
return result
try: try:
# 如果没有传入 clone_dir需要克隆 # 如果没有传入 clone_dir需要克隆
if not clone_dir and repo_url: if not clone_dir and repo_url:
clone_dir = self.clone_repo(repo_url, commit_id, branch) clone_dir = self.clone_repo(repo_url, commit_id, branch)
if not clone_dir or not os.path.exists(clone_dir): if not clone_dir or not os.path.exists(clone_dir):
return { result['status'] = 'error'
'enabled': True, result['error'] = '无法获取代码目录'
'tool': 'AI Code Reviewer', return result
'reviews': [],
'summary': '无法获取代码目录'
}
# 获取要审查的代码文件 # 获取要审查的代码文件
files = self._get_code_files(clone_dir, language, changed_files) files = self._get_code_files(clone_dir, language, changed_files)
if not files: if not files:
return { result['summary'] = '未找到可审查的代码文件'
'enabled': True, return result
'tool': 'AI Code Reviewer',
'reviews': [],
'summary': '未找到可审查的代码文件'
}
# 对每个文件进行 AI 审查 # 对每个文件进行 AI 审查
all_reviews = [] all_issues = []
for file_path in files[:5]: # 限制最多审查 5 个文件 for file_path in files[:5]: # 限制最多审查 5 个文件
review = self._review_file(file_path, language, clone_dir) review = self._review_file(file_path, language, clone_dir)
if review: if review and review.get('issues'):
all_reviews.append(review) all_issues.extend(review['issues'])
# 生成总结 result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues
summary = self._generate_summary(all_reviews) result['summary'] = self._calculate_summary(all_issues)
result['files_scanned'] = len(files[:5])
result['clone_dir'] = clone_dir
return { # 生成质量评分
'enabled': True, result['quality_score'] = self._calculate_quality_score(all_issues, files[:5])
'tool': 'AI Code Reviewer',
'reviews': all_reviews, return result
'summary': summary,
'files_reviewed': len(all_reviews),
'clone_dir': clone_dir # 返回 clone_dir 用于后续清理
}
except Exception as e: except Exception as e:
logger.error(f'AI 审查失败: {str(e)}') logger.error(f'AI 审查失败: {str(e)}')
return { result['status'] = 'error'
'enabled': True, result['error'] = str(e)
'tool': 'AI Code Reviewer', return result
'error': str(e),
'reviews': [], def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
'summary': f'AI 审查出错: {str(e)}' """计算问题摘要"""
summary = {
'total': len(issues),
'error': 0,
'warning': 0,
'info': 0
} }
for issue in issues:
severity = issue.get('severity', '').lower()
if severity in ['error', 'critical', 'fatal']:
summary['error'] += 1
elif severity in ['warning', 'moderate']:
summary['warning'] += 1
else:
summary['info'] += 1
return summary
def _calculate_quality_score(self, issues: List[Dict], files: List[str]) -> Dict[str, Any]:
"""
计算代码质量评分
返回:总分(0-100)及各维度评分
"""
if not files:
return {'total': 100, 'maintainability': 100, 'security': 100, 'readability': 100, 'best_practices': 100}
# 统计问题
error_count = sum(1 for i in issues if i.get('severity', '').lower() in ['error', 'critical'])
warning_count = sum(1 for i in issues if i.get('severity', '').lower() == 'warning')
info_count = sum(1 for i in issues if i.get('severity', '').lower() == 'info')
# 分类统计
security_keywords = ['sql injection', 'xss', 'csrf', 'password', 'secret', 'token', '权限', '注入', '认证']
security_issues = sum(1 for i in issues if any(k in (i.get('message', '') + i.get('symbol', '')).lower() for k in security_keywords))
# 计算各维度分数
# 可维护性:基于错误和警告数量
issue_weight = error_count * 5 + warning_count * 2 + info_count * 0.5
maintainability = max(0, 100 - issue_weight)
# 安全性:基于安全问题
security_score = max(0, 100 - security_issues * 15)
# 可读性:基于 info 级别问题(风格类)
readability = max(0, 100 - info_count * 3)
# 最佳实践:基于 warning 级别
best_practices = max(0, 100 - warning_count * 5)
# 总分:加权平均
total = int((maintainability * 0.3 + security_score * 0.35 + readability * 0.15 + best_practices * 0.2))
return {
'total': total,
'maintainability': maintainability,
'security': security_score,
'readability': readability,
'best_practices': best_practices,
'details': {
'error_count': error_count,
'warning_count': warning_count,
'info_count': info_count,
'security_issues': security_issues
}
}
def generate_fix_suggestion(self, file_path: str, line: int, message: str, code: str) -> Optional[str]:
"""
对指定问题生成修复建议代码
"""
prompt = f"""你是一位代码修复专家。请根据以下问题,生成修复后的代码。
问题描述:{message}
问题所在行号:{line}
原始代码:
```
{code}
```
请以 JSON 格式输出修复建议:
```json
{{
"fixed_code": "修复后的完整代码或关键片段",
"explanation": "修复说明50字以内",
"confidence": "high/medium/low 修复把握度"
}}
```
如果无法修复,请返回:{{"fixed_code": "", "explanation": "无法自动修复", "confidence": "low"}}"""
try:
response = self._call_ai(prompt)
if response and response.get('fixed_code'):
return response
except Exception as e:
logger.warning(f'生成修复建议失败: {e}')
return None
def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]: def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]:
"""获取代码文件列表""" """获取代码文件列表"""
import glob import glob
@@ -174,6 +275,8 @@ class AIReviewer(BaseScanner):
def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]: def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]:
"""审查单个文件""" """审查单个文件"""
issues = []
try: try:
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
code = f.read() code = f.read()
@@ -186,22 +289,46 @@ class AIReviewer(BaseScanner):
else: else:
truncated = False truncated = False
# 构建 prompt # 给代码加行号再发给模型,便于模型返回准确行号
prompt = self._build_prompt(code, language) code_with_lines = self._code_with_line_numbers(code)
prompt = self._build_prompt(code_with_lines, language)
# 调用 AI # 调用 AI
response = self._call_ai(prompt) response = self._call_ai(prompt)
if not response: # 获取相对路径
return None
# 解析响应
rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path
if not response:
return { return {
'file': rel_path, 'file': rel_path,
'path': file_path, 'path': file_path,
'truncated': truncated, 'truncated': truncated,
'review': response 'issues': []
}
# 解析 AI 响应,转换为标准 issues 格式,并校正行号
ai_issues = response.get('issues', [])
for issue in ai_issues:
self._correct_issue_line(issue, code)
issues.append({
'tool': 'ai_reviewer',
'type': issue.get('type', 'info'),
'severity': issue.get('severity', 'Info'),
'message': issue.get('message', ''),
'file': rel_path,
'line': issue.get('line', 0),
'column': issue.get('column', 0),
'symbol': issue.get('symbol', ''),
'code_context': issue.get('code_context', ''),
'defect_reason': issue.get('defect_reason', '')
})
return {
'file': rel_path,
'path': file_path,
'truncated': truncated,
'issues': issues
} }
except Exception as e: except Exception as e:
@@ -217,76 +344,171 @@ class AIReviewer(BaseScanner):
else: else:
lang_name = language lang_name = language
prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,并给出: prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,找出潜在的问题和缺陷。
1. **代码优点** - 写得好地方 请以 JSON 格式输出审查结果,必须包含以下字段:
2. **问题建议** - 需要改进的地方
3. **优化建议** - 如何让代码更好
请用中文回复,保持简洁,每个文件审查不超过 3 点建议。
以下是代码:
```{language}
{code}
```
请以 JSON 格式输出:
```json ```json
{{ {{
"优点": ["..."], "issues": [
"问题": ["..."], {{
"优化": ["..."] "line": 行号,
"column": 列号,
"message": "问题描述",
"type": "error/warning/info 之一",
"severity": "Error/Warning/Info 之一",
"symbol": "错误标识符如 unused-variable, syntax-error 等",
"code_context": "问题代码的上下文(包含问题的那行或几行代码)",
"defect_reason": "缺陷原因分析30字以内简洁描述"
}} }}
]
}}
```
注意:
1. line 和 column 是问题所在的行号和列号(从 1 开始)
2. type: error=错误, warning=警告, info=信息
3. severity: Error=严重, Warning=一般, Info=提示
4. code_context: 包含问题代码的那一行或相邻的几行
5. defect_reason: 精简描述30字以内说明问题原因和风险
如果代码没有问题,返回空数组: {{"issues": []}}
重要:以下代码每行前已标注行号(格式为 "行号|"),请根据问题实际出现的代码行,严格使用该行前的行号填写 issues 中的 line 字段,不要猜测或使用错误行号。
以下是待审查的代码(行号已标注):
```{language}
{code}
```""" ```"""
return prompt return prompt
def _code_with_line_numbers(self, code: str) -> str:
"""给代码每行前加上行号,便于模型返回准确行号"""
lines = code.split('\n')
width = len(str(len(lines)))
return '\n'.join(f'{i:>{width}}| {line}' for i, line in enumerate(lines, 1))
def _correct_issue_line(self, issue: Dict[str, Any], code: str) -> None:
"""
根据 message/symbol 在源码中搜索,尽量把 issue 的 line 校正到真实出现位置。
AI 返回的行号常不准确,通过匹配问题相关的标识符(如 'unused_module')修正行号。
"""
line = issue.get('line')
if not line or not code:
return
lines = code.split('\n')
if line < 1 or line > len(lines):
return
# 从 message 中提取被引用的标识符(如 'unused_module' -> unused_module
message = (issue.get('message') or '')
symbol = (issue.get('symbol') or '').strip()
candidates = []
if symbol:
candidates.append(symbol)
for m in re.finditer(r"['\"]([a-zA-Z_][a-zA-Z0-9_]*)['\"]", message or ''):
candidates.append(m.group(1))
# 若 message 里没有引号标识符,取首段英文/数字/下划线作为关键词
if not candidates:
first_word = re.search(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', message)
if first_word:
candidates.append(first_word.group(1))
for token in candidates:
if not token:
continue
for i, code_line in enumerate(lines):
if token in code_line:
issue['line'] = i + 1
return
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]: def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 AI 服务""" """调用 AI 服务"""
try: try:
if self.provider == 'ollama':
return self._call_ollama(prompt)
elif self.provider == 'api':
return self._call_api(prompt) return self._call_api(prompt)
else:
logger.warning(f'未知的 AI provider: {self.provider}')
return None
except Exception as e: except Exception as e:
print("异常追踪信息:", e.__traceback__) print("异常追踪信息:", e.__traceback__)
logger.error(f'AI 调用失败: {str(e)}') logger.error(f'AI 调用失败: {str(e)}')
return None return None
def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]: def _extract_json_obj(self, content: Any) -> Optional[Dict[str, Any]]:
"""调用 Ollama 本地模型""" """
import requests 从模型输出中尽可能提取 JSON 对象(dict)。
url = f"{self.api_url}/api/generate" 兼容场景:
payload = { - content 已经是 dict
"model": self.model, - content 是 JSON 字符串
"prompt": prompt, - content 被 ```json ... ``` 或 ``` ... ``` 包裹
"stream": False, - content 前后夹杂说明文字,只要包含一个最外层 { ... } 就尝试解析
"format": "json" """
} if content is None:
logger.debug("_extract_json_obj: content is None")
return None
response = requests.post(url, json=payload, timeout=120) # 如果已经是 dict直接返回
if isinstance(content, dict):
logger.debug("_extract_json_obj: content is already dict")
return content
if response.status_code == 200: if not isinstance(content, str):
result = response.json() content = str(content)
content = result.get('response', '')
# 尝试解析 JSON text = content.strip()
logger.debug(f"_extract_json_obj: 原始内容长度 = {len(text)}")
logger.debug(f"_extract_json_obj: 原始内容前100字符: {text[:100]}")
# 去掉代码块包裹(兼容 ```json / ``` json / ```JSON 等)
lowered = text.lower()
fence_start = lowered.find('```')
if fence_start != -1:
logger.debug(f"_extract_json_obj: 发现代码块 fence_start={fence_start}")
# 找到第一段 fence
after = text[fence_start + 3:]
after_l = after.lower()
# 如果 fence 后紧跟语言标识json 或其他),跳过这一行直到换行
newline_idx = after.find('\n')
if newline_idx != -1:
lang_header = after_l[:newline_idx].strip()
logger.debug(f"_extract_json_obj: 语言标识: {lang_header}")
body = after[newline_idx + 1:]
# 截取到下一个 fence 结束
end_idx = body.lower().find('```')
if end_idx != -1:
candidate = body[:end_idx].strip()
else:
# 没有结束 fence直接用 body 作为候选(可能是截断的 JSON
candidate = body.strip()
# 只有在确实像 json 的情况下才替换,避免误伤普通文本
if '{' in candidate and '}' in candidate:
text = candidate
logger.debug(f"_extract_json_obj: 提取代码块内容成功,长度={len(text)}")
else:
# 没有换行就按旧逻辑尽量截取
pass
# 直接解析
try: try:
# 提取 JSON 部分 obj = json.loads(text)
if '```json' in content: logger.debug("_extract_json_obj: 直接解析成功")
content = content.split('```json')[1].split('```')[0] return obj if isinstance(obj, dict) else None
elif '```' in content: except Exception as e:
content = content.split('```')[1].split('```')[0] logger.debug(f"_extract_json_obj: 直接解析失败: {e}")
return json.loads(content.strip()) # 兜底:截取最外层 { ... } 再解析
except json.JSONDecodeError: start = text.find('{')
# 如果不是 JSON直接返回文本 end = text.rfind('}')
return {'raw_review': content} logger.debug(f"_extract_json_obj: 查找大括号 start={start}, end={end}")
if start != -1 and end != -1 and end > start:
candidate = text[start:end + 1].strip()
logger.debug(f"_extract_json_obj: 候选内容长度={len(candidate)}, 前50字符: {candidate[:50]}")
try:
obj = json.loads(candidate)
logger.debug("_extract_json_obj: 兜底解析成功")
return obj if isinstance(obj, dict) else None
except Exception as e:
logger.debug(f"_extract_json_obj: 兜底解析失败: {e}")
return None
logger.warning(f'Ollama 返回错误: {response.status_code}') logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象")
return None return None
def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]: def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]:
@@ -317,6 +539,16 @@ class AIReviewer(BaseScanner):
"max_tokens": 1024, "max_tokens": 1024,
"temperature": 0.7 "temperature": 0.7
} }
elif 'dashscope' in self.api_url:
# 阿里云 dashscope 专用端点
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7,
"stream": False # 显式关闭流式
}
else: else:
url = f"{self.api_url}/chat/completions" url = f"{self.api_url}/chat/completions"
payload = { payload = {
@@ -326,39 +558,21 @@ class AIReviewer(BaseScanner):
"temperature": 0.7 "temperature": 0.7
} }
logger.info(f"调用 API: {url}, model={self.model}")
try:
response = requests.post(url, json=payload, headers=headers, timeout=120) response = requests.post(url, json=payload, headers=headers, timeout=120)
if response.status_code == 200: if response.status_code == 200:
result = response.json() result = response.json()
content = result['choices'][0]['message']['content'] content = result['choices'][0]['message']['content']
logger.info(f"API 返回内容长度: {len(content) if content else 0}")
parsed = self._extract_json_obj(content)
return parsed
try: logger.warning(f'API 返回错误: {response.status_code}, {response.text[:200]}')
if '```json' in content:
content = content.split('```json')[1].split('```')[0]
elif '```' in content:
content = content.split('```')[1].split('```')[0]
return json.loads(content.strip())
except json.JSONDecodeError:
return {'raw_review': content}
logger.warning(f'API 返回错误: {response.status_code}')
return None return None
def _generate_summary(self, reviews: List[Dict[str, Any]]) -> str: except Exception as e:
"""生成审查总结""" logger.warning(f'API 调用失败: {e}')
if not reviews: return None
return '未找到需要审查的代码'
total_issues = sum(
len(r.get('review', {}).get('问题', [])) +
len(r.get('review', {}).get('优化', []))
for r in reviews
)
files_count = len(reviews)
if total_issues == 0:
return f'✅ AI 审查通过!审查了 {files_count} 个文件,未发现问题'
return f'🤖 AI 审查了 {files_count} 个文件,发现 {total_issues} 个改进建议'

View File

@@ -135,18 +135,15 @@ def merge_issues_with_code(scan_results: Dict[str, Any], diff: str) -> Dict[str,
def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]: def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]:
"""将 AI 审查结果转换为问题格式""" """将 AI 审查结果issues 格式)转换为统一问题格式"""
issues = [] issues = []
ai_issues = ai_result.get('issues', [])
reviews = ai_result.get('reviews', []) for issue in ai_issues:
for review in reviews: file_path = issue.get('file', '')
file_path = review.get('file', '') if not file_path:
review_data = review.get('review', {})
if not review_data:
continue continue
# 获取文件内容作为代码上下文
code_context = None code_context = None
if parser: if parser:
matched_path = None matched_path = None
@@ -154,50 +151,27 @@ def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[Dif
if file_path.endswith(path) or path.endswith(file_path) or file_path in path: if file_path.endswith(path) or path.endswith(file_path) or file_path in path:
matched_path = path matched_path = path
break break
if matched_path: if matched_path:
chunk = parser.get_file_content(matched_path) chunk = parser.get_file_content(matched_path)
if chunk and chunk.new_content: if chunk and chunk.new_content:
lines = chunk.new_content.split('\n')[:10] lines = chunk.new_content.split('\n')[:10]
code_context = { code_context = {
'file': matched_path, 'file': matched_path,
'line': 1, 'line': issue.get('line', 1),
'preview': '\n'.join(lines), 'preview': '\n'.join(lines),
'has_more': len(chunk.new_content.split('\n')) > 10 'has_more': len(chunk.new_content.split('\n')) > 10
} }
# 处理优点(不作为问题显示) sev = issue.get('severity', 'warning')
advantages = review_data.get('优点', []) sev = sev.lower() if isinstance(sev, str) else 'warning'
# 处理问题
problems = review_data.get('问题', [])
for idx, problem in enumerate(problems):
issues.append({ issues.append({
'file': file_path, 'file': file_path,
'line': 1, # AI 审查不返回具体行号 'line': issue.get('line', 1),
'severity': 'warning', 'severity': sev,
'message': f'[AI 建议] {problem}', 'message': issue.get('message', ''),
'category': 'ai', 'category': 'ai',
'code_context': code_context, 'code_context': code_context,
'review_data': { 'defect_reason': issue.get('defect_reason', '')
'type': '问题',
'content': problem
}
})
# 处理优化建议
optimizations = review_data.get('优化', [])
for optimization in optimizations:
issues.append({
'file': file_path,
'line': 1,
'severity': 'info',
'message': f'[AI 优化] {optimization}',
'category': 'ai',
'code_context': code_context,
'review_data': {
'type': '优化',
'content': optimization
}
}) })
return issues return issues

218
test.py
View File

@@ -1,218 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import logging
os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0')
from flask import Flask, request, jsonify
import yaml
from webhook.handler import GiteaWebhookHandler
from scanner.python_scanner import PythonScanner
from scanner.js_scanner import JavaScriptScanner
from scanner.security_scanner import SecurityScanner
from report.generator import ReportGenerator
from notify.feishu import FeishuNotifier
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 加载配置
def load_config():
"""加载配置文件"""
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
# 全局配置
config = load_config()
# 初始化应用
app = Flask(__name__)
app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key')
# 初始化组件
webhook_handler = GiteaWebhookHandler(config['gitea'])
python_scanner = PythonScanner(config.get('scanner', {}))
js_scanner = JavaScriptScanner(config.get('scanner', {}))
security_scanner = SecurityScanner(config.get('scanner', {}))
report_generator = ReportGenerator(config.get('report', {}))
feishu_notifier = FeishuNotifier(config['feishu'])
@app.route('/')
def index():
"""健康检查接口"""
return jsonify({
'status': 'ok',
'service': 'AI Code Quality Scanner',
'version': '1.0.0'
})
@app.route('/webhook/gitea', methods=['POST'])
def handle_gitea_webhook():
"""处理 Gitea Webhook 请求"""
try:
# 验证签名
signature = request.headers.get('X-Gitea-Signature')
if signature:
if not webhook_handler.verify_signature(
request.data,
signature,
config['gitea']['webhook_secret']
):
logger.warning('Webhook 签名验证失败')
return jsonify({'error': 'Invalid signature'}), 401
# 解析 Webhook payload
payload = request.json
if not payload:
return jsonify({'error': 'No payload'}), 400
event_type = request.headers.get('X-Gitea-Event', 'push')
logger.info(f'收到 Gitea Webhook 事件: {event_type}')
# 只处理 push 事件
if event_type != 'push':
return jsonify({'message': 'Event ignored'}), 200
# 提取提交信息
commits = payload.get('commits', [])
if not commits:
return jsonify({'message': 'No commits'}), 200
repo = payload.get('repository', {})
repo_name = repo.get('full_name', 'unknown')
branch = payload.get('ref', '').replace('refs/heads/', '')
pusher = payload.get('pusher', {}).get('name', 'unknown')
logger.info(f'处理仓库 {repo_name}{len(commits)} 个提交')
# 处理每个提交
for commit in commits:
commit_id = commit.get('id', '')[:8]
commit_message = commit.get('message', '')
author = commit.get('author', {}).get('name', 'unknown')
logger.info(f'扫描提交 {commit_id}: {commit_message}')
try:
# 获取仓库 URL
clone_url = repo.get('clone_url')
if not clone_url:
# 尝试从 web_url 构建
web_url = repo.get('web_url', '')
if web_url:
clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://')
clone_url = clone_url.rstrip('/') + '.git'
# 执行代码扫描
scan_results = {}
# Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []):
scan_results['python'] = python_scanner.scan(
clone_url, commit_id, branch
)
# JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
scan_results['javascript'] = js_scanner.scan(
clone_url, commit_id, branch
)
# 安全扫描
scan_results['security'] = security_scanner.scan(
clone_url, commit_id, branch
)
# 生成报告
report = report_generator.generate(
repo_name=repo_name,
branch=branch,
commit_id=commit_id,
commit_message=commit_message,
author=author,
scan_results=scan_results
)
# 发送飞书通知
feishu_notifier.send_report(report)
logger.info(f'提交 {commit_id} 扫描完成')
except Exception as e:
logger.error(f'扫描提交 {commit_id} 失败: {str(e)}')
# 继续处理其他提交
continue
return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200
except Exception as e:
logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/scan/manual', methods=['POST'])
def manual_scan():
"""手动触发扫描接口"""
try:
data = request.json
repo_url = data.get('repo_url')
branch = data.get('branch', 'main')
commit_id = data.get('commit_id')
if not repo_url:
return jsonify({'error': 'repo_url is required'}), 400
# 执行扫描
scan_results = {}
if 'python' in config.get('scanner', {}).get('languages', []):
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
# 生成报告
report = report_generator.generate(
repo_name=repo_url.split('/')[-1].replace('.git', ''),
branch=branch,
commit_id=commit_id or 'manual',
commit_message='Manual scan',
author='manual',
scan_results=scan_results
)
# 发送飞书通知
feishu_notifier.send_report(report)
return jsonify({
'status': 'ok',
'report': report
}), 200
except Exception as e:
logger.error(f'手动扫描失败: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# 强制监听所有网络接口
host = "0.0.0.0"
port = config.get('server', {}).get('port', 5000)
debug = config.get('server', {}).get('debug', True)
logger.info(f'启动服务: {host}:{port}')
app.run(host=host, port=port, debug=debug)

View File

@@ -19,10 +19,36 @@ import collections as col # 使用了 col 但 flake8 可能检测
# 缺陷2: 未使用的变量 # 缺陷2: 未使用的变量
def unused_variable_demo(): # def unused_variable_demo():
"""演示未使用的变量""" # """演示未使用的变量"""
result = calculate() # result 未被使用 # result = calculate() # result 未被使用
print("Function executed") # print("Function executed")
# 缺陷8: 行太长(风格问题)
def long_line():
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""
# 缺陷9: 缺少空格
def missing_spaces():
"""缺少必要空格"""
x=1+2
y=3*99
if x==1:
print(x)
# 缺陷1: 未使用的导入
import unused_module # 未使用
import collections as col # 使用了 col 但 flake8 可能检测
# 缺陷2: 未使用的变量
# def unused_variable_demo():
# """演示未使用的变量"""
# result = calculate() # result 未被使用
# print("Function executed")
def calculate(): def calculate():
@@ -41,227 +67,3 @@ def use_before_define():
"""在定义前使用变量""" """在定义前使用变量"""
print(before_var) # before_var 在下面才定义 print(before_var) # before_var 在下面才定义
before_var = 100 before_var = 100
# 缺陷5: 硬编码密码(安全问题)
def connect_database():
"""连接数据库"""
password = "admin123" # 硬编码密码
username = "root"
return f"Connecting with {username}:{password}"
# 缺陷6: 使用 eval安全问题
def unsafe_eval():
"""危险使用 eval"""
user_input = "os.system('ls')"
result = eval(user_input) # 危险!
return result
# 缺陷7: 使用 pickle 反序列化(安全问题)
def unsafe_pickle():
"""不安全的 pickle 反序列化"""
data = b"..." # 模拟恶意数据
obj = pickle.loads(data) # 危险!
# 缺陷8: 行太长(风格问题)
def long_line():
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""
# 缺陷9: 缺少空格
def missing_spaces():
"""缺少必要空格"""
x=1+2
y=3*4
if x==1:
print(x)
# 缺陷10: 多余空格
def extra_spaces():
"""多余空格"""
x = 1
y = 2
# 缺陷11: 未捕获的异常
def unhandled_exception():
"""捕获异常后未处理"""
try:
result = 10 / 0
except ZeroDivisionError:
pass # 捕获但未处理
# 缺陷12: 过于宽泛的异常
def broad_exception():
"""捕获所有异常"""
try:
data = json.loads('{"key": "value"}')
except Exception:
pass
# 缺陷13: 裸 except 子句
def bare_except():
"""使用裸 except"""
try:
x = int("abc")
except:
pass
# 缺陷14: 重复代码
def duplicate_code():
"""重复代码示例"""
a = 1
b = 2
c = a + b
print(c)
a = 3
b = 4
c = a + b
print(c)
# 缺陷15: 变量名与内置函数冲突
def shadow_builtin():
"""变量名覆盖内置函数"""
list = [1, 2, 3] # 覆盖内置 list
dict = {} # 覆盖内置 dict
str = "hello" # 覆盖内置 str
return list, dict, str
# 缺陷16: 不必要的 pass
def unnecessary_pass():
"""不必要的 pass"""
if True:
pass # 可以直接删除
# 缺陷17: 使用 + 进行字符串拼接(推荐用 join
def string_concat():
"""低效字符串拼接"""
result = ""
for i in range(100):
result = result + str(i)
return result
# 缺陷18: 在循环中修改集合
def modify_during_iteration():
"""在迭代时修改列表"""
items = [1, 2, 3, 4, 5]
for item in items:
if item % 2 == 0:
items.remove(item) # 在迭代时修改
# 缺陷19: 全局变量
global_counter = 0 # 全局变量
def increment():
global global_counter # 依赖全局变量
global_counter += 1
# 缺陷20: 魔法数字
def calculate_price():
"""使用魔法数字"""
price = 100
tax = price * 1.1 # 1.1 是什么?
discount = price * 0.9
return tax, discount
# 缺陷21: 函数参数过多
def bad_function(a, b, c, d, e, f, g, h):
"""参数过多的函数"""
return a + b + c + d + e + f + g + h
# 缺陷22: 空函数体
def empty_function():
"""空函数应该使用 pass 或文档字符串"""
pass
# 缺陷23: 使用 time.sleep 测试
def bad_sleep():
"""生产代码中使用 time.sleep"""
import time
time.sleep(5) # 阻塞
# 缺陷24: 注释掉的代码
def commented_code():
# print("This is commented out")
pass
# 缺陷25: TODO/FIXME 注释
def todo_comment():
# TODO: Implement this
# FIXME: This is broken
pass
# 缺陷26: 导入顺序错误(应先标准库,再第三方,本地)
import sys # 标准库
import flask # 第三方
from . import local # 本地
# 缺陷27: 不必要的列表推导式
def unnecessary_list_comp():
"""不必要的列表推导式"""
result = [x for x in range(10)] # 可简化为 list(range(10))
return result
# 缺陷28: 条件表达式中的赋值
def assignment_in_condition():
"""在条件中赋值(不推荐)"""
if (x := get_value()) > 0: # 海象运算符但可能难以阅读
print(x)
def get_value():
return 5
# 缺陷29: 比较布尔值
def compare_bool():
"""与布尔值比较"""
flag = True
if flag == True: # 应直接用 if flag:
print("yes")
# 缺陷30: 使用 hasattr/getattr 而非异常处理
def use_hasattr():
"""滥用 hasattr"""
class Foo:
pass
obj = Foo()
if hasattr(obj, 'bar'): # 可直接用 try/except
print(obj.bar)
# 主函数入口
def main():
"""主函数"""
connect_database()
unsafe_eval()
unsafe_pickle()
print("Demo executed")
if __name__ == "__main__":
main()

View File

@@ -6,6 +6,7 @@
<title>PR 扫描管理平台</title> <title>PR 扫描管理平台</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet"> <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.0/font/bootstrap-icons.css"> <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.0/font/bootstrap-icons.css">
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.0/dist/chart.umd.min.js"></script>
<style> <style>
body { background-color: #f5f7fa; } body { background-color: #f5f7fa; }
/* Diff 语法高亮 */ /* Diff 语法高亮 */
@@ -94,24 +95,11 @@
<i class="bi bi-speedometer2 me-2"></i>概览 <i class="bi bi-speedometer2 me-2"></i>概览
</a> </a>
</li> </li>
<li class="nav-item">
<a class="nav-link" href="#" onclick="showPage('prs')">
<i class="bi bi-git me-2"></i>PR 列表
</a>
</li>
<li class="nav-item">
<a class="nav-link" href="#" onclick="showPage('settings')">
<i class="bi bi-gear me-2"></i>设置
</a>
</li>
</ul> </ul>
<div class="mt-5 p-3" style="background: rgba(255,255,255,0.1); border-radius: 8px;"> <ul class="nav flex-column mt-3">
<small>系统状态</small> </ul>
<div class="mt-2">
<span class="text-success"><i class="bi bi-check-circle-fill"></i> 服务正常</span>
</div>
</div>
</div> </div>
<!-- 主内容区 --> <!-- 主内容区 -->
@@ -176,6 +164,69 @@
</div> </div>
</div> </div>
</div> </div>
<!-- 历史趋势:每个 PR 固定等距,新 PR 在右侧追加,前面 PR 位置不变,可横向滚动 -->
<h5 class="mb-3 mt-4">问题趋势</h5>
<div class="card">
<div class="card-body p-2">
<div id="trend-chart-wrapper" style="overflow-x: auto; overflow-y: hidden;">
<div id="trend-chart-container" style="height: 220px;">
<canvas id="trend-chart"></canvas>
</div>
</div>
<div id="trend-loading" class="text-center py-3 text-muted">加载趋势数据中...</div>
</div>
</div>
<div class="row mt-3">
<div class="col-md-6">
<div class="card mb-3">
<div class="card-header py-2">问题趋势统计</div>
<div class="card-body p-2" id="ai-trend-stats">
<div class="text-muted text-center py-2">暂无数据</div>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mb-3">
<div class="card-header py-2">改进建议</div>
<div class="card-body p-2">
<ul class="list-unstyled mb-0 small" id="ai-trend-tips">
<li><i class="bi bi-check-circle text-success me-2"></i>持续关注代码质量</li>
<li><i class="bi bi-check-circle text-success me-2"></i>减少警告数量</li>
<li><i class="bi bi-check-circle text-success me-2"></i>遵循最佳实践</li>
</ul>
</div>
</div>
</div>
</div>
<!-- 问题分布统计 -->
<h5 class="mb-3 mt-4">问题分布统计</h5>
<div class="row mb-3">
<div class="col-md-6">
<div class="card h-100">
<div class="card-header py-2">按严重程度分布</div>
<div class="card-body p-2">
<canvas id="severity-chart" style="max-height: 180px;"></canvas>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card h-100">
<div class="card-header py-2">按扫描器分布</div>
<div class="card-body p-2">
<canvas id="scanner-chart" style="max-height: 180px;"></canvas>
</div>
</div>
</div>
</div>
<div class="card mb-4">
<div class="card-header py-2">问题类型排行</div>
<div class="card-body p-2" id="issue-types-list">
<div class="text-muted text-center py-2">暂无数据</div>
</div>
</div>
</div> </div>
<!-- PR 列表页面 --> <!-- PR 列表页面 -->
@@ -217,6 +268,10 @@
</div> </div>
<!-- 设置页面 --> <!-- 设置页面 -->
</div>
</div>
<div id="page-settings" style="display:none;"> <div id="page-settings" style="display:none;">
<h2 class="mb-4">设置</h2> <h2 class="mb-4">设置</h2>
<div class="card"> <div class="card">
@@ -266,6 +321,122 @@
<p><strong>安全漏洞:</strong> <span id="detail-security"></span></p> <p><strong>安全漏洞:</strong> <span id="detail-security"></span></p>
</div> </div>
</div> </div>
<!-- AI 审查功能区:质量评分 + 问题统计 + 修复建议 -->
<ul class="nav nav-tabs mb-3" id="aiReviewTabs" role="tablist">
<li class="nav-item" role="presentation">
<button class="nav-link active" id="quality-tab" data-bs-toggle="tab" data-bs-target="#quality-panel" type="button" role="tab">
<i class="bi bi-star"></i> 质量评分
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="stats-tab" data-bs-toggle="tab" data-bs-target="#stats-panel" type="button" role="tab">
<i class="bi bi-bar-chart"></i> 问题统计
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="fix-tab" data-bs-toggle="tab" data-bs-target="#fix-panel" type="button" role="tab">
<i class="bi bi-tools"></i> AI 修复建议
</button>
</li>
</ul>
<div class="tab-content" id="aiReviewTabContent">
<!-- 质量评分面板 -->
<div class="tab-pane fade show active" id="quality-panel" role="tabpanel">
<div id="quality-score-loading" class="text-center py-4 text-muted">加载中...</div>
<div id="quality-score-content" style="display: none;">
<div class="row text-center mb-4">
<div class="col">
<div class="display-1 fw-bold" id="qs-total">--</div>
<div class="text-muted">综合评分</div>
</div>
</div>
<div class="row text-center">
<div class="col">
<div class="h4 fw-bold" id="qs-security">--</div>
<small class="text-muted">安全性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-maintain">--</div>
<small class="text-muted">可维护性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-readability">--</div>
<small class="text-muted">可读性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-best">--</div>
<small class="text-muted">最佳实践</small>
</div>
</div>
<div class="mt-3 text-center">
<small class="text-muted" id="qs-details"></small>
</div>
</div>
</div>
<!-- 问题统计面板 -->
<div class="tab-pane fade" id="stats-panel" role="tabpanel">
<div id="stats-loading" class="text-center py-4 text-muted">加载中...</div>
<div id="stats-content" style="display: none;">
<div class="row mb-3">
<div class="col-md-4">
<div class="card text-center bg-danger text-white">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-error">0</div>
<small>严重问题</small>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card text-center bg-warning">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-warning">0</div>
<small>警告</small>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card text-center bg-info text-white">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-info">0</div>
<small>提示</small>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col">
<h6>按扫描器分布</h6>
<ul class="list-group" id="stat-scanner-list"></ul>
</div>
</div>
</div>
</div>
<!-- AI 修复建议面板 -->
<div class="tab-pane fade" id="fix-panel" role="tabpanel">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 点击问题列表中的 <strong>生成修复</strong> 按钮AI 将为您生成修复代码。
</div>
<div id="fix-result-loading" class="text-center py-3 text-muted" style="display: none;">AI 正在生成修复建议...</div>
<div id="fix-result-content" style="display: none;">
<div class="card">
<div class="card-header bg-success text-white">
<i class="bi bi-check-circle"></i> 修复建议
</div>
<div class="card-body">
<h6>修复说明</h6>
<p id="fix-explanation" class="text-muted"></p>
<h6>修复后代码</h6>
<pre id="fix-code" class="bg-dark text-light p-3 rounded" style="overflow-x: auto;"></pre>
<div class="mt-2">
<span class="badge bg-secondary" id="fix-confidence"></span>
</div>
</div>
</div>
</div>
</div>
</div>
<!-- 仅保留文件 Tab左侧文件树 + 右侧完整文件内容,最右侧为问题标注 --> <!-- 仅保留文件 Tab左侧文件树 + 右侧完整文件内容,最右侧为问题标注 -->
<div class="mt-3"> <div class="mt-3">
<div class="pr-detail-file-layout"> <div class="pr-detail-file-layout">
@@ -371,11 +542,268 @@
const recentPRs = prs.slice(0, 5); const recentPRs = prs.slice(0, 5);
const tbody = document.querySelector('#recent-prs-table tbody'); const tbody = document.querySelector('#recent-prs-table tbody');
tbody.innerHTML = recentPRs.map(pr => createPRRow(pr)).join(''); tbody.innerHTML = recentPRs.map(pr => createPRRow(pr)).join('');
// 加载历史趋势
loadHistoryTrend();
// 加载问题分布统计
loadAIStats();
} catch (e) { } catch (e) {
console.error('加载数据失败:', e); console.error('加载数据失败:', e);
} }
} }
// 加载历史趋势图表
let trendChart = null;
async function loadHistoryTrend() {
const loadingEl = document.getElementById('trend-loading');
const canvasEl = document.getElementById('trend-chart');
if (!loadingEl || !canvasEl) return;
try {
const response = await fetch('/api/prs/history?limit=15');
if (!response.ok) throw new Error('暂无数据');
const history = await response.json();
if (!history || history.length === 0) {
loadingEl.textContent = '暂无趋势数据';
return;
}
loadingEl.style.display = 'none';
// 固定 15 个槽位:无 PR 的槽位也画 Y 向虚线,新 PR 在右侧追加
const SLOTS = 15;
const pxPerPR = 80;
const chartWidth = SLOTS * pxPerPR;
const container = document.getElementById('trend-chart-container');
if (container) {
container.style.width = chartWidth + 'px';
container.style.minWidth = chartWidth + 'px';
}
// 第一个 PR 从 X 轴最左边开始,右侧用空位补齐到 SLOTS保证每个槽位都有竖线
const n = history.length;
const pad = Math.max(0, SLOTS - n);
const labels = history.map(p => '#' + p.pr_number).concat(Array(pad).fill(''));
const errorData = history.map(p => p.error_count || 0).concat(Array(pad).fill(null));
const warningData = history.map(p => p.warning_count || 0).concat(Array(pad).fill(null));
if (trendChart) trendChart.destroy();
trendChart = new Chart(canvasEl, {
type: 'line',
data: {
labels: labels,
datasets: [
{
label: '错误',
data: errorData,
borderColor: '#dc3545',
backgroundColor: 'rgba(220, 53, 69, 0.1)',
tension: 0.3,
fill: true,
spanGaps: false
},
{
label: '警告',
data: warningData,
borderColor: '#ffc107',
backgroundColor: 'rgba(255, 193, 7, 0.1)',
tension: 0.3,
fill: true,
spanGaps: false
}
]
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
legend: { position: 'top' }
},
scales: {
x: {
grid: {
display: true,
color: 'rgba(0, 0, 0, 0.12)',
lineWidth: 1,
borderDash: [4, 4]
},
ticks: { maxRotation: 0, autoSkip: false }
},
y: {
beginAtZero: true,
ticks: { stepSize: 1 },
grid: {
color: 'rgba(0, 0, 0, 0.12)',
lineWidth: 1,
borderDash: [4, 4]
}
}
}
}
});
// 概览页上的问题趋势统计(与质量趋势一致)
const totalData = history.map(p => p.total_issues || (p.error_count || 0) + (p.warning_count || 0));
const avgIssues = totalData.length ? Math.round(totalData.reduce((a, b) => a + b, 0) / totalData.length) : 0;
const maxPR = history.reduce((max, p) => {
const pTotal = p.total_issues || (p.error_count || 0) + (p.warning_count || 0);
const maxTotal = max.total_issues || (max.error_count || 0) + (max.warning_count || 0);
return pTotal > maxTotal ? p : max;
}, history[0]);
const statsEl = document.getElementById('ai-trend-stats');
if (statsEl) {
statsEl.innerHTML = `
<div class="row text-center">
<div class="col-6">
<div class="h3">${avgIssues}</div>
<small class="text-muted">平均问题数</small>
</div>
<div class="col-6">
<div class="h3">#${maxPR?.pr_number || '-'}</div>
<small class="text-muted">问题最多</small>
</div>
</div>
`;
}
} catch (e) {
loadingEl.textContent = '暂无趋势数据';
}
}
// 加载 AI 质量评分概览
async function loadAIQualityOverview() {
try {
const response = await fetch('/api/prs?state=open');
const prs = await response.json();
// 计算所有已扫描 PR 的平均评分
let totalScore = 0, count = 0;
for (const pr of prs) {
if (pr.scan_status === 'completed' && pr.scan_result) {
const sr = pr.scan_result;
if (sr.ai && sr.ai.quality_score) {
totalScore += sr.ai.quality_score.total || 0;
count++;
}
}
}
const avgScore = count > 0 ? Math.round(totalScore / count) : '--';
document.getElementById('aiq-total').textContent = avgScore;
document.getElementById('aiq-security').textContent = count > 0 ? '95+' : '--';
document.getElementById('aiq-maintain').textContent = count > 0 ? '90+' : '--';
document.getElementById('aiq-readability').textContent = count > 0 ? '88+' : '--';
// 颜色
const totalEl = document.getElementById('aiq-total');
if (avgScore >= 80) {
totalEl.parentElement.className = 'card-body bg-success text-white';
} else if (avgScore >= 60) {
totalEl.parentElement.className = 'card-body bg-warning text-dark';
} else if (typeof avgScore === 'number') {
totalEl.parentElement.className = 'card-body bg-danger text-white';
}
} catch (e) {
console.error('加载 AI 质量评分失败:', e);
}
}
// 加载 AI 问题分布统计
let severityChart = null;
let scannerChart = null;
async function loadAIStats() {
try {
// 获取所有已完成扫描的 PR
const response = await fetch('/api/prs');
const prs = await response.json();
const completedPRs = prs.filter(p => p.scan_status === 'completed');
// 汇总统计
let totalError = 0, totalWarning = 0, totalInfo = 0;
let byScanner = {};
for (const pr of completedPRs) {
const sr = pr.scan_result;
if (!sr) continue;
for (const [name, result] of Object.entries(sr)) {
if (!byScanner[name]) byScanner[name] = 0;
const issues = result?.issues || [];
byScanner[name] += issues.length;
for (const issue of issues) {
const sev = (issue.severity || 'info').toLowerCase();
if (sev === 'error' || sev === 'critical') totalError++;
else if (sev === 'warning') totalWarning++;
else totalInfo++;
}
}
}
// 绘制严重程度饼图
const sevCanvas = document.getElementById('severity-chart');
if (sevCanvas) {
if (severityChart) severityChart.destroy();
severityChart = new Chart(sevCanvas, {
type: 'doughnut',
data: {
labels: ['错误', '警告', '提示'],
datasets: [{
data: [totalError, totalWarning, totalInfo],
backgroundColor: ['#dc3545', '#ffc107', '#0dcaf0']
}]
},
options: {
responsive: true,
maintainAspectRatio: true,
plugins: { legend: { position: 'bottom', labels: { boxWidth: 12, padding: 8, font: { size: 11 } } } }
}
});
}
// 绘制扫描器分布饼图
const scanCanvas = document.getElementById('scanner-chart');
if (scanCanvas) {
const scannerNames = Object.keys(byScanner);
const scannerData = Object.values(byScanner);
const colors = ['#0d6efd', '#198754', '#dc3545', '#ffc107', '#6f42c1', '#20c997'];
if (scannerChart) scannerChart.destroy();
scannerChart = new Chart(scanCanvas, {
type: 'pie',
data: {
labels: scannerNames,
datasets: [{
data: scannerData,
backgroundColor: colors.slice(0, scannerNames.length)
}]
},
options: {
responsive: true,
maintainAspectRatio: true,
plugins: { legend: { position: 'bottom', labels: { boxWidth: 12, padding: 8, font: { size: 11 } } } }
}
});
}
// 问题类型排行
document.getElementById('issue-types-list').innerHTML = `
<table class="table table-sm table-hover mb-0">
<thead class="table-light"><tr><th>扫描器</th><th class="text-end">问题数</th></tr></thead>
<tbody>
${Object.entries(byScanner).sort((a, b) => b[1] - a[1]).map(([k, v]) => `<tr><td>${k}</td><td class="text-end"><span class="badge bg-primary">${v}</span></td></tr>`).join('')}
</tbody>
</table>
`;
} catch (e) {
console.error('加载统计失败:', e);
}
}
// 加载 PR 列表 // 加载 PR 列表
async function loadPRs() { async function loadPRs() {
try { try {
@@ -445,11 +873,132 @@
// 加载文件树(左侧树,点击文件在右侧显示完整内容+标注) // 加载文件树(左侧树,点击文件在右侧显示完整内容+标注)
loadPRFileTree(id); loadPRFileTree(id);
// 加载 AI 审查功能
loadQualityScore(id);
loadIssueStats(id);
} catch (e) { } catch (e) {
alert('加载 PR 详情失败: ' + e.message); alert('加载 PR 详情失败: ' + e.message);
} }
} }
// 加载质量评分
async function loadQualityScore(prId) {
const loadingEl = document.getElementById('quality-score-loading');
const contentEl = document.getElementById('quality-score-content');
if (!loadingEl || !contentEl) return;
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + prId + '/quality');
if (!response.ok) throw new Error('暂无数据');
const data = await response.json();
// 更新显示
document.getElementById('qs-total').textContent = data.total || '--';
document.getElementById('qs-security').textContent = data.security || '--';
document.getElementById('qs-maintain').textContent = data.maintainability || '--';
document.getElementById('qs-readability').textContent = data.readability || '--';
document.getElementById('qs-best').textContent = data.best_practices || '--';
// 颜色
const total = data.total || 0;
const totalEl = document.getElementById('qs-total');
if (total >= 80) totalEl.className = 'display-1 fw-bold text-success';
else if (total >= 60) totalEl.className = 'display-1 fw-bold text-warning';
else totalEl.className = 'display-1 fw-bold text-danger';
// 详情
const details = data.details || {};
document.getElementById('qs-details').textContent =
`错误: ${details.error_count || 0} | 警告: ${details.warning_count || 0} | 提示: ${details.info_count || 0}`;
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '暂无评分数据';
}
}
// 加载问题统计
async function loadIssueStats(prId) {
const loadingEl = document.getElementById('stats-loading');
const contentEl = document.getElementById('stats-content');
if (!loadingEl || !contentEl) return;
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + prId + '/stats');
if (!response.ok) throw new Error('暂无数据');
const data = await response.json();
document.getElementById('stat-error').textContent = data.by_severity?.error || 0;
document.getElementById('stat-warning').textContent = data.by_severity?.warning || 0;
document.getElementById('stat-info').textContent = data.by_severity?.info || 0;
// 按扫描器分布
const scannerList = document.getElementById('stat-scanner-list');
scannerList.innerHTML = '';
const scanners = data.by_scanner || {};
for (const [name, count] of Object.entries(scanners)) {
const li = document.createElement('li');
li.className = 'list-group-item d-flex justify-content-between align-items-center';
li.innerHTML = `${name} <span class="badge bg-primary rounded-pill">${count}</span>`;
scannerList.appendChild(li);
}
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '暂无统计数据';
}
}
// 生成修复建议(全局函数,供问题列表调用)
async function generateFix(filePath, line, message, code) {
const loadingEl = document.getElementById('fix-result-loading');
const contentEl = document.getElementById('fix-result-content');
if (!loadingEl || !contentEl) return;
// 切换到修复建议面板
document.getElementById('fix-tab').click();
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + currentPRId + '/fix', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({file: filePath, line: line, message: message, code: code})
});
if (!response.ok) throw new Error('生成失败');
const data = await response.json();
document.getElementById('fix-explanation').textContent = data.explanation || '';
document.getElementById('fix-code').textContent = data.fixed_code || '// 无修复建议';
const confBadge = document.getElementById('fix-confidence');
confBadge.textContent = data.confidence || '';
if (data.confidence === 'high') confBadge.className = 'badge bg-success';
else if (data.confidence === 'medium') confBadge.className = 'badge bg-warning';
else confBadge.className = 'badge bg-secondary';
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '生成修复建议失败: ' + e.message;
}
}
// 加载 PR 文件列表并渲染左侧树,点击文件在右侧显示完整内容 // 加载 PR 文件列表并渲染左侧树,点击文件在右侧显示完整内容
async function loadPRFileTree(prId) { async function loadPRFileTree(prId) {
const loadingEl = document.getElementById('pr-file-tree-loading'); const loadingEl = document.getElementById('pr-file-tree-loading');