init
This commit is contained in:
218
test.py
Normal file
218
test.py
Normal file
@@ -0,0 +1,218 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import logging
|
||||
|
||||
|
||||
os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0')
|
||||
|
||||
from flask import Flask, request, jsonify
|
||||
import yaml
|
||||
from webhook.handler import GiteaWebhookHandler
|
||||
from scanner.python_scanner import PythonScanner
|
||||
from scanner.js_scanner import JavaScriptScanner
|
||||
from scanner.security_scanner import SecurityScanner
|
||||
from report.generator import ReportGenerator
|
||||
from notify.feishu import FeishuNotifier
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 加载配置
|
||||
def load_config():
|
||||
"""加载配置文件"""
|
||||
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
# 全局配置
|
||||
config = load_config()
|
||||
|
||||
# 初始化应用
|
||||
app = Flask(__name__)
|
||||
app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key')
|
||||
|
||||
# 初始化组件
|
||||
webhook_handler = GiteaWebhookHandler(config['gitea'])
|
||||
python_scanner = PythonScanner(config.get('scanner', {}))
|
||||
js_scanner = JavaScriptScanner(config.get('scanner', {}))
|
||||
security_scanner = SecurityScanner(config.get('scanner', {}))
|
||||
report_generator = ReportGenerator(config.get('report', {}))
|
||||
feishu_notifier = FeishuNotifier(config['feishu'])
|
||||
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
"""健康检查接口"""
|
||||
return jsonify({
|
||||
'status': 'ok',
|
||||
'service': 'AI Code Quality Scanner',
|
||||
'version': '1.0.0'
|
||||
})
|
||||
|
||||
|
||||
@app.route('/webhook/gitea', methods=['POST'])
|
||||
def handle_gitea_webhook():
|
||||
"""处理 Gitea Webhook 请求"""
|
||||
try:
|
||||
# 验证签名
|
||||
signature = request.headers.get('X-Gitea-Signature')
|
||||
if signature:
|
||||
if not webhook_handler.verify_signature(
|
||||
request.data,
|
||||
signature,
|
||||
config['gitea']['webhook_secret']
|
||||
):
|
||||
logger.warning('Webhook 签名验证失败')
|
||||
return jsonify({'error': 'Invalid signature'}), 401
|
||||
|
||||
# 解析 Webhook payload
|
||||
payload = request.json
|
||||
if not payload:
|
||||
return jsonify({'error': 'No payload'}), 400
|
||||
|
||||
event_type = request.headers.get('X-Gitea-Event', 'push')
|
||||
logger.info(f'收到 Gitea Webhook 事件: {event_type}')
|
||||
|
||||
# 只处理 push 事件
|
||||
if event_type != 'push':
|
||||
return jsonify({'message': 'Event ignored'}), 200
|
||||
|
||||
# 提取提交信息
|
||||
commits = payload.get('commits', [])
|
||||
if not commits:
|
||||
return jsonify({'message': 'No commits'}), 200
|
||||
|
||||
repo = payload.get('repository', {})
|
||||
repo_name = repo.get('full_name', 'unknown')
|
||||
branch = payload.get('ref', '').replace('refs/heads/', '')
|
||||
pusher = payload.get('pusher', {}).get('name', 'unknown')
|
||||
|
||||
logger.info(f'处理仓库 {repo_name} 的 {len(commits)} 个提交')
|
||||
|
||||
# 处理每个提交
|
||||
for commit in commits:
|
||||
commit_id = commit.get('id', '')[:8]
|
||||
commit_message = commit.get('message', '')
|
||||
author = commit.get('author', {}).get('name', 'unknown')
|
||||
|
||||
logger.info(f'扫描提交 {commit_id}: {commit_message}')
|
||||
|
||||
try:
|
||||
# 获取仓库 URL
|
||||
clone_url = repo.get('clone_url')
|
||||
if not clone_url:
|
||||
# 尝试从 web_url 构建
|
||||
web_url = repo.get('web_url', '')
|
||||
if web_url:
|
||||
clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://')
|
||||
clone_url = clone_url.rstrip('/') + '.git'
|
||||
|
||||
# 执行代码扫描
|
||||
scan_results = {}
|
||||
|
||||
# Python 扫描
|
||||
if 'python' in config.get('scanner', {}).get('languages', []):
|
||||
scan_results['python'] = python_scanner.scan(
|
||||
clone_url, commit_id, branch
|
||||
)
|
||||
|
||||
# JavaScript/TypeScript 扫描
|
||||
if any(lang in config.get('scanner', {}).get('languages', [])
|
||||
for lang in ['javascript', 'typescript']):
|
||||
scan_results['javascript'] = js_scanner.scan(
|
||||
clone_url, commit_id, branch
|
||||
)
|
||||
|
||||
# 安全扫描
|
||||
scan_results['security'] = security_scanner.scan(
|
||||
clone_url, commit_id, branch
|
||||
)
|
||||
|
||||
# 生成报告
|
||||
report = report_generator.generate(
|
||||
repo_name=repo_name,
|
||||
branch=branch,
|
||||
commit_id=commit_id,
|
||||
commit_message=commit_message,
|
||||
author=author,
|
||||
scan_results=scan_results
|
||||
)
|
||||
|
||||
# 发送飞书通知
|
||||
feishu_notifier.send_report(report)
|
||||
|
||||
logger.info(f'提交 {commit_id} 扫描完成')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'扫描提交 {commit_id} 失败: {str(e)}')
|
||||
# 继续处理其他提交
|
||||
continue
|
||||
|
||||
return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True)
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
|
||||
@app.route('/scan/manual', methods=['POST'])
|
||||
def manual_scan():
|
||||
"""手动触发扫描接口"""
|
||||
try:
|
||||
data = request.json
|
||||
repo_url = data.get('repo_url')
|
||||
branch = data.get('branch', 'main')
|
||||
commit_id = data.get('commit_id')
|
||||
|
||||
if not repo_url:
|
||||
return jsonify({'error': 'repo_url is required'}), 400
|
||||
|
||||
# 执行扫描
|
||||
scan_results = {}
|
||||
|
||||
if 'python' in config.get('scanner', {}).get('languages', []):
|
||||
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
|
||||
|
||||
if any(lang in config.get('scanner', {}).get('languages', [])
|
||||
for lang in ['javascript', 'typescript']):
|
||||
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
|
||||
|
||||
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
|
||||
|
||||
# 生成报告
|
||||
report = report_generator.generate(
|
||||
repo_name=repo_url.split('/')[-1].replace('.git', ''),
|
||||
branch=branch,
|
||||
commit_id=commit_id or 'manual',
|
||||
commit_message='Manual scan',
|
||||
author='manual',
|
||||
scan_results=scan_results
|
||||
)
|
||||
|
||||
# 发送飞书通知
|
||||
feishu_notifier.send_report(report)
|
||||
|
||||
return jsonify({
|
||||
'status': 'ok',
|
||||
'report': report
|
||||
}), 200
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'手动扫描失败: {str(e)}', exc_info=True)
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 强制监听所有网络接口
|
||||
host = "0.0.0.0"
|
||||
port = config.get('server', {}).get('port', 5000)
|
||||
debug = config.get('server', {}).get('debug', True)
|
||||
|
||||
logger.info(f'启动服务: {host}:{port}')
|
||||
app.run(host=host, port=port, debug=debug)
|
||||
Reference in New Issue
Block a user