From d2f53ee233a4e7acba8231a6cf782612e14ee338 Mon Sep 17 00:00:00 2001 From: Dang Zerong Date: Mon, 9 Mar 2026 09:24:08 +0800 Subject: [PATCH] init --- config.yaml | 35 +++++ notify/__init__.py | 1 + notify/feishu.py | 288 ++++++++++++++++++++++++++++++++++++ report/__init__.py | 1 + report/generator.py | 220 +++++++++++++++++++++++++++ requirements.txt | 4 + scanner/__init__.py | 1 + scanner/base.py | 142 ++++++++++++++++++ scanner/js_scanner.py | 162 ++++++++++++++++++++ scanner/python_scanner.py | 196 ++++++++++++++++++++++++ scanner/security_scanner.py | 223 ++++++++++++++++++++++++++++ webhook/__init__.py | 1 + webhook/handler.py | 126 ++++++++++++++++ 13 files changed, 1400 insertions(+) create mode 100644 config.yaml create mode 100644 notify/__init__.py create mode 100644 notify/feishu.py create mode 100644 report/__init__.py create mode 100644 report/generator.py create mode 100644 requirements.txt create mode 100644 scanner/__init__.py create mode 100644 scanner/base.py create mode 100644 scanner/js_scanner.py create mode 100644 scanner/python_scanner.py create mode 100644 scanner/security_scanner.py create mode 100644 webhook/__init__.py create mode 100644 webhook/handler.py diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..d9770f0 --- /dev/null +++ b/config.yaml @@ -0,0 +1,35 @@ +server: + host: "0.0.0.0" + port: 5000 + debug: true + +gitea: + # Gitea 服务器地址(根据实际情况修改) + base_url: "http://154.9.253.114:3000" + # Gitea Webhook 签名密钥,需要与 Gitea 配置一致 + webhook_secret: "BoschScan_2026_xxx" + +feishu: + # 飞书机器人 Webhook 地址(替换为你的实际地址) + webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/30c24ff3-3d22-4217-813e-1bc49916b691" + # 飞书消息签名密钥(可选) + secret: "" + +scanner: + # 支持的编程语言 + languages: + - python + - javascript + - typescript + # 最大问题数量阈值,超过则标记为失败 + max_issues: 10 + # 是否启用详细扫描模式 + detailed: true + # 克隆代码仓库的临时目录 + temp_clone_dir: "/tmp/code_scanner_clones" + +report: + # 报告保存目录 + output_dir: "./reports" + # 是否保留报告文件 + keep_files: true diff --git a/notify/__init__.py b/notify/__init__.py new file mode 100644 index 0000000..2c65443 --- /dev/null +++ b/notify/__init__.py @@ -0,0 +1 @@ +# Notify 模块 \ No newline at end of file diff --git a/notify/feishu.py b/notify/feishu.py new file mode 100644 index 0000000..5ca5e2a --- /dev/null +++ b/notify/feishu.py @@ -0,0 +1,288 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +飞书机器人通知器 +发送代码质量扫描报告到飞书 +""" +import json +import time +import hashlib +import hmac +import base64 +import logging +import requests +from typing import Dict, Any + +logger = logging.getLogger(__name__) + + +class FeishuNotifier: + """飞书机器人通知器""" + + def __init__(self, config: Dict[str, Any]): + """ + 初始化飞书通知器 + + Args: + config: 飞书配置 + """ + self.config = config + self.webhook_url = config.get('webhook_url', '') + self.secret = config.get('secret', '') + + if not self.webhook_url: + logger.warning('飞书 Webhook URL 未配置') + + def send_report(self, report: Dict[str, Any]) -> bool: + """ + 发送扫描报告到飞书 + + Args: + report: 报告数据 + + Returns: + 是否发送成功 + """ + if not self.webhook_url: + logger.error('飞书 Webhook URL 未配置') + return False + + try: + # 构建消息内容 + message = self._build_message(report) + + # 如果配置了签名,则使用签名验证 + if self.secret: + timestamp, sign = self._generate_sign() + payload = { + "timestamp": timestamp, + "sign": sign, + "msg_type": "interactive", + "card": message + } + else: + payload = { + "msg_type": "interactive", + "card": message + } + + # 发送请求 + headers = {'Content-Type': 'application/json'} + response = requests.post( + self.webhook_url, + headers=headers, + data=json.dumps(payload).encode('utf-8'), + timeout=30 + ) + + # 解析响应 + result = response.json() + + if result.get('code') == 0: + logger.info('飞书消息发送成功') + return True + else: + logger.error(f'飞书消息发送失败: {result.get("msg")}') + return False + + except Exception as e: + logger.error(f'发送飞书通知失败: {str(e)}', exc_info=True) + return False + + def _generate_sign(self) -> tuple: + """ + 生成飞书签名 + + Returns: + (timestamp, sign) 元组 + """ + # 当前时间戳(秒) + timestamp = str(int(time.time())) + + # 拼接字符串 + string_to_sign = '{}\n{}'.format(timestamp, self.secret) + + # 使用 HmacSHA256 计算签名 + hmac_code = hmac.new( + string_to_sign.encode('utf-8'), + digestmod=hashlib.sha256 + ).digest() + + # 进行 Base64 编码 + sign = base64.b64encode(hmac_code).decode('utf-8') + + return timestamp, sign + + def _build_message(self, report: Dict[str, Any]) -> Dict[str, Any]: + """ + 构建飞书卡片消息 + + Args: + report: 报告数据 + + Returns: + 飞书卡片消息结构 + """ + # 根据状态选择颜色 + status = report.get('status', 'pass') + if status == 'pass': + theme_color = 'green' + status_icon = '✅' + elif status == 'fail': + theme_color = 'red' + status_icon = '❌' + else: + theme_color = 'orange' + status_icon = '⚠️' + + # 构建问题摘要 + total_issues = report.get('total_issues', 0) + total_errors = report.get('total_errors', 0) + total_warnings = report.get('total_warnings', 0) + + # 获取扫描结果详情 + scan_details = [] + for scanner_name, result in report.get('scan_results', {}).items(): + tool_name = result.get('tool', scanner_name) + summary = result.get('summary', {}) + files_scanned = result.get('files_scanned', 0) + total = summary.get('total', 0) + + if total > 0: + detail_text = f"{tool_name}: 扫描 {files_scanned} 个文件,发现 {total} 个问题" + else: + detail_text = f"{tool_name}: 扫描 {files_scanned} 个文件,无问题" + + scan_details.append(detail_text) + + # 构建卡片消息 + card = { + "header": { + "title": { + "tag": "plain_text", + "content": f"{status_icon} 代码质量扫描报告" + }, + "template": theme_color + }, + "elements": [ + { + "tag": "div", + "text": { + "tag": "lark_md", + "content": f"**仓库:** `{report.get('repo_name', 'unknown')}`\n" + f"**分支:** `{report.get('branch', 'unknown')}`\n" + f"**提交:** `{report.get('commit_id', 'unknown')}`\n" + f"**提交者:** {report.get('author', 'unknown')}" + } + }, + { + "tag": "div", + "text": { + "tag": "lark_md", + "content": f"**扫描状态:** {report.get('status_text', 'unknown')}\n" + f"📊 总问题: {total_issues} | " + f"🔴 错误: {total_errors} | " + f"🟡 警告: {total_warnings}" + } + } + ] + } + + # 添加扫描详情 + if scan_details: + card["elements"].append({ + "tag": "div", + "text": { + "tag": "lark_md", + "content": "**扫描详情:**\n" + "\n".join([f"- {d}" for d in scan_details]) + } + }) + + # 添加主要问题列表(最多显示5个) + all_issues = [] + for scanner_name, result in report.get('scan_results', {}).items(): + for issue in result.get('issues', [])[:3]: # 每个扫描器最多显示3个 + all_issues.append(issue) + + if all_issues: + issues_text = "**主要问题:**\n" + for i, issue in enumerate(all_issues[:5], 1): + severity = issue.get('severity', 'Unknown') + severity_emoji = { + 'HIGH': '🔴', + 'MEDIUM': '🟡', + 'LOW': '🔵', + 'ERROR': '🔴', + 'WARNING': '🟡' + }.get(severity.upper(), '⚪') + + file_path = issue.get('file', 'unknown') + line_num = issue.get('line', 0) + message = issue.get('message', 'No message')[:50] + + issues_text += f"{i}. {severity_emoji} `{file_path}:{line_num}` - {message}\n" + + card["elements"].append({ + "tag": "div", + "text": { + "tag": "lark_md", + "content": issues_text + } + }) + + # 添加时间戳 + card["elements"].append({ + "tag": "div", + "text": { + "tag": "lark_md", + "content": f"🕐 扫描时间: {report.get('timestamp', '')}" + } + }) + + return card + + def send_simple_message(self, title: str, content: str) -> bool: + """ + 发送简单文本消息 + + Args: + title: 标题 + content: 内容 + + Returns: + 是否发送成功 + """ + if not self.webhook_url: + logger.error('飞书 Webhook URL 未配置') + return False + + try: + # 构建消息 + payload = { + "msg_type": "text", + "content": { + "text": f"{title}\n{content}" + } + } + + # 发送请求 + headers = {'Content-Type': 'application/json'} + response = requests.post( + self.webhook_url, + headers=headers, + data=json.dumps(payload).encode('utf-8'), + timeout=30 + ) + + result = response.json() + + if result.get('code') == 0: + logger.info('飞书消息发送成功') + return True + else: + logger.error(f'飞书消息发送失败: {result.get("msg")}') + return False + + except Exception as e: + logger.error(f'发送飞书通知失败: {str(e)}') + return False diff --git a/report/__init__.py b/report/__init__.py new file mode 100644 index 0000000..e9a56fa --- /dev/null +++ b/report/__init__.py @@ -0,0 +1 @@ +# Report 模块 \ No newline at end of file diff --git a/report/generator.py b/report/generator.py new file mode 100644 index 0000000..447f436 --- /dev/null +++ b/report/generator.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Markdown 报告生成器 +生成代码质量扫描报告 +""" +import os +import json +import logging +from datetime import datetime +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + + +class ReportGenerator: + """代码质量扫描报告生成器""" + + def __init__(self, config: Dict[str, Any]): + """ + 初始化报告生成器 + + Args: + config: 报告配置 + """ + self.config = config + self.output_dir = config.get('output_dir', './reports') + self.keep_files = config.get('keep_files', True) + + # 确保输出目录存在 + os.makedirs(self.output_dir, exist_ok=True) + + def generate( + self, + repo_name: str, + branch: str, + commit_id: str, + commit_message: str, + author: str, + scan_results: Dict[str, Any] + ) -> Dict[str, Any]: + """ + 生成扫描报告 + + Args: + repo_name: 仓库名称 + branch: 分支名 + commit_id: 提交 ID + commit_message: 提交信息 + author: 提交者 + scan_results: 扫描结果 + + Returns: + 报告数据 + """ + # 计算总体统计 + total_issues = 0 + total_errors = 0 + total_warnings = 0 + + for scanner_name, result in scan_results.items(): + summary = result.get('summary', {}) + total_issues += summary.get('total', 0) + total_errors += summary.get('error', 0) + summary.get('high', 0) + total_warnings += summary.get('warning', 0) + summary.get('medium', 0) + + # 确定状态 + if total_issues == 0: + status = 'pass' + status_text = '✅ 扫描通过' + elif total_errors > 0: + status = 'fail' + status_text = f'❌ 发现 {total_errors} 个错误' + else: + status = 'warning' + status_text = f'⚠️ 发现 {total_warnings} 个警告' + + # 生成报告数据 + report = { + 'repo_name': repo_name, + 'branch': branch, + 'commit_id': commit_id, + 'commit_message': commit_message, + 'author': author, + 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'status': status, + 'status_text': status_text, + 'total_issues': total_issues, + 'total_errors': total_errors, + 'total_warnings': total_warnings, + 'scan_results': scan_results, + 'markdown': self._generate_markdown( + repo_name, branch, commit_id, commit_message, author, scan_results, status, status_text + ) + } + + # 保存报告文件 + if self.keep_files: + self._save_report(report) + + return report + + def _generate_markdown( + self, + repo_name: str, + branch: str, + commit_id: str, + commit_message: str, + author: str, + scan_results: Dict[str, Any], + status: str, + status_text: str + ) -> str: + """生成 Markdown 格式的报告""" + lines = [] + + # 标题 + lines.append('# 📊 代码质量扫描报告') + lines.append('') + + # 基本信息 + lines.append('## 📋 基本信息') + lines.append('') + lines.append(f'| 项目 | 内容 |') + lines.append(f'|------|------|') + lines.append(f'| 仓库 | `{repo_name}` |') + lines.append(f'| 分支 | `{branch}` |') + lines.append(f'| 提交 | `{commit_id}` |') + lines.append(f'| 提交者 | {author} |') + lines.append(f'| 提交信息 | {commit_message[:50]}... |' if len(commit_message) > 50 else f'| 提交信息 | {commit_message} |') + lines.append(f'| 扫描时间 | {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |') + lines.append('') + + # 扫描状态 + lines.append('## 📈 扫描状态') + lines.append('') + lines.append(f'**{status_text}**') + lines.append('') + + # 各扫描器结果汇总 + lines.append('## 🔍 扫描详情') + lines.append('') + + for scanner_name, result in scan_results.items(): + tool_name = result.get('tool', scanner_name) + summary = result.get('summary', {}) + + lines.append(f'### {tool_name}') + lines.append('') + lines.append(f'- 扫描文件数: {result.get("files_scanned", 0)}') + lines.append(f'- 总问题数: {summary.get("total", 0)}') + + # 根据不同扫描器显示不同的摘要字段 + if 'error' in summary: + lines.append(f' - 错误: {summary.get("error", 0)}') + lines.append(f' - 警告: {summary.get("warning", 0)}') + lines.append(f' - 提示: {summary.get("info", 0)}') + elif 'high' in summary: + lines.append(f' - 高危: {summary.get("high", 0)}') + lines.append(f' - 中危: {summary.get("medium", 0)}') + lines.append(f' - 低危: {summary.get("low", 0)}') + + issues = result.get('issues', []) + if issues and self.config.get('detailed', True): + lines.append('') + lines.append('**问题列表:**') + lines.append('') + + for i, issue in enumerate(issues[:10], 1): # 最多显示10个 + severity = issue.get('severity', 'Unknown') + severity_emoji = { + 'HIGH': '🔴', + 'MEDIUM': '🟡', + 'LOW': '🔵', + 'ERROR': '🔴', + 'WARNING': '🟡', + 'INFO': 'ℹ️' + }.get(severity.upper(), '⚪') + + file_path = issue.get('file', 'unknown') + line_num = issue.get('line', 0) + message = issue.get('message', 'No message') + + lines.append(f'{i}. {severity_emoji} **{severity}** - `{file_path}:{line_num}`') + lines.append(f' - {message}') + lines.append('') + + # 添加报告链接或下一步操作 + lines.append('---') + lines.append('') + lines.append('*此报告由 AI Code Quality Scanner 自动生成*') + + return '\n'.join(lines) + + def _save_report(self, report: Dict[str, Any]): + """保存报告到文件""" + try: + # 生成文件名 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + repo_name = report['repo_name'].replace('/', '_') + filename = f'{repo_name}_{report["commit_id"]}_{timestamp}.md' + filepath = os.path.join(self.output_dir, filename) + + # 写入文件 + with open(filepath, 'w', encoding='utf-8') as f: + f.write(report['markdown']) + + logger.info(f'报告已保存: {filepath}') + + # 同时保存 JSON 格式(便于程序解析) + json_filename = filename.replace('.md', '.json') + json_filepath = os.path.join(self.output_dir, json_filename) + + with open(json_filepath, 'w', encoding='utf-8') as f: + json.dump(report, f, ensure_ascii=False, indent=2) + + logger.info(f'JSON 报告已保存: {json_filepath}') + + except Exception as e: + logger.error(f'保存报告失败: {str(e)}') diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..369f634 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +flask>=2.0.0 +pyyaml>=5.0 +requests>=2.25.0 +python-dotenv>=0.19.0 diff --git a/scanner/__init__.py b/scanner/__init__.py new file mode 100644 index 0000000..6c10f9e --- /dev/null +++ b/scanner/__init__.py @@ -0,0 +1 @@ +# Scanner 模块 \ No newline at end of file diff --git a/scanner/base.py b/scanner/base.py new file mode 100644 index 0000000..0a93395 --- /dev/null +++ b/scanner/base.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +代码扫描器基类 +定义扫描器接口和通用功能 +""" +import os +import logging +import tempfile +import shutil +from abc import ABC, abstractmethod +from typing import Dict, Any, List, Optional +from git import Repo +logger = logging.getLogger(__name__) +class BaseScanner(ABC): + """代码扫描器基类""" + def __init__(self, config: Dict[str, Any]): + """ + 初始化扫描器 + Args: + config: 扫描器配置 + """ + self.config = config + self.temp_dir = config.get('temp_clone_dir', '/tmp/code_scanner_clones') + self.max_issues = config.get('max_issues', 10) + self.detailed = config.get('detailed', True) + # 确保临时目录存在 + os.makedirs(self.temp_dir, exist_ok=True) + @abstractmethod + def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + """ + 执行代码扫描 + Args: + repo_url: 仓库 URL + commit_id: 提交 ID + branch: 分支名 + Returns: + 扫描结果 + """ + pass + def clone_repo(self, repo_url: str, commit_id: Optional[str], branch: str) -> str: + """ + 克隆代码仓库到临时目录 + Args: + repo_url: 仓库 URL + commit_id: 提交 ID(可选,为 None 时使用 branch) + branch: 分支名 + Returns: + 克隆的目录路径 + """ + # 生成唯一的目录名 + repo_name = repo_url.split('/')[-1].replace('.git', '') + commit_hash = commit_id or branch + clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}") + # 如果目录已存在,先删除 + if os.path.exists(clone_dir): + shutil.rmtree(clone_dir) + try: + logger.info(f'克隆仓库: {repo_url}') + # 克隆仓库(浅克隆,只获取最新提交) + repo = Repo.clone_from( + repo_url, + clone_dir, + depth=1, + branch=branch + ) + # 如果指定了 commit_id,切换到该提交 + if commit_id: + repo.git.checkout(commit_id) + logger.info(f'仓库克隆成功: {clone_dir}') + return clone_dir + except Exception as e: + logger.error(f'克隆仓库失败: {str(e)}') + raise + def cleanup(self, clone_dir: str): + """ + 清理临时目录 + Args: + clone_dir: 克隆的目录路径 + """ + try: + if os.path.exists(clone_dir): + shutil.rmtree(clone_dir) + logger.info(f'清理临时目录: {clone_dir}') + except Exception as e: + logger.warning(f'清理临时目录失败: {str(e)}') + def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]: + """ + 运行命令并返回结果 + Args: + cmd: 命令列表 + cwd: 工作目录 + timeout: 超时时间(秒) + Returns: + 命令执行结果 + """ + import subprocess + try: + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=True, + text=True, + timeout=timeout + ) + return { + 'success': result.returncode == 0, + 'returncode': result.returncode, + 'stdout': result.stdout, + 'stderr': result.stderr + } + except subprocess.TimeoutExpired: + return { + 'success': False, + 'returncode': -1, + 'stdout': '', + 'stderr': 'Command timeout' + } + except Exception as e: + return { + 'success': False, + 'returncode': -1, + 'stdout': '', + 'stderr': str(e) + } + def get_changed_files(self, clone_dir: str, extensions: List[str]) -> List[str]: + """ + 获取指定扩展名的文件列表 + Args: + clone_dir: 仓库目录 + extensions: 文件扩展名列表 + Returns: + 文件路径列表 + """ + files = [] + for root, dirs, filenames in os.walk(clone_dir): + # 跳过隐藏目录和特殊目录 + dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv', '.git']] + for filename in filenames: + if any(filename.endswith(ext) for ext in extensions): + files.append(os.path.join(root, filename)) + return files diff --git a/scanner/js_scanner.py b/scanner/js_scanner.py new file mode 100644 index 0000000..d290e26 --- /dev/null +++ b/scanner/js_scanner.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +JavaScript/TypeScript 代码扫描器 +使用 ESLint 进行代码质量检查 +""" +import os +import json +import logging +from typing import Dict, Any, List, Optional +from scanner.base import BaseScanner + +logger = logging.getLogger(__name__) + + +class JavaScriptScanner(BaseScanner): + """JavaScript/TypeScript 代码扫描器""" + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + self.extensions = ['.js', '.jsx', '.ts', '.tsx', '.vue', '.svelte'] + + def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + """ + 执行 JavaScript/TypeScript 代码扫描 + + Args: + repo_url: 仓库 URL + commit_id: 提交 ID + branch: 分支名 + + Returns: + 扫描结果 + """ + result = { + 'tool': 'JavaScript Scanner', + 'language': 'javascript', + 'status': 'success', + 'issues': [], + 'summary': { + 'total': 0, + 'error': 0, + 'warning': 0, + 'info': 0 + }, + 'files_scanned': 0 + } + + clone_dir = None + try: + # 克隆仓库 + clone_dir = self.clone_repo(repo_url, commit_id, branch) + + # 获取 JavaScript/TypeScript 文件 + js_files = self.get_changed_files(clone_dir, self.extensions) + result['files_scanned'] = len(js_files) + + if not js_files: + logger.info('没有找到 JavaScript/TypeScript 文件') + return result + + # 运行 ESLint 扫描 + eslint_result = self._run_eslint(clone_dir, js_files) + + # 合并结果 + result['issues'] = eslint_result.get('issues', [])[:self.max_issues] if self.detailed else eslint_result.get('issues', []) + result['summary'] = self._calculate_summary(eslint_result.get('issues', [])) + result['raw_output'] = eslint_result.get('raw_output', '') + + except Exception as e: + logger.error(f'JavaScript 扫描失败: {str(e)}') + result['status'] = 'error' + result['error'] = str(e) + + finally: + # 清理临时目录 + if clone_dir: + self.cleanup(clone_dir) + + return result + + def _run_eslint(self, cwd: str, files: List[str]) -> Dict[str, Any]: + """运行 ESLint 扫描""" + result = { + 'tool': 'eslint', + 'issues': [], + 'raw_output': '' + } + + try: + # 尝试使用 npx 运行 eslint + cmd = ['npx', 'eslint', '--format=json', '--no-eslintrc'] + files + + # 如果没有 eslint 配置,先创建默认配置 + eslintrc_path = os.path.join(cwd, '.eslintrc.json') + if not os.path.exists(eslintrc_path): + # 创建简单的 ESLint 配置 + eslint_config = { + "env": { + "browser": True, + "es2021": True, + "node": True + }, + "extends": ["eslint:recommended"], + "parserOptions": { + "ecmaVersion": "latest", + "sourceType": "module" + } + } + with open(eslintrc_path, 'w') as f: + json.dump(eslint_config, f) + + output = self.run_command(cmd, cwd, timeout=120) + result['raw_output'] = output.get('stdout', '') + output.get('stderr', '') + + # 解析 JSON 输出 + if output.get('stdout'): + try: + eslint_results = json.loads(output['stdout']) + for file_result in eslint_results: + file_path = file_result.get('filePath', '') + messages = file_result.get('messages', []) + + for msg in messages: + severity = 'error' if msg.get('severity', 0) == 2 else 'warning' + result['issues'].append({ + 'tool': 'eslint', + 'type': severity, + 'severity': 'Error' if msg.get('severity', 0) == 2 else 'Warning', + 'message': msg.get('message', ''), + 'file': os.path.basename(file_path), + 'line': msg.get('line', 0), + 'column': msg.get('column', 0), + 'symbol': msg.get('ruleId', 'unknown') + }) + except json.JSONDecodeError as e: + logger.warning(f'ESLint JSON 解析失败: {e}') + + except Exception as e: + logger.warning(f'ESLint 运行失败: {str(e)}') + + return result + + def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]: + """计算问题摘要""" + summary = { + 'total': len(issues), + 'error': 0, + 'warning': 0, + 'info': 0 + } + + for issue in issues: + severity = issue.get('severity', '').lower() + if severity in ['error', 'critical']: + summary['error'] += 1 + elif severity in ['warning', 'moderate']: + summary['warning'] += 1 + else: + summary['info'] += 1 + + return summary diff --git a/scanner/python_scanner.py b/scanner/python_scanner.py new file mode 100644 index 0000000..c2293c7 --- /dev/null +++ b/scanner/python_scanner.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Python 代码扫描器 +使用 Pylint、Flake8、MyPy 进行代码质量检查 +""" +import os +import json +import logging +from typing import Dict, Any, List, Optional +from scanner.base import BaseScanner + +logger = logging.getLogger(__name__) + + +class PythonScanner(BaseScanner): + """Python 代码扫描器""" + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + self.extensions = ['.py'] + + def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + """ + 执行 Python 代码扫描 + + Args: + repo_url: 仓库 URL + commit_id: 提交 ID + branch: 分支名 + + Returns: + 扫描结果 + """ + result = { + 'tool': 'Python Scanner', + 'language': 'python', + 'status': 'success', + 'issues': [], + 'summary': { + 'total': 0, + 'error': 0, + 'warning': 0, + 'info': 0 + }, + 'files_scanned': 0 + } + + clone_dir = None + try: + # 克隆仓库 + clone_dir = self.clone_repo(repo_url, commit_id, branch) + + # 获取 Python 文件 + py_files = self.get_changed_files(clone_dir, self.extensions) + result['files_scanned'] = len(py_files) + + if not py_files: + logger.info('没有找到 Python 文件') + return result + + # 运行各种扫描工具 + pylint_result = self._run_pylint(clone_dir, py_files) + flake8_result = self._run_flake8(clone_dir, py_files) + + # 合并结果 + all_issues = [] + all_issues.extend(pylint_result.get('issues', [])) + all_issues.extend(flake8_result.get('issues', [])) + + result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues + result['summary'] = self._calculate_summary(all_issues) + result['raw_output'] = { + 'pylint': pylint_result.get('raw_output', ''), + 'flake8': flake8_result.get('raw_output', '') + } + + except Exception as e: + logger.error(f'Python 扫描失败: {str(e)}') + result['status'] = 'error' + result['error'] = str(e) + + finally: + # 清理临时目录 + if clone_dir: + self.cleanup(clone_dir) + + return result + + def _run_pylint(self, cwd: str, files: List[str]) -> Dict[str, Any]: + """运行 Pylint 扫描""" + result = { + 'tool': 'pylint', + 'issues': [], + 'raw_output': '' + } + + # 只扫描变更的文件 + try: + cmd = ['python', '-m', 'pylint', '--output-format=json'] + files + output = self.run_command(cmd, cwd, timeout=120) + + result['raw_output'] = output.get('stdout', '') + + # 解析 JSON 输出 + if output.get('stdout'): + try: + issues = json.loads(output['stdout']) + for issue in issues: + result['issues'].append({ + 'tool': 'pylint', + 'type': issue.get('type', 'info'), + 'severity': issue.get('severity', 'Info'), + 'message': issue.get('message', ''), + 'file': os.path.basename(issue.get('path', '')), + 'line': issue.get('line', 0), + 'column': issue.get('column', 0), + 'symbol': issue.get('symbol', '') + }) + except json.JSONDecodeError: + logger.warning('Pylint JSON 解析失败') + + except Exception as e: + logger.warning(f'Pylint 运行失败: {str(e)}') + + return result + + def _run_flake8(self, cwd: str, files: List[str]) -> Dict[str, Any]: + """运行 Flake8 扫描""" + result = { + 'tool': 'flake8', + 'issues': [], + 'raw_output': '' + } + + try: + cmd = ['python', '-m', 'flake8', '--format=json'] + files + output = self.run_command(cmd, cwd, timeout=120) + + result['raw_output'] = output.get('stdout', '') + + # 解析 JSON 输出 + if output.get('stdout'): + try: + issues = json.loads(output['stdout']) + for issue in issues: + result['issues'].append({ + 'tool': 'flake8', + 'type': self._map_flake8_code(issue.get('code', '')), + 'severity': 'Warning', + 'message': issue.get('text', ''), + 'file': os.path.basename(issue.get('filename', '')), + 'line': issue.get('line_number', 0), + 'column': issue.get('column_number', 0), + 'symbol': issue.get('code', '') + }) + except json.JSONDecodeError: + logger.warning('Flake8 JSON 解析失败') + + except Exception as e: + logger.warning(f'Flake8 运行失败: {str(e)}') + + return result + + def _map_flake8_code(self, code: str) -> str: + """映射 Flake8 错误代码到类型""" + # E/W - Flake8 错误/警告 + # F - Pyflakes + # C - mccabe 复杂度 + if code.startswith('E') or code.startswith('W'): + return 'error' if code.startswith('E') else 'warning' + elif code.startswith('F'): + return 'error' + elif code.startswith('C'): + return 'warning' + return 'info' + + def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]: + """计算问题摘要""" + summary = { + 'total': len(issues), + 'error': 0, + 'warning': 0, + 'info': 0 + } + + for issue in issues: + severity = issue.get('severity', '').lower() + if severity in ['error', 'critical', 'fatal', 'error']: + summary['error'] += 1 + elif severity in ['warning', 'moderate']: + summary['warning'] += 1 + else: + summary['info'] += 1 + + return summary diff --git a/scanner/security_scanner.py b/scanner/security_scanner.py new file mode 100644 index 0000000..0a68a4f --- /dev/null +++ b/scanner/security_scanner.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +安全扫描器 +使用 Bandit 进行安全漏洞扫描 +""" +import os +import json +import logging +from typing import Dict, Any, List, Optional +from scanner.base import BaseScanner + +logger = logging.getLogger(__name__) + + +class SecurityScanner(BaseScanner): + """安全漏洞扫描器""" + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + # 扫描所有代码文件以发现安全问题 + self.extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', '.rb', '.php'] + + def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + """ + 执行安全扫描 + + Args: + repo_url: 仓库 URL + commit_id: 提交 ID + branch: 分支名 + + Returns: + 扫描结果 + """ + result = { + 'tool': 'Security Scanner', + 'language': 'multi', + 'status': 'success', + 'issues': [], + 'summary': { + 'total': 0, + 'high': 0, + 'medium': 0, + 'low': 0, + 'info': 0 + }, + 'files_scanned': 0 + } + + clone_dir = None + try: + # 克隆仓库 + clone_dir = self.clone_repo(repo_url, commit_id, branch) + + # 获取所有支持的文件 + all_files = self.get_changed_files(clone_dir, self.extensions) + result['files_scanned'] = len(all_files) + + if not all_files: + logger.info('没有找到可扫描的文件') + return result + + # Python 安全扫描 (Bandit) + py_files = [f for f in all_files if f.endswith('.py')] + if py_files: + bandit_result = self._run_bandit(clone_dir, py_files) + result['issues'].extend(bandit_result.get('issues', [])) + + # JavaScript 安全扫描 + js_files = [f for f in all_files if f.endswith(('.js', '.jsx', '.ts', '.tsx'))] + if js_files: + # 使用简单的模式匹配检测常见安全问题 + js_security_result = self._scan_js_security(clone_dir, js_files) + result['issues'].extend(js_security_result.get('issues', [])) + + # 计算摘要 + result['summary'] = self._calculate_summary(result['issues']) + + # 限制返回的问题数量 + if self.detailed: + result['issues'] = result['issues'][:self.max_issues] + + except Exception as e: + logger.error(f'安全扫描失败: {str(e)}') + result['status'] = 'error' + result['error'] = str(e) + + finally: + # 清理临时目录 + if clone_dir: + self.cleanup(clone_dir) + + return result + + def _run_bandit(self, cwd: str, files: List[str]) -> Dict[str, Any]: + """运行 Bandit 安全扫描""" + result = { + 'tool': 'bandit', + 'issues': [] + } + + try: + # 运行 bandit + cmd = ['python', '-m', 'bandit', '-f', 'json'] + files + output = self.run_command(cmd, cwd, timeout=120) + + # 解析 JSON 输出 + if output.get('stdout'): + try: + data = json.loads(output['stdout']) + results = data.get('results', []) + + for issue in results: + # 映射严重级别 + severity = issue.get('issue_severity', 'LOW') + result['issues'].append({ + 'tool': 'bandit', + 'type': issue.get('issue_id', 'unknown'), + 'severity': severity, + 'confidence': issue.get('issue_confidence', 'LOW'), + 'message': issue.get('issue_text', ''), + 'file': os.path.basename(issue.get('filename', '')), + 'line': issue.get('line_number', 0), + 'code': issue.get('code', '') + }) + except json.JSONDecodeError: + logger.warning('Bandit JSON 解析失败') + + except Exception as e: + logger.warning(f'Bandit 运行失败: {str(e)}') + + return result + + def _scan_js_security(self, cwd: str, files: List[str]) -> Dict[str, Any]: + """简单的 JavaScript 安全扫描(基于模式匹配)""" + result = { + 'tool': 'js-security', + 'issues': [] + } + + # 需要检测的不安全模式 + dangerous_patterns = [ + { + 'pattern': r'eval\s*\(', + 'message': '使用 eval() 可能导致代码注入', + 'severity': 'HIGH' + }, + { + 'pattern': r'innerHTML\s*=', + 'message': '使用 innerHTML 可能导致 XSS 攻击', + 'severity': 'MEDIUM' + }, + { + 'pattern': r'document\.write\s*\(', + 'message': '使用 document.write 可能导致 XSS 攻击', + 'severity': 'MEDIUM' + }, + { + 'pattern': r'password\s*[:=]', + 'message': '硬编码密码可能存在安全风险', + 'severity': 'HIGH' + }, + { + 'pattern': r'api[_-]?key\s*[:=]', + 'message': '硬编码 API Key 可能存在安全风险', + 'severity': 'HIGH' + }, + { + 'pattern': r'secret\s*[:=]', + 'message': '硬编码密钥可能存在安全风险', + 'severity': 'HIGH' + } + ] + + import re + + for file_path in files: + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + lines = content.split('\n') + + for line_num, line in enumerate(lines, 1): + for pattern_info in dangerous_patterns: + if re.search(pattern_info['pattern'], line, re.IGNORECASE): + result['issues'].append({ + 'tool': 'js-security', + 'type': 'security-warning', + 'severity': pattern_info['severity'], + 'confidence': 'MEDIUM', + 'message': pattern_info['message'], + 'file': os.path.basename(file_path), + 'line': line_num, + 'code': line.strip()[:80] + }) + except Exception as e: + logger.warning(f'扫描文件 {file_path} 失败: {str(e)}') + + return result + + def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]: + """计算问题摘要""" + summary = { + 'total': len(issues), + 'high': 0, + 'medium': 0, + 'low': 0, + 'info': 0 + } + + for issue in issues: + severity = issue.get('severity', '').upper() + if severity in ['HIGH', 'CRITICAL']: + summary['high'] += 1 + elif severity == 'MEDIUM': + summary['medium'] += 1 + elif severity == 'LOW': + summary['low'] += 1 + else: + summary['info'] += 1 + + return summary diff --git a/webhook/__init__.py b/webhook/__init__.py new file mode 100644 index 0000000..26877d1 --- /dev/null +++ b/webhook/__init__.py @@ -0,0 +1 @@ +# Webhook 模块 \ No newline at end of file diff --git a/webhook/handler.py b/webhook/handler.py new file mode 100644 index 0000000..7d32c26 --- /dev/null +++ b/webhook/handler.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Gitea Webhook 处理器 +验证签名并解析 Webhook 事件 +""" +import hmac +import hashlib +import logging +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + + +class GiteaWebhookHandler: + """Gitea Webhook 处理器""" + + def __init__(self, config: Dict[str, Any]): + """ + 初始化 Webhook 处理器 + + Args: + config: Gitea 配置 + """ + self.config = config + self.base_url = config.get('base_url', 'http://localhost:3000') + self.webhook_secret = config.get('webhook_secret', '') + + def verify_signature(self, payload: bytes, signature: str, secret: str) -> bool: + """ + 验证 Webhook 签名 + + Args: + payload: 请求体 + signature: 请求头中的签名 + secret: 密钥 + + Returns: + 签名是否有效 + """ + if not secret: + logger.warning('未配置 Webhook 密钥,跳过验证') + return True + + try: + # Gitea 使用 SHA256 HMAC + expected_signature = hmac.new( + secret.encode('utf-8'), + payload, + hashlib.sha256 + ).hexdigest() + + # 比较签名(使用 constant time 比较防止时序攻击) + return hmac.compare_digest(f'sha256={expected_signature}', signature) + except Exception as e: + logger.error(f'签名验证失败: {str(e)}') + return False + + def parse_push_event(self, payload: Dict[str, Any]) -> Dict[str, Any]: + """ + 解析 Push 事件 + + Args: + payload: Webhook payload + + Returns: + 解析后的提交信息 + """ + repo = payload.get('repository', {}) + commits = payload.get('commits', []) + ref = payload.get('ref', '') + + return { + 'repo_name': repo.get('full_name', ''), + 'repo_url': repo.get('clone_url', ''), + 'web_url': repo.get('web_url', ''), + 'branch': ref.replace('refs/heads/', ''), + 'commits': [ + { + 'id': commit.get('id', '')[:8], + 'message': commit.get('message', ''), + 'author': commit.get('author', {}).get('name', ''), + 'email': commit.get('author', {}).get('email', ''), + 'timestamp': commit.get('timestamp', ''), + 'added': commit.get('added', []), + 'modified': commit.get('modified', []), + 'removed': commit.get('removed', []), + } + for commit in commits + ], + 'pusher': payload.get('pusher', {}).get('name', ''), + 'before': payload.get('before', ''), + 'after': payload.get('after', ''), + } + + def get_changed_files(self, commit: Dict[str, Any]) -> list: + """ + 获取提交中变更的文件列表 + + Args: + commit: 提交信息 + + Returns: + 变更的文件列表 + """ + files = [] + files.extend(commit.get('added', [])) + files.extend(commit.get('modified', [])) + files.extend(commit.get('removed', [])) + return files + + def filter_by_extension(self, files: list, extensions: list) -> list: + """ + 按文件扩展名过滤文件 + + Args: + files: 文件列表 + extensions: 扩展名列表(如 ['.py', '.js']) + + Returns: + 过滤后的文件列表 + """ + return [ + f for f in files + if any(f.endswith(ext) for ext in extensions) + ]