From 8f9e5bf4f5fbd2060ae452df9921e33f6bbffef7 Mon Sep 17 00:00:00 2001 From: Dang Zerong Date: Thu, 12 Mar 2026 16:13:18 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test_demo/demo_flaws.py | 267 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) diff --git a/test_demo/demo_flaws.py b/test_demo/demo_flaws.py index e69de29..a06db6a 100644 --- a/test_demo/demo_flaws.py +++ b/test_demo/demo_flaws.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +测试文件:包含常见代码缺陷,用于验证扫描器 +""" + +import os +import sys +import json +import pickle +import subprocess +from ast import parse +from typing import List, Dict + + +# 缺陷1: 未使用的导入 +import unused_module # 未使用 +import collections as col # 使用了 col 但 flake8 可能检测 + + +# 缺陷2: 未使用的变量 +def unused_variable_demo(): + """演示未使用的变量""" + result = calculate() # result 未被使用 + print("Function executed") + + +def calculate(): + """计算并返回结果""" + return 42 + + +# 缺陷3: 未定义的变量 +def undefined_variable_demo(): + """演示未定义的变量""" + print(undefined_var) # undefined_var 未定义 + + +# 缺陷4: 变量在定义前使用 +def use_before_define(): + """在定义前使用变量""" + print(before_var) # before_var 在下面才定义 + before_var = 100 + + +# 缺陷5: 硬编码密码(安全问题) +def connect_database(): + """连接数据库""" + password = "admin123" # 硬编码密码 + username = "root" + return f"Connecting with {username}:{password}" + + +# 缺陷6: 使用 eval(安全问题) +def unsafe_eval(): + """危险使用 eval""" + user_input = "os.system('ls')" + result = eval(user_input) # 危险! + return result + + +# 缺陷7: 使用 pickle 反序列化(安全问题) +def unsafe_pickle(): + """不安全的 pickle 反序列化""" + data = b"..." # 模拟恶意数据 + obj = pickle.loads(data) # 危险! + + +# 缺陷8: 行太长(风格问题) +def long_line(): + """这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制""" + + +# 缺陷9: 缺少空格 +def missing_spaces(): + """缺少必要空格""" + x=1+2 + y=3*4 + if x==1: + print(x) + + +# 缺陷10: 多余空格 +def extra_spaces(): + """多余空格""" + x = 1 + y = 2 + + +# 缺陷11: 未捕获的异常 +def unhandled_exception(): + """捕获异常后未处理""" + try: + result = 10 / 0 + except ZeroDivisionError: + pass # 捕获但未处理 + + +# 缺陷12: 过于宽泛的异常 +def broad_exception(): + """捕获所有异常""" + try: + data = json.loads('{"key": "value"}') + except Exception: + pass + + +# 缺陷13: 裸 except 子句 +def bare_except(): + """使用裸 except""" + try: + x = int("abc") + except: + pass + + +# 缺陷14: 重复代码 +def duplicate_code(): + """重复代码示例""" + a = 1 + b = 2 + c = a + b + print(c) + + a = 3 + b = 4 + c = a + b + print(c) + + +# 缺陷15: 变量名与内置函数冲突 +def shadow_builtin(): + """变量名覆盖内置函数""" + list = [1, 2, 3] # 覆盖内置 list + dict = {} # 覆盖内置 dict + str = "hello" # 覆盖内置 str + return list, dict, str + + +# 缺陷16: 不必要的 pass +def unnecessary_pass(): + """不必要的 pass""" + if True: + pass # 可以直接删除 + + +# 缺陷17: 使用 + 进行字符串拼接(推荐用 join) +def string_concat(): + """低效字符串拼接""" + result = "" + for i in range(100): + result = result + str(i) + return result + + +# 缺陷18: 在循环中修改集合 +def modify_during_iteration(): + """在迭代时修改列表""" + items = [1, 2, 3, 4, 5] + for item in items: + if item % 2 == 0: + items.remove(item) # 在迭代时修改 + + +# 缺陷19: 全局变量 +global_counter = 0 # 全局变量 + + +def increment(): + global global_counter # 依赖全局变量 + global_counter += 1 + + +# 缺陷20: 魔法数字 +def calculate_price(): + """使用魔法数字""" + price = 100 + tax = price * 1.1 # 1.1 是什么? + discount = price * 0.9 + return tax, discount + + +# 缺陷21: 函数参数过多 +def bad_function(a, b, c, d, e, f, g, h): + """参数过多的函数""" + return a + b + c + d + e + f + g + h + + +# 缺陷22: 空函数体 +def empty_function(): + """空函数应该使用 pass 或文档字符串""" + pass + + +# 缺陷23: 使用 time.sleep 测试 +def bad_sleep(): + """生产代码中使用 time.sleep""" + import time + time.sleep(5) # 阻塞 + + +# 缺陷24: 注释掉的代码 +def commented_code(): + # print("This is commented out") + pass + + +# 缺陷25: TODO/FIXME 注释 +def todo_comment(): + # TODO: Implement this + # FIXME: This is broken + pass + + +# 缺陷26: 导入顺序错误(应先标准库,再第三方,本地) +import sys # 标准库 +import flask # 第三方 +from . import local # 本地 + + +# 缺陷27: 不必要的列表推导式 +def unnecessary_list_comp(): + """不必要的列表推导式""" + result = [x for x in range(10)] # 可简化为 list(range(10)) + return result + + +# 缺陷28: 条件表达式中的赋值 +def assignment_in_condition(): + """在条件中赋值(不推荐)""" + if (x := get_value()) > 0: # 海象运算符但可能难以阅读 + print(x) + + +def get_value(): + return 5 + + +# 缺陷29: 比较布尔值 +def compare_bool(): + """与布尔值比较""" + flag = True + if flag == True: # 应直接用 if flag: + print("yes") + + +# 缺陷30: 使用 hasattr/getattr 而非异常处理 +def use_hasattr(): + """滥用 hasattr""" + class Foo: + pass + obj = Foo() + if hasattr(obj, 'bar'): # 可直接用 try/except + print(obj.bar) + + +# 主函数入口 +def main(): + """主函数""" + connect_database() + unsafe_eval() + unsafe_pickle() + print("Demo executed") + + +if __name__ == "__main__": + main() From cb90b66f09a655e5f584113b09c3649cc408d059 Mon Sep 17 00:00:00 2001 From: Dang Zerong Date: Fri, 13 Mar 2026 11:26:01 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 174 ++++++++++ db.py | 32 +- report/generator.py | 83 ++--- scanner/ai_reviewer.py | 438 +++++++++++++++++++------ scanner/diff_parser.py | 60 +--- web/index.html | 705 ++++++++++++++++++++++++++++++++++++++++- 6 files changed, 1291 insertions(+), 201 deletions(-) diff --git a/app.py b/app.py index 9b166b6..c5f7eb6 100644 --- a/app.py +++ b/app.py @@ -3,6 +3,7 @@ import os import logging +import traceback from typing import Dict, Tuple, Any import json @@ -298,6 +299,7 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: logger.info(f'PR #{pr_number} 扫描完成') except Exception as e: + traceback.print_exc() logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}') return jsonify({'error': str(e)}), 500 @@ -667,6 +669,178 @@ def api_get_pr_file_content(pr_id): return jsonify({'error': str(e)}), 500 +@app.route('/api/prs//quality') +def api_get_quality_score(pr_id): + """获取 PR 的代码质量评分""" + try: + pr = PRScanDB.get_pr_by_id(pr_id) + if not pr: + return jsonify({'error': 'PR not found'}), 404 + + # 从 scan_result 中获取质量评分 + scan_result = pr.get('scan_result') + if isinstance(scan_result, str): + try: + scan_result = json.loads(scan_result) + except: + scan_result = None + + quality_score = None + if scan_result and scan_result.get('ai'): + quality_score = scan_result['ai'].get('quality_score') + + if not quality_score: + return jsonify({'error': '暂无质量评分'}), 404 + + return jsonify(quality_score) + except Exception as e: + logger.error(f'获取质量评分失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/prs//stats') +def api_get_issue_stats(pr_id): + """获取 PR 的问题统计""" + try: + pr = PRScanDB.get_pr_by_id(pr_id) + if not pr: + return jsonify({'error': 'PR not found'}), 404 + + # 获取 scan_details_with_code + scan_details = pr.get('scan_details_with_code') + if isinstance(scan_details, str): + try: + scan_details = json.loads(scan_details) + except: + scan_details = None + + if not scan_details: + return jsonify({'error': '暂无扫描详情'}), 404 + + # 统计各扫描器的问题 + stats = { + 'by_severity': {'error': 0, 'warning': 0, 'info': 0}, + 'by_scanner': {}, + 'total': 0 + } + + # 统计静态扫描器 + for scanner in scan_details.get('scanners', []): + scanner_name = scanner.get('name', 'unknown') + scanner_issues = scanner.get('issues', []) + stats['by_scanner'][scanner_name] = len(scanner_issues) + + for issue in scanner_issues: + sev = (issue.get('severity') or 'info').lower() + if sev in stats['by_severity']: + stats['by_severity'][sev] += 1 + stats['total'] += 1 + + # 统计 AI 扫描器 + ai_data = scan_details.get('ai', {}) + if ai_data: + ai_issues = ai_data.get('issues', []) + stats['by_scanner']['AI'] = len(ai_issues) + for issue in ai_issues: + sev = (issue.get('severity') or 'info').lower() + if sev in stats['by_severity']: + stats['by_severity'][sev] += 1 + stats['total'] += 1 + + return jsonify(stats) + except Exception as e: + logger.error(f'获取问题统计失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/prs//fix', methods=['POST']) +def api_generate_fix(pr_id): + """生成问题修复建议""" + try: + data = request.get_json() + if not data: + return jsonify({'error': '请求体为空'}), 400 + + file_path = data.get('file') + line = data.get('line', 1) + message = data.get('message', '') + code = data.get('code', '') + + if not file_path or not message: + return jsonify({'error': '缺少必要参数'}), 400 + + # 调用 AI 生成修复建议 + fix_result = ai_reviewer.generate_fix_suggestion(file_path, line, message, code) + + if fix_result: + return jsonify(fix_result) + else: + return jsonify({'error': '生成修复建议失败'}), 500 + + except Exception as e: + logger.error(f'生成修复建议失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/prs/history') +def api_get_pr_history(): + """获取 PR 扫描历史趋势""" + try: + limit = request.args.get('limit', 20, type=int) + repo_name = request.args.get('repo_name', '') + + # 获取 PR 列表 + prs = PRScanDB.get_all_prs(status='completed') + if repo_name: + prs = [p for p in prs if p.get('repo_name') == repo_name] + + # 只取最近的 N 个 + prs = prs[:limit] + + # 构建趋势数据 + history = [] + for pr in reversed(prs): # 从旧到新 + issues_count = pr.get('issues_count', 0) + + # 从 scan_result 中各扫描器汇总 error/warning 数量 + scan_result = pr.get('scan_result') + if isinstance(scan_result, str): + try: + scan_result = json.loads(scan_result) + except: + scan_result = None + + error_count = 0 + warning_count = 0 + if scan_result and isinstance(scan_result, dict): + # 遍历各扫描器,汇总 error 和 warning + for scanner_name, scanner_result in scan_result.items(): + if isinstance(scanner_result, dict): + summary = scanner_result.get('summary', {}) + if isinstance(summary, dict): + error_count += summary.get('error', 0) + warning_count += summary.get('warning', 0) + + history.append({ + 'pr_id': pr.get('id'), + 'pr_number': pr.get('pr_number'), + 'repo_name': pr.get('repo_name'), + 'title': pr.get('pr_title', ''), + 'author': pr.get('author', ''), + 'created_at': pr.get('created_at', ''), + 'issues_count': issues_count, + 'error_count': error_count, + 'warning_count': warning_count, + 'total_issues': error_count + warning_count, + 'state': pr.get('state', '') + }) + + return jsonify(history) + except Exception as e: + logger.error(f'获取历史趋势失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + @app.route('/api/prs//merge', methods=['POST']) def api_merge_pr(pr_id): """合并 PR""" diff --git a/db.py b/db.py index 249b724..ce9d5ee 100644 --- a/db.py +++ b/db.py @@ -7,12 +7,17 @@ import sqlite3 import json import os -from datetime import datetime +from datetime import datetime, timezone, timedelta from typing import List, Dict, Any, Optional DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db') +def get_cst_now(): + """获取当前中国时区时间 (UTC+8)""" + return datetime.now(timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S') + + def get_db_connection(): """获取数据库连接""" os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) @@ -110,6 +115,7 @@ class PRScanDB: if existing: # 更新现有记录 + cst_time = get_cst_now() cursor.execute(''' UPDATE pr_scans SET pr_title = ?, @@ -123,7 +129,7 @@ class PRScanDB: security_issues = ?, ai_review = ?, report_path = ?, - updated_at = CURRENT_TIMESTAMP + updated_at = ? WHERE repo_name = ? AND pr_number = ? ''', ( pr_info.get('pr_title'), @@ -137,19 +143,22 @@ class PRScanDB: security_issues, json.dumps(scan_results.get('ai', {}), ensure_ascii=False), report_path, + cst_time, pr_info.get('repo_name'), pr_info.get('pr_number') )) scan_id = existing['id'] else: # 插入新记录 + cst_time = get_cst_now() cursor.execute(''' INSERT INTO pr_scans ( pr_number, repo_name, pr_title, pr_url, source_branch, target_branch, author, state, scan_status, scan_result, scan_details_with_code, - issues_count, security_issues, ai_review, report_path - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + issues_count, security_issues, ai_review, report_path, + created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( pr_info.get('pr_number'), pr_info.get('repo_name'), @@ -165,7 +174,9 @@ class PRScanDB: issues_count, security_issues, json.dumps(scan_results.get('ai', {}), ensure_ascii=False), - report_path + report_path, + cst_time, + cst_time )) scan_id = cursor.lastrowid @@ -239,23 +250,24 @@ class PRScanDB: """更新 PR 状态""" conn = get_db_connection() cursor = conn.cursor() + cst_time = get_cst_now() if state == 'merged': cursor.execute(''' UPDATE pr_scans SET state = ?, - merged_at = CURRENT_TIMESTAMP, + merged_at = ?, merged_by = ?, - updated_at = CURRENT_TIMESTAMP + updated_at = ? WHERE id = ? - ''', (state, merged_by, scan_id)) + ''', (state, cst_time, merged_by, cst_time, scan_id)) else: cursor.execute(''' UPDATE pr_scans SET state = ?, - updated_at = CURRENT_TIMESTAMP + updated_at = ? WHERE id = ? - ''', (state, scan_id)) + ''', (state, cst_time, scan_id)) conn.commit() conn.close() diff --git a/report/generator.py b/report/generator.py index 9a0597a..0bd0dc2 100644 --- a/report/generator.py +++ b/report/generator.py @@ -218,55 +218,66 @@ class ReportGenerator: lines.append(f' - {message}') lines.append('') - # AI 审查结果(单独展示) + # AI 审查结果(适配新格式:issues 列表) if 'ai' in scan_results: ai_result = scan_results['ai'] lines.append('') lines.append('## 🤖 AI 代码审查') lines.append('') - lines.append(ai_result.get('summary', '无 AI 审查结果')) + + # 新格式:直接使用 summary + if 'summary' in ai_result: + # summary 可能是字符串或 dict + summary = ai_result.get('summary', '') + if isinstance(summary, dict): + lines.append(f"发现 {summary.get('total', 0)} 个问题," + f"错误 {summary.get('error', 0)}," + f"警告 {summary.get('warning', 0)}," + f"提示 {summary.get('info', 0)}") + else: + lines.append(str(summary)) lines.append('') - reviews = ai_result.get('reviews', []) - if reviews: - for i, review in enumerate(reviews, 1): - file_name = review.get('file', 'unknown') - review_content = review.get('review', {}) + # 新格式:issues 列表 + ai_issues = ai_result.get('issues', []) + if ai_issues: + # 按文件分组 + issues_by_file = {} + for issue in ai_issues: + file_name = issue.get('file', 'unknown') + if file_name not in issues_by_file: + issues_by_file[file_name] = [] + issues_by_file[file_name].append(issue) + for file_name, issues in issues_by_file.items(): lines.append(f'### 📄 {file_name}') lines.append('') - # 优点 - advantages = review_content.get('优点', []) - if advantages: - lines.append('**✅ 代码优点:**') - for adv in advantages[:3]: - lines.append(f'- {adv}') - lines.append('') + for i, issue in enumerate(issues[:10], 1): + severity = issue.get('severity', 'Info') + severity_emoji = { + 'ERROR': '🔴', + 'WARNING': '🟡', + 'INFO': 'ℹ️' + }.get(severity.upper(), '⚪') - # 问题 - issues = review_content.get('问题', []) - if issues: - lines.append('**⚠️ 需要改进:**') - for issue in issues[:3]: - lines.append(f'- {issue}') - lines.append('') + line_num = issue.get('line', 0) + symbol = issue.get('symbol', '') + message = issue.get('message', 'No message') + code_context = issue.get('code_context', '') + defect_reason = issue.get('defect_reason', '') - # 优化建议 - optimizations = review_content.get('优化', []) - if optimizations: - lines.append('**💡 优化建议:**') - for opt in optimizations[:3]: - lines.append(f'- {opt}') - lines.append('') - - # 原始回复(如果不是 JSON 格式) - raw = review_content.get('raw_review') - if raw: - lines.append('**📝 AI 原始回复:**') - lines.append('```') - lines.append(raw[:500] + '...' if len(raw) > 500 else raw) - lines.append('```') + lines.append(f'{i}. {severity_emoji} **{severity}** - 行 {line_num}') + if symbol: + lines.append(f' - 标识: `{symbol}`') + lines.append(f' - 问题: {message}') + if code_context: + lines.append(' - 代码:') + lines.append('```') + lines.append(code_context) + lines.append('```') + if defect_reason: + lines.append(f' - 原因: {defect_reason}') lines.append('') # 添加报告链接或下一步操作 diff --git a/scanner/ai_reviewer.py b/scanner/ai_reviewer.py index 51c1bd8..171abfa 100644 --- a/scanner/ai_reviewer.py +++ b/scanner/ai_reviewer.py @@ -5,6 +5,7 @@ AI 代码审查器 使用大模型进行智能代码审查 """ import os +import re import json import logging from typing import Dict, Any, List, Optional @@ -73,15 +74,26 @@ class AIReviewer(BaseScanner): changed_files: 可选的变更文件列表(来自 PR) Returns: - 审查结果 + 审查结果(与 python_scanner.py 兼容的格式) """ + result = { + 'tool': 'AI Code Reviewer', + 'language': language, + 'status': 'success', + 'issues': [], + 'summary': { + 'total': 0, + 'error': 0, + 'warning': 0, + 'info': 0 + }, + 'files_scanned': 0 + } + if not self.enabled: - return { - 'enabled': False, - 'tool': 'AI Code Reviewer', - 'reviews': [], - 'summary': 'AI 审查已禁用' - } + result['status'] = 'disabled' + result['summary'] = 'AI 审查已禁用' + return result try: # 如果没有传入 clone_dir,需要克隆 @@ -89,52 +101,141 @@ class AIReviewer(BaseScanner): clone_dir = self.clone_repo(repo_url, commit_id, branch) if not clone_dir or not os.path.exists(clone_dir): - return { - 'enabled': True, - 'tool': 'AI Code Reviewer', - 'reviews': [], - 'summary': '无法获取代码目录' - } + result['status'] = 'error' + result['error'] = '无法获取代码目录' + return result # 获取要审查的代码文件 files = self._get_code_files(clone_dir, language, changed_files) if not files: - return { - 'enabled': True, - 'tool': 'AI Code Reviewer', - 'reviews': [], - 'summary': '未找到可审查的代码文件' - } + result['summary'] = '未找到可审查的代码文件' + return result # 对每个文件进行 AI 审查 - all_reviews = [] + all_issues = [] for file_path in files[:5]: # 限制最多审查 5 个文件 review = self._review_file(file_path, language, clone_dir) - if review: - all_reviews.append(review) + if review and review.get('issues'): + all_issues.extend(review['issues']) - # 生成总结 - summary = self._generate_summary(all_reviews) + result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues + result['summary'] = self._calculate_summary(all_issues) + result['files_scanned'] = len(files[:5]) + result['clone_dir'] = clone_dir - return { - 'enabled': True, - 'tool': 'AI Code Reviewer', - 'reviews': all_reviews, - 'summary': summary, - 'files_reviewed': len(all_reviews), - 'clone_dir': clone_dir # 返回 clone_dir 用于后续清理 - } + # 生成质量评分 + result['quality_score'] = self._calculate_quality_score(all_issues, files[:5]) + + return result except Exception as e: logger.error(f'AI 审查失败: {str(e)}') - return { - 'enabled': True, - 'tool': 'AI Code Reviewer', - 'error': str(e), - 'reviews': [], - 'summary': f'AI 审查出错: {str(e)}' + result['status'] = 'error' + result['error'] = str(e) + return result + + def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]: + """计算问题摘要""" + summary = { + 'total': len(issues), + 'error': 0, + 'warning': 0, + 'info': 0 + } + + for issue in issues: + severity = issue.get('severity', '').lower() + if severity in ['error', 'critical', 'fatal']: + summary['error'] += 1 + elif severity in ['warning', 'moderate']: + summary['warning'] += 1 + else: + summary['info'] += 1 + + return summary + + def _calculate_quality_score(self, issues: List[Dict], files: List[str]) -> Dict[str, Any]: + """ + 计算代码质量评分 + 返回:总分(0-100)及各维度评分 + """ + if not files: + return {'total': 100, 'maintainability': 100, 'security': 100, 'readability': 100, 'best_practices': 100} + + # 统计问题 + error_count = sum(1 for i in issues if i.get('severity', '').lower() in ['error', 'critical']) + warning_count = sum(1 for i in issues if i.get('severity', '').lower() == 'warning') + info_count = sum(1 for i in issues if i.get('severity', '').lower() == 'info') + + # 分类统计 + security_keywords = ['sql injection', 'xss', 'csrf', 'password', 'secret', 'token', '权限', '注入', '认证'] + security_issues = sum(1 for i in issues if any(k in (i.get('message', '') + i.get('symbol', '')).lower() for k in security_keywords)) + + # 计算各维度分数 + # 可维护性:基于错误和警告数量 + issue_weight = error_count * 5 + warning_count * 2 + info_count * 0.5 + maintainability = max(0, 100 - issue_weight) + + # 安全性:基于安全问题 + security_score = max(0, 100 - security_issues * 15) + + # 可读性:基于 info 级别问题(风格类) + readability = max(0, 100 - info_count * 3) + + # 最佳实践:基于 warning 级别 + best_practices = max(0, 100 - warning_count * 5) + + # 总分:加权平均 + total = int((maintainability * 0.3 + security_score * 0.35 + readability * 0.15 + best_practices * 0.2)) + + return { + 'total': total, + 'maintainability': maintainability, + 'security': security_score, + 'readability': readability, + 'best_practices': best_practices, + 'details': { + 'error_count': error_count, + 'warning_count': warning_count, + 'info_count': info_count, + 'security_issues': security_issues } + } + + def generate_fix_suggestion(self, file_path: str, line: int, message: str, code: str) -> Optional[str]: + """ + 对指定问题生成修复建议代码 + """ + prompt = f"""你是一位代码修复专家。请根据以下问题,生成修复后的代码。 + +问题描述:{message} +问题所在行号:{line} + +原始代码: +``` +{code} +``` + +请以 JSON 格式输出修复建议: +```json +{{ + "fixed_code": "修复后的完整代码或关键片段", + "explanation": "修复说明(50字以内)", + "confidence": "high/medium/low 修复把握度" +}} +``` + +如果无法修复,请返回:{{"fixed_code": "", "explanation": "无法自动修复", "confidence": "low"}}""" + + try: + response = self._call_ai(prompt) + if response and response.get('fixed_code'): + return response + except Exception as e: + logger.warning(f'生成修复建议失败: {e}') + + return None def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]: """获取代码文件列表""" @@ -174,6 +275,8 @@ class AIReviewer(BaseScanner): def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]: """审查单个文件""" + issues = [] + try: with open(file_path, 'r', encoding='utf-8') as f: code = f.read() @@ -186,22 +289,46 @@ class AIReviewer(BaseScanner): else: truncated = False - # 构建 prompt - prompt = self._build_prompt(code, language) + # 给代码加行号再发给模型,便于模型返回准确行号 + code_with_lines = self._code_with_line_numbers(code) + prompt = self._build_prompt(code_with_lines, language) # 调用 AI response = self._call_ai(prompt) - if not response: - return None - - # 解析响应 + # 获取相对路径 rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path + + if not response: + return { + 'file': rel_path, + 'path': file_path, + 'truncated': truncated, + 'issues': [] + } + + # 解析 AI 响应,转换为标准 issues 格式,并校正行号 + ai_issues = response.get('issues', []) + for issue in ai_issues: + self._correct_issue_line(issue, code) + issues.append({ + 'tool': 'ai_reviewer', + 'type': issue.get('type', 'info'), + 'severity': issue.get('severity', 'Info'), + 'message': issue.get('message', ''), + 'file': rel_path, + 'line': issue.get('line', 0), + 'column': issue.get('column', 0), + 'symbol': issue.get('symbol', ''), + 'code_context': issue.get('code_context', ''), + 'defect_reason': issue.get('defect_reason', '') + }) + return { 'file': rel_path, 'path': file_path, 'truncated': truncated, - 'review': response + 'issues': issues } except Exception as e: @@ -217,29 +344,83 @@ class AIReviewer(BaseScanner): else: lang_name = language - prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,并给出: + prompt = f"""你是一位资深的 {lang_name} 代码审查专家。请审查以下代码,找出潜在的问题和缺陷。 -1. **代码优点** - 写得好地方 -2. **问题建议** - 需要改进的地方 -3. **优化建议** - 如何让代码更好 - -请用中文回复,保持简洁,每个文件审查不超过 3 点建议。 - -以下是代码: -```{language} -{code} -``` - -请以 JSON 格式输出: +请以 JSON 格式输出审查结果,必须包含以下字段: ```json {{ - "优点": ["..."], - "问题": ["..."], - "优化": ["..."] + "issues": [ + {{ + "line": 行号, + "column": 列号, + "message": "问题描述", + "type": "error/warning/info 之一", + "severity": "Error/Warning/Info 之一", + "symbol": "错误标识符如 unused-variable, syntax-error 等", + "code_context": "问题代码的上下文(包含问题的那行或几行代码)", + "defect_reason": "缺陷原因分析(30字以内简洁描述)" + }} + ] }} +``` + +注意: +1. line 和 column 是问题所在的行号和列号(从 1 开始) +2. type: error=错误, warning=警告, info=信息 +3. severity: Error=严重, Warning=一般, Info=提示 +4. code_context: 包含问题代码的那一行或相邻的几行 +5. defect_reason: 精简描述,30字以内,说明问题原因和风险 + +如果代码没有问题,返回空数组: {{"issues": []}} + +重要:以下代码每行前已标注行号(格式为 "行号|"),请根据问题实际出现的代码行,严格使用该行前的行号填写 issues 中的 line 字段,不要猜测或使用错误行号。 + +以下是待审查的代码(行号已标注): +```{language} +{code} ```""" return prompt + def _code_with_line_numbers(self, code: str) -> str: + """给代码每行前加上行号,便于模型返回准确行号""" + lines = code.split('\n') + width = len(str(len(lines))) + return '\n'.join(f'{i:>{width}}| {line}' for i, line in enumerate(lines, 1)) + + def _correct_issue_line(self, issue: Dict[str, Any], code: str) -> None: + """ + 根据 message/symbol 在源码中搜索,尽量把 issue 的 line 校正到真实出现位置。 + AI 返回的行号常不准确,通过匹配问题相关的标识符(如 'unused_module')修正行号。 + """ + line = issue.get('line') + if not line or not code: + return + lines = code.split('\n') + if line < 1 or line > len(lines): + return + + # 从 message 中提取被引用的标识符(如 'unused_module' -> unused_module) + message = (issue.get('message') or '') + symbol = (issue.get('symbol') or '').strip() + candidates = [] + if symbol: + candidates.append(symbol) + for m in re.finditer(r"['\"]([a-zA-Z_][a-zA-Z0-9_]*)['\"]", message or ''): + candidates.append(m.group(1)) + # 若 message 里没有引号标识符,取首段英文/数字/下划线作为关键词 + if not candidates: + first_word = re.search(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', message) + if first_word: + candidates.append(first_word.group(1)) + + for token in candidates: + if not token: + continue + for i, code_line in enumerate(lines): + if token in code_line: + issue['line'] = i + 1 + return + def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]: """调用 AI 服务""" try: @@ -255,6 +436,87 @@ class AIReviewer(BaseScanner): logger.error(f'AI 调用失败: {str(e)}') return None + def _extract_json_obj(self, content: Any) -> Optional[Dict[str, Any]]: + """ + 从模型输出中尽可能提取 JSON 对象(dict)。 + + 兼容场景: + - content 已经是 dict + - content 是 JSON 字符串 + - content 被 ```json ... ``` 或 ``` ... ``` 包裹 + - content 前后夹杂说明文字,只要包含一个最外层 { ... } 就尝试解析 + """ + if content is None: + logger.debug("_extract_json_obj: content is None") + return None + + # 如果已经是 dict,直接返回 + if isinstance(content, dict): + logger.debug("_extract_json_obj: content is already dict") + return content + + if not isinstance(content, str): + content = str(content) + + text = content.strip() + logger.debug(f"_extract_json_obj: 原始内容长度 = {len(text)}") + logger.debug(f"_extract_json_obj: 原始内容前100字符: {text[:100]}") + + # 去掉代码块包裹(兼容 ```json / ``` json / ```JSON 等) + lowered = text.lower() + fence_start = lowered.find('```') + if fence_start != -1: + logger.debug(f"_extract_json_obj: 发现代码块 fence_start={fence_start}") + # 找到第一段 fence + after = text[fence_start + 3:] + after_l = after.lower() + # 如果 fence 后紧跟语言标识(json 或其他),跳过这一行直到换行 + newline_idx = after.find('\n') + if newline_idx != -1: + lang_header = after_l[:newline_idx].strip() + logger.debug(f"_extract_json_obj: 语言标识: {lang_header}") + body = after[newline_idx + 1:] + # 截取到下一个 fence 结束 + end_idx = body.lower().find('```') + if end_idx != -1: + candidate = body[:end_idx].strip() + else: + # 没有结束 fence,直接用 body 作为候选(可能是截断的 JSON) + candidate = body.strip() + # 只有在确实像 json 的情况下才替换,避免误伤普通文本 + if '{' in candidate and '}' in candidate: + text = candidate + logger.debug(f"_extract_json_obj: 提取代码块内容成功,长度={len(text)}") + else: + # 没有换行就按旧逻辑尽量截取 + pass + + # 直接解析 + try: + obj = json.loads(text) + logger.debug("_extract_json_obj: 直接解析成功") + return obj if isinstance(obj, dict) else None + except Exception as e: + logger.debug(f"_extract_json_obj: 直接解析失败: {e}") + + # 兜底:截取最外层 { ... } 再解析 + start = text.find('{') + end = text.rfind('}') + logger.debug(f"_extract_json_obj: 查找大括号 start={start}, end={end}") + if start != -1 and end != -1 and end > start: + candidate = text[start:end + 1].strip() + logger.debug(f"_extract_json_obj: 候选内容长度={len(candidate)}, 前50字符: {candidate[:50]}") + try: + obj = json.loads(candidate) + logger.debug("_extract_json_obj: 兜底解析成功") + return obj if isinstance(obj, dict) else None + except Exception as e: + logger.debug(f"_extract_json_obj: 兜底解析失败: {e}") + return None + + logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象") + return None + def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]: """调用 Ollama 本地模型""" import requests @@ -267,24 +529,16 @@ class AIReviewer(BaseScanner): "format": "json" } + logger.info(f"调用 Ollama: {url}, model={self.model}") response = requests.post(url, json=payload, timeout=120) if response.status_code == 200: result = response.json() content = result.get('response', '') - - # 尝试解析 JSON - try: - # 提取 JSON 部分 - if '```json' in content: - content = content.split('```json')[1].split('```')[0] - elif '```' in content: - content = content.split('```')[1].split('```')[0] - - return json.loads(content.strip()) - except json.JSONDecodeError: - # 如果不是 JSON,直接返回文本 - return {'raw_review': content} + logger.info(f"Ollama 返回内容长度: {len(content) if content else 0}") + logger.debug(f"Ollama 返回内容预览: {content[:200] if content else 'empty'}") + parsed = self._extract_json_obj(content) + return parsed logger.warning(f'Ollama 返回错误: {response.status_code}') return None @@ -306,7 +560,7 @@ class AIReviewer(BaseScanner): payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], - "max_tokens": 1024, + "max_tokens": 1024*5, "temperature": 0.7 } elif 'deepseek' in self.api_url: @@ -314,7 +568,7 @@ class AIReviewer(BaseScanner): payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], - "max_tokens": 1024, + "max_tokens": 1024*5, "temperature": 0.7 } else: @@ -322,7 +576,7 @@ class AIReviewer(BaseScanner): payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], - "max_tokens": 1024, + "max_tokens": 1024*5, "temperature": 0.7 } @@ -331,34 +585,8 @@ class AIReviewer(BaseScanner): if response.status_code == 200: result = response.json() content = result['choices'][0]['message']['content'] - - try: - if '```json' in content: - content = content.split('```json')[1].split('```')[0] - elif '```' in content: - content = content.split('```')[1].split('```')[0] - - return json.loads(content.strip()) - except json.JSONDecodeError: - return {'raw_review': content} + parsed = self._extract_json_obj(content) + return parsed logger.warning(f'API 返回错误: {response.status_code}') return None - - def _generate_summary(self, reviews: List[Dict[str, Any]]) -> str: - """生成审查总结""" - if not reviews: - return '未找到需要审查的代码' - - total_issues = sum( - len(r.get('review', {}).get('问题', [])) + - len(r.get('review', {}).get('优化', [])) - for r in reviews - ) - - files_count = len(reviews) - - if total_issues == 0: - return f'✅ AI 审查通过!审查了 {files_count} 个文件,未发现问题' - - return f'🤖 AI 审查了 {files_count} 个文件,发现 {total_issues} 个改进建议' diff --git a/scanner/diff_parser.py b/scanner/diff_parser.py index 440e423..b0e000e 100644 --- a/scanner/diff_parser.py +++ b/scanner/diff_parser.py @@ -135,18 +135,15 @@ def merge_issues_with_code(scan_results: Dict[str, Any], diff: str) -> Dict[str, def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]: - """将 AI 审查结果转换为问题格式""" + """将 AI 审查结果(issues 格式)转换为统一问题格式""" issues = [] + ai_issues = ai_result.get('issues', []) - reviews = ai_result.get('reviews', []) - for review in reviews: - file_path = review.get('file', '') - review_data = review.get('review', {}) - - if not review_data: + for issue in ai_issues: + file_path = issue.get('file', '') + if not file_path: continue - # 获取文件内容作为代码上下文 code_context = None if parser: matched_path = None @@ -154,51 +151,28 @@ def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[Dif if file_path.endswith(path) or path.endswith(file_path) or file_path in path: matched_path = path break - if matched_path: chunk = parser.get_file_content(matched_path) if chunk and chunk.new_content: lines = chunk.new_content.split('\n')[:10] code_context = { 'file': matched_path, - 'line': 1, + 'line': issue.get('line', 1), 'preview': '\n'.join(lines), 'has_more': len(chunk.new_content.split('\n')) > 10 } - # 处理优点(不作为问题显示) - advantages = review_data.get('优点', []) - # 处理问题 - problems = review_data.get('问题', []) - for idx, problem in enumerate(problems): - issues.append({ - 'file': file_path, - 'line': 1, # AI 审查不返回具体行号 - 'severity': 'warning', - 'message': f'[AI 建议] {problem}', - 'category': 'ai', - 'code_context': code_context, - 'review_data': { - 'type': '问题', - 'content': problem - } - }) - - # 处理优化建议 - optimizations = review_data.get('优化', []) - for optimization in optimizations: - issues.append({ - 'file': file_path, - 'line': 1, - 'severity': 'info', - 'message': f'[AI 优化] {optimization}', - 'category': 'ai', - 'code_context': code_context, - 'review_data': { - 'type': '优化', - 'content': optimization - } - }) + sev = issue.get('severity', 'warning') + sev = sev.lower() if isinstance(sev, str) else 'warning' + issues.append({ + 'file': file_path, + 'line': issue.get('line', 1), + 'severity': sev, + 'message': issue.get('message', ''), + 'category': 'ai', + 'code_context': code_context, + 'defect_reason': issue.get('defect_reason', '') + }) return issues diff --git a/web/index.html b/web/index.html index 78bb2ce..0143622 100644 --- a/web/index.html +++ b/web/index.html @@ -6,6 +6,7 @@ PR 扫描管理平台 +