40 Commits

Author SHA1 Message Date
8062ed4bfd Update test_demo/demo_flaws.py 2026-03-16 12:55:31 +08:00
dangzerong
ffd77057e3 Merge pull request 'Update test_demo/demo_flaws.py' (#25) from dingshuo-patch-2 into main 2026-03-15 13:34:19 +08:00
279a01b897 Update test_demo/demo_flaws.py 2026-03-15 13:29:58 +08:00
77fd09e6d2 Update test_demo/demo_flaws.py 2026-03-15 12:27:43 +08:00
dangzerong
91c16cbc88 Merge pull request '测试的扫描文件' (#20) from dev into main 2026-03-13 21:04:31 +08:00
Dang Zerong
c8c0ef1620 测试的扫描文件 2026-03-13 21:00:53 +08:00
dangzerong
95831d5190 Merge pull request 'dev' (#19) from dev into main 2026-03-13 18:09:32 +08:00
Dang Zerong
9a14c0b219 测试的扫描文件 2026-03-13 18:00:27 +08:00
Dang Zerong
87b2dacf65 测试的扫描文件 2026-03-13 18:00:22 +08:00
dangzerong
453414efb2 Merge pull request 'dev' (#17) from dev into main 2026-03-13 17:57:36 +08:00
Dang Zerong
04518812f4 Merge branch 'dev' of https://code.deep-pilot.chat/Bosch_Demo/code_scan into dev 2026-03-13 17:42:54 +08:00
Dang Zerong
6c4ee107f9 测试的扫描文件 2026-03-13 17:42:27 +08:00
dangzerong
d11b349d5e Merge pull request '测试的扫描文件' (#15) from dev into main 2026-03-13 17:41:51 +08:00
dangzerong
2a2ff1ad5f Merge branch 'main' into dev 2026-03-13 17:40:33 +08:00
Dang Zerong
bc5a19fffc 测试的扫描文件 2026-03-13 17:39:20 +08:00
Dang Zerong
78655ce5dc 测试的扫描文件 2026-03-13 17:37:46 +08:00
Dang Zerong
2201f6d696 Merge branch 'dev' 2026-03-13 17:37:10 +08:00
Dang Zerong
97881ee00e 测试的扫描文件 2026-03-13 17:32:23 +08:00
dangzerong
e46aff2797 Merge pull request 'dev' (#13) from dev into main 2026-03-13 17:25:28 +08:00
Dang Zerong
887c8ae154 测试的扫描文件 2026-03-13 16:49:13 +08:00
Dang Zerong
ecc39402d5 测试的扫描文件 2026-03-13 16:27:32 +08:00
dangzerong
dc9b921091 Merge pull request '测试的扫描文件' (#11) from dev into main
Reviewed-on: #11
2026-03-13 16:26:54 +08:00
Dang Zerong
a928b79d6d 测试的扫描文件 2026-03-13 16:25:37 +08:00
dangzerong
0991b3de26 Merge pull request 'dev' (#10) from dev into main 2026-03-13 16:24:21 +08:00
Dang Zerong
1876be1777 测试的扫描文件 2026-03-13 16:22:23 +08:00
Dang Zerong
51fc1a6aae 先删除测试代码,后面再提交 2026-03-13 16:21:39 +08:00
Dang Zerong
726c21feac 可演示 2026-03-13 16:04:20 +08:00
dangzerong
a525a2b4ac Merge pull request 'dev' (#9) from dev into main 2026-03-13 15:32:42 +08:00
Dang Zerong
cb90b66f09 代码测试 2026-03-13 11:26:01 +08:00
Dang Zerong
8f9e5bf4f5 代码测试 2026-03-12 16:13:18 +08:00
dangzerong
b4f923f76c Merge pull request '删除代码测试' (#7) from dev into main 2026-03-12 16:13:04 +08:00
Dang Zerong
a3ae277dcb 删除代码测试 2026-03-12 15:50:17 +08:00
dangzerong
a772afb2df Merge pull request 'add web' (#6) from dev into main 2026-03-12 15:41:58 +08:00
Dang Zerong
027cf50759 add web 2026-03-12 14:42:23 +08:00
Dang Zerong
9ae55407fc add web 2026-03-11 21:19:23 +08:00
dangzerong
99398f190b Merge pull request 'dev' (#5) from dev into main 2026-03-11 21:18:57 +08:00
Dang Zerong
14680f053e add web 2026-03-11 21:16:47 +08:00
Dang Zerong
459a8cb295 add web 2026-03-11 12:30:45 +08:00
dangzerong
6c6befbaea Merge pull request 'init' (#3) from dev into main 2026-03-11 09:28:48 +08:00
Dang Zerong
17306c6814 init 2026-03-10 17:22:07 +08:00
22 changed files with 4698 additions and 319 deletions

34
Dockerfile Normal file
View File

@@ -0,0 +1,34 @@
# 使用中科大镜像源的 Python 基础镜像
FROM python:3.11.15-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV FLASK_RUN_HOST=0.0.0.0
# 安装系统依赖(用于 git 等工具)
RUN apt-get update && apt-get install -y --no-install-recommends \
git \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖(显式安装,避免缓存导致遗漏)
RUN pip install --no-cache-dir -r requirements.txt \
&& pip install --no-cache-dir "GitPython>=3.1.0" "gitdb>=4.0.1" "smmap>=3.0.1"
# 复制应用代码
COPY . .
# 创建报告目录
RUN mkdir -p reports
# 暴露端口
EXPOSE 5000
# 启动应用
CMD ["python", "app.py"]

109
README.md Normal file
View File

@@ -0,0 +1,109 @@
# AI 代码质量扫描系统
自动化代码质量扫描工具,监听 PR 事件,自动扫描代码缺陷并提供合并决策支持。
## 工作流程
```
┌──────────┐ 1. 创建 PR ┌────────────┐
│ Gitea │ ───────────────► │ Webhook │
└──────────┘ │ Server │
└─────┬──────┘
│ 2. 拉取代码、扫描、存库
┌────────────┐
│ SQLite │
│ Database │
└────────────┘
│ 3. 前端查询
┌────────────┐
│ 前端页面 │
└────────────┘
```
## 三个核心功能
### 1. PR 创建
- Gitea 仓库创建 PR 时自动触发扫描
- 支持事件:`opened``reopened``synchronize`
### 2. 后端处理
- 拉取 PR 对应的代码
- 执行代码扫描Python/JavaScript/TypeScript
- AI 智能审查代码缺陷
- 扫描结果存入 SQLite 数据库
### 3. 前端功能
- 查询所有 PR 及扫描状态
- 查看每个 PR 的缺陷详情
- 一键「拒绝合并」或「同意合并」
## 快速开始
```bash
# 安装依赖
pip install -r requirements.txt
# 运行服务
python app.py
```
访问 http://localhost:5000 查看前端页面。
## Docker 部署
### 1. 构建镜像
```bash
docker buildx build --load --push -t dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest .
```
### 2. 登录仓库
```bash
docker login dcr-by1jwyxk44.71826370.xyz
```
### 3. Push 到仓库
```bash
docker run -d --name code-scan -p 5000:5000 dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
```
### 4. 使用 docker compose 启动
```bash
# 启动服务
docker compose up -d
# 查看日志
docker compose logs -f
# 停止服务
docker compose down
```
## 配置
配置文件 `config.yaml`
```yaml
server:
host: "0.0.0.0"
port: 5000
gitea:
base_url: "https://code.deep-pilot.chat"
webhook_secret: "xxx"
api_token: "xxx"
ai:
provider: "api"
model: "qwen3.5-plus"
api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_key: "sk-xxx"
```

707
app.py
View File

@@ -2,20 +2,27 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
import time
import logging import logging
import traceback
from typing import Dict, Tuple, Any from typing import Dict, Tuple, Any
import json
os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0') os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0')
from flask import Flask, request, jsonify from flask import Flask, request, jsonify, send_from_directory
import yaml import yaml
from webhook.handler import GiteaWebhookHandler from webhook.handler import GiteaWebhookHandler
from scanner.python_scanner import PythonScanner from scanner.python_scanner import PythonScanner
from scanner.js_scanner import JavaScriptScanner from scanner.js_scanner import JavaScriptScanner
from scanner.security_scanner import SecurityScanner from scanner.security_scanner import SecurityScanner
from scanner.ai_reviewer import AIReviewer
from scanner.diff_parser import merge_issues_with_code
from report.generator import ReportGenerator from report.generator import ReportGenerator
from notify.feishu import FeishuNotifier from notify.feishu import FeishuNotifier
from gitea_client import GiteaClient
from db import PRScanDB
# 配置日志 # 配置日志
logging.basicConfig( logging.basicConfig(
@@ -43,8 +50,10 @@ webhook_handler = GiteaWebhookHandler(config['gitea'])
python_scanner = PythonScanner(config.get('scanner', {})) python_scanner = PythonScanner(config.get('scanner', {}))
js_scanner = JavaScriptScanner(config.get('scanner', {})) js_scanner = JavaScriptScanner(config.get('scanner', {}))
security_scanner = SecurityScanner(config.get('scanner', {})) security_scanner = SecurityScanner(config.get('scanner', {}))
ai_reviewer = AIReviewer(config.get('ai', {}))
report_generator = ReportGenerator(config.get('report', {})) report_generator = ReportGenerator(config.get('report', {}))
feishu_notifier = FeishuNotifier(config['feishu']) feishu_notifier = FeishuNotifier(config['feishu'])
gitea_client = GiteaClient(config['gitea'])
@app.route('/') @app.route('/')
@@ -123,21 +132,27 @@ def handle_gitea_webhook():
# Python 扫描 # Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan( scan_results['python'] = python_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描 # JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan( scan_results['javascript'] = js_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描 # 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan( scan_results['security'] = security_scanner.scan(
clone_url, commit_id, branch clone_url, commit_id, branch
) )
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告 # 生成报告
report = report_generator.generate( report = report_generator.generate(
@@ -169,12 +184,6 @@ def handle_gitea_webhook():
def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
""" """
处理 Pull Request 事件 处理 Pull Request 事件
Args:
payload: Webhook payload
Returns:
JSON 响应和状态码
""" """
try: try:
# 解析 PR 事件 # 解析 PR 事件
@@ -205,26 +214,73 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
if web_url: if web_url:
clone_url = web_url.rstrip('/') + '.git' clone_url = web_url.rstrip('/') + '.git'
# 获取 PR 中变更的文件列表
changed_files = []
try:
if '/' in repo_name:
repo_owner, repo_name_only = repo_name.split('/', 1)
else:
repo_owner = 'Bosch_Demo'
repo_name_only = repo_name
pr_files = gitea_client.get_pull_request_files(repo_owner, repo_name_only, pr_number)
if pr_files:
changed_files = [f.get('filename', '') for f in pr_files if f.get('filename')]
logger.info(f"获取到 PR #{pr_number} 的变更文件: {changed_files}")
except Exception as e:
logger.warning(f"获取 PR 文件列表失败: {e}")
# 执行代码扫描 # 执行代码扫描
scan_results = {} scan_results = {}
# Python 扫描 # Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan( scan_results['python'] = python_scanner.scan(
clone_url, source_sha, source_branch clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描 # JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan( scan_results['javascript'] = js_scanner.scan(
clone_url, source_sha, source_branch clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描 # 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan( scan_results['security'] = security_scanner.scan(
clone_url, source_sha, source_branch clone_url, source_sha, source_branch, changed_files
) )
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# AI 代码审查
if config.get('ai', {}).get('enabled', False):
start_time = time.time()
scan_results['ai'] = ai_reviewer.scan(
clone_url, source_sha, source_branch, changed_files
)
logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}")
# 获取 PR 的代码差异,用于将问题与代码片段关联
pr_diff = None
try:
pr_diff = gitea_client.get_pull_request_diff(repo_owner, repo_name_only, pr_number)
logger.info(f"已获取 PR #{pr_number} 的 diff长度: {len(pr_diff) if pr_diff else 0}")
except Exception as e:
logger.warning(f"获取 PR diff 失败: {e}")
# 将问题与代码片段关联
scan_details_with_code = merge_issues_with_code(scan_results, pr_diff or '')
logger.info(f"[DEBUG] scan_results keys: {list(scan_results.keys())}")
for k, v in scan_results.items():
if isinstance(v, dict):
issues_cnt = len(v.get('issues', []))
logger.info(f"[DEBUG] scan_results['{k}'] issues count: {issues_cnt}")
logger.info(f"[DEBUG] scan_details_with_code scanners: {[s.get('name') for s in scan_details_with_code.get('scanners', [])] if scan_details_with_code else 'None'}")
# 生成报告 # 生成报告
commit_message = f'PR #{pr_number}: {pr_title}' commit_message = f'PR #{pr_number}: {pr_title}'
@@ -236,15 +292,29 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
author=author, author=author,
scan_results=scan_results, scan_results=scan_results,
pr_url=pr_url, pr_url=pr_url,
target_branch=target_branch target_branch=target_branch,
pr_number=pr_number
) )
# 发送飞书通知 # 发送飞书通知
feishu_notifier.send_report(report) feishu_notifier.send_report(report)
# 保存扫描结果到数据库
pr_info_for_db = {
'repo_name': repo_name,
'pr_number': pr_number,
'pr_title': pr_title,
'pr_url': pr_url,
'source_branch': source_branch,
'target_branch': target_branch,
'author': author
}
PRScanDB.save_pr_scan(pr_info_for_db, scan_results, report.get('file_path'), scan_details_with_code)
logger.info(f'PR #{pr_number} 扫描完成') logger.info(f'PR #{pr_number} 扫描完成')
except Exception as e: except Exception as e:
traceback.print_exc()
logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}') logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}')
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@@ -271,13 +341,19 @@ def manual_scan():
scan_results = {} scan_results = {}
if 'python' in config.get('scanner', {}).get('languages', []): if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch) scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
if any(lang in config.get('scanner', {}).get('languages', []) if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']): for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch) scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
start_time = time.time()
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch) scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告 # 生成报告
report = report_generator.generate( report = report_generator.generate(
@@ -298,10 +374,617 @@ def manual_scan():
}), 200 }), 200
except Exception as e: except Exception as e:
logger.error(f'手动扫描失败: {str(e)}', exc_info=True) logger.error(f'手动扫描失败: {str(e)}')
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@app.route('/feishu/card_action', methods=['POST'])
def handle_feishu_card_action():
"""处理飞书卡片按钮点击事件"""
try:
payload = request.json
logger.info(f'收到飞书卡片回调: {payload}')
# 处理 URL 验证请求
challenge = payload.get('challenge')
if challenge:
logger.info('处理 URL 验证请求')
return jsonify({'challenge': challenge}), 200
# 解析回调数据
action_data = payload.get('action', {})
if not action_data:
action_data = payload.get('value', {})
action_type = action_data.get('action')
owner = action_data.get('owner')
repo = action_data.get('repo')
pr_number = action_data.get('pr_number')
pr_url = action_data.get('pr_url')
if not all([action_type, owner, repo, pr_number]):
logger.error('卡片回调数据不完整')
return jsonify({'error': 'Missing required parameters'}), 400
logger.info(f'执行操作: {action_type}, PR: {owner}/{repo}#{pr_number}')
# 执行对应操作
if action_type == 'merge':
success = gitea_client.merge_pull_request(
owner=owner,
repo=repo,
pr_number=int(pr_number),
merge_message=f'通过飞书机器人合并 PR #{pr_number}'
)
result_message = '✅ **已合并 PR**' if success else '❌ **合并失败**'
elif action_type == 'close':
success = gitea_client.close_pull_request(
owner=owner,
repo=repo,
pr_number=int(pr_number)
)
result_message = '✅ **已关闭 PR取消合并**' if success else '❌ **关闭失败**'
else:
result_message = f'⚠️ **未知操作: {action_type}**'
# 发送操作结果到飞书
result_text = f"{result_message}\n\n**PR:** {owner}/{repo}#{pr_number}\n**链接:** [查看PR]({pr_url})"
feishu_notifier.send_simple_message('PR 操作结果', result_text)
return jsonify({'status': 'ok', 'message': result_message}), 200
except Exception as e:
logger.error(f'处理飞书卡片回调失败: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/feishu/webhook', methods=['POST'])
def handle_feishu_webhook():
"""处理飞书开放平台的验证回调"""
try:
payload = request.json
# 处理验证请求
challenge = payload.get('challenge')
if challenge:
return jsonify({'challenge': challenge}), 200
# 处理消息事件
event_type = payload.get('type')
if event_type == 'url_verification':
return jsonify({'challenge': payload.get('challenge')}), 200
logger.info(f'收到飞书事件: {event_type}')
return jsonify({'status': 'ok'}), 200
except Exception as e:
logger.error(f'处理飞书 Webhook 失败: {str(e)}')
return jsonify({'error': str(e)}), 500
# ============================================
# 扫描管理平台 API
# ============================================
@app.route('/api/prs')
def api_get_prs():
"""获取所有 PR 列表"""
try:
state = request.args.get('state')
prs = PRScanDB.get_all_prs(state=state)
# 转换 scan_result 字符串为对象
for pr in prs:
if pr.get('scan_result') and isinstance(pr['scan_result'], str):
try:
pr['scan_result'] = json.loads(pr['scan_result'])
except:
pass
if pr.get('ai_review') and isinstance(pr['ai_review'], str):
try:
pr['ai_review'] = json.loads(pr['ai_review'])
except:
pass
return jsonify(prs)
except Exception as e:
logger.error(f'获取 PR 列表失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>')
def api_get_pr(pr_id):
"""获取单个 PR 详情"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 转换 JSON 字段
if pr.get('scan_result') and isinstance(pr['scan_result'], str):
try:
pr['scan_result'] = json.loads(pr['scan_result'])
except:
pass
if pr.get('ai_review') and isinstance(pr['ai_review'], str):
try:
pr['ai_review'] = json.loads(pr['ai_review'])
except:
pass
# 返回带代码片段的扫描详情
if pr.get('scan_details_with_code') and isinstance(pr['scan_details_with_code'], str):
try:
pr['scan_details_with_code'] = json.loads(pr['scan_details_with_code'])
except:
pass
return jsonify(pr)
except Exception as e:
logger.error(f'获取 PR 详情失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/diff')
def api_get_pr_diff(pr_id):
"""获取 PR 的代码差异"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
repo_name = pr.get('repo_name', '')
pr_number = pr.get('pr_number', 0)
if not repo_name or not pr_number:
return jsonify({'error': 'PR 信息不完整'}), 400
# 解析 owner 和 repo
if '/' in repo_name:
owner, repo = repo_name.split('/', 1)
else:
owner = 'Bosch_Demo' # 默认
repo = repo_name
logger.info(f"获取 PR #{pr_number} ({owner}/{repo}) 的 diff")
# 获取 diff
diff = gitea_client.get_pull_request_diff(owner, repo, pr_number)
if diff is None:
return jsonify({'error': '获取 diff 失败'}), 500
return jsonify({
'diff': diff,
'pr_number': pr_number,
'repo_name': repo_name
})
except Exception as e:
logger.error(f'获取 PR diff 失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/files')
def api_get_pr_files(pr_id):
"""获取 PR 变更文件列表(用于左侧树状展示)"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
repo_name = pr.get('repo_name', '')
pr_number = pr.get('pr_number', 0)
if not repo_name or not pr_number:
return jsonify({'error': 'PR 信息不完整'}), 400
if '/' in repo_name:
owner, repo = repo_name.split('/', 1)
else:
owner, repo = 'Bosch_Demo', repo_name
files = gitea_client.get_pull_request_files(owner, repo, pr_number)
if files is None:
return jsonify({'error': '获取文件列表失败'}), 500
return jsonify({'files': files, 'repo_name': repo_name})
except Exception as e:
logger.error(f'获取 PR 文件列表失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/file')
def api_get_pr_file_content(pr_id):
"""获取 PR 中某文件在源分支上的完整内容"""
try:
path = request.args.get('path')
if not path:
return jsonify({'error': '缺少 path 参数'}), 400
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
repo_name = pr.get('repo_name', '')
pr_number = pr.get('pr_number', 0)
if not repo_name or not pr_number:
return jsonify({'error': 'PR 信息不完整'}), 400
if '/' in repo_name:
owner, repo = repo_name.split('/', 1)
else:
owner, repo = 'Bosch_Demo', repo_name
pr_info = gitea_client.get_pull_request(owner, repo, pr_number)
if not pr_info:
return jsonify({'error': '获取 PR 信息失败'}), 500
head_ref = pr_info.get('head', {}).get('ref') or pr_info.get('head_branch') or pr.get('source_branch')
if not head_ref:
return jsonify({'error': '无法确定源分支'}), 400
content = gitea_client.get_file_contents(owner, repo, path, head_ref)
if content is None:
return jsonify({'error': '文件不存在或无法读取'}), 404
# 获取该文件的扫描问题PR 创建时已扫描并存入 scan_details_with_code
scan_issues = []
path_norm = path.replace('\\', '/').strip()
logger.info(f"[DEBUG] 请求文件: path_norm={path_norm}")
scan_details = pr.get('scan_details_with_code')
if isinstance(scan_details, str):
try:
scan_details = json.loads(scan_details)
except Exception:
scan_details = None
if scan_details:
logger.info(f"[DEBUG] scan_details keys: {list(scan_details.keys()) if isinstance(scan_details, dict) else 'not dict'}")
if scan_details.get('scanners'):
logger.info(f"[DEBUG] scanners count: {len(scan_details['scanners'])}")
for scanner in scan_details['scanners']:
scanner_name = scanner.get('name', '')
issues_count = len(scanner.get('issues', []))
logger.info(f"[DEBUG] scanner={scanner_name}, issues_count={issues_count}")
# 打印前几个 issue 的 file 看看
for idx, issue in enumerate(scanner.get('issues', [])[:3]):
logger.info(f"[DEBUG] issue[{idx}] file={issue.get('file')}, line={issue.get('line')}")
if scan_details and scan_details.get('scanners'):
for scanner in scan_details['scanners']:
for issue in scanner.get('issues', []):
issue_file = (issue.get('file') or '').replace('\\', '/').strip()
if not issue_file:
continue
# 匹配:精确相等或一端包含另一端(兼容 basename 或完整路径)
if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm):
logger.info(f"[DEBUG] 匹配成功: issue_file={issue_file}, path_norm={path_norm}")
sev = (issue.get('severity') or 'info')
if isinstance(sev, str):
sev = sev.lower()
scanner_name = scanner.get('name', '')
scanner_display = {'python': 'Python', 'javascript': 'JavaScript', 'security': 'Security'}.get(scanner_name, scanner_name)
scan_issues.append({
'scanner': scanner_display,
'severity': sev,
'line': int(issue.get('line') or 0),
'message': (issue.get('message') or issue.get('description') or '').strip(),
'code_context': issue.get('code_context')
})
logger.info(f"[DEBUG] 最终 scan_issues count: {len(scan_issues)}")
# 获取 AI 审查结果
ai_issues = []
if scan_details and scan_details.get('ai'):
ai_data = scan_details['ai']
for issue in ai_data.get('issues', []):
issue_file = (issue.get('file') or '').replace('\\', '/').strip()
if not issue_file:
continue
# 匹配:精确相等或一端包含另一端
if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm):
ai_issues.append({
'scanner': 'AI',
'severity': issue.get('severity', 'info'),
'line': int(issue.get('line') or 1),
'message': issue.get('message', ''),
'category': 'ai',
'code_context': issue.get('code_context')
})
logger.info(f"[DEBUG] AI issues count: {len(ai_issues)}")
# 合并静态扫描问题和 AI 问题
all_issues = scan_issues + ai_issues
return jsonify({'path': path, 'content': content, 'scan_issues': all_issues})
except Exception as e:
logger.error(f'获取文件内容失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/quality')
def api_get_quality_score(pr_id):
"""获取 PR 的代码质量评分"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 从 scan_result 中获取质量评分
scan_result = pr.get('scan_result')
if isinstance(scan_result, str):
try:
scan_result = json.loads(scan_result)
except:
scan_result = None
quality_score = None
if scan_result and scan_result.get('ai'):
quality_score = scan_result['ai'].get('quality_score')
if not quality_score:
return jsonify({'error': '暂无质量评分'}), 404
return jsonify(quality_score)
except Exception as e:
logger.error(f'获取质量评分失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/stats')
def api_get_issue_stats(pr_id):
"""获取 PR 的问题统计"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 获取 scan_details_with_code
scan_details = pr.get('scan_details_with_code')
if isinstance(scan_details, str):
try:
scan_details = json.loads(scan_details)
except:
scan_details = None
if not scan_details:
return jsonify({'error': '暂无扫描详情'}), 404
# 统计各扫描器的问题
stats = {
'by_severity': {'error': 0, 'warning': 0, 'info': 0},
'by_scanner': {},
'total': 0
}
# 统计静态扫描器
for scanner in scan_details.get('scanners', []):
scanner_name = scanner.get('name', 'unknown')
scanner_issues = scanner.get('issues', [])
stats['by_scanner'][scanner_name] = len(scanner_issues)
for issue in scanner_issues:
sev = (issue.get('severity') or 'info').lower()
if sev in stats['by_severity']:
stats['by_severity'][sev] += 1
stats['total'] += 1
# 统计 AI 扫描器
ai_data = scan_details.get('ai', {})
if ai_data:
ai_issues = ai_data.get('issues', [])
stats['by_scanner']['AI'] = len(ai_issues)
for issue in ai_issues:
sev = (issue.get('severity') or 'info').lower()
if sev in stats['by_severity']:
stats['by_severity'][sev] += 1
stats['total'] += 1
return jsonify(stats)
except Exception as e:
logger.error(f'获取问题统计失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/fix', methods=['POST'])
def api_generate_fix(pr_id):
"""生成问题修复建议"""
try:
data = request.get_json()
if not data:
return jsonify({'error': '请求体为空'}), 400
file_path = data.get('file')
line = data.get('line', 1)
message = data.get('message', '')
code = data.get('code', '')
if not file_path or not message:
return jsonify({'error': '缺少必要参数'}), 400
# 调用 AI 生成修复建议
fix_result = ai_reviewer.generate_fix_suggestion(file_path, line, message, code)
if fix_result:
return jsonify(fix_result)
else:
return jsonify({'error': '生成修复建议失败'}), 500
except Exception as e:
logger.error(f'生成修复建议失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/history')
def api_get_pr_history():
"""获取 PR 扫描历史趋势"""
try:
limit = request.args.get('limit', 20, type=int)
repo_name = request.args.get('repo_name', '')
# 获取 PR 列表
prs = PRScanDB.get_all_prs(status='completed')
if repo_name:
prs = [p for p in prs if p.get('repo_name') == repo_name]
# 只取最近的 N 个
prs = prs[:limit]
# 构建趋势数据
history = []
for pr in reversed(prs): # 从旧到新
issues_count = pr.get('issues_count', 0)
# 从 scan_result 中各扫描器汇总 error/warning 数量
scan_result = pr.get('scan_result')
if isinstance(scan_result, str):
try:
scan_result = json.loads(scan_result)
except:
scan_result = None
error_count = 0
warning_count = 0
if scan_result and isinstance(scan_result, dict):
# 遍历各扫描器,汇总 error 和 warning
for scanner_name, scanner_result in scan_result.items():
if isinstance(scanner_result, dict):
summary = scanner_result.get('summary', {})
if isinstance(summary, dict):
error_count += summary.get('error', 0)
warning_count += summary.get('warning', 0)
history.append({
'pr_id': pr.get('id'),
'pr_number': pr.get('pr_number'),
'repo_name': pr.get('repo_name'),
'title': pr.get('pr_title', ''),
'author': pr.get('author', ''),
'created_at': pr.get('created_at', ''),
'issues_count': issues_count,
'error_count': error_count,
'warning_count': warning_count,
'total_issues': error_count + warning_count,
'state': pr.get('state', '')
})
return jsonify(history)
except Exception as e:
logger.error(f'获取历史趋势失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/merge', methods=['POST'])
def api_merge_pr(pr_id):
"""合并 PR"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'success': False, 'message': 'PR not found'}), 404
logger.info(f"合并 PR - 数据库记录: {pr}")
if pr['state'] != 'open':
return jsonify({'success': False, 'message': 'PR 状态不是 open'}), 400
# 解析仓库名
repo_name = pr['repo_name']
logger.info(f"仓库名称: {repo_name}")
if '/' in repo_name:
owner, repo = repo_name.split('/')
else:
owner = ''
repo = repo_name
logger.info(f"owner: {owner}, repo: {repo}, pr_number: {pr['pr_number']}")
# 先检查 PR 状态
pr_info = gitea_client.get_pull_request(owner, repo, pr['pr_number'])
if not pr_info:
return jsonify({'success': False, 'message': '无法获取 PR 信息,请检查仓库名称是否正确'}), 400
logger.info(f"PR 信息: state={pr_info.get('state')}, mergeable={pr_info.get('mergeable')}")
if pr_info.get('state') != 'open':
return jsonify({'success': False, 'message': f'PR 状态是 {pr_info.get("state")}, 不是 open'}), 400
# 调用 Gitea API 合并
success = gitea_client.merge_pull_request(
owner=owner,
repo=repo,
pr_number=pr['pr_number'],
merge_message=f'通过管理平台合并 PR #{pr["pr_number"]}'
)
if success:
# 更新数据库状态
PRScanDB.update_pr_state(pr_id, 'merged', merged_by='admin')
# 发送飞书通知
result_text = f"✅ **PR 已通过管理平台合并**\n\n**PR:** {repo_name}#{pr['pr_number']}\n**标题:** {pr['pr_title']}\n**合并人:** 管理员"
feishu_notifier.send_simple_message('PR 合并', result_text)
return jsonify({'success': True, 'message': 'PR 已合并'})
else:
return jsonify({'success': False, 'message': '合并失败'}), 500
except Exception as e:
logger.error(f'合并 PR 失败: {str(e)}')
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/close', methods=['POST'])
def api_close_pr(pr_id):
"""关闭 PR"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'success': False, 'message': 'PR not found'}), 404
if pr['state'] != 'open':
return jsonify({'success': False, 'message': 'PR 状态不是 open'}), 400
# 解析仓库名
repo_name = pr['repo_name']
if '/' in repo_name:
owner, repo = repo_name.split('/')
else:
owner = ''
repo = repo_name
# 调用 Gitea API 关闭
success = gitea_client.close_pull_request(
owner=owner,
repo=repo,
pr_number=pr['pr_number']
)
if success:
# 更新数据库状态
PRScanDB.update_pr_state(pr_id, 'closed')
# 发送飞书通知
result_text = f"❌ **PR 已被管理平台拒绝**\n\n**PR:** {repo_name}#{pr['pr_number']}\n**标题:** {pr['pr_title']}"
feishu_notifier.send_simple_message('PR 拒绝', result_text)
return jsonify({'success': True, 'message': 'PR 已关闭'})
else:
return jsonify({'success': False, 'message': '关闭失败'}), 500
except Exception as e:
logger.error(f'关闭 PR 失败: {str(e)}')
return jsonify({'success': False, 'message': str(e)}), 500
# ============================================
# 扫描管理平台页面
# ============================================
# 获取 web 目录的绝对路径
WEB_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'web')
@app.route('/dashboard')
def dashboard():
"""扫描管理平台首页"""
return send_from_directory(WEB_DIR, 'index.html')
@app.route('/web/<path:filename>')
def serve_static(filename):
"""提供静态文件服务"""
return send_from_directory(WEB_DIR, filename)
if __name__ == '__main__': if __name__ == '__main__':
# 强制监听所有网络接口 # 强制监听所有网络接口
host = "0.0.0.0" host = "0.0.0.0"

View File

@@ -5,15 +5,26 @@ server:
gitea: gitea:
# Gitea 服务器地址(根据实际情况修改) # Gitea 服务器地址(根据实际情况修改)
base_url: "http://154.9.253.114:3000" base_url: "https://code.deep-pilot.chat"
# Gitea Webhook 签名密钥,需要与 Gitea 配置一致 # Gitea Webhook 签名密钥,需要与 Gitea 配置一致
webhook_secret: "BoschScan_2026_xxx" webhook_secret: "BoschScan_2026_xxx"
# Gitea API Token用于合并/关闭PR
api_token: "8e223093b069a2e25f485360bd820e4dc255defc"
feishu: feishu:
# 飞书机器人 Webhook 地址(替换为你的实际地址) # 飞书机器人 Webhook 地址(替换为你的实际地址)
webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/c436570a-e6af-49a1-867d-4331c0f1cb06" #webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/636258bb-5f6e-40aa-aca3-10e61381325e"
# 飞书消息签名密钥(可选) # 飞书消息签名密钥(可选)
secret: "" secret: ""
# 飞书应用配置(用于发送文件)
# 如果需要发送文件,需要在飞书开放平台创建应用并获取以下配置
app_id: "cli_a9256d9d657b9bce"
app_secret: "4rsELdjStVuWnklxn0PLDbC0WPrSaKyN"
# 发送目标的群聊 ID应用机器人发送文件需要群聊 ID
# 在群聊中添加机器人后,使用 https://open.feishu.cn/document/ukTMukTMukTM/uADOwUjLwgDM14CM4ATN 获取群 ID
chat_id: "oc_313d71d460a851f31b7ddd0aca14c5b0"
# 是否在通知中附加报告文件
attach_report_file: true
scanner: scanner:
# 支持的编程语言 # 支持的编程语言
@@ -33,3 +44,18 @@ report:
output_dir: "./reports" output_dir: "./reports"
# 是否保留报告文件 # 是否保留报告文件
keep_files: true keep_files: true
ai:
# AI 审查器配置
# 支持: "ollama" (本地) 或 "api" (在线API)
provider: "api"
# 模型名称(阿里云通义千问)
model: "qwen3.5-plus"
# API 地址
api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
# API 密钥
api_key: "sk-616332b2afa94699b4572d0fe6ac370a"
# 是否启用 AI 审查
enabled: true
# 每次审查的最大代码行数
max_lines: 100

289
db.py Normal file
View File

@@ -0,0 +1,289 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库模型
存储 PR 扫描结果和管理状态
"""
import sqlite3
import json
import os
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional
DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db')
def get_cst_now():
"""获取当前中国时区时间 (UTC+8)"""
return datetime.now(timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')
def get_db_connection():
"""获取数据库连接"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""初始化数据库表"""
conn = get_db_connection()
cursor = conn.cursor()
# PR 扫描结果表
cursor.execute('''
CREATE TABLE IF NOT EXISTS pr_scans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pr_number INTEGER NOT NULL,
repo_name TEXT NOT NULL,
pr_title TEXT,
pr_url TEXT,
source_branch TEXT,
target_branch TEXT,
author TEXT,
state TEXT DEFAULT 'pending',
scan_status TEXT DEFAULT 'pending',
scan_result TEXT,
scan_details_with_code TEXT,
issues_count INTEGER DEFAULT 0,
security_issues INTEGER DEFAULT 0,
ai_review TEXT,
report_path TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
merged_at TIMESTAMP,
merged_by TEXT,
UNIQUE(repo_name, pr_number)
)
''')
# 扫描记录详情表
cursor.execute('''
CREATE TABLE IF NOT EXISTS scan_details (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pr_scan_id INTEGER NOT NULL,
scan_type TEXT NOT NULL,
scan_data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (pr_scan_id) REFERENCES pr_scans(id)
)
''')
conn.commit()
conn.close()
class PRScanDB:
"""PR 扫描结果数据库操作类"""
@staticmethod
def save_pr_scan(pr_info: Dict[str, Any], scan_results: Dict[str, Any],
report_path: str = None, scan_details_with_code: Dict = None) -> int:
"""
保存 PR 扫描结果
Args:
pr_info: PR 信息
scan_results: 扫描结果
report_path: 报告文件路径
scan_details_with_code: 带代码片段的扫描详情
Returns:
扫描记录 ID
"""
conn = get_db_connection()
cursor = conn.cursor()
# 统计问题数量
issues_count = 0
security_issues = 0
for scan_type, result in scan_results.items():
if isinstance(result, dict):
if 'issues' in result:
issues_count += len(result.get('issues', []))
if 'vulnerabilities' in result:
security_issues += len(result.get('vulnerabilities', []))
# 检查是否已存在
cursor.execute(
'SELECT id FROM pr_scans WHERE repo_name = ? AND pr_number = ?',
(pr_info.get('repo_name'), pr_info.get('pr_number'))
)
existing = cursor.fetchone()
if existing:
# 更新现有记录
cst_time = get_cst_now()
cursor.execute('''
UPDATE pr_scans SET
pr_title = ?,
source_branch = ?,
target_branch = ?,
author = ?,
scan_status = ?,
scan_result = ?,
scan_details_with_code = ?,
issues_count = ?,
security_issues = ?,
ai_review = ?,
report_path = ?,
updated_at = ?
WHERE repo_name = ? AND pr_number = ?
''', (
pr_info.get('pr_title'),
pr_info.get('source_branch'),
pr_info.get('target_branch'),
pr_info.get('author'),
'completed',
json.dumps(scan_results, ensure_ascii=False),
json.dumps(scan_details_with_code, ensure_ascii=False) if scan_details_with_code else None,
issues_count,
security_issues,
json.dumps(scan_results.get('ai', {}), ensure_ascii=False),
report_path,
cst_time,
pr_info.get('repo_name'),
pr_info.get('pr_number')
))
scan_id = existing['id']
else:
# 插入新记录
cst_time = get_cst_now()
cursor.execute('''
INSERT INTO pr_scans (
pr_number, repo_name, pr_title, pr_url,
source_branch, target_branch, author,
state, scan_status, scan_result, scan_details_with_code,
issues_count, security_issues, ai_review, report_path,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
pr_info.get('pr_number'),
pr_info.get('repo_name'),
pr_info.get('pr_title'),
pr_info.get('pr_url'),
pr_info.get('source_branch'),
pr_info.get('target_branch'),
pr_info.get('author'),
'open',
'completed',
json.dumps(scan_results, ensure_ascii=False),
json.dumps(scan_details_with_code, ensure_ascii=False) if scan_details_with_code else None,
issues_count,
security_issues,
json.dumps(scan_results.get('ai', {}), ensure_ascii=False),
report_path,
cst_time,
cst_time
))
scan_id = cursor.lastrowid
conn.commit()
conn.close()
return scan_id
@staticmethod
def get_all_prs(status: str = None, state: str = None) -> List[Dict[str, Any]]:
"""
获取所有 PR 扫描记录
Args:
status: 扫描状态 (pending/completed)
state: PR 状态 (open/merged/closed)
Returns:
PR 列表
"""
conn = get_db_connection()
cursor = conn.cursor()
query = 'SELECT * FROM pr_scans WHERE 1=1'
params = []
if status:
query += ' AND scan_status = ?'
params.append(status)
if state:
query += ' AND state = ?'
params.append(state)
query += ' ORDER BY updated_at DESC'
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
@staticmethod
def get_pr_by_id(scan_id: int) -> Optional[Dict[str, Any]]:
"""根据 ID 获取 PR 扫描记录"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM pr_scans WHERE id = ?', (scan_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
@staticmethod
def get_pr_by_number(repo_name: str, pr_number: int) -> Optional[Dict[str, Any]]:
"""根据仓库名和 PR 号获取扫描记录"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
'SELECT * FROM pr_scans WHERE repo_name = ? AND pr_number = ?',
(repo_name, pr_number)
)
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
@staticmethod
def update_pr_state(scan_id: int, state: str, merged_by: str = None):
"""更新 PR 状态"""
conn = get_db_connection()
cursor = conn.cursor()
cst_time = get_cst_now()
if state == 'merged':
cursor.execute('''
UPDATE pr_scans SET
state = ?,
merged_at = ?,
merged_by = ?,
updated_at = ?
WHERE id = ?
''', (state, cst_time, merged_by, cst_time, scan_id))
else:
cursor.execute('''
UPDATE pr_scans SET
state = ?,
updated_at = ?
WHERE id = ?
''', (state, cst_time, scan_id))
conn.commit()
conn.close()
@staticmethod
def delete_pr(scan_id: int):
"""删除 PR 扫描记录"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM scan_details WHERE pr_scan_id = ?', (scan_id,))
cursor.execute('DELETE FROM pr_scans WHERE id = ?', (scan_id,))
conn.commit()
conn.close()
# 初始化数据库
init_db()

9
docker-compose.yml Normal file
View File

@@ -0,0 +1,9 @@
version: "3.8"
services:
code-scan:
image: dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
container_name: code-scan
ports:
- "5000:5000"
restart: unless-stopped

267
gitea_client.py Normal file
View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Gitea API 客户端
用于操作 PR合并、关闭等
"""
import logging
import requests
from typing import Dict, Any, Optional, List
logger = logging.getLogger(__name__)
class GiteaClient:
"""Gitea API 客户端"""
def __init__(self, config: Dict[str, Any]):
"""
初始化 Gitea 客户端
Args:
config: Gitea 配置,包含 base_url 和 api_token
"""
self.base_url = config.get('base_url', '').rstrip('/')
self.api_token = config.get('api_token', '')
if not self.base_url:
raise ValueError("Gitea base_url 未配置")
if not self.api_token:
raise ValueError("Gitea api_token 未配置")
def _get_headers(self) -> Dict[str, str]:
"""获取 API 请求头"""
return {
'Authorization': f'token {self.api_token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
def merge_pull_request(self, owner: str, repo: str, pr_number: int,
merge_message: str = "",
merge_commit_id: str = None) -> bool:
"""
合并 Pull Request
"""
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}/merge"
logger.info(f"合并 PR URL: {url}")
# Gitea API 需要 do 参数merge, rebase, squash
payload = {
"do": "merge",
"merge_commit_message": merge_message or f"Merge PR #{pr_number}"
}
if merge_commit_id:
payload["merge_commit_id"] = merge_commit_id
try:
response = requests.post(
url,
headers=self._get_headers(),
json=payload,
timeout=30
)
logger.info(f"合并响应状态码: {response.status_code}")
logger.info(f"合并响应内容: {response.text[:500]}")
if response.status_code == 200:
logger.info(f"成功合并 PR #{pr_number}")
return True
elif response.status_code == 405:
logger.error(f"PR #{pr_number} 无法合并: {response.json().get('message', '未知原因')}")
return False
elif response.status_code == 422:
logger.error(f"PR #{pr_number} 合并失败: {response.json().get('message', '参数错误')}")
return False
else:
logger.error(f"合并 PR #{pr_number} 失败: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"合并 PR #{pr_number} 异常: {str(e)}")
return False
def close_pull_request(self, owner: str, repo: str, pr_number: int) -> bool:
"""
关闭 Pull Request
Args:
owner: 仓库所有者
repo: 仓库名称
pr_number: PR 编号
Returns:
是否关闭成功
"""
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}"
payload = {
"state": "closed"
}
try:
response = requests.patch(
url,
headers=self._get_headers(),
json=payload,
timeout=30
)
if response.status_code in (200, 201):
logger.info(f"成功关闭 PR #{pr_number}")
return True
else:
logger.error(f"关闭 PR #{pr_number} 失败: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"关闭 PR #{pr_number} 异常: {str(e)}")
return False
def get_pull_request(self, owner: str, repo: str, pr_number: int) -> Optional[Dict[str, Any]]:
"""
获取 Pull Request 信息
Args:
owner: 仓库所有者
repo: 仓库名称
pr_number: PR 编号
Returns:
PR 信息字典,失败返回 None
"""
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}"
try:
response = requests.get(
url,
headers=self._get_headers(),
timeout=30
)
if response.status_code == 200:
return response.json()
else:
logger.error(f"获取 PR #{pr_number} 失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取 PR #{pr_number} 异常: {str(e)}")
return None
def can_merge(self, owner: str, repo: str, pr_number: int) -> bool:
"""
检查 PR 是否可以合并
Args:
owner: 仓库所有者
repo: 仓库名称
pr_number: PR 编号
Returns:
是否可以合并
"""
pr_info = self.get_pull_request(owner, repo, pr_number)
if pr_info:
return pr_info.get('mergeable', False) and pr_info.get('state') == 'open'
return False
def get_pull_request_diff(self, owner: str, repo: str, pr_number: int) -> Optional[str]:
"""
获取 Pull Request 的代码差异
Args:
owner: 仓库所有者
repo: 仓库名称
pr_number: PR 编号
Returns:
diff 文本,失败返回 None
"""
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}.diff"
try:
response = requests.get(
url,
headers=self._get_headers(),
timeout=30
)
if response.status_code == 200:
logger.info(f"成功获取 PR #{pr_number} 的 diff")
return response.text
else:
logger.error(f"获取 PR #{pr_number} diff 失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取 PR #{pr_number} diff 异常: {str(e)}")
return None
def get_pull_request_files(self, owner: str, repo: str, pr_number: int) -> Optional[List[Dict[str, Any]]]:
"""
获取 PR 中修改的文件列表
Args:
owner: 仓库所有者
repo: 仓库名称
pr_number: PR 编号
Returns:
文件列表,失败返回 None
"""
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}/files"
try:
response = requests.get(
url,
headers=self._get_headers(),
timeout=30
)
if response.status_code == 200:
logger.info(f"成功获取 PR #{pr_number} 的文件列表")
return response.json()
else:
logger.error(f"获取 PR #{pr_number} 文件列表失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取 PR #{pr_number} 文件列表异常: {str(e)}")
return None
def get_file_contents(self, owner: str, repo: str, filepath: str, ref: str) -> Optional[str]:
"""
获取仓库中指定文件在给定 ref分支/commit下的内容
Args:
owner: 仓库所有者
repo: 仓库名称
filepath: 文件路径
ref: 分支名或 commit SHA
Returns:
文件内容文本,失败返回 None
"""
import base64
import urllib.parse
encoded_path = urllib.parse.quote(filepath, safe='')
url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{encoded_path}?ref={urllib.parse.quote(ref)}"
try:
response = requests.get(
url,
headers=self._get_headers(),
timeout=30
)
if response.status_code == 200:
data = response.json()
if data.get('encoding') == 'base64' and data.get('content'):
return base64.b64decode(data['content']).decode('utf-8', errors='replace')
return None
logger.error(f"获取文件 {filepath} 失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"获取文件内容异常: {str(e)}")
return None

77
install.sh Normal file
View File

@@ -0,0 +1,77 @@
#!/bin/bash
# AI Code Quality Scanner 安装脚本
echo "========================================="
echo " AI Code Quality Scanner 安装脚本"
echo "========================================="
# 检查 Python 版本
if ! command -v python3 &> /dev/null; then
echo "❌ 错误: 未找到 Python3请先安装 Python 3.8+"
exit 1
fi
PYTHON_VERSION=$(python3 -c 'import sys; print(".".join(map(str, sys.version_info[:2])))')
echo "✅ Python 版本: $PYTHON_VERSION"
# 创建虚拟环境(可选)
if [ ! -d "venv" ]; then
echo "📦 创建虚拟环境..."
python3 -m venv venv
fi
# 激活虚拟环境
source venv/bin/activate
# 安装依赖
echo "📦 安装 Python 依赖..."
pip install --upgrade pip
pip install -r requirements.txt
# 创建必要的目录
echo "📁 创建必要的目录..."
mkdir -p reports
mkdir -p /tmp/code_scanner_clones
# 检查并安装代码扫描工具(可选)
echo "🛠️ 检查代码扫描工具..."
# Pylint (Python)
if command -v pylint &> /dev/null || python -m pylint --version &> /dev/null; then
echo " ✅ Pylint 已安装"
else
echo " ⚠️ Pylint 未安装 (pip install pylint)"
fi
# Flake8 (Python)
if command -v flake8 &> /dev/null || python -m flake8 --version &> /dev/null; then
echo " ✅ Flake8 已安装"
else
echo " ⚠️ Flake8 未安装 (pip install flake8)"
fi
# Bandit (Python 安全扫描)
if command -v bandit &> /dev/null || python -m bandit --version &> /dev/null; then
echo " ✅ Bandit 已安装"
else
echo " ⚠️ Bandit 未安装 (pip install bandit)"
fi
# Node.js 和 npm (JavaScript 扫描)
if command -v node &> /dev/null; then
NODE_VERSION=$(node --version)
echo " ✅ Node.js 版本: $NODE_VERSION"
else
echo " ⚠️ Node.js 未安装 (JavaScript 扫描需要)"
fi
echo ""
echo "========================================="
echo " 安装完成!"
echo "========================================="
echo ""
echo "下一步操作:"
echo "1. 编辑 config.yaml 配置飞书机器人和 Gitea"
echo "2. 运行: python app.py"
echo "3. 在 Gitea 中配置 Webhook"
echo ""

View File

@@ -10,8 +10,9 @@ import hashlib
import hmac import hmac
import base64 import base64
import logging import logging
import os
import requests import requests
from typing import Dict, Any from typing import Dict, Any, Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -30,9 +31,107 @@ class FeishuNotifier:
self.webhook_url = config.get('webhook_url', '') self.webhook_url = config.get('webhook_url', '')
self.secret = config.get('secret', '') self.secret = config.get('secret', '')
# 文件上传配置
self.app_id = config.get('app_id', '')
self.app_secret = config.get('app_secret', '')
self.chat_id = config.get('chat_id', '')
self.attach_report_file = config.get('attach_report_file', True)
# 缓存 token
self._tenant_access_token = None
self._token_expires_at = 0
if not self.webhook_url: if not self.webhook_url:
logger.warning('飞书 Webhook URL 未配置') logger.warning('飞书 Webhook URL 未配置')
def _get_tenant_access_token(self) -> Optional[str]:
"""
获取飞书 tenant_access_token
Returns:
token 字符串,如果失败返回 None
"""
if not self.app_id or not self.app_secret:
return None
# 检查缓存的 token 是否有效
if self._tenant_access_token and time.time() < self._token_expires_at:
return self._tenant_access_token
try:
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {"Content-Type": "application/json; charset=utf-8"}
payload = {
"app_id": self.app_id,
"app_secret": self.app_secret
}
response = requests.post(url, headers=headers, json=payload, timeout=10)
result = response.json()
if result.get("code") == 0:
self._tenant_access_token = result.get("tenant_access_token")
# 提前 5 分钟过期
self._token_expires_at = time.time() + result.get("expire", 7200) - 300
return self._tenant_access_token
else:
logger.error(f"获取 tenant_access_token 失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"获取 tenant_access_token 异常: {str(e)}")
return None
def _upload_file(self, file_path: str, file_name: str) -> Optional[str]:
"""
上传文件到飞书
Args:
file_path: 文件本地路径
file_name: 文件名
Returns:
file_key 用于发送消息,如果失败返回 None
"""
token = self._get_tenant_access_token()
if not token:
logger.error("无法获取 token上传文件失败")
return None
try:
url = "https://open.feishu.cn/open-apis/drive/v1/files/upload_all"
headers = {
"Authorization": f"Bearer {token}"
}
# 读取文件
with open(file_path, 'rb') as f:
file_content = f.read()
# 构建 multipart 请求
files = {
'file': (file_name, file_content, 'application/octet-stream')
}
data = {
'file_name': file_name,
'parent_node': 'root' # 根目录
}
response = requests.post(url, headers=headers, files=files, data=data, timeout=60)
result = response.json()
if result.get("code") == 0:
file_key = result.get("data", {}).get("file", {}).get("token")
logger.info(f"文件上传成功: {file_name}")
return file_key
else:
logger.error(f"文件上传失败: {result.get('msg')}")
return None
except Exception as e:
logger.error(f"文件上传异常: {str(e)}")
return None
def send_report(self, report: Dict[str, Any]) -> bool: def send_report(self, report: Dict[str, Any]) -> bool:
""" """
发送扫描报告到飞书 发送扫描报告到飞书
@@ -48,47 +147,169 @@ class FeishuNotifier:
return False return False
try: try:
# 构建消息内容 # 上传报告文件(如果配置了)
message = self._build_message(report) file_key = None
if self.attach_report_file and self.app_id and self.app_secret:
report_file = report.get('report_file')
if report_file and os.path.exists(report_file):
file_name = os.path.basename(report_file)
file_key = self._upload_file(report_file, file_name)
# 如果配置了签名,则使用签名验证 # 如果配置了 chat_id使用应用机器人发送消息
if self.secret: if self.chat_id and self.app_id and self.app_secret:
timestamp, sign = self._generate_sign() # 使用应用机器人 API 发送
payload = { self._send_app_message(report, file_key)
"timestamp": timestamp,
"sign": sign,
"msg_type": "interactive",
"card": message
}
else: else:
payload = { # 使用 Webhook 发送
"msg_type": "interactive", message = self._build_message(report, file_key=file_key)
"card": message self._send_webhook_message(message)
}
# 发送请求 logger.info('飞书消息发送成功')
headers = {'Content-Type': 'application/json'} return True
response = requests.post(
self.webhook_url,
headers=headers,
data=json.dumps(payload).encode('utf-8'),
timeout=30
)
# 解析响应
result = response.json()
if result.get('code') == 0:
logger.info('飞书消息发送成功')
return True
else:
logger.error(f'飞书消息发送失败: {result.get("msg")}')
return False
except Exception as e: except Exception as e:
logger.error(f'发送飞书通知失败: {str(e)}', exc_info=True) logger.error(f'发送飞书通知失败: {str(e)}', exc_info=True)
return False return False
def _send_webhook_message(self, message: Dict[str, Any]) -> bool:
"""使用 Webhook 发送消息"""
# 如果配置了签名,则使用签名验证
if self.secret:
timestamp, sign = self._generate_sign()
payload = {
"timestamp": timestamp,
"sign": sign,
"msg_type": "interactive",
"card": message
}
else:
payload = {
"msg_type": "interactive",
"card": message
}
# 发送请求
headers = {'Content-Type': 'application/json'}
response = requests.post(
self.webhook_url,
headers=headers,
data=json.dumps(payload).encode('utf-8'),
timeout=30
)
# 解析响应
result = response.json()
if result.get('code') == 0:
return True
else:
logger.error(f'飞书消息发送失败: {result.get("msg")}')
return False
def _send_app_message(self, report: Dict[str, Any], file_key: str = None) -> bool:
"""使用应用机器人发送消息(支持文件)"""
token = self._get_tenant_access_token()
if not token:
logger.error("无法获取 token")
return False
# 构建消息内容
basic_info = self._build_basic_info_text(report)
# 构建消息元素
elements = []
# 添加基本信息
elements.append({
"tag": "div",
"text": {
"tag": "lark_md",
"content": basic_info
}
})
# 添加文件
if file_key:
elements.append({
"tag": "file",
"file_key": file_key
})
# 构造消息体
message_content = {
"title": report.get('status_text', '代码扫描报告'),
"elements": elements
}
# 发送消息到群聊
try:
url = "https://open.feishu.cn/open-apis/im/v1/messages"
params = {
"receive_id_type": "chat_id"
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
payload = {
"receive_id": self.chat_id,
"msg_type": "interactive",
"content": json.dumps(message_content)
}
response = requests.post(url, params=params, headers=headers, json=payload, timeout=30)
result = response.json()
if result.get("code") == 0:
logger.info("应用机器人消息发送成功")
return True
else:
logger.error(f"应用机器人消息发送失败: {result.get('msg')}")
return False
except Exception as e:
logger.error(f"应用机器人消息发送异常: {str(e)}")
return False
def _build_basic_info_text(self, report: Dict[str, Any]) -> str:
"""构建基本信息的文本"""
status = report.get('status', 'pass')
if status == 'pass':
status_icon = ''
elif status == 'fail':
status_icon = ''
else:
status_icon = '⚠️'
pr_url = report.get('pr_url')
target_branch = report.get('target_branch')
if pr_url and target_branch:
title = f"{status_icon} PR 代码质量扫描报告"
basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n"
f"**源分支:** `{report.get('branch', 'unknown')}` → **目标分支:** `{target_branch}`\n"
f"**PR链接:** [查看PR]({pr_url})\n"
f"**提交:** `{report.get('commit_id', 'unknown')}`\n"
f"**提交者:** {report.get('author', 'unknown')}")
else:
title = f"{status_icon} 代码质量扫描报告"
basic_info = (f"**仓库:** `{report.get('repo_name', 'unknown')}`\n"
f"**分支:** `{report.get('branch', 'unknown')}`\n"
f"**提交:** `{report.get('commit_id', 'unknown')}`\n"
f"**提交者:** {report.get('author', 'unknown')}")
total_issues = report.get('total_issues', 0)
total_errors = report.get('total_errors', 0)
total_warnings = report.get('total_warnings', 0)
info = f"{title}\n\n{basic_info}\n\n"
info += f"**扫描状态:** {report.get('status_text', 'unknown')}\n"
info += f"📊 总问题: {total_issues} | 🔴 错误: {total_errors} | 🟡 警告: {total_warnings}\n"
info += f"🕐 扫描时间: {report.get('timestamp', '')}"
return info
def _generate_sign(self) -> tuple: def _generate_sign(self) -> tuple:
""" """
生成飞书签名 生成飞书签名
@@ -113,7 +334,7 @@ class FeishuNotifier:
return timestamp, sign return timestamp, sign
def _build_message(self, report: Dict[str, Any]) -> Dict[str, Any]: def _build_message(self, report: Dict[str, Any], file_key: str = None) -> Dict[str, Any]:
""" """
构建飞书卡片消息 构建飞书卡片消息
@@ -143,8 +364,14 @@ class FeishuNotifier:
# 获取扫描结果详情 # 获取扫描结果详情
scan_details = [] scan_details = []
for scanner_name, result in report.get('scan_results', {}).items(): for scanner_name, result in report.get('scan_results', {}).items():
# AI 审查的 summary 是字符串,跳过
if scanner_name == 'ai':
continue
tool_name = result.get('tool', scanner_name) tool_name = result.get('tool', scanner_name)
summary = result.get('summary', {}) summary = result.get('summary', {})
if not isinstance(summary, dict):
continue
files_scanned = result.get('files_scanned', 0) files_scanned = result.get('files_scanned', 0)
total = summary.get('total', 0) total = summary.get('total', 0)
@@ -257,6 +484,68 @@ class FeishuNotifier:
} }
}) })
# 添加 PR 操作按钮(仅 PR 扫描且扫描通过时显示)
if pr_url and target_branch and status == 'pass':
card["elements"].append({
"tag": "div",
"text": {
"tag": "lark_md",
"content": "**请选择操作:**"
}
})
# 解析仓库信息用于按钮回调
repo_full_name = report.get('repo_name', '')
if '/' in repo_full_name:
owner, repo = repo_full_name.split('/', 1)
else:
owner, repo = '', repo_full_name
pr_number = report.get('pr_number', 0)
card["elements"].append({
"tag": "action",
"actions": [
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "✅ 同意合并"
},
"type": "primary",
"value": {
"action": "merge",
"owner": owner,
"repo": repo,
"pr_number": pr_number,
"pr_url": pr_url
}
},
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "❌ 取消合并"
},
"type": "danger",
"value": {
"action": "close",
"owner": owner,
"repo": repo,
"pr_number": pr_number,
"pr_url": pr_url
}
}
]
})
# 添加报告文件附件
if file_key:
card["elements"].append({
"tag": "file",
"file_key": file_key
})
return card return card
def send_simple_message(self, title: str, content: str) -> bool: def send_simple_message(self, title: str, content: str) -> bool:

24
pyproject.toml Normal file
View File

@@ -0,0 +1,24 @@
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "code-scan"
version = "1.0.0"
description = "代码扫描工具"
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"flask>=2.0.0",
"pyyaml>=5.0",
"requests>=2.25.0",
"python-dotenv>=0.19.0",
"GitPython>=3.1.0",
]
[project.scripts]
code-scan = "app:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["scanner*"]

View File

@@ -39,7 +39,8 @@ class ReportGenerator:
author: str, author: str,
scan_results: Dict[str, Any], scan_results: Dict[str, Any],
pr_url: str = None, pr_url: str = None,
target_branch: str = None target_branch: str = None,
pr_number: int = None
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
生成扫描报告 生成扫描报告
@@ -63,7 +64,13 @@ class ReportGenerator:
total_warnings = 0 total_warnings = 0
for scanner_name, result in scan_results.items(): for scanner_name, result in scan_results.items():
# AI 审查的 summary 是字符串,跳过统计
if scanner_name == 'ai':
continue
summary = result.get('summary', {}) summary = result.get('summary', {})
if not isinstance(summary, dict):
continue
total_issues += summary.get('total', 0) total_issues += summary.get('total', 0)
total_errors += summary.get('error', 0) + summary.get('high', 0) total_errors += summary.get('error', 0) + summary.get('high', 0)
total_warnings += summary.get('warning', 0) + summary.get('medium', 0) total_warnings += summary.get('warning', 0) + summary.get('medium', 0)
@@ -95,14 +102,17 @@ class ReportGenerator:
'scan_results': scan_results, 'scan_results': scan_results,
'pr_url': pr_url, 'pr_url': pr_url,
'target_branch': target_branch, 'target_branch': target_branch,
'pr_number': pr_number,
'markdown': self._generate_markdown( 'markdown': self._generate_markdown(
repo_name, branch, commit_id, commit_message, author, scan_results, status, status_text, pr_url, target_branch repo_name, branch, commit_id, commit_message, author, scan_results, status, status_text, pr_url, target_branch
) )
} }
# 保存报告文件 # 保存报告文件
report_file = None
if self.keep_files: if self.keep_files:
self._save_report(report) report_file = self._save_report(report)
report['report_file'] = report_file
return report return report
@@ -161,6 +171,10 @@ class ReportGenerator:
lines.append('') lines.append('')
for scanner_name, result in scan_results.items(): for scanner_name, result in scan_results.items():
# 跳过 AI 审查结果(单独处理)
if scanner_name == 'ai':
continue
tool_name = result.get('tool', scanner_name) tool_name = result.get('tool', scanner_name)
summary = result.get('summary', {}) summary = result.get('summary', {})
@@ -204,6 +218,68 @@ class ReportGenerator:
lines.append(f' - {message}') lines.append(f' - {message}')
lines.append('') lines.append('')
# AI 审查结果适配新格式issues 列表)
if 'ai' in scan_results:
ai_result = scan_results['ai']
lines.append('')
lines.append('## 🤖 AI 代码审查')
lines.append('')
# 新格式:直接使用 summary
if 'summary' in ai_result:
# summary 可能是字符串或 dict
summary = ai_result.get('summary', '')
if isinstance(summary, dict):
lines.append(f"发现 {summary.get('total', 0)} 个问题,"
f"错误 {summary.get('error', 0)}"
f"警告 {summary.get('warning', 0)}"
f"提示 {summary.get('info', 0)}")
else:
lines.append(str(summary))
lines.append('')
# 新格式issues 列表
ai_issues = ai_result.get('issues', [])
if ai_issues:
# 按文件分组
issues_by_file = {}
for issue in ai_issues:
file_name = issue.get('file', 'unknown')
if file_name not in issues_by_file:
issues_by_file[file_name] = []
issues_by_file[file_name].append(issue)
for file_name, issues in issues_by_file.items():
lines.append(f'### 📄 {file_name}')
lines.append('')
for i, issue in enumerate(issues[:10], 1):
severity = issue.get('severity', 'Info')
severity_emoji = {
'ERROR': '🔴',
'WARNING': '🟡',
'INFO': ''
}.get(severity.upper(), '')
line_num = issue.get('line', 0)
symbol = issue.get('symbol', '')
message = issue.get('message', 'No message')
code_context = issue.get('code_context', '')
defect_reason = issue.get('defect_reason', '')
lines.append(f'{i}. {severity_emoji} **{severity}** - 行 {line_num}')
if symbol:
lines.append(f' - 标识: `{symbol}`')
lines.append(f' - 问题: {message}')
if code_context:
lines.append(' - 代码:')
lines.append('```')
lines.append(code_context)
lines.append('```')
if defect_reason:
lines.append(f' - 原因: {defect_reason}')
lines.append('')
# 添加报告链接或下一步操作 # 添加报告链接或下一步操作
lines.append('---') lines.append('---')
lines.append('') lines.append('')
@@ -211,8 +287,13 @@ class ReportGenerator:
return '\n'.join(lines) return '\n'.join(lines)
def _save_report(self, report: Dict[str, Any]): def _save_report(self, report: Dict[str, Any]) -> str:
"""保存报告到文件""" """
保存报告到文件
Returns:
保存的文件路径
"""
try: try:
# 生成文件名 # 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
@@ -226,6 +307,8 @@ class ReportGenerator:
logger.info(f'报告已保存: {filepath}') logger.info(f'报告已保存: {filepath}')
return filepath
# 同时保存 JSON 格式(便于程序解析) # 同时保存 JSON 格式(便于程序解析)
json_filename = filename.replace('.md', '.json') json_filename = filename.replace('.md', '.json')
json_filepath = os.path.join(self.output_dir, json_filename) json_filepath = os.path.join(self.output_dir, json_filename)

View File

@@ -2,3 +2,9 @@ flask>=2.0.0
pyyaml>=5.0 pyyaml>=5.0
requests>=2.25.0 requests>=2.25.0
python-dotenv>=0.19.0 python-dotenv>=0.19.0
GitPython>=3.1.0
gitdb>=4.0.1
smmap>=3.0.1
pylint>=2.17.0
flake8>=6.0.0
bandit>=1.7.0

578
scanner/ai_reviewer.py Normal file
View File

@@ -0,0 +1,578 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI 代码审查器
使用大模型进行智能代码审查
"""
import os
import re
import json
import logging
from typing import Dict, Any, List, Optional
from scanner.base import BaseScanner
logger = logging.getLogger(__name__)
class AIReviewer(BaseScanner):
"""AI 代码审查器"""
def __init__(self, config: Dict[str, Any]):
"""
初始化 AI 审查器
Args:
config: AI 配置
"""
# 先初始化基类
super().__init__(config.get('scanner', {}))
self.config = config
self.enabled = config.get('enabled', True)
self.provider = config.get('provider', 'api')
self.model = config.get('model', 'llama3')
self.api_url = config.get('api_url', 'http://localhost:11434')
self.api_key = config.get('api_key', '')
self.max_lines = config.get('max_lines', 200)
if not self.enabled:
logger.info('AI 审查器已禁用')
return
logger.info(f'AI 审查器初始化: {self.provider}/{self.model}')
def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]:
"""
执行代码扫描(实现抽象方法)
Args:
repo_url: 仓库 URL
commit_id: 提交 ID
branch: 分支名
changed_files: 可选的变更文件列表(来自 PR
Returns:
审查结果
"""
# 调用实际的审查逻辑
return self._do_review(repo_url=repo_url, commit_id=commit_id, branch=branch, changed_files=changed_files)
def _do_review(self, clone_dir: str = None, repo_url: str = None,
commit_id: str = None, branch: str = None,
language: str = 'python',
changed_files: Optional[List[str]] = None) -> Dict[str, Any]:
"""
执行 AI 代码审查
Args:
clone_dir: 仓库目录(如果已克隆则直接传入)
repo_url: 仓库 URL如果未克隆则需要传入
commit_id: 提交 ID
branch: 分支名
language: 编程语言
changed_files: 可选的变更文件列表(来自 PR
Returns:
审查结果(与 python_scanner.py 兼容的格式)
"""
result = {
'tool': 'AI Code Reviewer',
'language': language,
'status': 'success',
'issues': [],
'summary': {
'total': 0,
'error': 0,
'warning': 0,
'info': 0
},
'files_scanned': 0
}
if not self.enabled:
result['status'] = 'disabled'
result['summary'] = 'AI 审查已禁用'
return result
try:
# 如果没有传入 clone_dir需要克隆
if not clone_dir and repo_url:
clone_dir = self.clone_repo(repo_url, commit_id, branch)
if not clone_dir or not os.path.exists(clone_dir):
result['status'] = 'error'
result['error'] = '无法获取代码目录'
return result
# 获取要审查的代码文件
files = self._get_code_files(clone_dir, language, changed_files)
if not files:
result['summary'] = '未找到可审查的代码文件'
return result
# 对每个文件进行 AI 审查
all_issues = []
for file_path in files[:5]: # 限制最多审查 5 个文件
review = self._review_file(file_path, language, clone_dir)
if review and review.get('issues'):
all_issues.extend(review['issues'])
result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues
result['summary'] = self._calculate_summary(all_issues)
result['files_scanned'] = len(files[:5])
result['clone_dir'] = clone_dir
# 生成质量评分
result['quality_score'] = self._calculate_quality_score(all_issues, files[:5])
return result
except Exception as e:
logger.error(f'AI 审查失败: {str(e)}')
result['status'] = 'error'
result['error'] = str(e)
return result
def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
"""计算问题摘要"""
summary = {
'total': len(issues),
'error': 0,
'warning': 0,
'info': 0
}
for issue in issues:
severity = issue.get('severity', '').lower()
if severity in ['error', 'critical', 'fatal']:
summary['error'] += 1
elif severity in ['warning', 'moderate']:
summary['warning'] += 1
else:
summary['info'] += 1
return summary
def _calculate_quality_score(self, issues: List[Dict], files: List[str]) -> Dict[str, Any]:
"""
计算代码质量评分
返回:总分(0-100)及各维度评分
"""
if not files:
return {'total': 100, 'maintainability': 100, 'security': 100, 'readability': 100, 'best_practices': 100}
# 统计问题
error_count = sum(1 for i in issues if i.get('severity', '').lower() in ['error', 'critical'])
warning_count = sum(1 for i in issues if i.get('severity', '').lower() == 'warning')
info_count = sum(1 for i in issues if i.get('severity', '').lower() == 'info')
# 分类统计
security_keywords = ['sql injection', 'xss', 'csrf', 'password', 'secret', 'token', '权限', '注入', '认证']
security_issues = sum(1 for i in issues if any(k in (i.get('message', '') + i.get('symbol', '')).lower() for k in security_keywords))
# 计算各维度分数
# 可维护性:基于错误和警告数量
issue_weight = error_count * 5 + warning_count * 2 + info_count * 0.5
maintainability = max(0, 100 - issue_weight)
# 安全性:基于安全问题
security_score = max(0, 100 - security_issues * 15)
# 可读性:基于 info 级别问题(风格类)
readability = max(0, 100 - info_count * 3)
# 最佳实践:基于 warning 级别
best_practices = max(0, 100 - warning_count * 5)
# 总分:加权平均
total = int((maintainability * 0.3 + security_score * 0.35 + readability * 0.15 + best_practices * 0.2))
return {
'total': total,
'maintainability': maintainability,
'security': security_score,
'readability': readability,
'best_practices': best_practices,
'details': {
'error_count': error_count,
'warning_count': warning_count,
'info_count': info_count,
'security_issues': security_issues
}
}
def generate_fix_suggestion(self, file_path: str, line: int, message: str, code: str) -> Optional[str]:
"""
对指定问题生成修复建议代码
"""
prompt = f"""你是一位代码修复专家。请根据以下问题,生成修复后的代码。
问题描述:{message}
问题所在行号:{line}
原始代码:
```
{code}
```
请以 JSON 格式输出修复建议:
```json
{{
"fixed_code": "修复后的完整代码或关键片段",
"explanation": "修复说明50字以内",
"confidence": "high/medium/low 修复把握度"
}}
```
如果无法修复,请返回:{{"fixed_code": "", "explanation": "无法自动修复", "confidence": "low"}}"""
try:
response = self._call_ai(prompt)
if response and response.get('fixed_code'):
return response
except Exception as e:
logger.warning(f'生成修复建议失败: {e}')
return None
def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]:
"""获取代码文件列表"""
import glob
extensions = {
'python': ['.py'],
'javascript': ['.js', '.jsx'],
'typescript': ['.ts', '.tsx']
}
exts = extensions.get(language, ['.py'])
# 如果提供了变更文件列表,只返回这些文件
if changed_files:
files = []
for changed_file in changed_files:
if any(changed_file.endswith(ext) for ext in exts):
full_path = os.path.join(clone_dir, changed_file)
if os.path.exists(full_path):
files.append(full_path)
return files[:10]
# 否则扫描整个仓库
files = []
for ext in exts:
pattern = os.path.join(clone_dir, '**', f'*{ext}')
files.extend(glob.glob(pattern, recursive=True))
# 过滤掉测试文件和虚拟环境
files = [f for f in files if not any(x in f for x in [
'test_', '_test.', 'venv', 'node_modules', '__pycache__'
])]
return files[:10] # 最多 10 个文件
def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]:
"""审查单个文件"""
issues = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
code = f.read()
# 限制代码行数
lines = code.split('\n')
if len(lines) > self.max_lines:
code = '\n'.join(lines[:self.max_lines])
truncated = True
else:
truncated = False
# 给代码加行号再发给模型,便于模型返回准确行号
code_with_lines = self._code_with_line_numbers(code)
prompt = self._build_prompt(code_with_lines, language)
# 调用 AI
response = self._call_ai(prompt)
# 获取相对路径
rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path
if not response:
return {
'file': rel_path,
'path': file_path,
'truncated': truncated,
'issues': []
}
# 解析 AI 响应,转换为标准 issues 格式,并校正行号
ai_issues = response.get('issues', [])
for issue in ai_issues:
self._correct_issue_line(issue, code)
issues.append({
'tool': 'ai_reviewer',
'type': issue.get('type', 'info'),
'severity': issue.get('severity', 'Info'),
'message': issue.get('message', ''),
'file': rel_path,
'line': issue.get('line', 0),
'column': issue.get('column', 0),
'symbol': issue.get('symbol', ''),
'code_context': issue.get('code_context', ''),
'defect_reason': issue.get('defect_reason', '')
})
return {
'file': rel_path,
'path': file_path,
'truncated': truncated,
'issues': issues
}
except Exception as e:
logger.warning(f'审查文件失败 {file_path}: {str(e)}')
return None
def _build_prompt(self, code: str, language: str) -> str:
"""构建审查 prompt"""
if language == 'python':
lang_name = 'Python'
elif language in ['javascript', 'typescript']:
lang_name = 'JavaScript/TypeScript'
else:
lang_name = language
prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,找出潜在的问题和缺陷。
请以 JSON 格式输出审查结果,必须包含以下字段:
```json
{{
"issues": [
{{
"line": 行号,
"column": 列号,
"message": "问题描述",
"type": "error/warning/info 之一",
"severity": "Error/Warning/Info 之一",
"symbol": "错误标识符如 unused-variable, syntax-error 等",
"code_context": "问题代码的上下文(包含问题的那行或几行代码)",
"defect_reason": "缺陷原因分析30字以内简洁描述"
}}
]
}}
```
注意:
1. line 和 column 是问题所在的行号和列号(从 1 开始)
2. type: error=错误, warning=警告, info=信息
3. severity: Error=严重, Warning=一般, Info=提示
4. code_context: 包含问题代码的那一行或相邻的几行
5. defect_reason: 精简描述30字以内说明问题原因和风险
如果代码没有问题,返回空数组: {{"issues": []}}
重要:以下代码每行前已标注行号(格式为 "行号|"),请根据问题实际出现的代码行,严格使用该行前的行号填写 issues 中的 line 字段,不要猜测或使用错误行号。
以下是待审查的代码(行号已标注):
```{language}
{code}
```"""
return prompt
def _code_with_line_numbers(self, code: str) -> str:
"""给代码每行前加上行号,便于模型返回准确行号"""
lines = code.split('\n')
width = len(str(len(lines)))
return '\n'.join(f'{i:>{width}}| {line}' for i, line in enumerate(lines, 1))
def _correct_issue_line(self, issue: Dict[str, Any], code: str) -> None:
"""
根据 message/symbol 在源码中搜索,尽量把 issue 的 line 校正到真实出现位置。
AI 返回的行号常不准确,通过匹配问题相关的标识符(如 'unused_module')修正行号。
"""
line = issue.get('line')
if not line or not code:
return
lines = code.split('\n')
if line < 1 or line > len(lines):
return
# 从 message 中提取被引用的标识符(如 'unused_module' -> unused_module
message = (issue.get('message') or '')
symbol = (issue.get('symbol') or '').strip()
candidates = []
if symbol:
candidates.append(symbol)
for m in re.finditer(r"['\"]([a-zA-Z_][a-zA-Z0-9_]*)['\"]", message or ''):
candidates.append(m.group(1))
# 若 message 里没有引号标识符,取首段英文/数字/下划线作为关键词
if not candidates:
first_word = re.search(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', message)
if first_word:
candidates.append(first_word.group(1))
for token in candidates:
if not token:
continue
for i, code_line in enumerate(lines):
if token in code_line:
issue['line'] = i + 1
return
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 AI 服务"""
try:
return self._call_api(prompt)
except Exception as e:
print("异常追踪信息:", e.__traceback__)
logger.error(f'AI 调用失败: {str(e)}')
return None
def _extract_json_obj(self, content: Any) -> Optional[Dict[str, Any]]:
"""
从模型输出中尽可能提取 JSON 对象(dict)。
兼容场景:
- content 已经是 dict
- content 是 JSON 字符串
- content 被 ```json ... ``` 或 ``` ... ``` 包裹
- content 前后夹杂说明文字,只要包含一个最外层 { ... } 就尝试解析
"""
if content is None:
logger.debug("_extract_json_obj: content is None")
return None
# 如果已经是 dict直接返回
if isinstance(content, dict):
logger.debug("_extract_json_obj: content is already dict")
return content
if not isinstance(content, str):
content = str(content)
text = content.strip()
logger.debug(f"_extract_json_obj: 原始内容长度 = {len(text)}")
logger.debug(f"_extract_json_obj: 原始内容前100字符: {text[:100]}")
# 去掉代码块包裹(兼容 ```json / ``` json / ```JSON 等)
lowered = text.lower()
fence_start = lowered.find('```')
if fence_start != -1:
logger.debug(f"_extract_json_obj: 发现代码块 fence_start={fence_start}")
# 找到第一段 fence
after = text[fence_start + 3:]
after_l = after.lower()
# 如果 fence 后紧跟语言标识json 或其他),跳过这一行直到换行
newline_idx = after.find('\n')
if newline_idx != -1:
lang_header = after_l[:newline_idx].strip()
logger.debug(f"_extract_json_obj: 语言标识: {lang_header}")
body = after[newline_idx + 1:]
# 截取到下一个 fence 结束
end_idx = body.lower().find('```')
if end_idx != -1:
candidate = body[:end_idx].strip()
else:
# 没有结束 fence直接用 body 作为候选(可能是截断的 JSON
candidate = body.strip()
# 只有在确实像 json 的情况下才替换,避免误伤普通文本
if '{' in candidate and '}' in candidate:
text = candidate
logger.debug(f"_extract_json_obj: 提取代码块内容成功,长度={len(text)}")
else:
# 没有换行就按旧逻辑尽量截取
pass
# 直接解析
try:
obj = json.loads(text)
logger.debug("_extract_json_obj: 直接解析成功")
return obj if isinstance(obj, dict) else None
except Exception as e:
logger.debug(f"_extract_json_obj: 直接解析失败: {e}")
# 兜底:截取最外层 { ... } 再解析
start = text.find('{')
end = text.rfind('}')
logger.debug(f"_extract_json_obj: 查找大括号 start={start}, end={end}")
if start != -1 and end != -1 and end > start:
candidate = text[start:end + 1].strip()
logger.debug(f"_extract_json_obj: 候选内容长度={len(candidate)}, 前50字符: {candidate[:50]}")
try:
obj = json.loads(candidate)
logger.debug("_extract_json_obj: 兜底解析成功")
return obj if isinstance(obj, dict) else None
except Exception as e:
logger.debug(f"_extract_json_obj: 兜底解析失败: {e}")
return None
logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象")
return None
def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用在线 API"""
import requests
headers = {
'Content-Type': 'application/json'
}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
# 根据 API URL 自动判断 provider
if 'siliconflow' in self.api_url:
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7
}
elif 'deepseek' in self.api_url:
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7
}
elif 'dashscope' in self.api_url:
# 阿里云 dashscope 专用端点
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7,
"stream": False # 显式关闭流式
}
else:
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7
}
logger.info(f"调用 API: {url}, model={self.model}")
try:
response = requests.post(url, json=payload, headers=headers, timeout=120)
if response.status_code == 200:
result = response.json()
content = result['choices'][0]['message']['content']
logger.info(f"API 返回内容长度: {len(content) if content else 0}")
parsed = self._extract_json_obj(content)
return parsed
logger.warning(f'API 返回错误: {response.status_code}, {response.text[:200]}')
return None
except Exception as e:
logger.warning(f'API 调用失败: {e}')
return None

View File

@@ -52,9 +52,12 @@ class BaseScanner(ABC):
repo_name = repo_url.split('/')[-1].replace('.git', '') repo_name = repo_url.split('/')[-1].replace('.git', '')
commit_hash = commit_id or branch commit_hash = commit_id or branch
clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}") clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}")
# 如果目录已存在,先删除
# 如果目录已存在,先删除(带重试机制)
if os.path.exists(clone_dir): if os.path.exists(clone_dir):
shutil.rmtree(clone_dir) self.cleanup(clone_dir)
repo = None
try: try:
logger.info(f'克隆仓库: {repo_url}') logger.info(f'克隆仓库: {repo_url}')
# 克隆仓库(浅克隆,只获取最新提交) # 克隆仓库(浅克隆,只获取最新提交)
@@ -64,26 +67,52 @@ class BaseScanner(ABC):
depth=1, depth=1,
branch=branch branch=branch
) )
# 如果指定了 commit_id切换到该提交 # 如果指定了 commit_id切换到该提交
if commit_id: if commit_id:
repo.git.checkout(commit_id) repo.git.checkout(commit_id)
logger.info(f'仓库克隆成功: {clone_dir}') logger.info(f'仓库克隆成功: {clone_dir}')
return clone_dir return clone_dir
except Exception as e: except Exception as e:
logger.error(f'克隆仓库失败: {str(e)}') logger.error(f'克隆仓库失败: {str(e)}')
raise raise
finally:
# 显式关闭 Repo 对象以释放文件句柄(特别是 Windows
if repo is not None:
repo.close()
def cleanup(self, clone_dir: str): def cleanup(self, clone_dir: str):
""" """
清理临时目录 清理临时目录(带重试机制,处理 Windows 权限问题)
Args: Args:
clone_dir: 克隆的目录路径 clone_dir: 克隆的目录路径
""" """
try: import time
if os.path.exists(clone_dir): import stat
shutil.rmtree(clone_dir)
logger.info(f'清理临时目录: {clone_dir}') def handle_remove_readonly(func, path, exc_info):
except Exception as e: """处理只读文件的删除问题Windows"""
logger.warning(f'清理临时目录失败: {str(e)}') # 添加写权限并重试
os.chmod(path, stat.S_IWRITE)
func(path)
max_retries = 3
retry_delay = 1 # 秒
for attempt in range(max_retries):
try:
if os.path.exists(clone_dir):
# Windows 上使用 onerror 回调处理只读文件
shutil.rmtree(clone_dir, onerror=handle_remove_readonly)
logger.info(f'清理临时目录: {clone_dir}')
return # 成功清理,直接返回
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f'清理临时目录失败,{retry_delay}秒后重试: {str(e)}')
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
else:
logger.warning(f'清理临时目录失败(已重试{max_retries}次): {str(e)}')
def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]: def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]:
""" """
运行命令并返回结果 运行命令并返回结果
@@ -123,15 +152,28 @@ class BaseScanner(ABC):
'stdout': '', 'stdout': '',
'stderr': str(e) 'stderr': str(e)
} }
def get_changed_files(self, clone_dir: str, extensions: List[str]) -> List[str]: def get_changed_files(self, clone_dir: str, extensions: List[str], changed_files: Optional[List[str]] = None) -> List[str]:
""" """
获取指定扩展名的文件列表 获取指定扩展名的文件列表
Args: Args:
clone_dir: 仓库目录 clone_dir: 仓库目录
extensions: 文件扩展名列表 extensions: 文件扩展名列表
changed_files: 可选的变更文件列表(来自 PR如果提供则只返回这些文件
Returns: Returns:
文件路径列表 文件路径列表
""" """
# 如果提供了变更文件列表,只扫描这些文件
if changed_files:
files = []
for changed_file in changed_files:
# 检查文件扩展名是否匹配
if any(changed_file.endswith(ext) for ext in extensions):
full_path = os.path.join(clone_dir, changed_file)
if os.path.exists(full_path):
files.append(full_path)
return files
# 否则扫描整个仓库
files = [] files = []
for root, dirs, filenames in os.walk(clone_dir): for root, dirs, filenames in os.walk(clone_dir):
# 跳过隐藏目录和特殊目录 # 跳过隐藏目录和特殊目录

220
scanner/diff_parser.py Normal file
View File

@@ -0,0 +1,220 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Diff 解析器 - 将扫描问题与代码片段关联
"""
import re
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class CodeChunk:
"""代码块"""
file_path: str
old_content: str = ""
new_content: str = ""
old_start: int = 0
new_start: int = 0
hunks: List[Dict] = field(default_factory=list)
class DiffParser:
"""Diff 解析器"""
def __init__(self, diff_text: str):
self.diff_text = diff_text
self.files: Dict[str, CodeChunk] = {}
self._parse()
def _parse(self):
"""解析 diff 文本"""
if not self.diff_text:
return
current_chunk = None
lines = self.diff_text.split('\n')
for line in lines:
diff_match = re.match(r'diff --git a/(.+) b/(.+)', line)
if diff_match:
file_path = diff_match.group(1)
current_chunk = CodeChunk(file_path=file_path)
self.files[file_path] = current_chunk
continue
hunk_match = re.match(r'@@ -(\d+),?\d* \+(\d+),?\d* @@', line)
if hunk_match and current_chunk:
current_chunk.old_start = int(hunk_match.group(1))
current_chunk.new_start = int(hunk_match.group(2))
continue
if current_chunk and line:
if line.startswith('+') and not line.startswith('+++'):
current_chunk.new_content += line[1:] + '\n'
elif line.startswith('-') and not line.startswith('---'):
current_chunk.old_content += line[1:] + '\n'
elif line.startswith(' '):
current_chunk.old_content += line[1:] + '\n'
current_chunk.new_content += line[1:] + '\n'
def get_file_content(self, file_path: str) -> Optional[CodeChunk]:
return self.files.get(file_path)
def get_line_context(self, file_path: str, line_number: int, context_lines: int = 3) -> Optional[Dict[str, Any]]:
chunk = self.files.get(file_path)
if not chunk:
return None
new_lines = chunk.new_content.split('\n')
if line_number > len(new_lines):
return None
start = max(0, line_number - context_lines - 1)
end = min(len(new_lines), line_number + context_lines)
context = []
for i in range(start, end):
code = new_lines[i].rstrip('\n')
is_current_line = (i == line_number - 1)
context.append({
'line_number': chunk.new_start + i,
'code': code,
'is_issue_line': is_current_line
})
return {
'file': file_path,
'line': line_number,
'context': context
}
def merge_issues_with_code(scan_results: Dict[str, Any], diff: str) -> Dict[str, Any]:
"""将扫描问题与代码片段关联"""
parser = DiffParser(diff) if diff else None
enriched_results = {
'scanners': [],
'summary': scan_results.get('summary', {}),
'total_issues': scan_results.get('total_issues', 0)
}
for scanner_name, scanner_data in scan_results.items():
if scanner_name in ['summary', 'total_issues', 'ai']:
continue
if isinstance(scanner_data, dict):
enriched_scanner = {
'name': scanner_name,
'issues': [],
'file_count': scanner_data.get('file_count', 0),
'total_issues': scanner_data.get('total_issues', 0)
}
issues = scanner_data.get('issues', [])
for issue in issues:
enriched_issue = enrich_issue_with_code(issue, parser) if parser else issue
enriched_scanner['issues'].append(enriched_issue)
enriched_results['scanners'].append(enriched_scanner)
# 处理 AI 审查结果,转换为问题格式
if 'ai' in scan_results:
ai_issues = convert_ai_reviews_to_issues(scan_results['ai'], parser)
enriched_results['ai'] = {
'name': 'ai',
'issues': ai_issues,
'summary': scan_results['ai'].get('summary', ''),
'files_reviewed': scan_results['ai'].get('files_reviewed', 0)
}
return enriched_results
def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]:
"""将 AI 审查结果issues 格式)转换为统一问题格式"""
issues = []
ai_issues = ai_result.get('issues', [])
for issue in ai_issues:
file_path = issue.get('file', '')
if not file_path:
continue
code_context = None
if parser:
matched_path = None
for path in parser.files.keys():
if file_path.endswith(path) or path.endswith(file_path) or file_path in path:
matched_path = path
break
if matched_path:
chunk = parser.get_file_content(matched_path)
if chunk and chunk.new_content:
lines = chunk.new_content.split('\n')[:10]
code_context = {
'file': matched_path,
'line': issue.get('line', 1),
'preview': '\n'.join(lines),
'has_more': len(chunk.new_content.split('\n')) > 10
}
sev = issue.get('severity', 'warning')
sev = sev.lower() if isinstance(sev, str) else 'warning'
issues.append({
'file': file_path,
'line': issue.get('line', 1),
'severity': sev,
'message': issue.get('message', ''),
'category': 'ai',
'code_context': code_context,
'defect_reason': issue.get('defect_reason', '')
})
return issues
def enrich_issue_with_code(issue: Dict[str, Any], parser: DiffParser) -> Dict[str, Any]:
"""为单个问题添加代码片段"""
enriched = issue.copy()
file_path = issue.get('file', '')
line_number = issue.get('line', 0)
if not file_path:
return enriched
if not line_number:
desc = issue.get('description', '') or issue.get('message', '')
line_match = re.search(r'line[:#]?\s*(\d+)', desc, re.IGNORECASE)
if line_match:
line_number = int(line_match.group(1))
matched_path = None
for path in parser.files.keys():
if file_path.endswith(path) or path.endswith(file_path) or file_path in path:
matched_path = path
break
if matched_path:
enriched['file'] = matched_path
if matched_path and line_number:
context = parser.get_line_context(matched_path, line_number)
if context:
enriched['code_context'] = context
if 'code_context' not in enriched and matched_path:
chunk = parser.get_file_content(matched_path)
if chunk and chunk.new_content:
lines = chunk.new_content.split('\n')[:10]
enriched['code_context'] = {
'file': matched_path,
'line': line_number or 1,
'preview': '\n'.join(lines),
'has_more': len(chunk.new_content.split('\n')) > 10
}
return enriched

View File

@@ -20,7 +20,7 @@ class JavaScriptScanner(BaseScanner):
super().__init__(config) super().__init__(config)
self.extensions = ['.js', '.jsx', '.ts', '.tsx', '.vue', '.svelte'] self.extensions = ['.js', '.jsx', '.ts', '.tsx', '.vue', '.svelte']
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]:
""" """
执行 JavaScript/TypeScript 代码扫描 执行 JavaScript/TypeScript 代码扫描
@@ -28,6 +28,7 @@ class JavaScriptScanner(BaseScanner):
repo_url: 仓库 URL repo_url: 仓库 URL
commit_id: 提交 ID commit_id: 提交 ID
branch: 分支名 branch: 分支名
changed_files: 可选的变更文件列表(来自 PR
Returns: Returns:
扫描结果 扫描结果
@@ -51,8 +52,8 @@ class JavaScriptScanner(BaseScanner):
# 克隆仓库 # 克隆仓库
clone_dir = self.clone_repo(repo_url, commit_id, branch) clone_dir = self.clone_repo(repo_url, commit_id, branch)
# 获取 JavaScript/TypeScript 文件 # 获取 JavaScript/TypeScript 文件(只扫描变更的文件)
js_files = self.get_changed_files(clone_dir, self.extensions) js_files = self.get_changed_files(clone_dir, self.extensions, changed_files)
result['files_scanned'] = len(js_files) result['files_scanned'] = len(js_files)
if not js_files: if not js_files:
@@ -72,14 +73,10 @@ class JavaScriptScanner(BaseScanner):
result['status'] = 'error' result['status'] = 'error'
result['error'] = str(e) result['error'] = str(e)
finally:
# 清理临时目录
if clone_dir:
self.cleanup(clone_dir)
return result return result
def _run_eslint(self, cwd: str, files: List[str]) -> Dict[str, Any]: def _run_eslint(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""运行 ESLint 扫描""" """运行 ESLint 扫描"""
result = { result = {
'tool': 'eslint', 'tool': 'eslint',
@@ -92,7 +89,7 @@ class JavaScriptScanner(BaseScanner):
cmd = ['npx', 'eslint', '--format=json', '--no-eslintrc'] + files cmd = ['npx', 'eslint', '--format=json', '--no-eslintrc'] + files
# 如果没有 eslint 配置,先创建默认配置 # 如果没有 eslint 配置,先创建默认配置
eslintrc_path = os.path.join(cwd, '.eslintrc.json') eslintrc_path = os.path.join(clone_dir, '.eslintrc.json')
if not os.path.exists(eslintrc_path): if not os.path.exists(eslintrc_path):
# 创建简单的 ESLint 配置 # 创建简单的 ESLint 配置
eslint_config = { eslint_config = {
@@ -110,7 +107,7 @@ class JavaScriptScanner(BaseScanner):
with open(eslintrc_path, 'w') as f: with open(eslintrc_path, 'w') as f:
json.dump(eslint_config, f) json.dump(eslint_config, f)
output = self.run_command(cmd, cwd, timeout=120) output = self.run_command(cmd, clone_dir, timeout=120)
result['raw_output'] = output.get('stdout', '') + output.get('stderr', '') result['raw_output'] = output.get('stdout', '') + output.get('stderr', '')
# 解析 JSON 输出 # 解析 JSON 输出
@@ -119,6 +116,8 @@ class JavaScriptScanner(BaseScanner):
eslint_results = json.loads(output['stdout']) eslint_results = json.loads(output['stdout'])
for file_result in eslint_results: for file_result in eslint_results:
file_path = file_result.get('filePath', '') file_path = file_result.get('filePath', '')
# 使用相对于 clone_dir 的路径
rel_path = os.path.relpath(file_path, clone_dir) if file_path else ''
messages = file_result.get('messages', []) messages = file_result.get('messages', [])
for msg in messages: for msg in messages:
@@ -128,7 +127,7 @@ class JavaScriptScanner(BaseScanner):
'type': severity, 'type': severity,
'severity': 'Error' if msg.get('severity', 0) == 2 else 'Warning', 'severity': 'Error' if msg.get('severity', 0) == 2 else 'Warning',
'message': msg.get('message', ''), 'message': msg.get('message', ''),
'file': os.path.basename(file_path), 'file': rel_path,
'line': msg.get('line', 0), 'line': msg.get('line', 0),
'column': msg.get('column', 0), 'column': msg.get('column', 0),
'symbol': msg.get('ruleId', 'unknown') 'symbol': msg.get('ruleId', 'unknown')

View File

@@ -20,7 +20,7 @@ class PythonScanner(BaseScanner):
super().__init__(config) super().__init__(config)
self.extensions = ['.py'] self.extensions = ['.py']
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]:
""" """
执行 Python 代码扫描 执行 Python 代码扫描
@@ -28,6 +28,7 @@ class PythonScanner(BaseScanner):
repo_url: 仓库 URL repo_url: 仓库 URL
commit_id: 提交 ID commit_id: 提交 ID
branch: 分支名 branch: 分支名
changed_files: 可选的变更文件列表(来自 PR
Returns: Returns:
扫描结果 扫描结果
@@ -51,8 +52,8 @@ class PythonScanner(BaseScanner):
# 克隆仓库 # 克隆仓库
clone_dir = self.clone_repo(repo_url, commit_id, branch) clone_dir = self.clone_repo(repo_url, commit_id, branch)
# 获取 Python 文件 # 获取 Python 文件(只扫描变更的文件)
py_files = self.get_changed_files(clone_dir, self.extensions) py_files = self.get_changed_files(clone_dir, self.extensions, changed_files)
result['files_scanned'] = len(py_files) result['files_scanned'] = len(py_files)
if not py_files: if not py_files:
@@ -80,14 +81,11 @@ class PythonScanner(BaseScanner):
result['status'] = 'error' result['status'] = 'error'
result['error'] = str(e) result['error'] = str(e)
finally:
# 清理临时目录
if clone_dir:
self.cleanup(clone_dir)
return result return result
def _run_pylint(self, cwd: str, files: List[str]) -> Dict[str, Any]: def _run_pylint(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""运行 Pylint 扫描""" """运行 Pylint 扫描"""
result = { result = {
'tool': 'pylint', 'tool': 'pylint',
@@ -98,7 +96,7 @@ class PythonScanner(BaseScanner):
# 只扫描变更的文件 # 只扫描变更的文件
try: try:
cmd = ['python', '-m', 'pylint', '--output-format=json'] + files cmd = ['python', '-m', 'pylint', '--output-format=json'] + files
output = self.run_command(cmd, cwd, timeout=120) output = self.run_command(cmd, clone_dir, timeout=120)
result['raw_output'] = output.get('stdout', '') result['raw_output'] = output.get('stdout', '')
@@ -107,12 +105,15 @@ class PythonScanner(BaseScanner):
try: try:
issues = json.loads(output['stdout']) issues = json.loads(output['stdout'])
for issue in issues: for issue in issues:
# 使用相对于 clone_dir 的路径
full_path = issue.get('path', '')
rel_path = os.path.relpath(full_path, clone_dir) if full_path else ''
result['issues'].append({ result['issues'].append({
'tool': 'pylint', 'tool': 'pylint',
'type': issue.get('type', 'info'), 'type': issue.get('type', 'info'),
'severity': issue.get('severity', 'Info'), 'severity': issue.get('severity', 'Info'),
'message': issue.get('message', ''), 'message': issue.get('message', ''),
'file': os.path.basename(issue.get('path', '')), 'file': rel_path,
'line': issue.get('line', 0), 'line': issue.get('line', 0),
'column': issue.get('column', 0), 'column': issue.get('column', 0),
'symbol': issue.get('symbol', '') 'symbol': issue.get('symbol', '')
@@ -125,7 +126,7 @@ class PythonScanner(BaseScanner):
return result return result
def _run_flake8(self, cwd: str, files: List[str]) -> Dict[str, Any]: def _run_flake8(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""运行 Flake8 扫描""" """运行 Flake8 扫描"""
result = { result = {
'tool': 'flake8', 'tool': 'flake8',
@@ -135,7 +136,7 @@ class PythonScanner(BaseScanner):
try: try:
cmd = ['python', '-m', 'flake8', '--format=json'] + files cmd = ['python', '-m', 'flake8', '--format=json'] + files
output = self.run_command(cmd, cwd, timeout=120) output = self.run_command(cmd, clone_dir, timeout=120)
result['raw_output'] = output.get('stdout', '') result['raw_output'] = output.get('stdout', '')
@@ -144,12 +145,15 @@ class PythonScanner(BaseScanner):
try: try:
issues = json.loads(output['stdout']) issues = json.loads(output['stdout'])
for issue in issues: for issue in issues:
# 使用相对于 clone_dir 的路径
full_path = issue.get('filename', '')
rel_path = os.path.relpath(full_path, clone_dir) if full_path else ''
result['issues'].append({ result['issues'].append({
'tool': 'flake8', 'tool': 'flake8',
'type': self._map_flake8_code(issue.get('code', '')), 'type': self._map_flake8_code(issue.get('code', '')),
'severity': 'Warning', 'severity': 'Warning',
'message': issue.get('text', ''), 'message': issue.get('text', ''),
'file': os.path.basename(issue.get('filename', '')), 'file': rel_path,
'line': issue.get('line_number', 0), 'line': issue.get('line_number', 0),
'column': issue.get('column_number', 0), 'column': issue.get('column_number', 0),
'symbol': issue.get('code', '') 'symbol': issue.get('code', '')

View File

@@ -21,7 +21,7 @@ class SecurityScanner(BaseScanner):
# 扫描所有代码文件以发现安全问题 # 扫描所有代码文件以发现安全问题
self.extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', '.rb', '.php'] self.extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', '.rb', '.php']
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]:
""" """
执行安全扫描 执行安全扫描
@@ -29,6 +29,7 @@ class SecurityScanner(BaseScanner):
repo_url: 仓库 URL repo_url: 仓库 URL
commit_id: 提交 ID commit_id: 提交 ID
branch: 分支名 branch: 分支名
changed_files: 可选的变更文件列表(来自 PR
Returns: Returns:
扫描结果 扫描结果
@@ -53,8 +54,8 @@ class SecurityScanner(BaseScanner):
# 克隆仓库 # 克隆仓库
clone_dir = self.clone_repo(repo_url, commit_id, branch) clone_dir = self.clone_repo(repo_url, commit_id, branch)
# 获取所有支持的文件 # 获取所有支持的文件(只扫描变更的文件)
all_files = self.get_changed_files(clone_dir, self.extensions) all_files = self.get_changed_files(clone_dir, self.extensions, changed_files)
result['files_scanned'] = len(all_files) result['files_scanned'] = len(all_files)
if not all_files: if not all_files:
@@ -86,14 +87,9 @@ class SecurityScanner(BaseScanner):
result['status'] = 'error' result['status'] = 'error'
result['error'] = str(e) result['error'] = str(e)
finally:
# 清理临时目录
if clone_dir:
self.cleanup(clone_dir)
return result return result
def _run_bandit(self, cwd: str, files: List[str]) -> Dict[str, Any]: def _run_bandit(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""运行 Bandit 安全扫描""" """运行 Bandit 安全扫描"""
result = { result = {
'tool': 'bandit', 'tool': 'bandit',
@@ -103,7 +99,7 @@ class SecurityScanner(BaseScanner):
try: try:
# 运行 bandit # 运行 bandit
cmd = ['python', '-m', 'bandit', '-f', 'json'] + files cmd = ['python', '-m', 'bandit', '-f', 'json'] + files
output = self.run_command(cmd, cwd, timeout=120) output = self.run_command(cmd, clone_dir, timeout=120)
# 解析 JSON 输出 # 解析 JSON 输出
if output.get('stdout'): if output.get('stdout'):
@@ -112,6 +108,9 @@ class SecurityScanner(BaseScanner):
results = data.get('results', []) results = data.get('results', [])
for issue in results: for issue in results:
# 使用相对于 clone_dir 的路径
full_path = issue.get('filename', '')
rel_path = os.path.relpath(full_path, clone_dir) if full_path else ''
# 映射严重级别 # 映射严重级别
severity = issue.get('issue_severity', 'LOW') severity = issue.get('issue_severity', 'LOW')
result['issues'].append({ result['issues'].append({
@@ -120,7 +119,7 @@ class SecurityScanner(BaseScanner):
'severity': severity, 'severity': severity,
'confidence': issue.get('issue_confidence', 'LOW'), 'confidence': issue.get('issue_confidence', 'LOW'),
'message': issue.get('issue_text', ''), 'message': issue.get('issue_text', ''),
'file': os.path.basename(issue.get('filename', '')), 'file': rel_path,
'line': issue.get('line_number', 0), 'line': issue.get('line_number', 0),
'code': issue.get('code', '') 'code': issue.get('code', '')
}) })
@@ -132,7 +131,7 @@ class SecurityScanner(BaseScanner):
return result return result
def _scan_js_security(self, cwd: str, files: List[str]) -> Dict[str, Any]: def _scan_js_security(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""简单的 JavaScript 安全扫描(基于模式匹配)""" """简单的 JavaScript 安全扫描(基于模式匹配)"""
result = { result = {
'tool': 'js-security', 'tool': 'js-security',
@@ -177,6 +176,8 @@ class SecurityScanner(BaseScanner):
for file_path in files: for file_path in files:
try: try:
# 使用相对于 clone_dir 的路径
rel_path = os.path.relpath(file_path, clone_dir) if file_path else ''
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read() content = f.read()
lines = content.split('\n') lines = content.split('\n')
@@ -190,7 +191,7 @@ class SecurityScanner(BaseScanner):
'severity': pattern_info['severity'], 'severity': pattern_info['severity'],
'confidence': 'MEDIUM', 'confidence': 'MEDIUM',
'message': pattern_info['message'], 'message': pattern_info['message'],
'file': os.path.basename(file_path), 'file': rel_path,
'line': line_num, 'line': line_num,
'code': line.strip()[:80] 'code': line.strip()[:80]
}) })

218
test.py
View File

@@ -1,218 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import logging
os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0')
from flask import Flask, request, jsonify
import yaml
from webhook.handler import GiteaWebhookHandler
from scanner.python_scanner import PythonScanner
from scanner.js_scanner import JavaScriptScanner
from scanner.security_scanner import SecurityScanner
from report.generator import ReportGenerator
from notify.feishu import FeishuNotifier
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 加载配置
def load_config():
"""加载配置文件"""
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
# 全局配置
config = load_config()
# 初始化应用
app = Flask(__name__)
app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key')
# 初始化组件
webhook_handler = GiteaWebhookHandler(config['gitea'])
python_scanner = PythonScanner(config.get('scanner', {}))
js_scanner = JavaScriptScanner(config.get('scanner', {}))
security_scanner = SecurityScanner(config.get('scanner', {}))
report_generator = ReportGenerator(config.get('report', {}))
feishu_notifier = FeishuNotifier(config['feishu'])
@app.route('/')
def index():
"""健康检查接口"""
return jsonify({
'status': 'ok',
'service': 'AI Code Quality Scanner',
'version': '1.0.0'
})
@app.route('/webhook/gitea', methods=['POST'])
def handle_gitea_webhook():
"""处理 Gitea Webhook 请求"""
try:
# 验证签名
signature = request.headers.get('X-Gitea-Signature')
if signature:
if not webhook_handler.verify_signature(
request.data,
signature,
config['gitea']['webhook_secret']
):
logger.warning('Webhook 签名验证失败')
return jsonify({'error': 'Invalid signature'}), 401
# 解析 Webhook payload
payload = request.json
if not payload:
return jsonify({'error': 'No payload'}), 400
event_type = request.headers.get('X-Gitea-Event', 'push')
logger.info(f'收到 Gitea Webhook 事件: {event_type}')
# 只处理 push 事件
if event_type != 'push':
return jsonify({'message': 'Event ignored'}), 200
# 提取提交信息
commits = payload.get('commits', [])
if not commits:
return jsonify({'message': 'No commits'}), 200
repo = payload.get('repository', {})
repo_name = repo.get('full_name', 'unknown')
branch = payload.get('ref', '').replace('refs/heads/', '')
pusher = payload.get('pusher', {}).get('name', 'unknown')
logger.info(f'处理仓库 {repo_name}{len(commits)} 个提交')
# 处理每个提交
for commit in commits:
commit_id = commit.get('id', '')[:8]
commit_message = commit.get('message', '')
author = commit.get('author', {}).get('name', 'unknown')
logger.info(f'扫描提交 {commit_id}: {commit_message}')
try:
# 获取仓库 URL
clone_url = repo.get('clone_url')
if not clone_url:
# 尝试从 web_url 构建
web_url = repo.get('web_url', '')
if web_url:
clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://')
clone_url = clone_url.rstrip('/') + '.git'
# 执行代码扫描
scan_results = {}
# Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []):
scan_results['python'] = python_scanner.scan(
clone_url, commit_id, branch
)
# JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
scan_results['javascript'] = js_scanner.scan(
clone_url, commit_id, branch
)
# 安全扫描
scan_results['security'] = security_scanner.scan(
clone_url, commit_id, branch
)
# 生成报告
report = report_generator.generate(
repo_name=repo_name,
branch=branch,
commit_id=commit_id,
commit_message=commit_message,
author=author,
scan_results=scan_results
)
# 发送飞书通知
feishu_notifier.send_report(report)
logger.info(f'提交 {commit_id} 扫描完成')
except Exception as e:
logger.error(f'扫描提交 {commit_id} 失败: {str(e)}')
# 继续处理其他提交
continue
return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200
except Exception as e:
logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/scan/manual', methods=['POST'])
def manual_scan():
"""手动触发扫描接口"""
try:
data = request.json
repo_url = data.get('repo_url')
branch = data.get('branch', 'main')
commit_id = data.get('commit_id')
if not repo_url:
return jsonify({'error': 'repo_url is required'}), 400
# 执行扫描
scan_results = {}
if 'python' in config.get('scanner', {}).get('languages', []):
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
# 生成报告
report = report_generator.generate(
repo_name=repo_url.split('/')[-1].replace('.git', ''),
branch=branch,
commit_id=commit_id or 'manual',
commit_message='Manual scan',
author='manual',
scan_results=scan_results
)
# 发送飞书通知
feishu_notifier.send_report(report)
return jsonify({
'status': 'ok',
'report': report
}), 200
except Exception as e:
logger.error(f'手动扫描失败: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# 强制监听所有网络接口
host = "0.0.0.0"
port = config.get('server', {}).get('port', 5000)
debug = config.get('server', {}).get('debug', True)
logger.info(f'启动服务: {host}:{port}')
app.run(host=host, port=port, debug=debug)

69
test_demo/demo_flaws.py Normal file
View File

@@ -0,0 +1,69 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试文件:包含常见代码缺陷,用于验证扫描器
"""
import os
import sys
import json
import pickle
import subprocess
from ast import parse
from typing import List, Dict
# 缺陷1: 未使用的导入
import unused_module # 未使用
import collections as col # 使用了 col 但 flake8 可能检测
# 缺陷2: 未使用的变量
def unused_variable_demo():
"""演示未使用的变量"""
result = calculate() # result 未被使用
print("Function executed")
# 缺陷8: 行太长(风格问题)
def long_line():
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""
# 缺陷9: 缺少空格
def missing_spaces():
"""缺少必要空格"""
x=1+2
y=3*99
if x==1:
print(x)
# 缺陷1: 未使用的导入
import unused_module # 未使用
import collections as col # 使用了 col 但 flake8 可能检测
# 缺陷2: 未使用的变量
def unused_variable_demo():
"""演示未使用的变量"""
result = calculate() # result 未被使用
print("Function executed")
def calculate():
"""计算并返回结果"""
return 42
# 缺陷3: 未定义的变量
def undefined_variable_demo():
"""演示未定义的变量"""
print(undefined_var) # undefined_var 未定义
# 缺陷4: 变量在定义前使用
def use_before_define():
"""在定义前使用变量"""
print(before_var) # before_var 在下面才定义
before_var = 100

1558
web/index.html Normal file

File diff suppressed because it is too large Load Diff

230
快速开始指南.md Normal file
View File

@@ -0,0 +1,230 @@
# 快速开始指南
本文档将帮助你快速部署 AI Code Quality Scanner 并配置 Gitea Webhook 和飞书通知。
## 环境要求
- Python 3.8+
- Git
- Node.js 和 npm用于 JavaScript/TypeScript 扫描,可选)
- Docker 和 Docker Compose可选用于容器化部署
## 步骤 1配置修改
### 修改 `config.yaml`
首先编辑 `config.yaml` 文件,配置以下内容:
```yaml
server:
host: "0.0.0.0"
port: 5000 # Webhook 服务端口
gitea:
base_url: "http://服务器IP:3000" # 你的 Gitea 地址
webhook_secret: "your_secret_key" # Webhook 签名密钥
feishu:
webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" # 飞书 Webhook 地址
secret: "" # 飞书签名密钥(可选)
```
### 获取飞书 Webhook 地址
1. 打开飞书群聊
2. 点击右上角「...」→「设置」→「群机器人」
3. 点击「添加机器人」→「自定义机器人」
4. 设置机器人名称,点击「添加」
5. 复制 Webhook 地址
6. (可选)开启「签名校验」,复制 secret
### 获取 Gitea Webhook 密钥
1. 在 Gitea 仓库页面点击「仓库设置」→「Webhooks」
2. 点击「添加 Webhook」→「Gitea」
3. 填写以下信息:
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 密钥: 自定义一个密钥(如 `my_secret_key`),需要与 config.yaml 中的 `webhook_secret` 一致
4. 点击「添加 Webhook」
## 步骤 2安装依赖
### 方式 A本地安装Windows/Mac/Linux
```bash
# Windows
install.bat
# Mac/Linux
chmod +x install.sh
./install.sh
```
### 方式 BDocker 部署
```bash
# 构建并运行
docker-compose up -d
# 查看日志
docker-compose logs -f
```
## 步骤 3启动服务
```bash
# 激活虚拟环境(如果使用虚拟环境)
# Windows
call venv\Scripts\activate.bat
# Mac/Linux
source venv/bin/activate
# 启动服务
python app.py
```
服务启动后,访问 `http://localhost:5000` 可以看到健康检查响应。
## 步骤 4测试
### 测试 Webhook
在 Gitea 仓库中进行一次代码提交,应该能看到:
1. 服务端日志显示收到 Webhook 请求
2. 代码被克隆到临时目录
3. 扫描工具运行
4. 飞书群聊收到通知
### 测试手动扫描
```bash
curl -X POST http://localhost:5000/scan/manual \
-H "Content-Type: application/json" \
-d '{"repo_url": "https://github.com/username/repo.git", "branch": "main"}'
```
## 配置说明
### 扫描工具说明
| 工具 | 语言 | 功能 |
|------|------|------|
| Pylint | Python | 代码风格和错误检查 |
| Flake8 | Python | Python 代码检查 |
| Bandit | Python | 安全漏洞扫描 |
| ESLint | JavaScript/TypeScript | JS/TS 代码检查 |
### 配置文件选项
```yaml
server:
host: "0.0.0.0" # 监听地址
port: 5000 # 监听端口
debug: true # 调试模式
gitea:
base_url: "http://localhost:3000" # Gitea 地址
webhook_secret: "secret" # Webhook 签名密钥
feishu:
webhook_url: "https://..." # 飞书 Webhook
secret: "" # 飞书签名密钥
scanner:
languages:
- python
- javascript
- typescript
max_issues: 10 # 最大问题数量
detailed: true # 详细扫描模式
temp_clone_dir: "/tmp/code_scanner_clones" # 临时目录
report:
output_dir: "./reports" # 报告保存目录
keep_files: true # 是否保留报告文件
```
## 常见问题
### Q: 扫描时间很长怎么办?
A: 系统会浅克隆仓库(只获取最新提交),首次扫描后会有缓存。如果仍需优化,可以:
- 减少扫描的文件类型
- 调整 `max_issues` 参数
### Q: 飞书消息发送失败?
A: 检查:
1. Webhook 地址是否正确
2. 是否开启了签名校验(如果开启了,需要配置 secret
3. 网络是否可达
### Q: 扫描不到代码?
A: 检查:
1. 仓库 URL 是否可公开访问
2. 私有仓库需要配置 Git 凭证
3. 确认分支名称正确
### Q: 如何访问 Gitea 私有仓库?
A: 在环境变量中配置 Git 凭证:
```bash
export GIT_USERNAME=your_username
export GIT_PASSWORD=your_password
```
或者在 Git 克隆 URL 中包含凭证:
```
http://username:password@gitea-server.com/user/repo.git
```
## 系统架构图
```
用户提交代码
Gitea Webhook ──────────────────────┐
│ │
▼ │
Webhook 服务 │
(Flask :5000) │
│ │
├──────────┬──────────┬─────────┘
▼ ▼ ▼
Python JS/TS Security
Scanner Scanner Scanner
│ │ │
└──────────┴──────────┘
Report Generator
(Markdown 报告)
Feishu Bot
(发送通知)
```
## 目录结构
```
code-scanner/
├── app.py # 主应用
├── config.yaml # 配置文件
├── requirements.txt # 依赖
├── Dockerfile # Docker 镜像
├── docker-compose.yml # Docker Compose
├── install.bat # Windows 安装脚本
├── install.sh # Linux 安装脚本
├── README.md # 项目说明
├── 快速开始指南.md # 本文档
├── webhook/ # Webhook 处理
├── scanner/ # 代码扫描器
├── report/ # 报告生成
├── notify/ # 飞书通知
└── reports/ # 报告输出
```