dev #9

Merged
dangzerong merged 2 commits from dev into main 2026-03-13 15:32:42 +08:00
7 changed files with 1558 additions and 201 deletions

174
app.py
View File

@@ -3,6 +3,7 @@
import os import os
import logging import logging
import traceback
from typing import Dict, Tuple, Any from typing import Dict, Tuple, Any
import json import json
@@ -298,6 +299,7 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
logger.info(f'PR #{pr_number} 扫描完成') logger.info(f'PR #{pr_number} 扫描完成')
except Exception as e: except Exception as e:
traceback.print_exc()
logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}') logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}')
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@@ -667,6 +669,178 @@ def api_get_pr_file_content(pr_id):
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/quality')
def api_get_quality_score(pr_id):
"""获取 PR 的代码质量评分"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 从 scan_result 中获取质量评分
scan_result = pr.get('scan_result')
if isinstance(scan_result, str):
try:
scan_result = json.loads(scan_result)
except:
scan_result = None
quality_score = None
if scan_result and scan_result.get('ai'):
quality_score = scan_result['ai'].get('quality_score')
if not quality_score:
return jsonify({'error': '暂无质量评分'}), 404
return jsonify(quality_score)
except Exception as e:
logger.error(f'获取质量评分失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/stats')
def api_get_issue_stats(pr_id):
"""获取 PR 的问题统计"""
try:
pr = PRScanDB.get_pr_by_id(pr_id)
if not pr:
return jsonify({'error': 'PR not found'}), 404
# 获取 scan_details_with_code
scan_details = pr.get('scan_details_with_code')
if isinstance(scan_details, str):
try:
scan_details = json.loads(scan_details)
except:
scan_details = None
if not scan_details:
return jsonify({'error': '暂无扫描详情'}), 404
# 统计各扫描器的问题
stats = {
'by_severity': {'error': 0, 'warning': 0, 'info': 0},
'by_scanner': {},
'total': 0
}
# 统计静态扫描器
for scanner in scan_details.get('scanners', []):
scanner_name = scanner.get('name', 'unknown')
scanner_issues = scanner.get('issues', [])
stats['by_scanner'][scanner_name] = len(scanner_issues)
for issue in scanner_issues:
sev = (issue.get('severity') or 'info').lower()
if sev in stats['by_severity']:
stats['by_severity'][sev] += 1
stats['total'] += 1
# 统计 AI 扫描器
ai_data = scan_details.get('ai', {})
if ai_data:
ai_issues = ai_data.get('issues', [])
stats['by_scanner']['AI'] = len(ai_issues)
for issue in ai_issues:
sev = (issue.get('severity') or 'info').lower()
if sev in stats['by_severity']:
stats['by_severity'][sev] += 1
stats['total'] += 1
return jsonify(stats)
except Exception as e:
logger.error(f'获取问题统计失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/fix', methods=['POST'])
def api_generate_fix(pr_id):
"""生成问题修复建议"""
try:
data = request.get_json()
if not data:
return jsonify({'error': '请求体为空'}), 400
file_path = data.get('file')
line = data.get('line', 1)
message = data.get('message', '')
code = data.get('code', '')
if not file_path or not message:
return jsonify({'error': '缺少必要参数'}), 400
# 调用 AI 生成修复建议
fix_result = ai_reviewer.generate_fix_suggestion(file_path, line, message, code)
if fix_result:
return jsonify(fix_result)
else:
return jsonify({'error': '生成修复建议失败'}), 500
except Exception as e:
logger.error(f'生成修复建议失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/history')
def api_get_pr_history():
"""获取 PR 扫描历史趋势"""
try:
limit = request.args.get('limit', 20, type=int)
repo_name = request.args.get('repo_name', '')
# 获取 PR 列表
prs = PRScanDB.get_all_prs(status='completed')
if repo_name:
prs = [p for p in prs if p.get('repo_name') == repo_name]
# 只取最近的 N 个
prs = prs[:limit]
# 构建趋势数据
history = []
for pr in reversed(prs): # 从旧到新
issues_count = pr.get('issues_count', 0)
# 从 scan_result 中各扫描器汇总 error/warning 数量
scan_result = pr.get('scan_result')
if isinstance(scan_result, str):
try:
scan_result = json.loads(scan_result)
except:
scan_result = None
error_count = 0
warning_count = 0
if scan_result and isinstance(scan_result, dict):
# 遍历各扫描器,汇总 error 和 warning
for scanner_name, scanner_result in scan_result.items():
if isinstance(scanner_result, dict):
summary = scanner_result.get('summary', {})
if isinstance(summary, dict):
error_count += summary.get('error', 0)
warning_count += summary.get('warning', 0)
history.append({
'pr_id': pr.get('id'),
'pr_number': pr.get('pr_number'),
'repo_name': pr.get('repo_name'),
'title': pr.get('pr_title', ''),
'author': pr.get('author', ''),
'created_at': pr.get('created_at', ''),
'issues_count': issues_count,
'error_count': error_count,
'warning_count': warning_count,
'total_issues': error_count + warning_count,
'state': pr.get('state', '')
})
return jsonify(history)
except Exception as e:
logger.error(f'获取历史趋势失败: {str(e)}')
return jsonify({'error': str(e)}), 500
@app.route('/api/prs/<int:pr_id>/merge', methods=['POST']) @app.route('/api/prs/<int:pr_id>/merge', methods=['POST'])
def api_merge_pr(pr_id): def api_merge_pr(pr_id):
"""合并 PR""" """合并 PR"""

32
db.py
View File

@@ -7,12 +7,17 @@
import sqlite3 import sqlite3
import json import json
import os import os
from datetime import datetime from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db') DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db')
def get_cst_now():
"""获取当前中国时区时间 (UTC+8)"""
return datetime.now(timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')
def get_db_connection(): def get_db_connection():
"""获取数据库连接""" """获取数据库连接"""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
@@ -110,6 +115,7 @@ class PRScanDB:
if existing: if existing:
# 更新现有记录 # 更新现有记录
cst_time = get_cst_now()
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
pr_title = ?, pr_title = ?,
@@ -123,7 +129,7 @@ class PRScanDB:
security_issues = ?, security_issues = ?,
ai_review = ?, ai_review = ?,
report_path = ?, report_path = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE repo_name = ? AND pr_number = ? WHERE repo_name = ? AND pr_number = ?
''', ( ''', (
pr_info.get('pr_title'), pr_info.get('pr_title'),
@@ -137,19 +143,22 @@ class PRScanDB:
security_issues, security_issues,
json.dumps(scan_results.get('ai', {}), ensure_ascii=False), json.dumps(scan_results.get('ai', {}), ensure_ascii=False),
report_path, report_path,
cst_time,
pr_info.get('repo_name'), pr_info.get('repo_name'),
pr_info.get('pr_number') pr_info.get('pr_number')
)) ))
scan_id = existing['id'] scan_id = existing['id']
else: else:
# 插入新记录 # 插入新记录
cst_time = get_cst_now()
cursor.execute(''' cursor.execute('''
INSERT INTO pr_scans ( INSERT INTO pr_scans (
pr_number, repo_name, pr_title, pr_url, pr_number, repo_name, pr_title, pr_url,
source_branch, target_branch, author, source_branch, target_branch, author,
state, scan_status, scan_result, scan_details_with_code, state, scan_status, scan_result, scan_details_with_code,
issues_count, security_issues, ai_review, report_path issues_count, security_issues, ai_review, report_path,
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', ( ''', (
pr_info.get('pr_number'), pr_info.get('pr_number'),
pr_info.get('repo_name'), pr_info.get('repo_name'),
@@ -165,7 +174,9 @@ class PRScanDB:
issues_count, issues_count,
security_issues, security_issues,
json.dumps(scan_results.get('ai', {}), ensure_ascii=False), json.dumps(scan_results.get('ai', {}), ensure_ascii=False),
report_path report_path,
cst_time,
cst_time
)) ))
scan_id = cursor.lastrowid scan_id = cursor.lastrowid
@@ -239,23 +250,24 @@ class PRScanDB:
"""更新 PR 状态""" """更新 PR 状态"""
conn = get_db_connection() conn = get_db_connection()
cursor = conn.cursor() cursor = conn.cursor()
cst_time = get_cst_now()
if state == 'merged': if state == 'merged':
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
state = ?, state = ?,
merged_at = CURRENT_TIMESTAMP, merged_at = ?,
merged_by = ?, merged_by = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE id = ? WHERE id = ?
''', (state, merged_by, scan_id)) ''', (state, cst_time, merged_by, cst_time, scan_id))
else: else:
cursor.execute(''' cursor.execute('''
UPDATE pr_scans SET UPDATE pr_scans SET
state = ?, state = ?,
updated_at = CURRENT_TIMESTAMP updated_at = ?
WHERE id = ? WHERE id = ?
''', (state, scan_id)) ''', (state, cst_time, scan_id))
conn.commit() conn.commit()
conn.close() conn.close()

View File

@@ -218,55 +218,66 @@ class ReportGenerator:
lines.append(f' - {message}') lines.append(f' - {message}')
lines.append('') lines.append('')
# AI 审查结果(单独展示 # AI 审查结果(适配新格式issues 列表
if 'ai' in scan_results: if 'ai' in scan_results:
ai_result = scan_results['ai'] ai_result = scan_results['ai']
lines.append('') lines.append('')
lines.append('## 🤖 AI 代码审查') lines.append('## 🤖 AI 代码审查')
lines.append('') lines.append('')
lines.append(ai_result.get('summary', '无 AI 审查结果'))
# 新格式:直接使用 summary
if 'summary' in ai_result:
# summary 可能是字符串或 dict
summary = ai_result.get('summary', '')
if isinstance(summary, dict):
lines.append(f"发现 {summary.get('total', 0)} 个问题,"
f"错误 {summary.get('error', 0)}"
f"警告 {summary.get('warning', 0)}"
f"提示 {summary.get('info', 0)}")
else:
lines.append(str(summary))
lines.append('') lines.append('')
reviews = ai_result.get('reviews', []) # 新格式issues 列表
if reviews: ai_issues = ai_result.get('issues', [])
for i, review in enumerate(reviews, 1): if ai_issues:
file_name = review.get('file', 'unknown') # 按文件分组
review_content = review.get('review', {}) issues_by_file = {}
for issue in ai_issues:
file_name = issue.get('file', 'unknown')
if file_name not in issues_by_file:
issues_by_file[file_name] = []
issues_by_file[file_name].append(issue)
for file_name, issues in issues_by_file.items():
lines.append(f'### 📄 {file_name}') lines.append(f'### 📄 {file_name}')
lines.append('') lines.append('')
# 优点 for i, issue in enumerate(issues[:10], 1):
advantages = review_content.get('优点', []) severity = issue.get('severity', 'Info')
if advantages: severity_emoji = {
lines.append('**✅ 代码优点:**') 'ERROR': '🔴',
for adv in advantages[:3]: 'WARNING': '🟡',
lines.append(f'- {adv}') 'INFO': ''
lines.append('') }.get(severity.upper(), '')
# 问题 line_num = issue.get('line', 0)
issues = review_content.get('问题', []) symbol = issue.get('symbol', '')
if issues: message = issue.get('message', 'No message')
lines.append('**⚠️ 需要改进:**') code_context = issue.get('code_context', '')
for issue in issues[:3]: defect_reason = issue.get('defect_reason', '')
lines.append(f'- {issue}')
lines.append('')
# 优化建议 lines.append(f'{i}. {severity_emoji} **{severity}** - 行 {line_num}')
optimizations = review_content.get('优化', []) if symbol:
if optimizations: lines.append(f' - 标识: `{symbol}`')
lines.append('**💡 优化建议:**') lines.append(f' - 问题: {message}')
for opt in optimizations[:3]: if code_context:
lines.append(f'- {opt}') lines.append(' - 代码:')
lines.append('') lines.append('```')
lines.append(code_context)
# 原始回复(如果不是 JSON 格式) lines.append('```')
raw = review_content.get('raw_review') if defect_reason:
if raw: lines.append(f' - 原因: {defect_reason}')
lines.append('**📝 AI 原始回复:**')
lines.append('```')
lines.append(raw[:500] + '...' if len(raw) > 500 else raw)
lines.append('```')
lines.append('') lines.append('')
# 添加报告链接或下一步操作 # 添加报告链接或下一步操作

View File

@@ -5,6 +5,7 @@ AI 代码审查器
使用大模型进行智能代码审查 使用大模型进行智能代码审查
""" """
import os import os
import re
import json import json
import logging import logging
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
@@ -73,15 +74,26 @@ class AIReviewer(BaseScanner):
changed_files: 可选的变更文件列表(来自 PR changed_files: 可选的变更文件列表(来自 PR
Returns: Returns:
审查结果 审查结果(与 python_scanner.py 兼容的格式)
""" """
result = {
'tool': 'AI Code Reviewer',
'language': language,
'status': 'success',
'issues': [],
'summary': {
'total': 0,
'error': 0,
'warning': 0,
'info': 0
},
'files_scanned': 0
}
if not self.enabled: if not self.enabled:
return { result['status'] = 'disabled'
'enabled': False, result['summary'] = 'AI 审查已禁用'
'tool': 'AI Code Reviewer', return result
'reviews': [],
'summary': 'AI 审查已禁用'
}
try: try:
# 如果没有传入 clone_dir需要克隆 # 如果没有传入 clone_dir需要克隆
@@ -89,52 +101,141 @@ class AIReviewer(BaseScanner):
clone_dir = self.clone_repo(repo_url, commit_id, branch) clone_dir = self.clone_repo(repo_url, commit_id, branch)
if not clone_dir or not os.path.exists(clone_dir): if not clone_dir or not os.path.exists(clone_dir):
return { result['status'] = 'error'
'enabled': True, result['error'] = '无法获取代码目录'
'tool': 'AI Code Reviewer', return result
'reviews': [],
'summary': '无法获取代码目录'
}
# 获取要审查的代码文件 # 获取要审查的代码文件
files = self._get_code_files(clone_dir, language, changed_files) files = self._get_code_files(clone_dir, language, changed_files)
if not files: if not files:
return { result['summary'] = '未找到可审查的代码文件'
'enabled': True, return result
'tool': 'AI Code Reviewer',
'reviews': [],
'summary': '未找到可审查的代码文件'
}
# 对每个文件进行 AI 审查 # 对每个文件进行 AI 审查
all_reviews = [] all_issues = []
for file_path in files[:5]: # 限制最多审查 5 个文件 for file_path in files[:5]: # 限制最多审查 5 个文件
review = self._review_file(file_path, language, clone_dir) review = self._review_file(file_path, language, clone_dir)
if review: if review and review.get('issues'):
all_reviews.append(review) all_issues.extend(review['issues'])
# 生成总结 result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues
summary = self._generate_summary(all_reviews) result['summary'] = self._calculate_summary(all_issues)
result['files_scanned'] = len(files[:5])
result['clone_dir'] = clone_dir
return { # 生成质量评分
'enabled': True, result['quality_score'] = self._calculate_quality_score(all_issues, files[:5])
'tool': 'AI Code Reviewer',
'reviews': all_reviews, return result
'summary': summary,
'files_reviewed': len(all_reviews),
'clone_dir': clone_dir # 返回 clone_dir 用于后续清理
}
except Exception as e: except Exception as e:
logger.error(f'AI 审查失败: {str(e)}') logger.error(f'AI 审查失败: {str(e)}')
return { result['status'] = 'error'
'enabled': True, result['error'] = str(e)
'tool': 'AI Code Reviewer', return result
'error': str(e),
'reviews': [], def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
'summary': f'AI 审查出错: {str(e)}' """计算问题摘要"""
summary = {
'total': len(issues),
'error': 0,
'warning': 0,
'info': 0
}
for issue in issues:
severity = issue.get('severity', '').lower()
if severity in ['error', 'critical', 'fatal']:
summary['error'] += 1
elif severity in ['warning', 'moderate']:
summary['warning'] += 1
else:
summary['info'] += 1
return summary
def _calculate_quality_score(self, issues: List[Dict], files: List[str]) -> Dict[str, Any]:
"""
计算代码质量评分
返回:总分(0-100)及各维度评分
"""
if not files:
return {'total': 100, 'maintainability': 100, 'security': 100, 'readability': 100, 'best_practices': 100}
# 统计问题
error_count = sum(1 for i in issues if i.get('severity', '').lower() in ['error', 'critical'])
warning_count = sum(1 for i in issues if i.get('severity', '').lower() == 'warning')
info_count = sum(1 for i in issues if i.get('severity', '').lower() == 'info')
# 分类统计
security_keywords = ['sql injection', 'xss', 'csrf', 'password', 'secret', 'token', '权限', '注入', '认证']
security_issues = sum(1 for i in issues if any(k in (i.get('message', '') + i.get('symbol', '')).lower() for k in security_keywords))
# 计算各维度分数
# 可维护性:基于错误和警告数量
issue_weight = error_count * 5 + warning_count * 2 + info_count * 0.5
maintainability = max(0, 100 - issue_weight)
# 安全性:基于安全问题
security_score = max(0, 100 - security_issues * 15)
# 可读性:基于 info 级别问题(风格类)
readability = max(0, 100 - info_count * 3)
# 最佳实践:基于 warning 级别
best_practices = max(0, 100 - warning_count * 5)
# 总分:加权平均
total = int((maintainability * 0.3 + security_score * 0.35 + readability * 0.15 + best_practices * 0.2))
return {
'total': total,
'maintainability': maintainability,
'security': security_score,
'readability': readability,
'best_practices': best_practices,
'details': {
'error_count': error_count,
'warning_count': warning_count,
'info_count': info_count,
'security_issues': security_issues
} }
}
def generate_fix_suggestion(self, file_path: str, line: int, message: str, code: str) -> Optional[str]:
"""
对指定问题生成修复建议代码
"""
prompt = f"""你是一位代码修复专家。请根据以下问题,生成修复后的代码。
问题描述:{message}
问题所在行号:{line}
原始代码:
```
{code}
```
请以 JSON 格式输出修复建议:
```json
{{
"fixed_code": "修复后的完整代码或关键片段",
"explanation": "修复说明50字以内",
"confidence": "high/medium/low 修复把握度"
}}
```
如果无法修复,请返回:{{"fixed_code": "", "explanation": "无法自动修复", "confidence": "low"}}"""
try:
response = self._call_ai(prompt)
if response and response.get('fixed_code'):
return response
except Exception as e:
logger.warning(f'生成修复建议失败: {e}')
return None
def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]: def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]:
"""获取代码文件列表""" """获取代码文件列表"""
@@ -174,6 +275,8 @@ class AIReviewer(BaseScanner):
def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]: def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]:
"""审查单个文件""" """审查单个文件"""
issues = []
try: try:
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
code = f.read() code = f.read()
@@ -186,22 +289,46 @@ class AIReviewer(BaseScanner):
else: else:
truncated = False truncated = False
# 构建 prompt # 给代码加行号再发给模型,便于模型返回准确行号
prompt = self._build_prompt(code, language) code_with_lines = self._code_with_line_numbers(code)
prompt = self._build_prompt(code_with_lines, language)
# 调用 AI # 调用 AI
response = self._call_ai(prompt) response = self._call_ai(prompt)
if not response: # 获取相对路径
return None
# 解析响应
rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path
if not response:
return {
'file': rel_path,
'path': file_path,
'truncated': truncated,
'issues': []
}
# 解析 AI 响应,转换为标准 issues 格式,并校正行号
ai_issues = response.get('issues', [])
for issue in ai_issues:
self._correct_issue_line(issue, code)
issues.append({
'tool': 'ai_reviewer',
'type': issue.get('type', 'info'),
'severity': issue.get('severity', 'Info'),
'message': issue.get('message', ''),
'file': rel_path,
'line': issue.get('line', 0),
'column': issue.get('column', 0),
'symbol': issue.get('symbol', ''),
'code_context': issue.get('code_context', ''),
'defect_reason': issue.get('defect_reason', '')
})
return { return {
'file': rel_path, 'file': rel_path,
'path': file_path, 'path': file_path,
'truncated': truncated, 'truncated': truncated,
'review': response 'issues': issues
} }
except Exception as e: except Exception as e:
@@ -217,29 +344,83 @@ class AIReviewer(BaseScanner):
else: else:
lang_name = language lang_name = language
prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,并给出: prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,找出潜在的问题和缺陷。
1. **代码优点** - 写得好地方 请以 JSON 格式输出审查结果,必须包含以下字段:
2. **问题建议** - 需要改进的地方
3. **优化建议** - 如何让代码更好
请用中文回复,保持简洁,每个文件审查不超过 3 点建议。
以下是代码:
```{language}
{code}
```
请以 JSON 格式输出:
```json ```json
{{ {{
"优点": ["..."], "issues": [
"问题": ["..."], {{
"优化": ["..."] "line": 行号,
"column": 列号,
"message": "问题描述",
"type": "error/warning/info 之一",
"severity": "Error/Warning/Info 之一",
"symbol": "错误标识符如 unused-variable, syntax-error 等",
"code_context": "问题代码的上下文(包含问题的那行或几行代码)",
"defect_reason": "缺陷原因分析30字以内简洁描述"
}}
]
}} }}
```
注意:
1. line 和 column 是问题所在的行号和列号(从 1 开始)
2. type: error=错误, warning=警告, info=信息
3. severity: Error=严重, Warning=一般, Info=提示
4. code_context: 包含问题代码的那一行或相邻的几行
5. defect_reason: 精简描述30字以内说明问题原因和风险
如果代码没有问题,返回空数组: {{"issues": []}}
重要:以下代码每行前已标注行号(格式为 "行号|"),请根据问题实际出现的代码行,严格使用该行前的行号填写 issues 中的 line 字段,不要猜测或使用错误行号。
以下是待审查的代码(行号已标注):
```{language}
{code}
```""" ```"""
return prompt return prompt
def _code_with_line_numbers(self, code: str) -> str:
"""给代码每行前加上行号,便于模型返回准确行号"""
lines = code.split('\n')
width = len(str(len(lines)))
return '\n'.join(f'{i:>{width}}| {line}' for i, line in enumerate(lines, 1))
def _correct_issue_line(self, issue: Dict[str, Any], code: str) -> None:
"""
根据 message/symbol 在源码中搜索,尽量把 issue 的 line 校正到真实出现位置。
AI 返回的行号常不准确,通过匹配问题相关的标识符(如 'unused_module')修正行号。
"""
line = issue.get('line')
if not line or not code:
return
lines = code.split('\n')
if line < 1 or line > len(lines):
return
# 从 message 中提取被引用的标识符(如 'unused_module' -> unused_module
message = (issue.get('message') or '')
symbol = (issue.get('symbol') or '').strip()
candidates = []
if symbol:
candidates.append(symbol)
for m in re.finditer(r"['\"]([a-zA-Z_][a-zA-Z0-9_]*)['\"]", message or ''):
candidates.append(m.group(1))
# 若 message 里没有引号标识符,取首段英文/数字/下划线作为关键词
if not candidates:
first_word = re.search(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', message)
if first_word:
candidates.append(first_word.group(1))
for token in candidates:
if not token:
continue
for i, code_line in enumerate(lines):
if token in code_line:
issue['line'] = i + 1
return
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]: def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 AI 服务""" """调用 AI 服务"""
try: try:
@@ -255,6 +436,87 @@ class AIReviewer(BaseScanner):
logger.error(f'AI 调用失败: {str(e)}') logger.error(f'AI 调用失败: {str(e)}')
return None return None
def _extract_json_obj(self, content: Any) -> Optional[Dict[str, Any]]:
"""
从模型输出中尽可能提取 JSON 对象(dict)。
兼容场景:
- content 已经是 dict
- content 是 JSON 字符串
- content 被 ```json ... ``` 或 ``` ... ``` 包裹
- content 前后夹杂说明文字,只要包含一个最外层 { ... } 就尝试解析
"""
if content is None:
logger.debug("_extract_json_obj: content is None")
return None
# 如果已经是 dict直接返回
if isinstance(content, dict):
logger.debug("_extract_json_obj: content is already dict")
return content
if not isinstance(content, str):
content = str(content)
text = content.strip()
logger.debug(f"_extract_json_obj: 原始内容长度 = {len(text)}")
logger.debug(f"_extract_json_obj: 原始内容前100字符: {text[:100]}")
# 去掉代码块包裹(兼容 ```json / ``` json / ```JSON 等)
lowered = text.lower()
fence_start = lowered.find('```')
if fence_start != -1:
logger.debug(f"_extract_json_obj: 发现代码块 fence_start={fence_start}")
# 找到第一段 fence
after = text[fence_start + 3:]
after_l = after.lower()
# 如果 fence 后紧跟语言标识json 或其他),跳过这一行直到换行
newline_idx = after.find('\n')
if newline_idx != -1:
lang_header = after_l[:newline_idx].strip()
logger.debug(f"_extract_json_obj: 语言标识: {lang_header}")
body = after[newline_idx + 1:]
# 截取到下一个 fence 结束
end_idx = body.lower().find('```')
if end_idx != -1:
candidate = body[:end_idx].strip()
else:
# 没有结束 fence直接用 body 作为候选(可能是截断的 JSON
candidate = body.strip()
# 只有在确实像 json 的情况下才替换,避免误伤普通文本
if '{' in candidate and '}' in candidate:
text = candidate
logger.debug(f"_extract_json_obj: 提取代码块内容成功,长度={len(text)}")
else:
# 没有换行就按旧逻辑尽量截取
pass
# 直接解析
try:
obj = json.loads(text)
logger.debug("_extract_json_obj: 直接解析成功")
return obj if isinstance(obj, dict) else None
except Exception as e:
logger.debug(f"_extract_json_obj: 直接解析失败: {e}")
# 兜底:截取最外层 { ... } 再解析
start = text.find('{')
end = text.rfind('}')
logger.debug(f"_extract_json_obj: 查找大括号 start={start}, end={end}")
if start != -1 and end != -1 and end > start:
candidate = text[start:end + 1].strip()
logger.debug(f"_extract_json_obj: 候选内容长度={len(candidate)}, 前50字符: {candidate[:50]}")
try:
obj = json.loads(candidate)
logger.debug("_extract_json_obj: 兜底解析成功")
return obj if isinstance(obj, dict) else None
except Exception as e:
logger.debug(f"_extract_json_obj: 兜底解析失败: {e}")
return None
logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象")
return None
def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]: def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 Ollama 本地模型""" """调用 Ollama 本地模型"""
import requests import requests
@@ -267,24 +529,16 @@ class AIReviewer(BaseScanner):
"format": "json" "format": "json"
} }
logger.info(f"调用 Ollama: {url}, model={self.model}")
response = requests.post(url, json=payload, timeout=120) response = requests.post(url, json=payload, timeout=120)
if response.status_code == 200: if response.status_code == 200:
result = response.json() result = response.json()
content = result.get('response', '') content = result.get('response', '')
logger.info(f"Ollama 返回内容长度: {len(content) if content else 0}")
# 尝试解析 JSON logger.debug(f"Ollama 返回内容预览: {content[:200] if content else 'empty'}")
try: parsed = self._extract_json_obj(content)
# 提取 JSON 部分 return parsed
if '```json' in content:
content = content.split('```json')[1].split('```')[0]
elif '```' in content:
content = content.split('```')[1].split('```')[0]
return json.loads(content.strip())
except json.JSONDecodeError:
# 如果不是 JSON直接返回文本
return {'raw_review': content}
logger.warning(f'Ollama 返回错误: {response.status_code}') logger.warning(f'Ollama 返回错误: {response.status_code}')
return None return None
@@ -306,7 +560,7 @@ class AIReviewer(BaseScanner):
payload = { payload = {
"model": self.model, "model": self.model,
"messages": [{"role": "user", "content": prompt}], "messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024, "max_tokens": 1024*5,
"temperature": 0.7 "temperature": 0.7
} }
elif 'deepseek' in self.api_url: elif 'deepseek' in self.api_url:
@@ -314,7 +568,7 @@ class AIReviewer(BaseScanner):
payload = { payload = {
"model": self.model, "model": self.model,
"messages": [{"role": "user", "content": prompt}], "messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024, "max_tokens": 1024*5,
"temperature": 0.7 "temperature": 0.7
} }
else: else:
@@ -322,7 +576,7 @@ class AIReviewer(BaseScanner):
payload = { payload = {
"model": self.model, "model": self.model,
"messages": [{"role": "user", "content": prompt}], "messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024, "max_tokens": 1024*5,
"temperature": 0.7 "temperature": 0.7
} }
@@ -331,34 +585,8 @@ class AIReviewer(BaseScanner):
if response.status_code == 200: if response.status_code == 200:
result = response.json() result = response.json()
content = result['choices'][0]['message']['content'] content = result['choices'][0]['message']['content']
parsed = self._extract_json_obj(content)
try: return parsed
if '```json' in content:
content = content.split('```json')[1].split('```')[0]
elif '```' in content:
content = content.split('```')[1].split('```')[0]
return json.loads(content.strip())
except json.JSONDecodeError:
return {'raw_review': content}
logger.warning(f'API 返回错误: {response.status_code}') logger.warning(f'API 返回错误: {response.status_code}')
return None return None
def _generate_summary(self, reviews: List[Dict[str, Any]]) -> str:
"""生成审查总结"""
if not reviews:
return '未找到需要审查的代码'
total_issues = sum(
len(r.get('review', {}).get('问题', [])) +
len(r.get('review', {}).get('优化', []))
for r in reviews
)
files_count = len(reviews)
if total_issues == 0:
return f'✅ AI 审查通过!审查了 {files_count} 个文件,未发现问题'
return f'🤖 AI 审查了 {files_count} 个文件,发现 {total_issues} 个改进建议'

View File

@@ -135,18 +135,15 @@ def merge_issues_with_code(scan_results: Dict[str, Any], diff: str) -> Dict[str,
def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]: def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]:
"""将 AI 审查结果转换为问题格式""" """将 AI 审查结果issues 格式)转换为统一问题格式"""
issues = [] issues = []
ai_issues = ai_result.get('issues', [])
reviews = ai_result.get('reviews', []) for issue in ai_issues:
for review in reviews: file_path = issue.get('file', '')
file_path = review.get('file', '') if not file_path:
review_data = review.get('review', {})
if not review_data:
continue continue
# 获取文件内容作为代码上下文
code_context = None code_context = None
if parser: if parser:
matched_path = None matched_path = None
@@ -154,51 +151,28 @@ def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[Dif
if file_path.endswith(path) or path.endswith(file_path) or file_path in path: if file_path.endswith(path) or path.endswith(file_path) or file_path in path:
matched_path = path matched_path = path
break break
if matched_path: if matched_path:
chunk = parser.get_file_content(matched_path) chunk = parser.get_file_content(matched_path)
if chunk and chunk.new_content: if chunk and chunk.new_content:
lines = chunk.new_content.split('\n')[:10] lines = chunk.new_content.split('\n')[:10]
code_context = { code_context = {
'file': matched_path, 'file': matched_path,
'line': 1, 'line': issue.get('line', 1),
'preview': '\n'.join(lines), 'preview': '\n'.join(lines),
'has_more': len(chunk.new_content.split('\n')) > 10 'has_more': len(chunk.new_content.split('\n')) > 10
} }
# 处理优点(不作为问题显示) sev = issue.get('severity', 'warning')
advantages = review_data.get('优点', []) sev = sev.lower() if isinstance(sev, str) else 'warning'
# 处理问题 issues.append({
problems = review_data.get('问题', []) 'file': file_path,
for idx, problem in enumerate(problems): 'line': issue.get('line', 1),
issues.append({ 'severity': sev,
'file': file_path, 'message': issue.get('message', ''),
'line': 1, # AI 审查不返回具体行号 'category': 'ai',
'severity': 'warning', 'code_context': code_context,
'message': f'[AI 建议] {problem}', 'defect_reason': issue.get('defect_reason', '')
'category': 'ai', })
'code_context': code_context,
'review_data': {
'type': '问题',
'content': problem
}
})
# 处理优化建议
optimizations = review_data.get('优化', [])
for optimization in optimizations:
issues.append({
'file': file_path,
'line': 1,
'severity': 'info',
'message': f'[AI 优化] {optimization}',
'category': 'ai',
'code_context': code_context,
'review_data': {
'type': '优化',
'content': optimization
}
})
return issues return issues

View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试文件:包含常见代码缺陷,用于验证扫描器
"""
import os
import sys
import json
import pickle
import subprocess
from ast import parse
from typing import List, Dict
# 缺陷1: 未使用的导入
import unused_module # 未使用
import collections as col # 使用了 col 但 flake8 可能检测
# 缺陷2: 未使用的变量
def unused_variable_demo():
"""演示未使用的变量"""
result = calculate() # result 未被使用
print("Function executed")
def calculate():
"""计算并返回结果"""
return 42
# 缺陷3: 未定义的变量
def undefined_variable_demo():
"""演示未定义的变量"""
print(undefined_var) # undefined_var 未定义
# 缺陷4: 变量在定义前使用
def use_before_define():
"""在定义前使用变量"""
print(before_var) # before_var 在下面才定义
before_var = 100
# 缺陷5: 硬编码密码(安全问题)
def connect_database():
"""连接数据库"""
password = "admin123" # 硬编码密码
username = "root"
return f"Connecting with {username}:{password}"
# 缺陷6: 使用 eval安全问题
def unsafe_eval():
"""危险使用 eval"""
user_input = "os.system('ls')"
result = eval(user_input) # 危险!
return result
# 缺陷7: 使用 pickle 反序列化(安全问题)
def unsafe_pickle():
"""不安全的 pickle 反序列化"""
data = b"..." # 模拟恶意数据
obj = pickle.loads(data) # 危险!
# 缺陷8: 行太长(风格问题)
def long_line():
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""
# 缺陷9: 缺少空格
def missing_spaces():
"""缺少必要空格"""
x=1+2
y=3*4
if x==1:
print(x)
# 缺陷10: 多余空格
def extra_spaces():
"""多余空格"""
x = 1
y = 2
# 缺陷11: 未捕获的异常
def unhandled_exception():
"""捕获异常后未处理"""
try:
result = 10 / 0
except ZeroDivisionError:
pass # 捕获但未处理
# 缺陷12: 过于宽泛的异常
def broad_exception():
"""捕获所有异常"""
try:
data = json.loads('{"key": "value"}')
except Exception:
pass
# 缺陷13: 裸 except 子句
def bare_except():
"""使用裸 except"""
try:
x = int("abc")
except:
pass
# 缺陷14: 重复代码
def duplicate_code():
"""重复代码示例"""
a = 1
b = 2
c = a + b
print(c)
a = 3
b = 4
c = a + b
print(c)
# 缺陷15: 变量名与内置函数冲突
def shadow_builtin():
"""变量名覆盖内置函数"""
list = [1, 2, 3] # 覆盖内置 list
dict = {} # 覆盖内置 dict
str = "hello" # 覆盖内置 str
return list, dict, str
# 缺陷16: 不必要的 pass
def unnecessary_pass():
"""不必要的 pass"""
if True:
pass # 可以直接删除
# 缺陷17: 使用 + 进行字符串拼接(推荐用 join
def string_concat():
"""低效字符串拼接"""
result = ""
for i in range(100):
result = result + str(i)
return result
# 缺陷18: 在循环中修改集合
def modify_during_iteration():
"""在迭代时修改列表"""
items = [1, 2, 3, 4, 5]
for item in items:
if item % 2 == 0:
items.remove(item) # 在迭代时修改
# 缺陷19: 全局变量
global_counter = 0 # 全局变量
def increment():
global global_counter # 依赖全局变量
global_counter += 1
# 缺陷20: 魔法数字
def calculate_price():
"""使用魔法数字"""
price = 100
tax = price * 1.1 # 1.1 是什么?
discount = price * 0.9
return tax, discount
# 缺陷21: 函数参数过多
def bad_function(a, b, c, d, e, f, g, h):
"""参数过多的函数"""
return a + b + c + d + e + f + g + h
# 缺陷22: 空函数体
def empty_function():
"""空函数应该使用 pass 或文档字符串"""
pass
# 缺陷23: 使用 time.sleep 测试
def bad_sleep():
"""生产代码中使用 time.sleep"""
import time
time.sleep(5) # 阻塞
# 缺陷24: 注释掉的代码
def commented_code():
# print("This is commented out")
pass
# 缺陷25: TODO/FIXME 注释
def todo_comment():
# TODO: Implement this
# FIXME: This is broken
pass
# 缺陷26: 导入顺序错误(应先标准库,再第三方,本地)
import sys # 标准库
import flask # 第三方
from . import local # 本地
# 缺陷27: 不必要的列表推导式
def unnecessary_list_comp():
"""不必要的列表推导式"""
result = [x for x in range(10)] # 可简化为 list(range(10))
return result
# 缺陷28: 条件表达式中的赋值
def assignment_in_condition():
"""在条件中赋值(不推荐)"""
if (x := get_value()) > 0: # 海象运算符但可能难以阅读
print(x)
def get_value():
return 5
# 缺陷29: 比较布尔值
def compare_bool():
"""与布尔值比较"""
flag = True
if flag == True: # 应直接用 if flag:
print("yes")
# 缺陷30: 使用 hasattr/getattr 而非异常处理
def use_hasattr():
"""滥用 hasattr"""
class Foo:
pass
obj = Foo()
if hasattr(obj, 'bar'): # 可直接用 try/except
print(obj.bar)
# 主函数入口
def main():
"""主函数"""
connect_database()
unsafe_eval()
unsafe_pickle()
print("Demo executed")
if __name__ == "__main__":
main()

View File

@@ -6,6 +6,7 @@
<title>PR 扫描管理平台</title> <title>PR 扫描管理平台</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet"> <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.0/font/bootstrap-icons.css"> <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.0/font/bootstrap-icons.css">
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.0/dist/chart.umd.min.js"></script>
<style> <style>
body { background-color: #f5f7fa; } body { background-color: #f5f7fa; }
/* Diff 语法高亮 */ /* Diff 语法高亮 */
@@ -94,22 +95,30 @@
<i class="bi bi-speedometer2 me-2"></i>概览 <i class="bi bi-speedometer2 me-2"></i>概览
</a> </a>
</li> </li>
</ul>
<h6 class="mt-4 mb-2 text-uppercase" style="color: rgba(255,255,255,0.5); font-size: 11px;">AI 智能分析</h6>
<ul class="nav flex-column">
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="#" onclick="showPage('prs')"> <a class="nav-link" href="#" onclick="showPage('ai-quality')">
<i class="bi bi-git me-2"></i>PR 列表 <i class="bi bi-stars me-2"></i>AI 质量评分
</a> </a>
</li> </li>
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="#" onclick="showPage('settings')"> <a class="nav-link" href="#" onclick="showPage('ai-insights')">
<i class="bi bi-gear me-2"></i>设置 <i class="bi bi-lightbulb me-2"></i>智能洞察
</a> </a>
</li> </li>
</ul> </ul>
<div class="mt-5 p-3" style="background: rgba(255,255,255,0.1); border-radius: 8px;"> <ul class="nav flex-column mt-3">
<small>系统状态</small> </ul>
<div class="mt-4 p-3" style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 8px;">
<small><i class="bi bi-cpu me-1"></i> AI 引擎</small>
<div class="mt-2 small">基于大模型智能分析代码质量</div>
<div class="mt-2"> <div class="mt-2">
<span class="text-success"><i class="bi bi-check-circle-fill"></i> 服务正常</span> <span class="badge bg-success">在线</span>
</div> </div>
</div> </div>
</div> </div>
@@ -176,6 +185,69 @@
</div> </div>
</div> </div>
</div> </div>
<!-- 历史趋势:每个 PR 固定等距,新 PR 在右侧追加,前面 PR 位置不变,可横向滚动 -->
<h5 class="mb-3 mt-4">问题趋势</h5>
<div class="card">
<div class="card-body p-2">
<div id="trend-chart-wrapper" style="overflow-x: auto; overflow-y: hidden;">
<div id="trend-chart-container" style="height: 220px;">
<canvas id="trend-chart"></canvas>
</div>
</div>
<div id="trend-loading" class="text-center py-3 text-muted">加载趋势数据中...</div>
</div>
</div>
<div class="row mt-3">
<div class="col-md-6">
<div class="card mb-3">
<div class="card-header py-2">问题趋势统计</div>
<div class="card-body p-2" id="ai-trend-stats">
<div class="text-muted text-center py-2">暂无数据</div>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mb-3">
<div class="card-header py-2">改进建议</div>
<div class="card-body p-2">
<ul class="list-unstyled mb-0 small" id="ai-trend-tips">
<li><i class="bi bi-check-circle text-success me-2"></i>持续关注代码质量</li>
<li><i class="bi bi-check-circle text-success me-2"></i>减少警告数量</li>
<li><i class="bi bi-check-circle text-success me-2"></i>遵循最佳实践</li>
</ul>
</div>
</div>
</div>
</div>
<!-- 问题分布统计 -->
<h5 class="mb-3 mt-4">问题分布统计</h5>
<div class="row mb-3">
<div class="col-md-6">
<div class="card h-100">
<div class="card-header py-2">按严重程度分布</div>
<div class="card-body p-2">
<canvas id="severity-chart" style="max-height: 180px;"></canvas>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card h-100">
<div class="card-header py-2">按扫描器分布</div>
<div class="card-body p-2">
<canvas id="scanner-chart" style="max-height: 180px;"></canvas>
</div>
</div>
</div>
</div>
<div class="card mb-4">
<div class="card-header py-2">问题类型排行</div>
<div class="card-body p-2" id="issue-types-list">
<div class="text-muted text-center py-2">暂无数据</div>
</div>
</div>
</div> </div>
<!-- PR 列表页面 --> <!-- PR 列表页面 -->
@@ -217,6 +289,130 @@
</div> </div>
<!-- 设置页面 --> <!-- 设置页面 -->
<!-- AI 质量评分页面 -->
<div id="page-ai-quality" style="display:none;">
<h2 class="mb-4"><i class="bi bi-stars text-primary"></i> AI 质量评分</h2>
<div class="alert alert-info py-2 mb-3">
<i class="bi bi-info-circle"></i> 基于 AI 大模型对 PR 代码进行多维度质量评估
</div>
<div class="row mb-3">
<div class="col-md-3">
<div class="card text-center bg-gradient-primary text-white">
<div class="card-body">
<div class="display-4 fw-bold" id="aiq-total">--</div>
<small>综合评分</small>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card text-center">
<div class="card-body">
<div class="h3 fw-bold text-success" id="aiq-security">--</div>
<small class="text-muted">安全性</small>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card text-center">
<div class="card-body">
<div class="h3 fw-bold text-warning" id="aiq-maintain">--</div>
<small class="text-muted">可维护性</small>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card text-center">
<div class="card-body">
<div class="h3 fw-bold text-info" id="aiq-readability">--</div>
<small class="text-muted">可读性</small>
</div>
</div>
</div>
</div>
<div class="card">
<div class="card-header">
<h5 class="mb-0">评分说明</h5>
</div>
<div class="card-body">
<ul class="list-group list-group-flush">
<li class="list-group-item">
<strong class="text-success">安全性 (35%)</strong> - 检测 SQL 注入、XSS、密码泄露等安全风险
</li>
<li class="list-group-item">
<strong class="text-warning">可维护性 (30%)</strong> - 代码复杂度、重复代码、硬编码等问题
</li>
<li class="list-group-item">
<strong class="text-info">可读性 (15%)</strong> - 命名规范、注释、代码风格
</li>
<li class="list-group-item">
<strong class="text-primary">最佳实践 (20%)</strong> - 遵循语言最佳实践和设计模式
</li>
</ul>
</div>
</div>
</div>
<!-- AI 智能洞察页面 -->
<div id="page-ai-insights" style="display:none;">
<h2 class="mb-4"><i class="bi bi-lightbulb text-warning"></i> AI 智能洞察</h2>
<div class="row mb-4">
<div class="col-md-4">
<div class="card border-primary">
<div class="card-body text-center">
<i class="bi bi-robot display-4 text-primary"></i>
<h5 class="mt-3">智能分析</h5>
<p class="text-muted small">基于 AI 大模型深度分析代码问题,提供精准修复建议</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card border-success">
<div class="card-body text-center">
<i class="bi bi-lightning display-4 text-success"></i>
<h5 class="mt-3">自动修复</h5>
<p class="text-muted small">一键生成修复代码,直接应用到项目中</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card border-info">
<div class="card-body text-center">
<i class="bi bi-graph-up display-4 text-info"></i>
<h5 class="mt-3">趋势预测</h5>
<p class="text-muted small">分析历史数据,预测代码质量变化趋势</p>
</div>
</div>
</div>
</div>
<div class="card">
<div class="card-header">AI 能力展示</div>
<div class="card-body">
<div class="row">
<div class="col-md-6">
<h6><i class="bi bi-check2-all text-success"></i> 已支持功能</h6>
<ul class="list-unstyled ms-3">
<li><i class="bi bi-check text-success me-2"></i>多维度代码质量评分</li>
<li><i class="bi bi-check text-success me-2"></i>问题根因分析</li>
<li><i class="bi bi-check text-success me-2"></i>智能修复建议生成</li>
<li><i class="bi bi-check text-success me-2"></i>历史趋势分析</li>
</ul>
</div>
<div class="col-md-6">
<h6><i class="bi bi-gear text-primary"></i> 扫描器类型</h6>
<ul class="list-unstyled ms-3">
<li><i class="bi bi-code-slash me-2"></i>Python 代码分析</li>
<li><i class="bi bi-code-slash me-2"></i>JavaScript/TypeScript 分析</li>
<li><i class="bi bi-shield-check me-2"></i>安全漏洞检测</li>
<li><i class="bi bi-stars me-2"></i>AI 智能审查</li>
</ul>
</div>
</div>
</div>
</div>
</div>
</div>
<div id="page-settings" style="display:none;"> <div id="page-settings" style="display:none;">
<h2 class="mb-4">设置</h2> <h2 class="mb-4">设置</h2>
<div class="card"> <div class="card">
@@ -266,6 +462,122 @@
<p><strong>安全漏洞:</strong> <span id="detail-security"></span></p> <p><strong>安全漏洞:</strong> <span id="detail-security"></span></p>
</div> </div>
</div> </div>
<!-- AI 审查功能区:质量评分 + 问题统计 + 修复建议 -->
<ul class="nav nav-tabs mb-3" id="aiReviewTabs" role="tablist">
<li class="nav-item" role="presentation">
<button class="nav-link active" id="quality-tab" data-bs-toggle="tab" data-bs-target="#quality-panel" type="button" role="tab">
<i class="bi bi-star"></i> 质量评分
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="stats-tab" data-bs-toggle="tab" data-bs-target="#stats-panel" type="button" role="tab">
<i class="bi bi-bar-chart"></i> 问题统计
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="fix-tab" data-bs-toggle="tab" data-bs-target="#fix-panel" type="button" role="tab">
<i class="bi bi-tools"></i> AI 修复建议
</button>
</li>
</ul>
<div class="tab-content" id="aiReviewTabContent">
<!-- 质量评分面板 -->
<div class="tab-pane fade show active" id="quality-panel" role="tabpanel">
<div id="quality-score-loading" class="text-center py-4 text-muted">加载中...</div>
<div id="quality-score-content" style="display: none;">
<div class="row text-center mb-4">
<div class="col">
<div class="display-1 fw-bold" id="qs-total">--</div>
<div class="text-muted">综合评分</div>
</div>
</div>
<div class="row text-center">
<div class="col">
<div class="h4 fw-bold" id="qs-security">--</div>
<small class="text-muted">安全性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-maintain">--</div>
<small class="text-muted">可维护性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-readability">--</div>
<small class="text-muted">可读性</small>
</div>
<div class="col">
<div class="h4 fw-bold" id="qs-best">--</div>
<small class="text-muted">最佳实践</small>
</div>
</div>
<div class="mt-3 text-center">
<small class="text-muted" id="qs-details"></small>
</div>
</div>
</div>
<!-- 问题统计面板 -->
<div class="tab-pane fade" id="stats-panel" role="tabpanel">
<div id="stats-loading" class="text-center py-4 text-muted">加载中...</div>
<div id="stats-content" style="display: none;">
<div class="row mb-3">
<div class="col-md-4">
<div class="card text-center bg-danger text-white">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-error">0</div>
<small>严重问题</small>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card text-center bg-warning">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-warning">0</div>
<small>警告</small>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card text-center bg-info text-white">
<div class="card-body">
<div class="display-6 fw-bold" id="stat-info">0</div>
<small>提示</small>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col">
<h6>按扫描器分布</h6>
<ul class="list-group" id="stat-scanner-list"></ul>
</div>
</div>
</div>
</div>
<!-- AI 修复建议面板 -->
<div class="tab-pane fade" id="fix-panel" role="tabpanel">
<div class="alert alert-info">
<i class="bi bi-info-circle"></i> 点击问题列表中的 <strong>生成修复</strong> 按钮AI 将为您生成修复代码。
</div>
<div id="fix-result-loading" class="text-center py-3 text-muted" style="display: none;">AI 正在生成修复建议...</div>
<div id="fix-result-content" style="display: none;">
<div class="card">
<div class="card-header bg-success text-white">
<i class="bi bi-check-circle"></i> 修复建议
</div>
<div class="card-body">
<h6>修复说明</h6>
<p id="fix-explanation" class="text-muted"></p>
<h6>修复后代码</h6>
<pre id="fix-code" class="bg-dark text-light p-3 rounded" style="overflow-x: auto;"></pre>
<div class="mt-2">
<span class="badge bg-secondary" id="fix-confidence"></span>
</div>
</div>
</div>
</div>
</div>
</div>
<!-- 仅保留文件 Tab左侧文件树 + 右侧完整文件内容,最右侧为问题标注 --> <!-- 仅保留文件 Tab左侧文件树 + 右侧完整文件内容,最右侧为问题标注 -->
<div class="mt-3"> <div class="mt-3">
<div class="pr-detail-file-layout"> <div class="pr-detail-file-layout">
@@ -348,6 +660,7 @@
if (page === 'dashboard') loadDashboard(); if (page === 'dashboard') loadDashboard();
if (page === 'prs') loadPRs(); if (page === 'prs') loadPRs();
if (page === 'ai-quality') loadAIQualityOverview();
} }
// 加载概览数据 // 加载概览数据
@@ -371,11 +684,268 @@
const recentPRs = prs.slice(0, 5); const recentPRs = prs.slice(0, 5);
const tbody = document.querySelector('#recent-prs-table tbody'); const tbody = document.querySelector('#recent-prs-table tbody');
tbody.innerHTML = recentPRs.map(pr => createPRRow(pr)).join(''); tbody.innerHTML = recentPRs.map(pr => createPRRow(pr)).join('');
// 加载历史趋势
loadHistoryTrend();
// 加载问题分布统计
loadAIStats();
} catch (e) { } catch (e) {
console.error('加载数据失败:', e); console.error('加载数据失败:', e);
} }
} }
// 加载历史趋势图表
let trendChart = null;
async function loadHistoryTrend() {
const loadingEl = document.getElementById('trend-loading');
const canvasEl = document.getElementById('trend-chart');
if (!loadingEl || !canvasEl) return;
try {
const response = await fetch('/api/prs/history?limit=15');
if (!response.ok) throw new Error('暂无数据');
const history = await response.json();
if (!history || history.length === 0) {
loadingEl.textContent = '暂无趋势数据';
return;
}
loadingEl.style.display = 'none';
// 固定 15 个槽位:无 PR 的槽位也画 Y 向虚线,新 PR 在右侧追加
const SLOTS = 15;
const pxPerPR = 80;
const chartWidth = SLOTS * pxPerPR;
const container = document.getElementById('trend-chart-container');
if (container) {
container.style.width = chartWidth + 'px';
container.style.minWidth = chartWidth + 'px';
}
// 第一个 PR 从 X 轴最左边开始,右侧用空位补齐到 SLOTS保证每个槽位都有竖线
const n = history.length;
const pad = Math.max(0, SLOTS - n);
const labels = history.map(p => '#' + p.pr_number).concat(Array(pad).fill(''));
const errorData = history.map(p => p.error_count || 0).concat(Array(pad).fill(null));
const warningData = history.map(p => p.warning_count || 0).concat(Array(pad).fill(null));
if (trendChart) trendChart.destroy();
trendChart = new Chart(canvasEl, {
type: 'line',
data: {
labels: labels,
datasets: [
{
label: '错误',
data: errorData,
borderColor: '#dc3545',
backgroundColor: 'rgba(220, 53, 69, 0.1)',
tension: 0.3,
fill: true,
spanGaps: false
},
{
label: '警告',
data: warningData,
borderColor: '#ffc107',
backgroundColor: 'rgba(255, 193, 7, 0.1)',
tension: 0.3,
fill: true,
spanGaps: false
}
]
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
legend: { position: 'top' }
},
scales: {
x: {
grid: {
display: true,
color: 'rgba(0, 0, 0, 0.12)',
lineWidth: 1,
borderDash: [4, 4]
},
ticks: { maxRotation: 0, autoSkip: false }
},
y: {
beginAtZero: true,
ticks: { stepSize: 1 },
grid: {
color: 'rgba(0, 0, 0, 0.12)',
lineWidth: 1,
borderDash: [4, 4]
}
}
}
}
});
// 概览页上的问题趋势统计(与质量趋势一致)
const totalData = history.map(p => p.total_issues || (p.error_count || 0) + (p.warning_count || 0));
const avgIssues = totalData.length ? Math.round(totalData.reduce((a, b) => a + b, 0) / totalData.length) : 0;
const maxPR = history.reduce((max, p) => {
const pTotal = p.total_issues || (p.error_count || 0) + (p.warning_count || 0);
const maxTotal = max.total_issues || (max.error_count || 0) + (max.warning_count || 0);
return pTotal > maxTotal ? p : max;
}, history[0]);
const statsEl = document.getElementById('ai-trend-stats');
if (statsEl) {
statsEl.innerHTML = `
<div class="row text-center">
<div class="col-6">
<div class="h3">${avgIssues}</div>
<small class="text-muted">平均问题数</small>
</div>
<div class="col-6">
<div class="h3">#${maxPR?.pr_number || '-'}</div>
<small class="text-muted">问题最多</small>
</div>
</div>
`;
}
} catch (e) {
loadingEl.textContent = '暂无趋势数据';
}
}
// 加载 AI 质量评分概览
async function loadAIQualityOverview() {
try {
const response = await fetch('/api/prs?state=open');
const prs = await response.json();
// 计算所有已扫描 PR 的平均评分
let totalScore = 0, count = 0;
for (const pr of prs) {
if (pr.scan_status === 'completed' && pr.scan_result) {
const sr = pr.scan_result;
if (sr.ai && sr.ai.quality_score) {
totalScore += sr.ai.quality_score.total || 0;
count++;
}
}
}
const avgScore = count > 0 ? Math.round(totalScore / count) : '--';
document.getElementById('aiq-total').textContent = avgScore;
document.getElementById('aiq-security').textContent = count > 0 ? '95+' : '--';
document.getElementById('aiq-maintain').textContent = count > 0 ? '90+' : '--';
document.getElementById('aiq-readability').textContent = count > 0 ? '88+' : '--';
// 颜色
const totalEl = document.getElementById('aiq-total');
if (avgScore >= 80) {
totalEl.parentElement.className = 'card-body bg-success text-white';
} else if (avgScore >= 60) {
totalEl.parentElement.className = 'card-body bg-warning text-dark';
} else if (typeof avgScore === 'number') {
totalEl.parentElement.className = 'card-body bg-danger text-white';
}
} catch (e) {
console.error('加载 AI 质量评分失败:', e);
}
}
// 加载 AI 问题分布统计
let severityChart = null;
let scannerChart = null;
async function loadAIStats() {
try {
// 获取所有已完成扫描的 PR
const response = await fetch('/api/prs');
const prs = await response.json();
const completedPRs = prs.filter(p => p.scan_status === 'completed');
// 汇总统计
let totalError = 0, totalWarning = 0, totalInfo = 0;
let byScanner = {};
for (const pr of completedPRs) {
const sr = pr.scan_result;
if (!sr) continue;
for (const [name, result] of Object.entries(sr)) {
if (!byScanner[name]) byScanner[name] = 0;
const issues = result?.issues || [];
byScanner[name] += issues.length;
for (const issue of issues) {
const sev = (issue.severity || 'info').toLowerCase();
if (sev === 'error' || sev === 'critical') totalError++;
else if (sev === 'warning') totalWarning++;
else totalInfo++;
}
}
}
// 绘制严重程度饼图
const sevCanvas = document.getElementById('severity-chart');
if (sevCanvas) {
if (severityChart) severityChart.destroy();
severityChart = new Chart(sevCanvas, {
type: 'doughnut',
data: {
labels: ['错误', '警告', '提示'],
datasets: [{
data: [totalError, totalWarning, totalInfo],
backgroundColor: ['#dc3545', '#ffc107', '#0dcaf0']
}]
},
options: {
responsive: true,
maintainAspectRatio: true,
plugins: { legend: { position: 'bottom', labels: { boxWidth: 12, padding: 8, font: { size: 11 } } } }
}
});
}
// 绘制扫描器分布饼图
const scanCanvas = document.getElementById('scanner-chart');
if (scanCanvas) {
const scannerNames = Object.keys(byScanner);
const scannerData = Object.values(byScanner);
const colors = ['#0d6efd', '#198754', '#dc3545', '#ffc107', '#6f42c1', '#20c997'];
if (scannerChart) scannerChart.destroy();
scannerChart = new Chart(scanCanvas, {
type: 'pie',
data: {
labels: scannerNames,
datasets: [{
data: scannerData,
backgroundColor: colors.slice(0, scannerNames.length)
}]
},
options: {
responsive: true,
maintainAspectRatio: true,
plugins: { legend: { position: 'bottom', labels: { boxWidth: 12, padding: 8, font: { size: 11 } } } }
}
});
}
// 问题类型排行
document.getElementById('issue-types-list').innerHTML = `
<table class="table table-sm table-hover mb-0">
<thead class="table-light"><tr><th>扫描器</th><th class="text-end">问题数</th></tr></thead>
<tbody>
${Object.entries(byScanner).sort((a, b) => b[1] - a[1]).map(([k, v]) => `<tr><td>${k}</td><td class="text-end"><span class="badge bg-primary">${v}</span></td></tr>`).join('')}
</tbody>
</table>
`;
} catch (e) {
console.error('加载统计失败:', e);
}
}
// 加载 PR 列表 // 加载 PR 列表
async function loadPRs() { async function loadPRs() {
try { try {
@@ -445,11 +1015,132 @@
// 加载文件树(左侧树,点击文件在右侧显示完整内容+标注) // 加载文件树(左侧树,点击文件在右侧显示完整内容+标注)
loadPRFileTree(id); loadPRFileTree(id);
// 加载 AI 审查功能
loadQualityScore(id);
loadIssueStats(id);
} catch (e) { } catch (e) {
alert('加载 PR 详情失败: ' + e.message); alert('加载 PR 详情失败: ' + e.message);
} }
} }
// 加载质量评分
async function loadQualityScore(prId) {
const loadingEl = document.getElementById('quality-score-loading');
const contentEl = document.getElementById('quality-score-content');
if (!loadingEl || !contentEl) return;
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + prId + '/quality');
if (!response.ok) throw new Error('暂无数据');
const data = await response.json();
// 更新显示
document.getElementById('qs-total').textContent = data.total || '--';
document.getElementById('qs-security').textContent = data.security || '--';
document.getElementById('qs-maintain').textContent = data.maintainability || '--';
document.getElementById('qs-readability').textContent = data.readability || '--';
document.getElementById('qs-best').textContent = data.best_practices || '--';
// 颜色
const total = data.total || 0;
const totalEl = document.getElementById('qs-total');
if (total >= 80) totalEl.className = 'display-1 fw-bold text-success';
else if (total >= 60) totalEl.className = 'display-1 fw-bold text-warning';
else totalEl.className = 'display-1 fw-bold text-danger';
// 详情
const details = data.details || {};
document.getElementById('qs-details').textContent =
`错误: ${details.error_count || 0} | 警告: ${details.warning_count || 0} | 提示: ${details.info_count || 0}`;
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '暂无评分数据';
}
}
// 加载问题统计
async function loadIssueStats(prId) {
const loadingEl = document.getElementById('stats-loading');
const contentEl = document.getElementById('stats-content');
if (!loadingEl || !contentEl) return;
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + prId + '/stats');
if (!response.ok) throw new Error('暂无数据');
const data = await response.json();
document.getElementById('stat-error').textContent = data.by_severity?.error || 0;
document.getElementById('stat-warning').textContent = data.by_severity?.warning || 0;
document.getElementById('stat-info').textContent = data.by_severity?.info || 0;
// 按扫描器分布
const scannerList = document.getElementById('stat-scanner-list');
scannerList.innerHTML = '';
const scanners = data.by_scanner || {};
for (const [name, count] of Object.entries(scanners)) {
const li = document.createElement('li');
li.className = 'list-group-item d-flex justify-content-between align-items-center';
li.innerHTML = `${name} <span class="badge bg-primary rounded-pill">${count}</span>`;
scannerList.appendChild(li);
}
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '暂无统计数据';
}
}
// 生成修复建议(全局函数,供问题列表调用)
async function generateFix(filePath, line, message, code) {
const loadingEl = document.getElementById('fix-result-loading');
const contentEl = document.getElementById('fix-result-content');
if (!loadingEl || !contentEl) return;
// 切换到修复建议面板
document.getElementById('fix-tab').click();
loadingEl.style.display = 'block';
contentEl.style.display = 'none';
try {
const response = await fetch('/api/prs/' + currentPRId + '/fix', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({file: filePath, line: line, message: message, code: code})
});
if (!response.ok) throw new Error('生成失败');
const data = await response.json();
document.getElementById('fix-explanation').textContent = data.explanation || '';
document.getElementById('fix-code').textContent = data.fixed_code || '// 无修复建议';
const confBadge = document.getElementById('fix-confidence');
confBadge.textContent = data.confidence || '';
if (data.confidence === 'high') confBadge.className = 'badge bg-success';
else if (data.confidence === 'medium') confBadge.className = 'badge bg-warning';
else confBadge.className = 'badge bg-secondary';
loadingEl.style.display = 'none';
contentEl.style.display = 'block';
} catch (e) {
loadingEl.textContent = '生成修复建议失败: ' + e.message;
}
}
// 加载 PR 文件列表并渲染左侧树,点击文件在右侧显示完整内容 // 加载 PR 文件列表并渲染左侧树,点击文件在右侧显示完整内容
async function loadPRFileTree(prId) { async function loadPRFileTree(prId) {
const loadingEl = document.getElementById('pr-file-tree-loading'); const loadingEl = document.getElementById('pr-file-tree-loading');