#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import time import logging import traceback from typing import Dict, Tuple, Any import json os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0') from flask import Flask, request, jsonify, send_from_directory import yaml from webhook.handler import GiteaWebhookHandler from scanner.python_scanner import PythonScanner from scanner.js_scanner import JavaScriptScanner from scanner.security_scanner import SecurityScanner from scanner.ai_reviewer import AIReviewer from scanner.diff_parser import merge_issues_with_code from report.generator import ReportGenerator from notify.feishu import FeishuNotifier from gitea_client import GiteaClient from db import PRScanDB # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 加载配置 def load_config(): """加载配置文件""" config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') with open(config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) # 全局配置 config = load_config() # 初始化应用 app = Flask(__name__) app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key') # 初始化组件 webhook_handler = GiteaWebhookHandler(config['gitea']) python_scanner = PythonScanner(config.get('scanner', {})) js_scanner = JavaScriptScanner(config.get('scanner', {})) security_scanner = SecurityScanner(config.get('scanner', {})) ai_reviewer = AIReviewer(config.get('ai', {})) report_generator = ReportGenerator(config.get('report', {})) feishu_notifier = FeishuNotifier(config['feishu']) gitea_client = GiteaClient(config['gitea']) @app.route('/') def index(): """健康检查接口""" return jsonify({ 'status': 'ok', 'service': 'AI Code Quality Scanner', 'version': '1.0.0' }) @app.route('/webhook/gitea', methods=['POST']) def handle_gitea_webhook(): """处理 Gitea Webhook 请求""" try: # 验证签名 signature = request.headers.get('X-Gitea-Signature') if signature: if not webhook_handler.verify_signature( request.data, signature, config['gitea']['webhook_secret'] ): logger.warning('Webhook 签名验证失败') return jsonify({'error': 'Invalid signature'}), 401 # 解析 Webhook payload payload = request.json if not payload: return jsonify({'error': 'No payload'}), 400 event_type = request.headers.get('X-Gitea-Event', 'push') logger.info(f'收到 Gitea Webhook 事件: {event_type}') # 处理 Pull Request 事件 if event_type == 'pull_request': return handle_pull_request(payload) # 处理 Push 事件 if event_type != 'push': return jsonify({'message': 'Event ignored'}), 200 # 提取提交信息 commits = payload.get('commits', []) if not commits: return jsonify({'message': 'No commits'}), 200 repo = payload.get('repository', {}) repo_name = repo.get('full_name', 'unknown') branch = payload.get('ref', '').replace('refs/heads/', '') pusher = payload.get('pusher', {}).get('name', 'unknown') logger.info(f'处理仓库 {repo_name} 的 {len(commits)} 个提交') # 处理每个提交 for commit in commits: commit_id = commit.get('id', '')[:8] commit_message = commit.get('message', '') author = commit.get('author', {}).get('name', 'unknown') logger.info(f'扫描提交 {commit_id}: {commit_message}') try: # 获取仓库 URL clone_url = repo.get('clone_url') if not clone_url: # 尝试从 web_url 构建 web_url = repo.get('web_url', '') if web_url: clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://') clone_url = clone_url.rstrip('/') + '.git' # 执行代码扫描 scan_results = {} # Python 扫描 if 'python' in config.get('scanner', {}).get('languages', []): start_time = time.time() scan_results['python'] = python_scanner.scan( clone_url, commit_id, branch ) logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒") # JavaScript/TypeScript 扫描 if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): start_time = time.time() scan_results['javascript'] = js_scanner.scan( clone_url, commit_id, branch ) logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒") # 安全扫描 start_time = time.time() scan_results['security'] = security_scanner.scan( clone_url, commit_id, branch ) logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒") # 生成报告 report = report_generator.generate( repo_name=repo_name, branch=branch, commit_id=commit_id, commit_message=commit_message, author=author, scan_results=scan_results ) # 发送飞书通知 feishu_notifier.send_report(report) logger.info(f'提交 {commit_id} 扫描完成') except Exception as e: logger.error(f'扫描提交 {commit_id} 失败: {str(e)}') # 继续处理其他提交 continue return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200 except Exception as e: logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True) return jsonify({'error': str(e)}), 500 def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: """ 处理 Pull Request 事件 """ try: # 解析 PR 事件 pr_info = webhook_handler.parse_pull_request_event(payload) if not pr_info: logger.info('PR 事件不需要处理(如关闭、合并等)') return jsonify({'message': 'PR event ignored'}), 200 repo_name = pr_info['repo_name'] source_branch = pr_info['source_branch'] source_sha = pr_info['source_sha'] pr_number = pr_info['pr_number'] pr_title = pr_info['pr_title'] pr_url = pr_info['pr_url'] target_branch = pr_info['target_branch'] author = pr_info['author'] logger.info(f'处理 PR #{pr_number}: {pr_title} ({source_branch} -> {target_branch})') logger.info(f'扫描 PR 分支: {source_branch}, commit: {source_sha}') try: # 获取仓库 URL repo = payload.get('repository', {}) clone_url = repo.get('clone_url') if not clone_url: web_url = repo.get('web_url', '') if web_url: clone_url = web_url.rstrip('/') + '.git' # 获取 PR 中变更的文件列表 changed_files = [] try: if '/' in repo_name: repo_owner, repo_name_only = repo_name.split('/', 1) else: repo_owner = 'Bosch_Demo' repo_name_only = repo_name pr_files = gitea_client.get_pull_request_files(repo_owner, repo_name_only, pr_number) if pr_files: changed_files = [f.get('filename', '') for f in pr_files if f.get('filename')] logger.info(f"获取到 PR #{pr_number} 的变更文件: {changed_files}") except Exception as e: logger.warning(f"获取 PR 文件列表失败: {e}") # 执行代码扫描 scan_results = {} # Python 扫描 if 'python' in config.get('scanner', {}).get('languages', []): start_time = time.time() scan_results['python'] = python_scanner.scan( clone_url, source_sha, source_branch, changed_files ) logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒") # JavaScript/TypeScript 扫描 if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): start_time = time.time() scan_results['javascript'] = js_scanner.scan( clone_url, source_sha, source_branch, changed_files ) logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒") # 安全扫描 start_time = time.time() scan_results['security'] = security_scanner.scan( clone_url, source_sha, source_branch, changed_files ) logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒") # AI 代码审查 if config.get('ai', {}).get('enabled', False): start_time = time.time() scan_results['ai'] = ai_reviewer.scan( clone_url, source_sha, source_branch, changed_files ) logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}秒") # 获取 PR 的代码差异,用于将问题与代码片段关联 pr_diff = None try: pr_diff = gitea_client.get_pull_request_diff(repo_owner, repo_name_only, pr_number) logger.info(f"已获取 PR #{pr_number} 的 diff,长度: {len(pr_diff) if pr_diff else 0}") except Exception as e: logger.warning(f"获取 PR diff 失败: {e}") # 将问题与代码片段关联 scan_details_with_code = merge_issues_with_code(scan_results, pr_diff or '') logger.info(f"[DEBUG] scan_results keys: {list(scan_results.keys())}") for k, v in scan_results.items(): if isinstance(v, dict): issues_cnt = len(v.get('issues', [])) logger.info(f"[DEBUG] scan_results['{k}'] issues count: {issues_cnt}") logger.info(f"[DEBUG] scan_details_with_code scanners: {[s.get('name') for s in scan_details_with_code.get('scanners', [])] if scan_details_with_code else 'None'}") # 生成报告 commit_message = f'PR #{pr_number}: {pr_title}' report = report_generator.generate( repo_name=repo_name, branch=source_branch, commit_id=source_sha, commit_message=commit_message, author=author, scan_results=scan_results, pr_url=pr_url, target_branch=target_branch, pr_number=pr_number ) # 发送飞书通知 feishu_notifier.send_report(report) # 保存扫描结果到数据库 pr_info_for_db = { 'repo_name': repo_name, 'pr_number': pr_number, 'pr_title': pr_title, 'pr_url': pr_url, 'source_branch': source_branch, 'target_branch': target_branch, 'author': author } PRScanDB.save_pr_scan(pr_info_for_db, scan_results, report.get('file_path'), scan_details_with_code) logger.info(f'PR #{pr_number} 扫描完成') except Exception as e: traceback.print_exc() logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}') return jsonify({'error': str(e)}), 500 return jsonify({'status': 'ok', 'message': 'PR scan completed'}), 200 except Exception as e: logger.error(f'处理 PR Webhook 失败: {str(e)}', exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/scan/manual', methods=['POST']) def manual_scan(): """手动触发扫描接口""" try: data = request.json repo_url = data.get('repo_url') branch = data.get('branch', 'main') commit_id = data.get('commit_id') if not repo_url: return jsonify({'error': 'repo_url is required'}), 400 # 执行扫描 scan_results = {} if 'python' in config.get('scanner', {}).get('languages', []): start_time = time.time() scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch) logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒") if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): start_time = time.time() scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch) logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒") start_time = time.time() scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch) logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒") # 生成报告 report = report_generator.generate( repo_name=repo_url.split('/')[-1].replace('.git', ''), branch=branch, commit_id=commit_id or 'manual', commit_message='Manual scan', author='manual', scan_results=scan_results ) # 发送飞书通知 feishu_notifier.send_report(report) return jsonify({ 'status': 'ok', 'report': report }), 200 except Exception as e: logger.error(f'手动扫描失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/feishu/card_action', methods=['POST']) def handle_feishu_card_action(): """处理飞书卡片按钮点击事件""" try: payload = request.json logger.info(f'收到飞书卡片回调: {payload}') # 处理 URL 验证请求 challenge = payload.get('challenge') if challenge: logger.info('处理 URL 验证请求') return jsonify({'challenge': challenge}), 200 # 解析回调数据 action_data = payload.get('action', {}) if not action_data: action_data = payload.get('value', {}) action_type = action_data.get('action') owner = action_data.get('owner') repo = action_data.get('repo') pr_number = action_data.get('pr_number') pr_url = action_data.get('pr_url') if not all([action_type, owner, repo, pr_number]): logger.error('卡片回调数据不完整') return jsonify({'error': 'Missing required parameters'}), 400 logger.info(f'执行操作: {action_type}, PR: {owner}/{repo}#{pr_number}') # 执行对应操作 if action_type == 'merge': success = gitea_client.merge_pull_request( owner=owner, repo=repo, pr_number=int(pr_number), merge_message=f'通过飞书机器人合并 PR #{pr_number}' ) result_message = '✅ **已合并 PR**' if success else '❌ **合并失败**' elif action_type == 'close': success = gitea_client.close_pull_request( owner=owner, repo=repo, pr_number=int(pr_number) ) result_message = '✅ **已关闭 PR(取消合并)**' if success else '❌ **关闭失败**' else: result_message = f'⚠️ **未知操作: {action_type}**' # 发送操作结果到飞书 result_text = f"{result_message}\n\n**PR:** {owner}/{repo}#{pr_number}\n**链接:** [查看PR]({pr_url})" feishu_notifier.send_simple_message('PR 操作结果', result_text) return jsonify({'status': 'ok', 'message': result_message}), 200 except Exception as e: logger.error(f'处理飞书卡片回调失败: {str(e)}', exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/feishu/webhook', methods=['POST']) def handle_feishu_webhook(): """处理飞书开放平台的验证回调""" try: payload = request.json # 处理验证请求 challenge = payload.get('challenge') if challenge: return jsonify({'challenge': challenge}), 200 # 处理消息事件 event_type = payload.get('type') if event_type == 'url_verification': return jsonify({'challenge': payload.get('challenge')}), 200 logger.info(f'收到飞书事件: {event_type}') return jsonify({'status': 'ok'}), 200 except Exception as e: logger.error(f'处理飞书 Webhook 失败: {str(e)}') return jsonify({'error': str(e)}), 500 # ============================================ # 扫描管理平台 API # ============================================ @app.route('/api/prs') def api_get_prs(): """获取所有 PR 列表""" try: state = request.args.get('state') prs = PRScanDB.get_all_prs(state=state) # 转换 scan_result 字符串为对象 for pr in prs: if pr.get('scan_result') and isinstance(pr['scan_result'], str): try: pr['scan_result'] = json.loads(pr['scan_result']) except: pass if pr.get('ai_review') and isinstance(pr['ai_review'], str): try: pr['ai_review'] = json.loads(pr['ai_review']) except: pass return jsonify(prs) except Exception as e: logger.error(f'获取 PR 列表失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs/') def api_get_pr(pr_id): """获取单个 PR 详情""" try: pr = PRScanDB.get_pr_by_id(pr_id) if not pr: return jsonify({'error': 'PR not found'}), 404 # 转换 JSON 字段 if pr.get('scan_result') and isinstance(pr['scan_result'], str): try: pr['scan_result'] = json.loads(pr['scan_result']) except: pass if pr.get('ai_review') and isinstance(pr['ai_review'], str): try: pr['ai_review'] = json.loads(pr['ai_review']) except: pass # 返回带代码片段的扫描详情 if pr.get('scan_details_with_code') and isinstance(pr['scan_details_with_code'], str): try: pr['scan_details_with_code'] = json.loads(pr['scan_details_with_code']) except: pass return jsonify(pr) except Exception as e: logger.error(f'获取 PR 详情失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs//diff') def api_get_pr_diff(pr_id): """获取 PR 的代码差异""" try: pr = PRScanDB.get_pr_by_id(pr_id) if not pr: return jsonify({'error': 'PR not found'}), 404 repo_name = pr.get('repo_name', '') pr_number = pr.get('pr_number', 0) if not repo_name or not pr_number: return jsonify({'error': 'PR 信息不完整'}), 400 # 解析 owner 和 repo if '/' in repo_name: owner, repo = repo_name.split('/', 1) else: owner = 'Bosch_Demo' # 默认 repo = repo_name logger.info(f"获取 PR #{pr_number} ({owner}/{repo}) 的 diff") # 获取 diff diff = gitea_client.get_pull_request_diff(owner, repo, pr_number) if diff is None: return jsonify({'error': '获取 diff 失败'}), 500 return jsonify({ 'diff': diff, 'pr_number': pr_number, 'repo_name': repo_name }) except Exception as e: logger.error(f'获取 PR diff 失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs//files') def api_get_pr_files(pr_id): """获取 PR 变更文件列表(用于左侧树状展示)""" try: pr = PRScanDB.get_pr_by_id(pr_id) if not pr: return jsonify({'error': 'PR not found'}), 404 repo_name = pr.get('repo_name', '') pr_number = pr.get('pr_number', 0) if not repo_name or not pr_number: return jsonify({'error': 'PR 信息不完整'}), 400 if '/' in repo_name: owner, repo = repo_name.split('/', 1) else: owner, repo = 'Bosch_Demo', repo_name files = gitea_client.get_pull_request_files(owner, repo, pr_number) if files is None: return jsonify({'error': '获取文件列表失败'}), 500 return jsonify({'files': files, 'repo_name': repo_name}) except Exception as e: logger.error(f'获取 PR 文件列表失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs//file') def api_get_pr_file_content(pr_id): """获取 PR 中某文件在源分支上的完整内容""" try: path = request.args.get('path') if not path: return jsonify({'error': '缺少 path 参数'}), 400 pr = PRScanDB.get_pr_by_id(pr_id) if not pr: return jsonify({'error': 'PR not found'}), 404 repo_name = pr.get('repo_name', '') pr_number = pr.get('pr_number', 0) if not repo_name or not pr_number: return jsonify({'error': 'PR 信息不完整'}), 400 if '/' in repo_name: owner, repo = repo_name.split('/', 1) else: owner, repo = 'Bosch_Demo', repo_name pr_info = gitea_client.get_pull_request(owner, repo, pr_number) if not pr_info: return jsonify({'error': '获取 PR 信息失败'}), 500 head_ref = pr_info.get('head', {}).get('ref') or pr_info.get('head_branch') or pr.get('source_branch') if not head_ref: return jsonify({'error': '无法确定源分支'}), 400 content = gitea_client.get_file_contents(owner, repo, path, head_ref) if content is None: return jsonify({'error': '文件不存在或无法读取'}), 404 # 获取该文件的扫描问题(PR 创建时已扫描并存入 scan_details_with_code) scan_issues = [] path_norm = path.replace('\\', '/').strip() logger.info(f"[DEBUG] 请求文件: path_norm={path_norm}") scan_details = pr.get('scan_details_with_code') if isinstance(scan_details, str): try: scan_details = json.loads(scan_details) except Exception: scan_details = None if scan_details: logger.info(f"[DEBUG] scan_details keys: {list(scan_details.keys()) if isinstance(scan_details, dict) else 'not dict'}") if scan_details.get('scanners'): logger.info(f"[DEBUG] scanners count: {len(scan_details['scanners'])}") for scanner in scan_details['scanners']: scanner_name = scanner.get('name', '') issues_count = len(scanner.get('issues', [])) logger.info(f"[DEBUG] scanner={scanner_name}, issues_count={issues_count}") # 打印前几个 issue 的 file 看看 for idx, issue in enumerate(scanner.get('issues', [])[:3]): logger.info(f"[DEBUG] issue[{idx}] file={issue.get('file')}, line={issue.get('line')}") if scan_details and scan_details.get('scanners'): for scanner in scan_details['scanners']: for issue in scanner.get('issues', []): issue_file = (issue.get('file') or '').replace('\\', '/').strip() if not issue_file: continue # 匹配:精确相等或一端包含另一端(兼容 basename 或完整路径) if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm): logger.info(f"[DEBUG] 匹配成功: issue_file={issue_file}, path_norm={path_norm}") sev = (issue.get('severity') or 'info') if isinstance(sev, str): sev = sev.lower() scanner_name = scanner.get('name', '') scanner_display = {'python': 'Python', 'javascript': 'JavaScript', 'security': 'Security'}.get(scanner_name, scanner_name) scan_issues.append({ 'scanner': scanner_display, 'severity': sev, 'line': int(issue.get('line') or 0), 'message': (issue.get('message') or issue.get('description') or '').strip(), 'code_context': issue.get('code_context') }) logger.info(f"[DEBUG] 最终 scan_issues count: {len(scan_issues)}") # 获取 AI 审查结果 ai_issues = [] if scan_details and scan_details.get('ai'): ai_data = scan_details['ai'] for issue in ai_data.get('issues', []): issue_file = (issue.get('file') or '').replace('\\', '/').strip() if not issue_file: continue # 匹配:精确相等或一端包含另一端 if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm): ai_issues.append({ 'scanner': 'AI', 'severity': issue.get('severity', 'info'), 'line': int(issue.get('line') or 1), 'message': issue.get('message', ''), 'category': 'ai', 'code_context': issue.get('code_context') }) logger.info(f"[DEBUG] AI issues count: {len(ai_issues)}") # 合并静态扫描问题和 AI 问题 all_issues = scan_issues + ai_issues return jsonify({'path': path, 'content': content, 'scan_issues': all_issues}) except Exception as e: logger.error(f'获取文件内容失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs//quality') def api_get_quality_score(pr_id): """获取 PR 的代码质量评分""" try: pr = PRScanDB.get_pr_by_id(pr_id) if not pr: return jsonify({'error': 'PR not found'}), 404 # 从 scan_result 中获取质量评分 scan_result = pr.get('scan_result') if isinstance(scan_result, str): try: scan_result = json.loads(scan_result) except: scan_result = None quality_score = None if scan_result and scan_result.get('ai'): quality_score = scan_result['ai'].get('quality_score') if not quality_score: return jsonify({'error': '暂无质量评分'}), 404 return jsonify(quality_score) except Exception as e: logger.error(f'获取质量评分失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs//stats') def api_get_issue_stats(pr_id): """获取 PR 的问题统计""" try: pr = PRScanDB.get_pr_by_id(pr_id) if not pr: return jsonify({'error': 'PR not found'}), 404 # 获取 scan_details_with_code scan_details = pr.get('scan_details_with_code') if isinstance(scan_details, str): try: scan_details = json.loads(scan_details) except: scan_details = None if not scan_details: return jsonify({'error': '暂无扫描详情'}), 404 # 统计各扫描器的问题 stats = { 'by_severity': {'error': 0, 'warning': 0, 'info': 0}, 'by_scanner': {}, 'total': 0 } # 统计静态扫描器 for scanner in scan_details.get('scanners', []): scanner_name = scanner.get('name', 'unknown') scanner_issues = scanner.get('issues', []) stats['by_scanner'][scanner_name] = len(scanner_issues) for issue in scanner_issues: sev = (issue.get('severity') or 'info').lower() if sev in stats['by_severity']: stats['by_severity'][sev] += 1 stats['total'] += 1 # 统计 AI 扫描器 ai_data = scan_details.get('ai', {}) if ai_data: ai_issues = ai_data.get('issues', []) stats['by_scanner']['AI'] = len(ai_issues) for issue in ai_issues: sev = (issue.get('severity') or 'info').lower() if sev in stats['by_severity']: stats['by_severity'][sev] += 1 stats['total'] += 1 return jsonify(stats) except Exception as e: logger.error(f'获取问题统计失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs//fix', methods=['POST']) def api_generate_fix(pr_id): """生成问题修复建议""" try: data = request.get_json() if not data: return jsonify({'error': '请求体为空'}), 400 file_path = data.get('file') line = data.get('line', 1) message = data.get('message', '') code = data.get('code', '') if not file_path or not message: return jsonify({'error': '缺少必要参数'}), 400 # 调用 AI 生成修复建议 fix_result = ai_reviewer.generate_fix_suggestion(file_path, line, message, code) if fix_result: return jsonify(fix_result) else: return jsonify({'error': '生成修复建议失败'}), 500 except Exception as e: logger.error(f'生成修复建议失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs/history') def api_get_pr_history(): """获取 PR 扫描历史趋势""" try: limit = request.args.get('limit', 20, type=int) repo_name = request.args.get('repo_name', '') # 获取 PR 列表 prs = PRScanDB.get_all_prs(status='completed') if repo_name: prs = [p for p in prs if p.get('repo_name') == repo_name] # 只取最近的 N 个 prs = prs[:limit] # 构建趋势数据 history = [] for pr in reversed(prs): # 从旧到新 issues_count = pr.get('issues_count', 0) # 从 scan_result 中各扫描器汇总 error/warning 数量 scan_result = pr.get('scan_result') if isinstance(scan_result, str): try: scan_result = json.loads(scan_result) except: scan_result = None error_count = 0 warning_count = 0 if scan_result and isinstance(scan_result, dict): # 遍历各扫描器,汇总 error 和 warning for scanner_name, scanner_result in scan_result.items(): if isinstance(scanner_result, dict): summary = scanner_result.get('summary', {}) if isinstance(summary, dict): error_count += summary.get('error', 0) warning_count += summary.get('warning', 0) history.append({ 'pr_id': pr.get('id'), 'pr_number': pr.get('pr_number'), 'repo_name': pr.get('repo_name'), 'title': pr.get('pr_title', ''), 'author': pr.get('author', ''), 'created_at': pr.get('created_at', ''), 'issues_count': issues_count, 'error_count': error_count, 'warning_count': warning_count, 'total_issues': error_count + warning_count, 'state': pr.get('state', '') }) return jsonify(history) except Exception as e: logger.error(f'获取历史趋势失败: {str(e)}') return jsonify({'error': str(e)}), 500 @app.route('/api/prs//merge', methods=['POST']) def api_merge_pr(pr_id): """合并 PR""" try: pr = PRScanDB.get_pr_by_id(pr_id) if not pr: return jsonify({'success': False, 'message': 'PR not found'}), 404 logger.info(f"合并 PR - 数据库记录: {pr}") if pr['state'] != 'open': return jsonify({'success': False, 'message': 'PR 状态不是 open'}), 400 # 解析仓库名 repo_name = pr['repo_name'] logger.info(f"仓库名称: {repo_name}") if '/' in repo_name: owner, repo = repo_name.split('/') else: owner = '' repo = repo_name logger.info(f"owner: {owner}, repo: {repo}, pr_number: {pr['pr_number']}") # 先检查 PR 状态 pr_info = gitea_client.get_pull_request(owner, repo, pr['pr_number']) if not pr_info: return jsonify({'success': False, 'message': '无法获取 PR 信息,请检查仓库名称是否正确'}), 400 logger.info(f"PR 信息: state={pr_info.get('state')}, mergeable={pr_info.get('mergeable')}") if pr_info.get('state') != 'open': return jsonify({'success': False, 'message': f'PR 状态是 {pr_info.get("state")}, 不是 open'}), 400 # 调用 Gitea API 合并 success = gitea_client.merge_pull_request( owner=owner, repo=repo, pr_number=pr['pr_number'], merge_message=f'通过管理平台合并 PR #{pr["pr_number"]}' ) if success: # 更新数据库状态 PRScanDB.update_pr_state(pr_id, 'merged', merged_by='admin') # 发送飞书通知 result_text = f"✅ **PR 已通过管理平台合并**\n\n**PR:** {repo_name}#{pr['pr_number']}\n**标题:** {pr['pr_title']}\n**合并人:** 管理员" feishu_notifier.send_simple_message('PR 合并', result_text) return jsonify({'success': True, 'message': 'PR 已合并'}) else: return jsonify({'success': False, 'message': '合并失败'}), 500 except Exception as e: logger.error(f'合并 PR 失败: {str(e)}') return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/prs//close', methods=['POST']) def api_close_pr(pr_id): """关闭 PR""" try: pr = PRScanDB.get_pr_by_id(pr_id) if not pr: return jsonify({'success': False, 'message': 'PR not found'}), 404 if pr['state'] != 'open': return jsonify({'success': False, 'message': 'PR 状态不是 open'}), 400 # 解析仓库名 repo_name = pr['repo_name'] if '/' in repo_name: owner, repo = repo_name.split('/') else: owner = '' repo = repo_name # 调用 Gitea API 关闭 success = gitea_client.close_pull_request( owner=owner, repo=repo, pr_number=pr['pr_number'] ) if success: # 更新数据库状态 PRScanDB.update_pr_state(pr_id, 'closed') # 发送飞书通知 result_text = f"❌ **PR 已被管理平台拒绝**\n\n**PR:** {repo_name}#{pr['pr_number']}\n**标题:** {pr['pr_title']}" feishu_notifier.send_simple_message('PR 拒绝', result_text) return jsonify({'success': True, 'message': 'PR 已关闭'}) else: return jsonify({'success': False, 'message': '关闭失败'}), 500 except Exception as e: logger.error(f'关闭 PR 失败: {str(e)}') return jsonify({'success': False, 'message': str(e)}), 500 # ============================================ # 扫描管理平台页面 # ============================================ # 获取 web 目录的绝对路径 WEB_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'web') @app.route('/dashboard') def dashboard(): """扫描管理平台首页""" return send_from_directory(WEB_DIR, 'index.html') @app.route('/web/') def serve_static(filename): """提供静态文件服务""" return send_from_directory(WEB_DIR, filename) if __name__ == '__main__': # 强制监听所有网络接口 host = "0.0.0.0" port = config.get('server', {}).get('port', 5000) debug = config.get('server', {}).get('debug', True) logger.info(f'启动服务: {host}:{port}') app.run(host=host, port=port, debug=debug)