commit 378feffe74cf01a40fe6464c743b5551f6ab0c3e Author: Dang Zerong Date: Sat Mar 7 19:56:28 2026 +0800 init diff --git a/app.py b/app.py new file mode 100644 index 0000000..14b50b7 --- /dev/null +++ b/app.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import logging + + +os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0') + +from flask import Flask, request, jsonify +import yaml +from webhook.handler import GiteaWebhookHandler +from scanner.python_scanner import PythonScanner +from scanner.js_scanner import JavaScriptScanner +from scanner.security_scanner import SecurityScanner +from report.generator import ReportGenerator +from notify.feishu import FeishuNotifier + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# 加载配置 +def load_config(): + """加载配置文件""" + config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') + with open(config_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + +# 全局配置 +config = load_config() + +# 初始化应用 +app = Flask(__name__) +app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key') + +# 初始化组件 +webhook_handler = GiteaWebhookHandler(config['gitea']) +python_scanner = PythonScanner(config.get('scanner', {})) +js_scanner = JavaScriptScanner(config.get('scanner', {})) +security_scanner = SecurityScanner(config.get('scanner', {})) +report_generator = ReportGenerator(config.get('report', {})) +feishu_notifier = FeishuNotifier(config['feishu']) + + +@app.route('/') +def index(): + """健康检查接口""" + return jsonify({ + 'status': 'ok', + 'service': 'AI Code Quality Scanner', + 'version': '1.0.0' + }) + + +@app.route('/webhook/gitea', methods=['POST']) +def handle_gitea_webhook(): + """处理 Gitea Webhook 请求""" + try: + # 验证签名 + signature = request.headers.get('X-Gitea-Signature') + if signature: + if not webhook_handler.verify_signature( + request.data, + signature, + config['gitea']['webhook_secret'] + ): + logger.warning('Webhook 签名验证失败') + return jsonify({'error': 'Invalid signature'}), 401 + + # 解析 Webhook payload + payload = request.json + if not payload: + return jsonify({'error': 'No payload'}), 400 + + event_type = request.headers.get('X-Gitea-Event', 'push') + logger.info(f'收到 Gitea Webhook 事件: {event_type}') + + # 只处理 push 事件 + if event_type != 'push': + return jsonify({'message': 'Event ignored'}), 200 + + # 提取提交信息 + commits = payload.get('commits', []) + if not commits: + return jsonify({'message': 'No commits'}), 200 + + repo = payload.get('repository', {}) + repo_name = repo.get('full_name', 'unknown') + branch = payload.get('ref', '').replace('refs/heads/', '') + pusher = payload.get('pusher', {}).get('name', 'unknown') + + logger.info(f'处理仓库 {repo_name} 的 {len(commits)} 个提交') + + # 处理每个提交 + for commit in commits: + commit_id = commit.get('id', '')[:8] + commit_message = commit.get('message', '') + author = commit.get('author', {}).get('name', 'unknown') + + logger.info(f'扫描提交 {commit_id}: {commit_message}') + + try: + # 获取仓库 URL + clone_url = repo.get('clone_url') + if not clone_url: + # 尝试从 web_url 构建 + web_url = repo.get('web_url', '') + if web_url: + clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://') + clone_url = clone_url.rstrip('/') + '.git' + + # 执行代码扫描 + scan_results = {} + + # Python 扫描 + if 'python' in config.get('scanner', {}).get('languages', []): + scan_results['python'] = python_scanner.scan( + clone_url, commit_id, branch + ) + + # JavaScript/TypeScript 扫描 + if any(lang in config.get('scanner', {}).get('languages', []) + for lang in ['javascript', 'typescript']): + scan_results['javascript'] = js_scanner.scan( + clone_url, commit_id, branch + ) + + # 安全扫描 + scan_results['security'] = security_scanner.scan( + clone_url, commit_id, branch + ) + + # 生成报告 + report = report_generator.generate( + repo_name=repo_name, + branch=branch, + commit_id=commit_id, + commit_message=commit_message, + author=author, + scan_results=scan_results + ) + + # 发送飞书通知 + feishu_notifier.send_report(report) + + logger.info(f'提交 {commit_id} 扫描完成') + + except Exception as e: + logger.error(f'扫描提交 {commit_id} 失败: {str(e)}') + # 继续处理其他提交 + continue + + return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200 + + except Exception as e: + logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True) + return jsonify({'error': str(e)}), 500 + + +@app.route('/scan/manual', methods=['POST']) +def manual_scan(): + """手动触发扫描接口""" + try: + data = request.json + repo_url = data.get('repo_url') + branch = data.get('branch', 'main') + commit_id = data.get('commit_id') + + if not repo_url: + return jsonify({'error': 'repo_url is required'}), 400 + + # 执行扫描 + scan_results = {} + + if 'python' in config.get('scanner', {}).get('languages', []): + scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch) + + if any(lang in config.get('scanner', {}).get('languages', []) + for lang in ['javascript', 'typescript']): + scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch) + + scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch) + + # 生成报告 + report = report_generator.generate( + repo_name=repo_url.split('/')[-1].replace('.git', ''), + branch=branch, + commit_id=commit_id or 'manual', + commit_message='Manual scan', + author='manual', + scan_results=scan_results + ) + + # 发送飞书通知 + feishu_notifier.send_report(report) + + return jsonify({ + 'status': 'ok', + 'report': report + }), 200 + + except Exception as e: + logger.error(f'手动扫描失败: {str(e)}', exc_info=True) + return jsonify({'error': str(e)}), 500 + + +if __name__ == '__main__': + # 强制监听所有网络接口 + host = "0.0.0.0" + port = config.get('server', {}).get('port', 5000) + debug = config.get('server', {}).get('debug', True) + + logger.info(f'启动服务: {host}:{port}') + app.run(host=host, port=port, debug=debug)