#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import logging os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0') from flask import Flask, request, jsonify import yaml from webhook.handler import GiteaWebhookHandler from scanner.python_scanner import PythonScanner from scanner.js_scanner import JavaScriptScanner from scanner.security_scanner import SecurityScanner from report.generator import ReportGenerator from notify.feishu import FeishuNotifier # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 加载配置 def load_config(): """加载配置文件""" config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') with open(config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) # 全局配置 config = load_config() # 初始化应用 app = Flask(__name__) app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key') # 初始化组件 webhook_handler = GiteaWebhookHandler(config['gitea']) python_scanner = PythonScanner(config.get('scanner', {})) js_scanner = JavaScriptScanner(config.get('scanner', {})) security_scanner = SecurityScanner(config.get('scanner', {})) report_generator = ReportGenerator(config.get('report', {})) feishu_notifier = FeishuNotifier(config['feishu']) @app.route('/') def index(): """健康检查接口""" return jsonify({ 'status': 'ok', 'service': 'AI Code Quality Scanner', 'version': '1.0.0' }) @app.route('/webhook/gitea', methods=['POST']) def handle_gitea_webhook(): """处理 Gitea Webhook 请求""" try: # 验证签名 signature = request.headers.get('X-Gitea-Signature') if signature: if not webhook_handler.verify_signature( request.data, signature, config['gitea']['webhook_secret'] ): logger.warning('Webhook 签名验证失败') return jsonify({'error': 'Invalid signature'}), 401 # 解析 Webhook payload payload = request.json if not payload: return jsonify({'error': 'No payload'}), 400 event_type = request.headers.get('X-Gitea-Event', 'push') logger.info(f'收到 Gitea Webhook 事件: {event_type}') # 只处理 push 事件 if event_type != 'push': return jsonify({'message': 'Event ignored'}), 200 # 提取提交信息 commits = payload.get('commits', []) if not commits: return jsonify({'message': 'No commits'}), 200 repo = payload.get('repository', {}) repo_name = repo.get('full_name', 'unknown') branch = payload.get('ref', '').replace('refs/heads/', '') pusher = payload.get('pusher', {}).get('name', 'unknown') logger.info(f'处理仓库 {repo_name} 的 {len(commits)} 个提交') # 处理每个提交 for commit in commits: commit_id = commit.get('id', '')[:8] commit_message = commit.get('message', '') author = commit.get('author', {}).get('name', 'unknown') logger.info(f'扫描提交 {commit_id}: {commit_message}') try: # 获取仓库 URL clone_url = repo.get('clone_url') if not clone_url: # 尝试从 web_url 构建 web_url = repo.get('web_url', '') if web_url: clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://') clone_url = clone_url.rstrip('/') + '.git' # 执行代码扫描 scan_results = {} # Python 扫描 if 'python' in config.get('scanner', {}).get('languages', []): scan_results['python'] = python_scanner.scan( clone_url, commit_id, branch ) # JavaScript/TypeScript 扫描 if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): scan_results['javascript'] = js_scanner.scan( clone_url, commit_id, branch ) # 安全扫描 scan_results['security'] = security_scanner.scan( clone_url, commit_id, branch ) # 生成报告 report = report_generator.generate( repo_name=repo_name, branch=branch, commit_id=commit_id, commit_message=commit_message, author=author, scan_results=scan_results ) # 发送飞书通知 feishu_notifier.send_report(report) logger.info(f'提交 {commit_id} 扫描完成') except Exception as e: logger.error(f'扫描提交 {commit_id} 失败: {str(e)}') # 继续处理其他提交 continue return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200 except Exception as e: logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/scan/manual', methods=['POST']) def manual_scan(): """手动触发扫描接口""" try: data = request.json repo_url = data.get('repo_url') branch = data.get('branch', 'main') commit_id = data.get('commit_id') if not repo_url: return jsonify({'error': 'repo_url is required'}), 400 # 执行扫描 scan_results = {} if 'python' in config.get('scanner', {}).get('languages', []): scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch) if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch) scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch) # 生成报告 report = report_generator.generate( repo_name=repo_url.split('/')[-1].replace('.git', ''), branch=branch, commit_id=commit_id or 'manual', commit_message='Manual scan', author='manual', scan_results=scan_results ) # 发送飞书通知 feishu_notifier.send_report(report) return jsonify({ 'status': 'ok', 'report': report }), 200 except Exception as e: logger.error(f'手动扫描失败: {str(e)}', exc_info=True) return jsonify({'error': str(e)}), 500 if __name__ == '__main__': # 强制监听所有网络接口 host = "0.0.0.0" port = config.get('server', {}).get('port', 5000) debug = config.get('server', {}).get('debug', True) logger.info(f'启动服务: {host}:{port}') app.run(host=host, port=port, debug=debug)