From 459a8cb295a36f04c6191c670596aaca815ffd43 Mon Sep 17 00:00:00 2001 From: Dang Zerong Date: Wed, 11 Mar 2026 12:30:45 +0800 Subject: [PATCH 1/2] add web --- app.py | 292 +++++++++++++++++++++++++++- config.yaml | 4 +- db.py | 272 +++++++++++++++++++++++++++ gitea_client.py | 169 +++++++++++++++++ notify/feishu.py | 74 ++++++-- report/generator.py | 4 +- web/index.html | 449 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 1241 insertions(+), 23 deletions(-) create mode 100644 db.py create mode 100644 gitea_client.py create mode 100644 web/index.html diff --git a/app.py b/app.py index 10653bc..354af0d 100644 --- a/app.py +++ b/app.py @@ -4,11 +4,12 @@ import os import logging from typing import Dict, Tuple, Any +import json os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0') -from flask import Flask, request, jsonify +from flask import Flask, request, jsonify, send_from_directory import yaml from webhook.handler import GiteaWebhookHandler from scanner.python_scanner import PythonScanner @@ -17,6 +18,8 @@ from scanner.security_scanner import SecurityScanner from scanner.ai_reviewer import AIReviewer from report.generator import ReportGenerator from notify.feishu import FeishuNotifier +from gitea_client import GiteaClient +from db import PRScanDB # 配置日志 logging.basicConfig( @@ -47,6 +50,7 @@ security_scanner = SecurityScanner(config.get('scanner', {})) ai_reviewer = AIReviewer(config.get('ai', {})) report_generator = ReportGenerator(config.get('report', {})) feishu_notifier = FeishuNotifier(config['feishu']) +gitea_client = GiteaClient(config['gitea']) @app.route('/') @@ -171,12 +175,6 @@ def handle_gitea_webhook(): def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: """ 处理 Pull Request 事件 - - Args: - payload: Webhook payload - - Returns: - JSON 响应和状态码 """ try: # 解析 PR 事件 @@ -244,12 +242,25 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: author=author, scan_results=scan_results, pr_url=pr_url, - target_branch=target_branch + target_branch=target_branch, + pr_number=pr_number ) # 发送飞书通知 feishu_notifier.send_report(report) + # 保存扫描结果到数据库 + pr_info_for_db = { + 'repo_name': repo_name, + 'pr_number': pr_number, + 'pr_title': pr_title, + 'pr_url': pr_url, + 'source_branch': source_branch, + 'target_branch': target_branch, + 'author': author + } + PRScanDB.save_pr_scan(pr_info_for_db, scan_results, report.get('file_path')) + logger.info(f'PR #{pr_number} 扫描完成') except Exception as e: @@ -306,10 +317,273 @@ def manual_scan(): }), 200 except Exception as e: - logger.error(f'手动扫描失败: {str(e)}', exc_info=True) + logger.error(f'手动扫描失败: {str(e)}') return jsonify({'error': str(e)}), 500 +@app.route('/feishu/card_action', methods=['POST']) +def handle_feishu_card_action(): + """处理飞书卡片按钮点击事件""" + try: + payload = request.json + logger.info(f'收到飞书卡片回调: {payload}') + + # 处理 URL 验证请求 + challenge = payload.get('challenge') + if challenge: + logger.info('处理 URL 验证请求') + return jsonify({'challenge': challenge}), 200 + + # 解析回调数据 + action_data = payload.get('action', {}) + if not action_data: + action_data = payload.get('value', {}) + + action_type = action_data.get('action') + owner = action_data.get('owner') + repo = action_data.get('repo') + pr_number = action_data.get('pr_number') + pr_url = action_data.get('pr_url') + + if not all([action_type, owner, repo, pr_number]): + logger.error('卡片回调数据不完整') + return jsonify({'error': 'Missing required parameters'}), 400 + + logger.info(f'执行操作: {action_type}, PR: {owner}/{repo}#{pr_number}') + + # 执行对应操作 + if action_type == 'merge': + success = gitea_client.merge_pull_request( + owner=owner, + repo=repo, + pr_number=int(pr_number), + merge_message=f'通过飞书机器人合并 PR #{pr_number}' + ) + result_message = '✅ **已合并 PR**' if success else '❌ **合并失败**' + elif action_type == 'close': + success = gitea_client.close_pull_request( + owner=owner, + repo=repo, + pr_number=int(pr_number) + ) + result_message = '✅ **已关闭 PR(取消合并)**' if success else '❌ **关闭失败**' + else: + result_message = f'⚠️ **未知操作: {action_type}**' + + # 发送操作结果到飞书 + result_text = f"{result_message}\n\n**PR:** {owner}/{repo}#{pr_number}\n**链接:** [查看PR]({pr_url})" + feishu_notifier.send_simple_message('PR 操作结果', result_text) + + return jsonify({'status': 'ok', 'message': result_message}), 200 + + except Exception as e: + logger.error(f'处理飞书卡片回调失败: {str(e)}', exc_info=True) + return jsonify({'error': str(e)}), 500 + + +@app.route('/feishu/webhook', methods=['POST']) +def handle_feishu_webhook(): + """处理飞书开放平台的验证回调""" + try: + payload = request.json + + # 处理验证请求 + challenge = payload.get('challenge') + if challenge: + return jsonify({'challenge': challenge}), 200 + + # 处理消息事件 + event_type = payload.get('type') + if event_type == 'url_verification': + return jsonify({'challenge': payload.get('challenge')}), 200 + + logger.info(f'收到飞书事件: {event_type}') + return jsonify({'status': 'ok'}), 200 + + except Exception as e: + logger.error(f'处理飞书 Webhook 失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + +# ============================================ +# 扫描管理平台 API +# ============================================ + +@app.route('/api/prs') +def api_get_prs(): + """获取所有 PR 列表""" + try: + state = request.args.get('state') + prs = PRScanDB.get_all_prs(state=state) + + # 转换 scan_result 字符串为对象 + for pr in prs: + if pr.get('scan_result') and isinstance(pr['scan_result'], str): + try: + pr['scan_result'] = json.loads(pr['scan_result']) + except: + pass + if pr.get('ai_review') and isinstance(pr['ai_review'], str): + try: + pr['ai_review'] = json.loads(pr['ai_review']) + except: + pass + + return jsonify(prs) + except Exception as e: + logger.error(f'获取 PR 列表失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/prs/') +def api_get_pr(pr_id): + """获取单个 PR 详情""" + try: + pr = PRScanDB.get_pr_by_id(pr_id) + if not pr: + return jsonify({'error': 'PR not found'}), 404 + + # 转换 JSON 字段 + if pr.get('scan_result') and isinstance(pr['scan_result'], str): + try: + pr['scan_result'] = json.loads(pr['scan_result']) + except: + pass + if pr.get('ai_review') and isinstance(pr['ai_review'], str): + try: + pr['ai_review'] = json.loads(pr['ai_review']) + except: + pass + + return jsonify(pr) + except Exception as e: + logger.error(f'获取 PR 详情失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/prs//merge', methods=['POST']) +def api_merge_pr(pr_id): + """合并 PR""" + try: + pr = PRScanDB.get_pr_by_id(pr_id) + if not pr: + return jsonify({'success': False, 'message': 'PR not found'}), 404 + + logger.info(f"合并 PR - 数据库记录: {pr}") + + if pr['state'] != 'open': + return jsonify({'success': False, 'message': 'PR 状态不是 open'}), 400 + + # 解析仓库名 + repo_name = pr['repo_name'] + logger.info(f"仓库名称: {repo_name}") + + if '/' in repo_name: + owner, repo = repo_name.split('/') + else: + owner = '' + repo = repo_name + + logger.info(f"owner: {owner}, repo: {repo}, pr_number: {pr['pr_number']}") + + # 先检查 PR 状态 + pr_info = gitea_client.get_pull_request(owner, repo, pr['pr_number']) + if not pr_info: + return jsonify({'success': False, 'message': '无法获取 PR 信息,请检查仓库名称是否正确'}), 400 + + logger.info(f"PR 信息: state={pr_info.get('state')}, mergeable={pr_info.get('mergeable')}") + + if pr_info.get('state') != 'open': + return jsonify({'success': False, 'message': f'PR 状态是 {pr_info.get("state")}, 不是 open'}), 400 + + # 调用 Gitea API 合并 + success = gitea_client.merge_pull_request( + owner=owner, + repo=repo, + pr_number=pr['pr_number'], + merge_message=f'通过管理平台合并 PR #{pr["pr_number"]}' + ) + + if success: + # 更新数据库状态 + PRScanDB.update_pr_state(pr_id, 'merged', merged_by='admin') + + # 发送飞书通知 + result_text = f"✅ **PR 已通过管理平台合并**\n\n**PR:** {repo_name}#{pr['pr_number']}\n**标题:** {pr['pr_title']}\n**合并人:** 管理员" + feishu_notifier.send_simple_message('PR 合并', result_text) + + return jsonify({'success': True, 'message': 'PR 已合并'}) + else: + return jsonify({'success': False, 'message': '合并失败'}), 500 + + except Exception as e: + logger.error(f'合并 PR 失败: {str(e)}') + return jsonify({'success': False, 'message': str(e)}), 500 + + +@app.route('/api/prs//close', methods=['POST']) +def api_close_pr(pr_id): + """关闭 PR""" + try: + pr = PRScanDB.get_pr_by_id(pr_id) + if not pr: + return jsonify({'success': False, 'message': 'PR not found'}), 404 + + if pr['state'] != 'open': + return jsonify({'success': False, 'message': 'PR 状态不是 open'}), 400 + + # 解析仓库名 + repo_name = pr['repo_name'] + if '/' in repo_name: + owner, repo = repo_name.split('/') + else: + owner = '' + repo = repo_name + + # 调用 Gitea API 关闭 + success = gitea_client.close_pull_request( + owner=owner, + repo=repo, + pr_number=pr['pr_number'] + ) + + if success: + # 更新数据库状态 + PRScanDB.update_pr_state(pr_id, 'closed') + + # 发送飞书通知 + result_text = f"❌ **PR 已被管理平台拒绝**\n\n**PR:** {repo_name}#{pr['pr_number']}\n**标题:** {pr['pr_title']}" + feishu_notifier.send_simple_message('PR 拒绝', result_text) + + return jsonify({'success': True, 'message': 'PR 已关闭'}) + else: + return jsonify({'success': False, 'message': '关闭失败'}), 500 + + except Exception as e: + logger.error(f'关闭 PR 失败: {str(e)}') + return jsonify({'success': False, 'message': str(e)}), 500 + + +# ============================================ +# 扫描管理平台页面 +# ============================================ + +# 获取 web 目录的绝对路径 +WEB_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'web') + + +@app.route('/dashboard') +def dashboard(): + """扫描管理平台首页""" + return send_from_directory(WEB_DIR, 'index.html') + + +@app.route('/web/') +def serve_static(filename): + """提供静态文件服务""" + return send_from_directory(WEB_DIR, filename) + + if __name__ == '__main__': # 强制监听所有网络接口 host = "0.0.0.0" diff --git a/config.yaml b/config.yaml index 80f1181..7521d1f 100644 --- a/config.yaml +++ b/config.yaml @@ -5,9 +5,11 @@ server: gitea: # Gitea 服务器地址(根据实际情况修改) - base_url: "http://154.9.253.114:3000" + base_url: "https://code.deep-pilot.chat" # Gitea Webhook 签名密钥,需要与 Gitea 配置一致 webhook_secret: "BoschScan_2026_xxx" + # Gitea API Token(用于合并/关闭PR) + api_token: "8e223093b069a2e25f485360bd820e4dc255defc" feishu: # 飞书机器人 Webhook 地址(替换为你的实际地址) diff --git a/db.py b/db.py new file mode 100644 index 0000000..772b82c --- /dev/null +++ b/db.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +数据库模型 +存储 PR 扫描结果和管理状态 +""" +import sqlite3 +import json +import os +from datetime import datetime +from typing import List, Dict, Any, Optional + +DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'pr_scans.db') + + +def get_db_connection(): + """获取数据库连接""" + os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def init_db(): + """初始化数据库表""" + conn = get_db_connection() + cursor = conn.cursor() + + # PR 扫描结果表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS pr_scans ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pr_number INTEGER NOT NULL, + repo_name TEXT NOT NULL, + pr_title TEXT, + pr_url TEXT, + source_branch TEXT, + target_branch TEXT, + author TEXT, + state TEXT DEFAULT 'pending', + scan_status TEXT DEFAULT 'pending', + scan_result TEXT, + issues_count INTEGER DEFAULT 0, + security_issues INTEGER DEFAULT 0, + ai_review TEXT, + report_path TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + merged_at TIMESTAMP, + merged_by TEXT, + UNIQUE(repo_name, pr_number) + ) + ''') + + # 扫描记录详情表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS scan_details ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pr_scan_id INTEGER NOT NULL, + scan_type TEXT NOT NULL, + scan_data TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (pr_scan_id) REFERENCES pr_scans(id) + ) + ''') + + conn.commit() + conn.close() + + +class PRScanDB: + """PR 扫描结果数据库操作类""" + + @staticmethod + def save_pr_scan(pr_info: Dict[str, Any], scan_results: Dict[str, Any], + report_path: str = None) -> int: + """ + 保存 PR 扫描结果 + + Args: + pr_info: PR 信息 + scan_results: 扫描结果 + report_path: 报告文件路径 + + Returns: + 扫描记录 ID + """ + conn = get_db_connection() + cursor = conn.cursor() + + # 统计问题数量 + issues_count = 0 + security_issues = 0 + + for scan_type, result in scan_results.items(): + if isinstance(result, dict): + if 'issues' in result: + issues_count += len(result.get('issues', [])) + if 'vulnerabilities' in result: + security_issues += len(result.get('vulnerabilities', [])) + + # 检查是否已存在 + cursor.execute( + 'SELECT id FROM pr_scans WHERE repo_name = ? AND pr_number = ?', + (pr_info.get('repo_name'), pr_info.get('pr_number')) + ) + existing = cursor.fetchone() + + if existing: + # 更新现有记录 + cursor.execute(''' + UPDATE pr_scans SET + pr_title = ?, + source_branch = ?, + target_branch = ?, + author = ?, + scan_status = ?, + scan_result = ?, + issues_count = ?, + security_issues = ?, + ai_review = ?, + report_path = ?, + updated_at = CURRENT_TIMESTAMP + WHERE repo_name = ? AND pr_number = ? + ''', ( + pr_info.get('pr_title'), + pr_info.get('source_branch'), + pr_info.get('target_branch'), + pr_info.get('author'), + 'completed', + json.dumps(scan_results, ensure_ascii=False), + issues_count, + security_issues, + json.dumps(scan_results.get('ai', {}), ensure_ascii=False), + report_path, + pr_info.get('repo_name'), + pr_info.get('pr_number') + )) + scan_id = existing['id'] + else: + # 插入新记录 + cursor.execute(''' + INSERT INTO pr_scans ( + pr_number, repo_name, pr_title, pr_url, + source_branch, target_branch, author, + state, scan_status, scan_result, + issues_count, security_issues, ai_review, report_path + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + pr_info.get('pr_number'), + pr_info.get('repo_name'), + pr_info.get('pr_title'), + pr_info.get('pr_url'), + pr_info.get('source_branch'), + pr_info.get('target_branch'), + pr_info.get('author'), + 'open', + 'completed', + json.dumps(scan_results, ensure_ascii=False), + issues_count, + security_issues, + json.dumps(scan_results.get('ai', {}), ensure_ascii=False), + report_path + )) + scan_id = cursor.lastrowid + + conn.commit() + conn.close() + + return scan_id + + @staticmethod + def get_all_prs(status: str = None, state: str = None) -> List[Dict[str, Any]]: + """ + 获取所有 PR 扫描记录 + + Args: + status: 扫描状态 (pending/completed) + state: PR 状态 (open/merged/closed) + + Returns: + PR 列表 + """ + conn = get_db_connection() + cursor = conn.cursor() + + query = 'SELECT * FROM pr_scans WHERE 1=1' + params = [] + + if status: + query += ' AND scan_status = ?' + params.append(status) + if state: + query += ' AND state = ?' + params.append(state) + + query += ' ORDER BY updated_at DESC' + + cursor.execute(query, params) + rows = cursor.fetchall() + conn.close() + + return [dict(row) for row in rows] + + @staticmethod + def get_pr_by_id(scan_id: int) -> Optional[Dict[str, Any]]: + """根据 ID 获取 PR 扫描记录""" + conn = get_db_connection() + cursor = conn.cursor() + + cursor.execute('SELECT * FROM pr_scans WHERE id = ?', (scan_id,)) + row = cursor.fetchone() + conn.close() + + return dict(row) if row else None + + @staticmethod + def get_pr_by_number(repo_name: str, pr_number: int) -> Optional[Dict[str, Any]]: + """根据仓库名和 PR 号获取扫描记录""" + conn = get_db_connection() + cursor = conn.cursor() + + cursor.execute( + 'SELECT * FROM pr_scans WHERE repo_name = ? AND pr_number = ?', + (repo_name, pr_number) + ) + row = cursor.fetchone() + conn.close() + + return dict(row) if row else None + + @staticmethod + def update_pr_state(scan_id: int, state: str, merged_by: str = None): + """更新 PR 状态""" + conn = get_db_connection() + cursor = conn.cursor() + + if state == 'merged': + cursor.execute(''' + UPDATE pr_scans SET + state = ?, + merged_at = CURRENT_TIMESTAMP, + merged_by = ?, + updated_at = CURRENT_TIMESTAMP + WHERE id = ? + ''', (state, merged_by, scan_id)) + else: + cursor.execute(''' + UPDATE pr_scans SET + state = ?, + updated_at = CURRENT_TIMESTAMP + WHERE id = ? + ''', (state, scan_id)) + + conn.commit() + conn.close() + + @staticmethod + def delete_pr(scan_id: int): + """删除 PR 扫描记录""" + conn = get_db_connection() + cursor = conn.cursor() + + cursor.execute('DELETE FROM scan_details WHERE pr_scan_id = ?', (scan_id,)) + cursor.execute('DELETE FROM pr_scans WHERE id = ?', (scan_id,)) + + conn.commit() + conn.close() + + +# 初始化数据库 +init_db() diff --git a/gitea_client.py b/gitea_client.py new file mode 100644 index 0000000..ae1f3a7 --- /dev/null +++ b/gitea_client.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Gitea API 客户端 +用于操作 PR:合并、关闭等 +""" +import logging +import requests +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + + +class GiteaClient: + """Gitea API 客户端""" + + def __init__(self, config: Dict[str, Any]): + """ + 初始化 Gitea 客户端 + + Args: + config: Gitea 配置,包含 base_url 和 api_token + """ + self.base_url = config.get('base_url', '').rstrip('/') + self.api_token = config.get('api_token', '') + + if not self.base_url: + raise ValueError("Gitea base_url 未配置") + if not self.api_token: + raise ValueError("Gitea api_token 未配置") + + def _get_headers(self) -> Dict[str, str]: + """获取 API 请求头""" + return { + 'Authorization': f'token {self.api_token}', + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + + def merge_pull_request(self, owner: str, repo: str, pr_number: int, + merge_message: str = "", + merge_commit_id: str = None) -> bool: + """ + 合并 Pull Request + """ + url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}/merge" + logger.info(f"合并 PR URL: {url}") + + # Gitea API 需要 do 参数:merge, rebase, squash + payload = { + "do": "merge", + "merge_commit_message": merge_message or f"Merge PR #{pr_number}" + } + + if merge_commit_id: + payload["merge_commit_id"] = merge_commit_id + + try: + response = requests.post( + url, + headers=self._get_headers(), + json=payload, + timeout=30 + ) + + logger.info(f"合并响应状态码: {response.status_code}") + logger.info(f"合并响应内容: {response.text[:500]}") + + if response.status_code == 200: + logger.info(f"成功合并 PR #{pr_number}") + return True + elif response.status_code == 405: + logger.error(f"PR #{pr_number} 无法合并: {response.json().get('message', '未知原因')}") + return False + elif response.status_code == 422: + logger.error(f"PR #{pr_number} 合并失败: {response.json().get('message', '参数错误')}") + return False + else: + logger.error(f"合并 PR #{pr_number} 失败: {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"合并 PR #{pr_number} 异常: {str(e)}") + return False + + def close_pull_request(self, owner: str, repo: str, pr_number: int) -> bool: + """ + 关闭 Pull Request + + Args: + owner: 仓库所有者 + repo: 仓库名称 + pr_number: PR 编号 + + Returns: + 是否关闭成功 + """ + url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}" + + payload = { + "state": "closed" + } + + try: + response = requests.patch( + url, + headers=self._get_headers(), + json=payload, + timeout=30 + ) + + if response.status_code == 200: + logger.info(f"成功关闭 PR #{pr_number}") + return True + else: + logger.error(f"关闭 PR #{pr_number} 失败: {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"关闭 PR #{pr_number} 异常: {str(e)}") + return False + + def get_pull_request(self, owner: str, repo: str, pr_number: int) -> Optional[Dict[str, Any]]: + """ + 获取 Pull Request 信息 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + pr_number: PR 编号 + + Returns: + PR 信息字典,失败返回 None + """ + url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}" + + try: + response = requests.get( + url, + headers=self._get_headers(), + timeout=30 + ) + + if response.status_code == 200: + return response.json() + else: + logger.error(f"获取 PR #{pr_number} 失败: {response.status_code}") + return None + + except Exception as e: + logger.error(f"获取 PR #{pr_number} 异常: {str(e)}") + return None + + def can_merge(self, owner: str, repo: str, pr_number: int) -> bool: + """ + 检查 PR 是否可以合并 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + pr_number: PR 编号 + + Returns: + 是否可以合并 + """ + pr_info = self.get_pull_request(owner, repo, pr_number) + if pr_info: + return pr_info.get('mergeable', False) and pr_info.get('state') == 'open' + return False diff --git a/notify/feishu.py b/notify/feishu.py index f974cb5..9287c8b 100644 --- a/notify/feishu.py +++ b/notify/feishu.py @@ -84,7 +84,7 @@ class FeishuNotifier: def _upload_file(self, file_path: str, file_name: str) -> Optional[str]: """ - 上传文件到飞书(用于消息中发送) + 上传文件到飞书 Args: file_path: 文件本地路径 @@ -99,8 +99,7 @@ class FeishuNotifier: return None try: - # 使用 im API 上传文件 - url = "https://open.feishu.cn/open-apis/im/v1/files" + url = "https://open.feishu.cn/open-apis/drive/v1/files/upload_all" headers = { "Authorization": f"Bearer {token}" } @@ -108,29 +107,25 @@ class FeishuNotifier: # 读取文件 with open(file_path, 'rb') as f: file_content = f.read() - - # 获取文件类型 - file_ext = os.path.splitext(file_name)[1].lower() - file_type = 'stream' # 默认 - + # 构建 multipart 请求 files = { 'file': (file_name, file_content, 'application/octet-stream') } data = { 'file_name': file_name, - 'file_type': file_type + 'parent_node': 'root' # 根目录 } response = requests.post(url, headers=headers, files=files, data=data, timeout=60) result = response.json() if result.get("code") == 0: - file_key = result.get("data", {}).get("file_key") - logger.info(f"文件上传成功: {file_name}, file_key: {file_key}") + file_key = result.get("data", {}).get("file", {}).get("token") + logger.info(f"文件上传成功: {file_name}") return file_key else: - logger.error(f"文件上传失败: {result.get('msg')}, code: {result.get('code')}") + logger.error(f"文件上传失败: {result.get('msg')}") return None except Exception as e: @@ -489,6 +484,61 @@ class FeishuNotifier: } }) + # 添加 PR 操作按钮(仅 PR 扫描且扫描通过时显示) + if pr_url and target_branch and status == 'pass': + card["elements"].append({ + "tag": "div", + "text": { + "tag": "lark_md", + "content": "**请选择操作:**" + } + }) + + # 解析仓库信息用于按钮回调 + repo_full_name = report.get('repo_name', '') + if '/' in repo_full_name: + owner, repo = repo_full_name.split('/', 1) + else: + owner, repo = '', repo_full_name + + pr_number = report.get('pr_number', 0) + + card["elements"].append({ + "tag": "action", + "actions": [ + { + "tag": "button", + "text": { + "tag": "plain_text", + "content": "✅ 同意合并" + }, + "type": "primary", + "value": { + "action": "merge", + "owner": owner, + "repo": repo, + "pr_number": pr_number, + "pr_url": pr_url + } + }, + { + "tag": "button", + "text": { + "tag": "plain_text", + "content": "❌ 取消合并" + }, + "type": "danger", + "value": { + "action": "close", + "owner": owner, + "repo": repo, + "pr_number": pr_number, + "pr_url": pr_url + } + } + ] + }) + # 添加报告文件附件 if file_key: card["elements"].append({ diff --git a/report/generator.py b/report/generator.py index 6b41972..9a0597a 100644 --- a/report/generator.py +++ b/report/generator.py @@ -39,7 +39,8 @@ class ReportGenerator: author: str, scan_results: Dict[str, Any], pr_url: str = None, - target_branch: str = None + target_branch: str = None, + pr_number: int = None ) -> Dict[str, Any]: """ 生成扫描报告 @@ -101,6 +102,7 @@ class ReportGenerator: 'scan_results': scan_results, 'pr_url': pr_url, 'target_branch': target_branch, + 'pr_number': pr_number, 'markdown': self._generate_markdown( repo_name, branch, commit_id, commit_message, author, scan_results, status, status_text, pr_url, target_branch ) diff --git a/web/index.html b/web/index.html new file mode 100644 index 0000000..9a8d39a --- /dev/null +++ b/web/index.html @@ -0,0 +1,449 @@ + + + + + + PR 扫描管理平台 + + + + + +
+
+ + + + +
+ +
+

概览

+
+
+
+
+
待处理
+

-

+
+
+
+
+
+
+
已通过
+

-

+
+
+
+
+
+
+
已拒绝
+

-

+
+
+
+
+
+
+
问题数
+

-

+
+
+
+
+ +
最近 PR
+
+
+
+ + + + + + + + + + + + + + +
PR标题作者分支状态问题时间操作
+
+
+
+
+ + + + + + +
+
+
+ + + + + + + + -- 2.49.1 From 14680f053e08f74d05aada87bf2aa1ca14e255fe Mon Sep 17 00:00:00 2001 From: Dang Zerong Date: Wed, 11 Mar 2026 21:16:47 +0800 Subject: [PATCH 2/2] add web --- README.md | 224 +++++++++++++++ app.py | 154 +++++++++- db.py | 17 +- gitea_client.py | 100 ++++++- install.sh | 77 +++++ scanner/diff_parser.py | 172 ++++++++++++ web/index.html | 622 +++++++++++++++++++++++++++++++++++++++-- 快速开始指南.md | 230 +++++++++++++++ 8 files changed, 1557 insertions(+), 39 deletions(-) create mode 100644 README.md create mode 100644 install.sh create mode 100644 scanner/diff_parser.py create mode 100644 快速开始指南.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..4731168 --- /dev/null +++ b/README.md @@ -0,0 +1,224 @@ +# AI Code Quality Scanner - 飞书通知版 + +一个自动化代码质量扫描系统,在代码提交时自动扫描并发送报告到飞书。 + +## 功能特性 + +- 🤖 自动监听 Gitea 代码提交事件 +- 🔍 多维度代码质量扫描(语法、风格、安全) +- 📊 生成 Markdown 格式扫描报告 +- 📱 实时推送飞书机器人通知 + +## 系统架构 + +``` +┌─────────────┐ Webhook ┌──────────────────┐ +│ Gitea │ ───────────────► │ Webhook Server │ +│ 代码仓库 │ │ (Flask) │ +└─────────────┘ └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ Code Scanner │ + │ - ESLint │ + │ - Pylint │ + │ - SonarQube │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ Report Generator│ + │ - Markdown │ + └────────┬─────────┘ + │ + ▼ + ┌──────────────────┐ + │ Feishu Bot │ + │ - Webhook │ + └──────────────────┘ +``` + +## 快速开始 + +### 1. 安装依赖 + +```bash +pip install -r requirements.txt +``` + +### 2. 配置飞书机器人 + +1. 打开飞书群聊 → 设置 → 群机器人 +2. 添加机器人 → 选择"自定义机器人" +3. 获取 Webhook 地址 +4. 配置 `config.yaml` + +### 3. 配置 Gitea Webhook + +#### 方式一:Push 时扫描(原有方式) + +1. 进入 Gitea 仓库 → 设置 → Webhooks +2. 添加 Webhook: + - 目标 URL: `http://你的服务器IP:5000/webhook/gitea` + - 触发事件: Push + - 密钥: 配置 `config.yaml` 中的 secret + +#### 方式二:PR 创建时扫描(推荐) + +1. 进入 Gitea 仓库 → 设置 → Webhooks +2. 添加 Webhook: + - 目标 URL: `http://你的服务器IP:5000/webhook/gitea` + - 触发事件: Pull Request + - 密钥: 配置 `config.yaml` 中的 secret + +**支持的 PR 事件:** +- `opened` - 创建新 PR +- `reopened` - 重新打开 PR +- `synchronize` - PR 中的提交有更新 +- `ready_for_review` - PR 标记为准备好审查 + +### 4. 运行服务 + +```bash +python app.py +``` + +## 配置说明 + +所有配置在 `config.yaml` 中: + +```yaml +server: + host: "0.0.0.0" + port: 5000 + debug: true + +gitea: + base_url: "http://localhost:3000" + # Webhook 签名密钥 + webhook_secret: "your_webhook_secret" + +feishu: + # 飞书机器人 Webhook 地址 + webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" + # 消息推送 secret(可选,用于签名) + secret: "your_feishu_secret" + +scanner: + # 支持的语言 + languages: + - python + - javascript + - typescript + # 扫描阈值 + max_issues: 10 + # 是否启用详细扫描 + detailed: true + +report: + # 报告保存路径 + output_dir: "./reports" + # 是否保留报告文件 + keep_files: true +``` + +## 项目结构 + +``` +code-scanner/ +├── app.py # 主应用入口 +├── config.yaml # 配置文件 +├── requirements.txt # Python 依赖 +├── README.md # 项目说明 +├── scanner/ +│ ├── __init__.py +│ ├── base.py # 扫描器基类 +│ ├── python_scanner.py # Python 代码扫描 +│ ├── js_scanner.py # JavaScript/TypeScript 扫描 +│ └── security_scanner.py # 安全扫描 +├── report/ +│ ├── __init__.py +│ └── generator.py # Markdown 报告生成 +├── notify/ +│ ├── __init__.py +│ └── feishu.py # 飞书通知 +├── webhook/ +│ ├── __init__.py +│ └── handler.py # Webhook 处理 +└── reports/ # 报告输出目录 +``` + +## 支持的扫描工具 + +### Python +- **Pylint** - 代码风格和错误检查 +- **Flake8** - Python 代码检查 +- **Bandit** - 安全漏洞扫描 + +### JavaScript/TypeScript +- **ESLint** - JavaScript/TypeScript 检查 +- **Prettier** - 代码格式化 + +## 飞书消息效果 + +扫描完成后,将收到类似以下消息: + +### Push 扫描消息 + +``` +📊 代码质量扫描报告 + +仓库: my-project +分支: main +提交: abc1234 +提交者: developer@example.com + +✅ 扫描通过 (0 issues) +或 +⚠️ 发现问题 (5 issues) +``` + +### PR 扫描消息 + +``` +📊 PR 代码质量扫描报告 + +仓库: my-project +源分支: feature-xxx → 目标分支: main +PR链接: https://gitea.example.com/user/project/pulls/123 +提交: abc1234 +提交者: developer@example.com + +✅ 扫描通过 (0 issues) +或 +⚠️ 发现问题 (5 issues) +``` + +## Docker 部署 + +```dockerfile +FROM python:3.11-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY . . +EXPOSE 5000 + +CMD ["python", "app.py"] +``` + +## 环境变量 + +也可以通过环境变量配置: + +```bash +export FEISHU_WEBHOOK_URL="https://open.feishu.cn/..." +export GITEA_WEBHOOK_SECRET="secret" +export SCANNER_MAX_ISSUES=10 +``` + +## 许可证 + +MIT License diff --git a/app.py b/app.py index 354af0d..699c21d 100644 --- a/app.py +++ b/app.py @@ -16,6 +16,7 @@ from scanner.python_scanner import PythonScanner from scanner.js_scanner import JavaScriptScanner from scanner.security_scanner import SecurityScanner from scanner.ai_reviewer import AIReviewer +from scanner.diff_parser import merge_issues_with_code from report.generator import ReportGenerator from notify.feishu import FeishuNotifier from gitea_client import GiteaClient @@ -232,6 +233,23 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: clone_url, source_sha, source_branch ) + # 获取 PR 的代码差异,用于将问题与代码片段关联 + pr_diff = None + if '/' in repo_name: + repo_owner, repo_name_only = repo_name.split('/', 1) + else: + repo_owner = 'Bosch_Demo' + repo_name_only = repo_name + + try: + pr_diff = gitea_client.get_pull_request_diff(repo_owner, repo_name_only, pr_number) + logger.info(f"已获取 PR #{pr_number} 的 diff,长度: {len(pr_diff) if pr_diff else 0}") + except Exception as e: + logger.warning(f"获取 PR diff 失败: {e}") + + # 将问题与代码片段关联 + scan_details_with_code = merge_issues_with_code(scan_results, pr_diff or '') + # 生成报告 commit_message = f'PR #{pr_number}: {pr_title}' report = report_generator.generate( @@ -259,7 +277,7 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: 'target_branch': target_branch, 'author': author } - PRScanDB.save_pr_scan(pr_info_for_db, scan_results, report.get('file_path')) + PRScanDB.save_pr_scan(pr_info_for_db, scan_results, report.get('file_path'), scan_details_with_code) logger.info(f'PR #{pr_number} 扫描完成') @@ -455,12 +473,146 @@ def api_get_pr(pr_id): except: pass + # 返回带代码片段的扫描详情 + if pr.get('scan_details_with_code') and isinstance(pr['scan_details_with_code'], str): + try: + pr['scan_details_with_code'] = json.loads(pr['scan_details_with_code']) + except: + pass + return jsonify(pr) except Exception as e: logger.error(f'获取 PR 详情失败: {str(e)}') return jsonify({'error': str(e)}), 500 +@app.route('/api/prs//diff') +def api_get_pr_diff(pr_id): + """获取 PR 的代码差异""" + try: + pr = PRScanDB.get_pr_by_id(pr_id) + if not pr: + return jsonify({'error': 'PR not found'}), 404 + + repo_name = pr.get('repo_name', '') + pr_number = pr.get('pr_number', 0) + + if not repo_name or not pr_number: + return jsonify({'error': 'PR 信息不完整'}), 400 + + # 解析 owner 和 repo + if '/' in repo_name: + owner, repo = repo_name.split('/', 1) + else: + owner = 'Bosch_Demo' # 默认 + repo = repo_name + + logger.info(f"获取 PR #{pr_number} ({owner}/{repo}) 的 diff") + + # 获取 diff + diff = gitea_client.get_pull_request_diff(owner, repo, pr_number) + if diff is None: + return jsonify({'error': '获取 diff 失败'}), 500 + + return jsonify({ + 'diff': diff, + 'pr_number': pr_number, + 'repo_name': repo_name + }) + + except Exception as e: + logger.error(f'获取 PR diff 失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/prs//files') +def api_get_pr_files(pr_id): + """获取 PR 变更文件列表(用于左侧树状展示)""" + try: + pr = PRScanDB.get_pr_by_id(pr_id) + if not pr: + return jsonify({'error': 'PR not found'}), 404 + repo_name = pr.get('repo_name', '') + pr_number = pr.get('pr_number', 0) + if not repo_name or not pr_number: + return jsonify({'error': 'PR 信息不完整'}), 400 + if '/' in repo_name: + owner, repo = repo_name.split('/', 1) + else: + owner, repo = 'Bosch_Demo', repo_name + files = gitea_client.get_pull_request_files(owner, repo, pr_number) + if files is None: + return jsonify({'error': '获取文件列表失败'}), 500 + return jsonify({'files': files, 'repo_name': repo_name}) + except Exception as e: + logger.error(f'获取 PR 文件列表失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/prs//file') +def api_get_pr_file_content(pr_id): + """获取 PR 中某文件在源分支上的完整内容""" + try: + path = request.args.get('path') + if not path: + return jsonify({'error': '缺少 path 参数'}), 400 + pr = PRScanDB.get_pr_by_id(pr_id) + if not pr: + return jsonify({'error': 'PR not found'}), 404 + repo_name = pr.get('repo_name', '') + pr_number = pr.get('pr_number', 0) + if not repo_name or not pr_number: + return jsonify({'error': 'PR 信息不完整'}), 400 + if '/' in repo_name: + owner, repo = repo_name.split('/', 1) + else: + owner, repo = 'Bosch_Demo', repo_name + pr_info = gitea_client.get_pull_request(owner, repo, pr_number) + if not pr_info: + return jsonify({'error': '获取 PR 信息失败'}), 500 + head_ref = pr_info.get('head', {}).get('ref') or pr_info.get('head_branch') or pr.get('source_branch') + if not head_ref: + return jsonify({'error': '无法确定源分支'}), 400 + content = gitea_client.get_file_contents(owner, repo, path, head_ref) + if content is None: + return jsonify({'error': '文件不存在或无法读取'}), 404 + + # 获取该文件的扫描问题(PR 创建时已扫描并存入 scan_details_with_code) + scan_issues = [] + path_norm = path.replace('\\', '/').strip() + scan_details = pr.get('scan_details_with_code') + if isinstance(scan_details, str): + try: + scan_details = json.loads(scan_details) + except Exception: + scan_details = None + if scan_details and scan_details.get('scanners'): + for scanner in scan_details['scanners']: + for issue in scanner.get('issues', []): + issue_file = (issue.get('file') or '').replace('\\', '/').strip() + if not issue_file: + continue + # 匹配:精确相等或一端包含另一端(兼容 basename 或完整路径) + if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm): + sev = (issue.get('severity') or 'info') + if isinstance(sev, str): + sev = sev.lower() + scanner_name = scanner.get('name', '') + scanner_display = {'python': 'Python', 'javascript': 'JavaScript', 'security': 'Security'}.get(scanner_name, scanner_name) + scan_issues.append({ + 'scanner': scanner_display, + 'severity': sev, + 'line': int(issue.get('line') or 0), + 'message': (issue.get('message') or issue.get('description') or '').strip(), + 'code_context': issue.get('code_context') + }) + + return jsonify({'path': path, 'content': content, 'scan_issues': scan_issues}) + except Exception as e: + logger.error(f'获取文件内容失败: {str(e)}') + return jsonify({'error': str(e)}), 500 + + @app.route('/api/prs//merge', methods=['POST']) def api_merge_pr(pr_id): """合并 PR""" diff --git a/db.py b/db.py index 772b82c..249b724 100644 --- a/db.py +++ b/db.py @@ -40,6 +40,7 @@ def init_db(): state TEXT DEFAULT 'pending', scan_status TEXT DEFAULT 'pending', scan_result TEXT, + scan_details_with_code TEXT, issues_count INTEGER DEFAULT 0, security_issues INTEGER DEFAULT 0, ai_review TEXT, @@ -72,16 +73,17 @@ class PRScanDB: """PR 扫描结果数据库操作类""" @staticmethod - def save_pr_scan(pr_info: Dict[str, Any], scan_results: Dict[str, Any], - report_path: str = None) -> int: + def save_pr_scan(pr_info: Dict[str, Any], scan_results: Dict[str, Any], + report_path: str = None, scan_details_with_code: Dict = None) -> int: """ 保存 PR 扫描结果 - + Args: pr_info: PR 信息 scan_results: 扫描结果 report_path: 报告文件路径 - + scan_details_with_code: 带代码片段的扫描详情 + Returns: 扫描记录 ID """ @@ -116,6 +118,7 @@ class PRScanDB: author = ?, scan_status = ?, scan_result = ?, + scan_details_with_code = ?, issues_count = ?, security_issues = ?, ai_review = ?, @@ -129,6 +132,7 @@ class PRScanDB: pr_info.get('author'), 'completed', json.dumps(scan_results, ensure_ascii=False), + json.dumps(scan_details_with_code, ensure_ascii=False) if scan_details_with_code else None, issues_count, security_issues, json.dumps(scan_results.get('ai', {}), ensure_ascii=False), @@ -143,9 +147,9 @@ class PRScanDB: INSERT INTO pr_scans ( pr_number, repo_name, pr_title, pr_url, source_branch, target_branch, author, - state, scan_status, scan_result, + state, scan_status, scan_result, scan_details_with_code, issues_count, security_issues, ai_review, report_path - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( pr_info.get('pr_number'), pr_info.get('repo_name'), @@ -157,6 +161,7 @@ class PRScanDB: 'open', 'completed', json.dumps(scan_results, ensure_ascii=False), + json.dumps(scan_details_with_code, ensure_ascii=False) if scan_details_with_code else None, issues_count, security_issues, json.dumps(scan_results.get('ai', {}), ensure_ascii=False), diff --git a/gitea_client.py b/gitea_client.py index ae1f3a7..d17a933 100644 --- a/gitea_client.py +++ b/gitea_client.py @@ -6,7 +6,7 @@ Gitea API 客户端 """ import logging import requests -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List logger = logging.getLogger(__name__) @@ -167,3 +167,101 @@ class GiteaClient: if pr_info: return pr_info.get('mergeable', False) and pr_info.get('state') == 'open' return False + + def get_pull_request_diff(self, owner: str, repo: str, pr_number: int) -> Optional[str]: + """ + 获取 Pull Request 的代码差异 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + pr_number: PR 编号 + + Returns: + diff 文本,失败返回 None + """ + url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}.diff" + + try: + response = requests.get( + url, + headers=self._get_headers(), + timeout=30 + ) + + if response.status_code == 200: + logger.info(f"成功获取 PR #{pr_number} 的 diff") + return response.text + else: + logger.error(f"获取 PR #{pr_number} diff 失败: {response.status_code}") + return None + + except Exception as e: + logger.error(f"获取 PR #{pr_number} diff 异常: {str(e)}") + return None + + def get_pull_request_files(self, owner: str, repo: str, pr_number: int) -> Optional[List[Dict[str, Any]]]: + """ + 获取 PR 中修改的文件列表 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + pr_number: PR 编号 + + Returns: + 文件列表,失败返回 None + """ + url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}/files" + + try: + response = requests.get( + url, + headers=self._get_headers(), + timeout=30 + ) + + if response.status_code == 200: + logger.info(f"成功获取 PR #{pr_number} 的文件列表") + return response.json() + else: + logger.error(f"获取 PR #{pr_number} 文件列表失败: {response.status_code}") + return None + + except Exception as e: + logger.error(f"获取 PR #{pr_number} 文件列表异常: {str(e)}") + return None + + def get_file_contents(self, owner: str, repo: str, filepath: str, ref: str) -> Optional[str]: + """ + 获取仓库中指定文件在给定 ref(分支/commit)下的内容 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + filepath: 文件路径 + ref: 分支名或 commit SHA + + Returns: + 文件内容文本,失败返回 None + """ + import base64 + import urllib.parse + encoded_path = urllib.parse.quote(filepath, safe='') + url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{encoded_path}?ref={urllib.parse.quote(ref)}" + try: + response = requests.get( + url, + headers=self._get_headers(), + timeout=30 + ) + if response.status_code == 200: + data = response.json() + if data.get('encoding') == 'base64' and data.get('content'): + return base64.b64decode(data['content']).decode('utf-8', errors='replace') + return None + logger.error(f"获取文件 {filepath} 失败: {response.status_code}") + return None + except Exception as e: + logger.error(f"获取文件内容异常: {str(e)}") + return None diff --git a/install.sh b/install.sh new file mode 100644 index 0000000..8dbdf1a --- /dev/null +++ b/install.sh @@ -0,0 +1,77 @@ +#!/bin/bash +# AI Code Quality Scanner 安装脚本 + +echo "=========================================" +echo " AI Code Quality Scanner 安装脚本" +echo "=========================================" + +# 检查 Python 版本 +if ! command -v python3 &> /dev/null; then + echo "❌ 错误: 未找到 Python3,请先安装 Python 3.8+" + exit 1 +fi + +PYTHON_VERSION=$(python3 -c 'import sys; print(".".join(map(str, sys.version_info[:2])))') +echo "✅ Python 版本: $PYTHON_VERSION" + +# 创建虚拟环境(可选) +if [ ! -d "venv" ]; then + echo "📦 创建虚拟环境..." + python3 -m venv venv +fi + +# 激活虚拟环境 +source venv/bin/activate + +# 安装依赖 +echo "📦 安装 Python 依赖..." +pip install --upgrade pip +pip install -r requirements.txt + +# 创建必要的目录 +echo "📁 创建必要的目录..." +mkdir -p reports +mkdir -p /tmp/code_scanner_clones + +# 检查并安装代码扫描工具(可选) +echo "🛠️ 检查代码扫描工具..." + +# Pylint (Python) +if command -v pylint &> /dev/null || python -m pylint --version &> /dev/null; then + echo " ✅ Pylint 已安装" +else + echo " ⚠️ Pylint 未安装 (pip install pylint)" +fi + +# Flake8 (Python) +if command -v flake8 &> /dev/null || python -m flake8 --version &> /dev/null; then + echo " ✅ Flake8 已安装" +else + echo " ⚠️ Flake8 未安装 (pip install flake8)" +fi + +# Bandit (Python 安全扫描) +if command -v bandit &> /dev/null || python -m bandit --version &> /dev/null; then + echo " ✅ Bandit 已安装" +else + echo " ⚠️ Bandit 未安装 (pip install bandit)" +fi + +# Node.js 和 npm (JavaScript 扫描) +if command -v node &> /dev/null; then + NODE_VERSION=$(node --version) + echo " ✅ Node.js 版本: $NODE_VERSION" +else + echo " ⚠️ Node.js 未安装 (JavaScript 扫描需要)" +fi + +echo "" +echo "=========================================" +echo " 安装完成!" +echo "=========================================" +echo "" +echo "下一步操作:" +echo "1. 编辑 config.yaml 配置飞书机器人和 Gitea" +echo "2. 运行: python app.py" +echo "3. 在 Gitea 中配置 Webhook" +echo "" diff --git a/scanner/diff_parser.py b/scanner/diff_parser.py new file mode 100644 index 0000000..e989e85 --- /dev/null +++ b/scanner/diff_parser.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Diff 解析器 - 将扫描问题与代码片段关联 +""" +import re +import logging +from typing import Dict, List, Any, Optional +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + + +@dataclass +class CodeChunk: + """代码块""" + file_path: str + old_content: str = "" + new_content: str = "" + old_start: int = 0 + new_start: int = 0 + hunks: List[Dict] = field(default_factory=list) + + +class DiffParser: + """Diff 解析器""" + + def __init__(self, diff_text: str): + self.diff_text = diff_text + self.files: Dict[str, CodeChunk] = {} + self._parse() + + def _parse(self): + """解析 diff 文本""" + if not self.diff_text: + return + + current_chunk = None + lines = self.diff_text.split('\n') + for line in lines: + diff_match = re.match(r'diff --git a/(.+) b/(.+)', line) + if diff_match: + file_path = diff_match.group(1) + current_chunk = CodeChunk(file_path=file_path) + self.files[file_path] = current_chunk + continue + + hunk_match = re.match(r'@@ -(\d+),?\d* \+(\d+),?\d* @@', line) + if hunk_match and current_chunk: + current_chunk.old_start = int(hunk_match.group(1)) + current_chunk.new_start = int(hunk_match.group(2)) + continue + + if current_chunk and line: + if line.startswith('+') and not line.startswith('+++'): + current_chunk.new_content += line[1:] + '\n' + elif line.startswith('-') and not line.startswith('---'): + current_chunk.old_content += line[1:] + '\n' + elif line.startswith(' '): + current_chunk.old_content += line[1:] + '\n' + current_chunk.new_content += line[1:] + '\n' + + def get_file_content(self, file_path: str) -> Optional[CodeChunk]: + return self.files.get(file_path) + + def get_line_context(self, file_path: str, line_number: int, context_lines: int = 3) -> Optional[Dict[str, Any]]: + chunk = self.files.get(file_path) + if not chunk: + return None + + new_lines = chunk.new_content.split('\n') + if line_number > len(new_lines): + return None + + start = max(0, line_number - context_lines - 1) + end = min(len(new_lines), line_number + context_lines) + + context = [] + for i in range(start, end): + code = new_lines[i].rstrip('\n') + is_current_line = (i == line_number - 1) + context.append({ + 'line_number': chunk.new_start + i, + 'code': code, + 'is_issue_line': is_current_line + }) + + return { + 'file': file_path, + 'line': line_number, + 'context': context + } + + +def merge_issues_with_code(scan_results: Dict[str, Any], diff: str) -> Dict[str, Any]: + """将扫描问题与代码片段关联""" + if not diff: + return scan_results + + parser = DiffParser(diff) + enriched_results = { + 'scanners': [], + 'summary': scan_results.get('summary', {}), + 'total_issues': scan_results.get('total_issues', 0) + } + + for scanner_name, scanner_data in scan_results.items(): + if scanner_name in ['summary', 'total_issues', 'ai']: + continue + + if isinstance(scanner_data, dict): + enriched_scanner = { + 'name': scanner_name, + 'issues': [], + 'file_count': scanner_data.get('file_count', 0), + 'total_issues': scanner_data.get('total_issues', 0) + } + + issues = scanner_data.get('issues', []) + for issue in issues: + enriched_issue = enrich_issue_with_code(issue, parser) + enriched_scanner['issues'].append(enriched_issue) + + enriched_results['scanners'].append(enriched_scanner) + + if 'ai' in scan_results: + enriched_results['ai'] = scan_results['ai'] + + return enriched_results + + +def enrich_issue_with_code(issue: Dict[str, Any], parser: DiffParser) -> Dict[str, Any]: + """为单个问题添加代码片段""" + enriched = issue.copy() + + file_path = issue.get('file', '') + line_number = issue.get('line', 0) + + if not file_path: + return enriched + + if not line_number: + desc = issue.get('description', '') or issue.get('message', '') + line_match = re.search(r'line[:#]?\s*(\d+)', desc, re.IGNORECASE) + if line_match: + line_number = int(line_match.group(1)) + + matched_path = None + for path in parser.files.keys(): + if file_path.endswith(path) or path.endswith(file_path) or file_path in path: + matched_path = path + break + + if matched_path: + enriched['file'] = matched_path + if matched_path and line_number: + context = parser.get_line_context(matched_path, line_number) + if context: + enriched['code_context'] = context + + if 'code_context' not in enriched and matched_path: + chunk = parser.get_file_content(matched_path) + if chunk and chunk.new_content: + lines = chunk.new_content.split('\n')[:10] + enriched['code_context'] = { + 'file': matched_path, + 'line': line_number or 1, + 'preview': '\n'.join(lines), + 'has_more': len(chunk.new_content.split('\n')) > 10 + } + + return enriched diff --git a/web/index.html b/web/index.html index 9a8d39a..78bb2ce 100644 --- a/web/index.html +++ b/web/index.html @@ -8,6 +8,11 @@ @@ -208,9 +245,9 @@ - +