init
This commit is contained in:
218
app.py
Normal file
218
app.py
Normal file
@@ -0,0 +1,218 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0')
|
||||||
|
|
||||||
|
from flask import Flask, request, jsonify
|
||||||
|
import yaml
|
||||||
|
from webhook.handler import GiteaWebhookHandler
|
||||||
|
from scanner.python_scanner import PythonScanner
|
||||||
|
from scanner.js_scanner import JavaScriptScanner
|
||||||
|
from scanner.security_scanner import SecurityScanner
|
||||||
|
from report.generator import ReportGenerator
|
||||||
|
from notify.feishu import FeishuNotifier
|
||||||
|
|
||||||
|
# 配置日志
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# 加载配置
|
||||||
|
def load_config():
|
||||||
|
"""加载配置文件"""
|
||||||
|
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')
|
||||||
|
with open(config_path, 'r', encoding='utf-8') as f:
|
||||||
|
return yaml.safe_load(f)
|
||||||
|
|
||||||
|
# 全局配置
|
||||||
|
config = load_config()
|
||||||
|
|
||||||
|
# 初始化应用
|
||||||
|
app = Flask(__name__)
|
||||||
|
app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key')
|
||||||
|
|
||||||
|
# 初始化组件
|
||||||
|
webhook_handler = GiteaWebhookHandler(config['gitea'])
|
||||||
|
python_scanner = PythonScanner(config.get('scanner', {}))
|
||||||
|
js_scanner = JavaScriptScanner(config.get('scanner', {}))
|
||||||
|
security_scanner = SecurityScanner(config.get('scanner', {}))
|
||||||
|
report_generator = ReportGenerator(config.get('report', {}))
|
||||||
|
feishu_notifier = FeishuNotifier(config['feishu'])
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
def index():
|
||||||
|
"""健康检查接口"""
|
||||||
|
return jsonify({
|
||||||
|
'status': 'ok',
|
||||||
|
'service': 'AI Code Quality Scanner',
|
||||||
|
'version': '1.0.0'
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/webhook/gitea', methods=['POST'])
|
||||||
|
def handle_gitea_webhook():
|
||||||
|
"""处理 Gitea Webhook 请求"""
|
||||||
|
try:
|
||||||
|
# 验证签名
|
||||||
|
signature = request.headers.get('X-Gitea-Signature')
|
||||||
|
if signature:
|
||||||
|
if not webhook_handler.verify_signature(
|
||||||
|
request.data,
|
||||||
|
signature,
|
||||||
|
config['gitea']['webhook_secret']
|
||||||
|
):
|
||||||
|
logger.warning('Webhook 签名验证失败')
|
||||||
|
return jsonify({'error': 'Invalid signature'}), 401
|
||||||
|
|
||||||
|
# 解析 Webhook payload
|
||||||
|
payload = request.json
|
||||||
|
if not payload:
|
||||||
|
return jsonify({'error': 'No payload'}), 400
|
||||||
|
|
||||||
|
event_type = request.headers.get('X-Gitea-Event', 'push')
|
||||||
|
logger.info(f'收到 Gitea Webhook 事件: {event_type}')
|
||||||
|
|
||||||
|
# 只处理 push 事件
|
||||||
|
if event_type != 'push':
|
||||||
|
return jsonify({'message': 'Event ignored'}), 200
|
||||||
|
|
||||||
|
# 提取提交信息
|
||||||
|
commits = payload.get('commits', [])
|
||||||
|
if not commits:
|
||||||
|
return jsonify({'message': 'No commits'}), 200
|
||||||
|
|
||||||
|
repo = payload.get('repository', {})
|
||||||
|
repo_name = repo.get('full_name', 'unknown')
|
||||||
|
branch = payload.get('ref', '').replace('refs/heads/', '')
|
||||||
|
pusher = payload.get('pusher', {}).get('name', 'unknown')
|
||||||
|
|
||||||
|
logger.info(f'处理仓库 {repo_name} 的 {len(commits)} 个提交')
|
||||||
|
|
||||||
|
# 处理每个提交
|
||||||
|
for commit in commits:
|
||||||
|
commit_id = commit.get('id', '')[:8]
|
||||||
|
commit_message = commit.get('message', '')
|
||||||
|
author = commit.get('author', {}).get('name', 'unknown')
|
||||||
|
|
||||||
|
logger.info(f'扫描提交 {commit_id}: {commit_message}')
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 获取仓库 URL
|
||||||
|
clone_url = repo.get('clone_url')
|
||||||
|
if not clone_url:
|
||||||
|
# 尝试从 web_url 构建
|
||||||
|
web_url = repo.get('web_url', '')
|
||||||
|
if web_url:
|
||||||
|
clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://')
|
||||||
|
clone_url = clone_url.rstrip('/') + '.git'
|
||||||
|
|
||||||
|
# 执行代码扫描
|
||||||
|
scan_results = {}
|
||||||
|
|
||||||
|
# Python 扫描
|
||||||
|
if 'python' in config.get('scanner', {}).get('languages', []):
|
||||||
|
scan_results['python'] = python_scanner.scan(
|
||||||
|
clone_url, commit_id, branch
|
||||||
|
)
|
||||||
|
|
||||||
|
# JavaScript/TypeScript 扫描
|
||||||
|
if any(lang in config.get('scanner', {}).get('languages', [])
|
||||||
|
for lang in ['javascript', 'typescript']):
|
||||||
|
scan_results['javascript'] = js_scanner.scan(
|
||||||
|
clone_url, commit_id, branch
|
||||||
|
)
|
||||||
|
|
||||||
|
# 安全扫描
|
||||||
|
scan_results['security'] = security_scanner.scan(
|
||||||
|
clone_url, commit_id, branch
|
||||||
|
)
|
||||||
|
|
||||||
|
# 生成报告
|
||||||
|
report = report_generator.generate(
|
||||||
|
repo_name=repo_name,
|
||||||
|
branch=branch,
|
||||||
|
commit_id=commit_id,
|
||||||
|
commit_message=commit_message,
|
||||||
|
author=author,
|
||||||
|
scan_results=scan_results
|
||||||
|
)
|
||||||
|
|
||||||
|
# 发送飞书通知
|
||||||
|
feishu_notifier.send_report(report)
|
||||||
|
|
||||||
|
logger.info(f'提交 {commit_id} 扫描完成')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f'扫描提交 {commit_id} 失败: {str(e)}')
|
||||||
|
# 继续处理其他提交
|
||||||
|
continue
|
||||||
|
|
||||||
|
return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True)
|
||||||
|
return jsonify({'error': str(e)}), 500
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/scan/manual', methods=['POST'])
|
||||||
|
def manual_scan():
|
||||||
|
"""手动触发扫描接口"""
|
||||||
|
try:
|
||||||
|
data = request.json
|
||||||
|
repo_url = data.get('repo_url')
|
||||||
|
branch = data.get('branch', 'main')
|
||||||
|
commit_id = data.get('commit_id')
|
||||||
|
|
||||||
|
if not repo_url:
|
||||||
|
return jsonify({'error': 'repo_url is required'}), 400
|
||||||
|
|
||||||
|
# 执行扫描
|
||||||
|
scan_results = {}
|
||||||
|
|
||||||
|
if 'python' in config.get('scanner', {}).get('languages', []):
|
||||||
|
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
|
||||||
|
|
||||||
|
if any(lang in config.get('scanner', {}).get('languages', [])
|
||||||
|
for lang in ['javascript', 'typescript']):
|
||||||
|
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
|
||||||
|
|
||||||
|
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
|
||||||
|
|
||||||
|
# 生成报告
|
||||||
|
report = report_generator.generate(
|
||||||
|
repo_name=repo_url.split('/')[-1].replace('.git', ''),
|
||||||
|
branch=branch,
|
||||||
|
commit_id=commit_id or 'manual',
|
||||||
|
commit_message='Manual scan',
|
||||||
|
author='manual',
|
||||||
|
scan_results=scan_results
|
||||||
|
)
|
||||||
|
|
||||||
|
# 发送飞书通知
|
||||||
|
feishu_notifier.send_report(report)
|
||||||
|
|
||||||
|
return jsonify({
|
||||||
|
'status': 'ok',
|
||||||
|
'report': report
|
||||||
|
}), 200
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f'手动扫描失败: {str(e)}', exc_info=True)
|
||||||
|
return jsonify({'error': str(e)}), 500
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# 强制监听所有网络接口
|
||||||
|
host = "0.0.0.0"
|
||||||
|
port = config.get('server', {}).get('port', 5000)
|
||||||
|
debug = config.get('server', {}).get('debug', True)
|
||||||
|
|
||||||
|
logger.info(f'启动服务: {host}:{port}')
|
||||||
|
app.run(host=host, port=port, debug=debug)
|
||||||
Reference in New Issue
Block a user