996 lines
37 KiB
Python
996 lines
37 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import time
|
||
import logging
|
||
import traceback
|
||
from typing import Dict, Tuple, Any
|
||
import json
|
||
|
||
|
||
os.environ.setdefault('FLASK_RUN_HOST', '0.0.0.0')
|
||
|
||
from flask import Flask, request, jsonify, send_from_directory
|
||
import yaml
|
||
from webhook.handler import GiteaWebhookHandler
|
||
from scanner.python_scanner import PythonScanner
|
||
from scanner.js_scanner import JavaScriptScanner
|
||
from scanner.security_scanner import SecurityScanner
|
||
from scanner.ai_reviewer import AIReviewer
|
||
from scanner.diff_parser import merge_issues_with_code
|
||
from report.generator import ReportGenerator
|
||
from notify.feishu import FeishuNotifier
|
||
from gitea_client import GiteaClient
|
||
from db import PRScanDB
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 加载配置
|
||
def load_config():
|
||
"""加载配置文件"""
|
||
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
return yaml.safe_load(f)
|
||
|
||
# 全局配置
|
||
config = load_config()
|
||
|
||
# 初始化应用
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = config.get('server', {}).get('secret_key', 'dev-secret-key')
|
||
|
||
# 初始化组件
|
||
webhook_handler = GiteaWebhookHandler(config['gitea'])
|
||
python_scanner = PythonScanner(config.get('scanner', {}))
|
||
js_scanner = JavaScriptScanner(config.get('scanner', {}))
|
||
security_scanner = SecurityScanner(config.get('scanner', {}))
|
||
ai_reviewer = AIReviewer(config.get('ai', {}))
|
||
report_generator = ReportGenerator(config.get('report', {}))
|
||
feishu_notifier = FeishuNotifier(config['feishu'])
|
||
gitea_client = GiteaClient(config['gitea'])
|
||
|
||
|
||
@app.route('/')
|
||
def index():
|
||
"""健康检查接口"""
|
||
return jsonify({
|
||
'status': 'ok',
|
||
'service': 'AI Code Quality Scanner',
|
||
'version': '1.0.0'
|
||
})
|
||
|
||
|
||
@app.route('/webhook/gitea', methods=['POST'])
|
||
def handle_gitea_webhook():
|
||
"""处理 Gitea Webhook 请求"""
|
||
try:
|
||
# 验证签名
|
||
signature = request.headers.get('X-Gitea-Signature')
|
||
if signature:
|
||
if not webhook_handler.verify_signature(
|
||
request.data,
|
||
signature,
|
||
config['gitea']['webhook_secret']
|
||
):
|
||
logger.warning('Webhook 签名验证失败')
|
||
return jsonify({'error': 'Invalid signature'}), 401
|
||
|
||
# 解析 Webhook payload
|
||
payload = request.json
|
||
if not payload:
|
||
return jsonify({'error': 'No payload'}), 400
|
||
|
||
event_type = request.headers.get('X-Gitea-Event', 'push')
|
||
logger.info(f'收到 Gitea Webhook 事件: {event_type}')
|
||
|
||
# 处理 Pull Request 事件
|
||
if event_type == 'pull_request':
|
||
return handle_pull_request(payload)
|
||
|
||
# 处理 Push 事件
|
||
if event_type != 'push':
|
||
return jsonify({'message': 'Event ignored'}), 200
|
||
|
||
# 提取提交信息
|
||
commits = payload.get('commits', [])
|
||
if not commits:
|
||
return jsonify({'message': 'No commits'}), 200
|
||
|
||
repo = payload.get('repository', {})
|
||
repo_name = repo.get('full_name', 'unknown')
|
||
branch = payload.get('ref', '').replace('refs/heads/', '')
|
||
pusher = payload.get('pusher', {}).get('name', 'unknown')
|
||
|
||
logger.info(f'处理仓库 {repo_name} 的 {len(commits)} 个提交')
|
||
|
||
# 处理每个提交
|
||
for commit in commits:
|
||
commit_id = commit.get('id', '')[:8]
|
||
commit_message = commit.get('message', '')
|
||
author = commit.get('author', {}).get('name', 'unknown')
|
||
|
||
logger.info(f'扫描提交 {commit_id}: {commit_message}')
|
||
|
||
try:
|
||
# 获取仓库 URL
|
||
clone_url = repo.get('clone_url')
|
||
if not clone_url:
|
||
# 尝试从 web_url 构建
|
||
web_url = repo.get('web_url', '')
|
||
if web_url:
|
||
clone_url = web_url.replace('http://', 'http://').replace('https://', 'https://')
|
||
clone_url = clone_url.rstrip('/') + '.git'
|
||
|
||
# 执行代码扫描
|
||
scan_results = {}
|
||
|
||
# Python 扫描
|
||
if 'python' in config.get('scanner', {}).get('languages', []):
|
||
start_time = time.time()
|
||
scan_results['python'] = python_scanner.scan(
|
||
clone_url, commit_id, branch
|
||
)
|
||
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
# JavaScript/TypeScript 扫描
|
||
if any(lang in config.get('scanner', {}).get('languages', [])
|
||
for lang in ['javascript', 'typescript']):
|
||
start_time = time.time()
|
||
scan_results['javascript'] = js_scanner.scan(
|
||
clone_url, commit_id, branch
|
||
)
|
||
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
# 安全扫描
|
||
start_time = time.time()
|
||
scan_results['security'] = security_scanner.scan(
|
||
clone_url, commit_id, branch
|
||
)
|
||
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
# 生成报告
|
||
report = report_generator.generate(
|
||
repo_name=repo_name,
|
||
branch=branch,
|
||
commit_id=commit_id,
|
||
commit_message=commit_message,
|
||
author=author,
|
||
scan_results=scan_results
|
||
)
|
||
|
||
# 发送飞书通知
|
||
feishu_notifier.send_report(report)
|
||
|
||
logger.info(f'提交 {commit_id} 扫描完成')
|
||
|
||
except Exception as e:
|
||
logger.error(f'扫描提交 {commit_id} 失败: {str(e)}')
|
||
# 继续处理其他提交
|
||
continue
|
||
|
||
return jsonify({'status': 'ok', 'message': 'Scan completed'}), 200
|
||
|
||
except Exception as e:
|
||
logger.error(f'处理 Webhook 失败: {str(e)}', exc_info=True)
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
|
||
"""
|
||
处理 Pull Request 事件
|
||
"""
|
||
try:
|
||
# 解析 PR 事件
|
||
pr_info = webhook_handler.parse_pull_request_event(payload)
|
||
|
||
if not pr_info:
|
||
logger.info('PR 事件不需要处理(如关闭、合并等)')
|
||
return jsonify({'message': 'PR event ignored'}), 200
|
||
|
||
repo_name = pr_info['repo_name']
|
||
source_branch = pr_info['source_branch']
|
||
source_sha = pr_info['source_sha']
|
||
pr_number = pr_info['pr_number']
|
||
pr_title = pr_info['pr_title']
|
||
pr_url = pr_info['pr_url']
|
||
target_branch = pr_info['target_branch']
|
||
author = pr_info['author']
|
||
|
||
logger.info(f'处理 PR #{pr_number}: {pr_title} ({source_branch} -> {target_branch})')
|
||
logger.info(f'扫描 PR 分支: {source_branch}, commit: {source_sha}')
|
||
|
||
try:
|
||
# 获取仓库 URL
|
||
repo = payload.get('repository', {})
|
||
clone_url = repo.get('clone_url')
|
||
if not clone_url:
|
||
web_url = repo.get('web_url', '')
|
||
if web_url:
|
||
clone_url = web_url.rstrip('/') + '.git'
|
||
|
||
# 获取 PR 中变更的文件列表
|
||
changed_files = []
|
||
try:
|
||
if '/' in repo_name:
|
||
repo_owner, repo_name_only = repo_name.split('/', 1)
|
||
else:
|
||
repo_owner = 'Bosch_Demo'
|
||
repo_name_only = repo_name
|
||
|
||
pr_files = gitea_client.get_pull_request_files(repo_owner, repo_name_only, pr_number)
|
||
if pr_files:
|
||
changed_files = [f.get('filename', '') for f in pr_files if f.get('filename')]
|
||
logger.info(f"获取到 PR #{pr_number} 的变更文件: {changed_files}")
|
||
except Exception as e:
|
||
logger.warning(f"获取 PR 文件列表失败: {e}")
|
||
|
||
# 执行代码扫描
|
||
scan_results = {}
|
||
|
||
# Python 扫描
|
||
if 'python' in config.get('scanner', {}).get('languages', []):
|
||
start_time = time.time()
|
||
scan_results['python'] = python_scanner.scan(
|
||
clone_url, source_sha, source_branch, changed_files
|
||
)
|
||
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
# JavaScript/TypeScript 扫描
|
||
if any(lang in config.get('scanner', {}).get('languages', [])
|
||
for lang in ['javascript', 'typescript']):
|
||
start_time = time.time()
|
||
scan_results['javascript'] = js_scanner.scan(
|
||
clone_url, source_sha, source_branch, changed_files
|
||
)
|
||
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
# 安全扫描
|
||
start_time = time.time()
|
||
scan_results['security'] = security_scanner.scan(
|
||
clone_url, source_sha, source_branch, changed_files
|
||
)
|
||
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
# AI 代码审查
|
||
if config.get('ai', {}).get('enabled', False):
|
||
start_time = time.time()
|
||
scan_results['ai'] = ai_reviewer.scan(
|
||
clone_url, source_sha, source_branch, changed_files
|
||
)
|
||
logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
# 获取 PR 的代码差异,用于将问题与代码片段关联
|
||
pr_diff = None
|
||
try:
|
||
pr_diff = gitea_client.get_pull_request_diff(repo_owner, repo_name_only, pr_number)
|
||
logger.info(f"已获取 PR #{pr_number} 的 diff,长度: {len(pr_diff) if pr_diff else 0}")
|
||
except Exception as e:
|
||
logger.warning(f"获取 PR diff 失败: {e}")
|
||
|
||
# 将问题与代码片段关联
|
||
scan_details_with_code = merge_issues_with_code(scan_results, pr_diff or '')
|
||
logger.info(f"[DEBUG] scan_results keys: {list(scan_results.keys())}")
|
||
for k, v in scan_results.items():
|
||
if isinstance(v, dict):
|
||
issues_cnt = len(v.get('issues', []))
|
||
logger.info(f"[DEBUG] scan_results['{k}'] issues count: {issues_cnt}")
|
||
logger.info(f"[DEBUG] scan_details_with_code scanners: {[s.get('name') for s in scan_details_with_code.get('scanners', [])] if scan_details_with_code else 'None'}")
|
||
|
||
# 生成报告
|
||
commit_message = f'PR #{pr_number}: {pr_title}'
|
||
report = report_generator.generate(
|
||
repo_name=repo_name,
|
||
branch=source_branch,
|
||
commit_id=source_sha,
|
||
commit_message=commit_message,
|
||
author=author,
|
||
scan_results=scan_results,
|
||
pr_url=pr_url,
|
||
target_branch=target_branch,
|
||
pr_number=pr_number
|
||
)
|
||
|
||
# 发送飞书通知
|
||
feishu_notifier.send_report(report)
|
||
|
||
# 保存扫描结果到数据库
|
||
pr_info_for_db = {
|
||
'repo_name': repo_name,
|
||
'pr_number': pr_number,
|
||
'pr_title': pr_title,
|
||
'pr_url': pr_url,
|
||
'source_branch': source_branch,
|
||
'target_branch': target_branch,
|
||
'author': author
|
||
}
|
||
PRScanDB.save_pr_scan(pr_info_for_db, scan_results, report.get('file_path'), scan_details_with_code)
|
||
|
||
logger.info(f'PR #{pr_number} 扫描完成')
|
||
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
logger.error(f'扫描 PR #{pr_number} 失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
return jsonify({'status': 'ok', 'message': 'PR scan completed'}), 200
|
||
|
||
except Exception as e:
|
||
logger.error(f'处理 PR Webhook 失败: {str(e)}', exc_info=True)
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/scan/manual', methods=['POST'])
|
||
def manual_scan():
|
||
"""手动触发扫描接口"""
|
||
try:
|
||
data = request.json
|
||
repo_url = data.get('repo_url')
|
||
branch = data.get('branch', 'main')
|
||
commit_id = data.get('commit_id')
|
||
|
||
if not repo_url:
|
||
return jsonify({'error': 'repo_url is required'}), 400
|
||
|
||
# 执行扫描
|
||
scan_results = {}
|
||
|
||
if 'python' in config.get('scanner', {}).get('languages', []):
|
||
start_time = time.time()
|
||
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
|
||
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
if any(lang in config.get('scanner', {}).get('languages', [])
|
||
for lang in ['javascript', 'typescript']):
|
||
start_time = time.time()
|
||
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
|
||
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
start_time = time.time()
|
||
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
|
||
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒")
|
||
|
||
# 生成报告
|
||
report = report_generator.generate(
|
||
repo_name=repo_url.split('/')[-1].replace('.git', ''),
|
||
branch=branch,
|
||
commit_id=commit_id or 'manual',
|
||
commit_message='Manual scan',
|
||
author='manual',
|
||
scan_results=scan_results
|
||
)
|
||
|
||
# 发送飞书通知
|
||
feishu_notifier.send_report(report)
|
||
|
||
return jsonify({
|
||
'status': 'ok',
|
||
'report': report
|
||
}), 200
|
||
|
||
except Exception as e:
|
||
logger.error(f'手动扫描失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/feishu/card_action', methods=['POST'])
|
||
def handle_feishu_card_action():
|
||
"""处理飞书卡片按钮点击事件"""
|
||
try:
|
||
payload = request.json
|
||
logger.info(f'收到飞书卡片回调: {payload}')
|
||
|
||
# 处理 URL 验证请求
|
||
challenge = payload.get('challenge')
|
||
if challenge:
|
||
logger.info('处理 URL 验证请求')
|
||
return jsonify({'challenge': challenge}), 200
|
||
|
||
# 解析回调数据
|
||
action_data = payload.get('action', {})
|
||
if not action_data:
|
||
action_data = payload.get('value', {})
|
||
|
||
action_type = action_data.get('action')
|
||
owner = action_data.get('owner')
|
||
repo = action_data.get('repo')
|
||
pr_number = action_data.get('pr_number')
|
||
pr_url = action_data.get('pr_url')
|
||
|
||
if not all([action_type, owner, repo, pr_number]):
|
||
logger.error('卡片回调数据不完整')
|
||
return jsonify({'error': 'Missing required parameters'}), 400
|
||
|
||
logger.info(f'执行操作: {action_type}, PR: {owner}/{repo}#{pr_number}')
|
||
|
||
# 执行对应操作
|
||
if action_type == 'merge':
|
||
success = gitea_client.merge_pull_request(
|
||
owner=owner,
|
||
repo=repo,
|
||
pr_number=int(pr_number),
|
||
merge_message=f'通过飞书机器人合并 PR #{pr_number}'
|
||
)
|
||
result_message = '✅ **已合并 PR**' if success else '❌ **合并失败**'
|
||
elif action_type == 'close':
|
||
success = gitea_client.close_pull_request(
|
||
owner=owner,
|
||
repo=repo,
|
||
pr_number=int(pr_number)
|
||
)
|
||
result_message = '✅ **已关闭 PR(取消合并)**' if success else '❌ **关闭失败**'
|
||
else:
|
||
result_message = f'⚠️ **未知操作: {action_type}**'
|
||
|
||
# 发送操作结果到飞书
|
||
result_text = f"{result_message}\n\n**PR:** {owner}/{repo}#{pr_number}\n**链接:** [查看PR]({pr_url})"
|
||
feishu_notifier.send_simple_message('PR 操作结果', result_text)
|
||
|
||
return jsonify({'status': 'ok', 'message': result_message}), 200
|
||
|
||
except Exception as e:
|
||
logger.error(f'处理飞书卡片回调失败: {str(e)}', exc_info=True)
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/feishu/webhook', methods=['POST'])
|
||
def handle_feishu_webhook():
|
||
"""处理飞书开放平台的验证回调"""
|
||
try:
|
||
payload = request.json
|
||
|
||
# 处理验证请求
|
||
challenge = payload.get('challenge')
|
||
if challenge:
|
||
return jsonify({'challenge': challenge}), 200
|
||
|
||
# 处理消息事件
|
||
event_type = payload.get('type')
|
||
if event_type == 'url_verification':
|
||
return jsonify({'challenge': payload.get('challenge')}), 200
|
||
|
||
logger.info(f'收到飞书事件: {event_type}')
|
||
return jsonify({'status': 'ok'}), 200
|
||
|
||
except Exception as e:
|
||
logger.error(f'处理飞书 Webhook 失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
# ============================================
|
||
# 扫描管理平台 API
|
||
# ============================================
|
||
|
||
@app.route('/api/prs')
|
||
def api_get_prs():
|
||
"""获取所有 PR 列表"""
|
||
try:
|
||
state = request.args.get('state')
|
||
prs = PRScanDB.get_all_prs(state=state)
|
||
|
||
# 转换 scan_result 字符串为对象
|
||
for pr in prs:
|
||
if pr.get('scan_result') and isinstance(pr['scan_result'], str):
|
||
try:
|
||
pr['scan_result'] = json.loads(pr['scan_result'])
|
||
except:
|
||
pass
|
||
if pr.get('ai_review') and isinstance(pr['ai_review'], str):
|
||
try:
|
||
pr['ai_review'] = json.loads(pr['ai_review'])
|
||
except:
|
||
pass
|
||
|
||
return jsonify(prs)
|
||
except Exception as e:
|
||
logger.error(f'获取 PR 列表失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>')
|
||
def api_get_pr(pr_id):
|
||
"""获取单个 PR 详情"""
|
||
try:
|
||
pr = PRScanDB.get_pr_by_id(pr_id)
|
||
if not pr:
|
||
return jsonify({'error': 'PR not found'}), 404
|
||
|
||
# 转换 JSON 字段
|
||
if pr.get('scan_result') and isinstance(pr['scan_result'], str):
|
||
try:
|
||
pr['scan_result'] = json.loads(pr['scan_result'])
|
||
except:
|
||
pass
|
||
if pr.get('ai_review') and isinstance(pr['ai_review'], str):
|
||
try:
|
||
pr['ai_review'] = json.loads(pr['ai_review'])
|
||
except:
|
||
pass
|
||
|
||
# 返回带代码片段的扫描详情
|
||
if pr.get('scan_details_with_code') and isinstance(pr['scan_details_with_code'], str):
|
||
try:
|
||
pr['scan_details_with_code'] = json.loads(pr['scan_details_with_code'])
|
||
except:
|
||
pass
|
||
|
||
return jsonify(pr)
|
||
except Exception as e:
|
||
logger.error(f'获取 PR 详情失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>/diff')
|
||
def api_get_pr_diff(pr_id):
|
||
"""获取 PR 的代码差异"""
|
||
try:
|
||
pr = PRScanDB.get_pr_by_id(pr_id)
|
||
if not pr:
|
||
return jsonify({'error': 'PR not found'}), 404
|
||
|
||
repo_name = pr.get('repo_name', '')
|
||
pr_number = pr.get('pr_number', 0)
|
||
|
||
if not repo_name or not pr_number:
|
||
return jsonify({'error': 'PR 信息不完整'}), 400
|
||
|
||
# 解析 owner 和 repo
|
||
if '/' in repo_name:
|
||
owner, repo = repo_name.split('/', 1)
|
||
else:
|
||
owner = 'Bosch_Demo' # 默认
|
||
repo = repo_name
|
||
|
||
logger.info(f"获取 PR #{pr_number} ({owner}/{repo}) 的 diff")
|
||
|
||
# 获取 diff
|
||
diff = gitea_client.get_pull_request_diff(owner, repo, pr_number)
|
||
if diff is None:
|
||
return jsonify({'error': '获取 diff 失败'}), 500
|
||
|
||
return jsonify({
|
||
'diff': diff,
|
||
'pr_number': pr_number,
|
||
'repo_name': repo_name
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(f'获取 PR diff 失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>/files')
|
||
def api_get_pr_files(pr_id):
|
||
"""获取 PR 变更文件列表(用于左侧树状展示)"""
|
||
try:
|
||
pr = PRScanDB.get_pr_by_id(pr_id)
|
||
if not pr:
|
||
return jsonify({'error': 'PR not found'}), 404
|
||
repo_name = pr.get('repo_name', '')
|
||
pr_number = pr.get('pr_number', 0)
|
||
if not repo_name or not pr_number:
|
||
return jsonify({'error': 'PR 信息不完整'}), 400
|
||
if '/' in repo_name:
|
||
owner, repo = repo_name.split('/', 1)
|
||
else:
|
||
owner, repo = 'Bosch_Demo', repo_name
|
||
files = gitea_client.get_pull_request_files(owner, repo, pr_number)
|
||
if files is None:
|
||
return jsonify({'error': '获取文件列表失败'}), 500
|
||
return jsonify({'files': files, 'repo_name': repo_name})
|
||
except Exception as e:
|
||
logger.error(f'获取 PR 文件列表失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>/file')
|
||
def api_get_pr_file_content(pr_id):
|
||
"""获取 PR 中某文件在源分支上的完整内容"""
|
||
try:
|
||
path = request.args.get('path')
|
||
if not path:
|
||
return jsonify({'error': '缺少 path 参数'}), 400
|
||
pr = PRScanDB.get_pr_by_id(pr_id)
|
||
if not pr:
|
||
return jsonify({'error': 'PR not found'}), 404
|
||
repo_name = pr.get('repo_name', '')
|
||
pr_number = pr.get('pr_number', 0)
|
||
if not repo_name or not pr_number:
|
||
return jsonify({'error': 'PR 信息不完整'}), 400
|
||
if '/' in repo_name:
|
||
owner, repo = repo_name.split('/', 1)
|
||
else:
|
||
owner, repo = 'Bosch_Demo', repo_name
|
||
pr_info = gitea_client.get_pull_request(owner, repo, pr_number)
|
||
if not pr_info:
|
||
return jsonify({'error': '获取 PR 信息失败'}), 500
|
||
head_ref = pr_info.get('head', {}).get('ref') or pr_info.get('head_branch') or pr.get('source_branch')
|
||
if not head_ref:
|
||
return jsonify({'error': '无法确定源分支'}), 400
|
||
content = gitea_client.get_file_contents(owner, repo, path, head_ref)
|
||
if content is None:
|
||
return jsonify({'error': '文件不存在或无法读取'}), 404
|
||
|
||
# 获取该文件的扫描问题(PR 创建时已扫描并存入 scan_details_with_code)
|
||
scan_issues = []
|
||
path_norm = path.replace('\\', '/').strip()
|
||
logger.info(f"[DEBUG] 请求文件: path_norm={path_norm}")
|
||
scan_details = pr.get('scan_details_with_code')
|
||
if isinstance(scan_details, str):
|
||
try:
|
||
scan_details = json.loads(scan_details)
|
||
except Exception:
|
||
scan_details = None
|
||
if scan_details:
|
||
logger.info(f"[DEBUG] scan_details keys: {list(scan_details.keys()) if isinstance(scan_details, dict) else 'not dict'}")
|
||
if scan_details.get('scanners'):
|
||
logger.info(f"[DEBUG] scanners count: {len(scan_details['scanners'])}")
|
||
for scanner in scan_details['scanners']:
|
||
scanner_name = scanner.get('name', '')
|
||
issues_count = len(scanner.get('issues', []))
|
||
logger.info(f"[DEBUG] scanner={scanner_name}, issues_count={issues_count}")
|
||
# 打印前几个 issue 的 file 看看
|
||
for idx, issue in enumerate(scanner.get('issues', [])[:3]):
|
||
logger.info(f"[DEBUG] issue[{idx}] file={issue.get('file')}, line={issue.get('line')}")
|
||
if scan_details and scan_details.get('scanners'):
|
||
for scanner in scan_details['scanners']:
|
||
for issue in scanner.get('issues', []):
|
||
issue_file = (issue.get('file') or '').replace('\\', '/').strip()
|
||
if not issue_file:
|
||
continue
|
||
# 匹配:精确相等或一端包含另一端(兼容 basename 或完整路径)
|
||
if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm):
|
||
logger.info(f"[DEBUG] 匹配成功: issue_file={issue_file}, path_norm={path_norm}")
|
||
sev = (issue.get('severity') or 'info')
|
||
if isinstance(sev, str):
|
||
sev = sev.lower()
|
||
scanner_name = scanner.get('name', '')
|
||
scanner_display = {'python': 'Python', 'javascript': 'JavaScript', 'security': 'Security'}.get(scanner_name, scanner_name)
|
||
scan_issues.append({
|
||
'scanner': scanner_display,
|
||
'severity': sev,
|
||
'line': int(issue.get('line') or 0),
|
||
'message': (issue.get('message') or issue.get('description') or '').strip(),
|
||
'code_context': issue.get('code_context')
|
||
})
|
||
logger.info(f"[DEBUG] 最终 scan_issues count: {len(scan_issues)}")
|
||
|
||
# 获取 AI 审查结果
|
||
ai_issues = []
|
||
if scan_details and scan_details.get('ai'):
|
||
ai_data = scan_details['ai']
|
||
for issue in ai_data.get('issues', []):
|
||
issue_file = (issue.get('file') or '').replace('\\', '/').strip()
|
||
if not issue_file:
|
||
continue
|
||
# 匹配:精确相等或一端包含另一端
|
||
if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm):
|
||
ai_issues.append({
|
||
'scanner': 'AI',
|
||
'severity': issue.get('severity', 'info'),
|
||
'line': int(issue.get('line') or 1),
|
||
'message': issue.get('message', ''),
|
||
'category': 'ai',
|
||
'code_context': issue.get('code_context')
|
||
})
|
||
|
||
logger.info(f"[DEBUG] AI issues count: {len(ai_issues)}")
|
||
|
||
# 合并静态扫描问题和 AI 问题
|
||
all_issues = scan_issues + ai_issues
|
||
|
||
return jsonify({'path': path, 'content': content, 'scan_issues': all_issues})
|
||
except Exception as e:
|
||
logger.error(f'获取文件内容失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>/quality')
|
||
def api_get_quality_score(pr_id):
|
||
"""获取 PR 的代码质量评分"""
|
||
try:
|
||
pr = PRScanDB.get_pr_by_id(pr_id)
|
||
if not pr:
|
||
return jsonify({'error': 'PR not found'}), 404
|
||
|
||
# 从 scan_result 中获取质量评分
|
||
scan_result = pr.get('scan_result')
|
||
if isinstance(scan_result, str):
|
||
try:
|
||
scan_result = json.loads(scan_result)
|
||
except:
|
||
scan_result = None
|
||
|
||
quality_score = None
|
||
if scan_result and scan_result.get('ai'):
|
||
quality_score = scan_result['ai'].get('quality_score')
|
||
|
||
if not quality_score:
|
||
return jsonify({'error': '暂无质量评分'}), 404
|
||
|
||
return jsonify(quality_score)
|
||
except Exception as e:
|
||
logger.error(f'获取质量评分失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>/stats')
|
||
def api_get_issue_stats(pr_id):
|
||
"""获取 PR 的问题统计"""
|
||
try:
|
||
pr = PRScanDB.get_pr_by_id(pr_id)
|
||
if not pr:
|
||
return jsonify({'error': 'PR not found'}), 404
|
||
|
||
# 获取 scan_details_with_code
|
||
scan_details = pr.get('scan_details_with_code')
|
||
if isinstance(scan_details, str):
|
||
try:
|
||
scan_details = json.loads(scan_details)
|
||
except:
|
||
scan_details = None
|
||
|
||
if not scan_details:
|
||
return jsonify({'error': '暂无扫描详情'}), 404
|
||
|
||
# 统计各扫描器的问题
|
||
stats = {
|
||
'by_severity': {'error': 0, 'warning': 0, 'info': 0},
|
||
'by_scanner': {},
|
||
'total': 0
|
||
}
|
||
|
||
# 统计静态扫描器
|
||
for scanner in scan_details.get('scanners', []):
|
||
scanner_name = scanner.get('name', 'unknown')
|
||
scanner_issues = scanner.get('issues', [])
|
||
stats['by_scanner'][scanner_name] = len(scanner_issues)
|
||
|
||
for issue in scanner_issues:
|
||
sev = (issue.get('severity') or 'info').lower()
|
||
if sev in stats['by_severity']:
|
||
stats['by_severity'][sev] += 1
|
||
stats['total'] += 1
|
||
|
||
# 统计 AI 扫描器
|
||
ai_data = scan_details.get('ai', {})
|
||
if ai_data:
|
||
ai_issues = ai_data.get('issues', [])
|
||
stats['by_scanner']['AI'] = len(ai_issues)
|
||
for issue in ai_issues:
|
||
sev = (issue.get('severity') or 'info').lower()
|
||
if sev in stats['by_severity']:
|
||
stats['by_severity'][sev] += 1
|
||
stats['total'] += 1
|
||
|
||
return jsonify(stats)
|
||
except Exception as e:
|
||
logger.error(f'获取问题统计失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>/fix', methods=['POST'])
|
||
def api_generate_fix(pr_id):
|
||
"""生成问题修复建议"""
|
||
try:
|
||
data = request.get_json()
|
||
if not data:
|
||
return jsonify({'error': '请求体为空'}), 400
|
||
|
||
file_path = data.get('file')
|
||
line = data.get('line', 1)
|
||
message = data.get('message', '')
|
||
code = data.get('code', '')
|
||
|
||
if not file_path or not message:
|
||
return jsonify({'error': '缺少必要参数'}), 400
|
||
|
||
# 调用 AI 生成修复建议
|
||
fix_result = ai_reviewer.generate_fix_suggestion(file_path, line, message, code)
|
||
|
||
if fix_result:
|
||
return jsonify(fix_result)
|
||
else:
|
||
return jsonify({'error': '生成修复建议失败'}), 500
|
||
|
||
except Exception as e:
|
||
logger.error(f'生成修复建议失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/history')
|
||
def api_get_pr_history():
|
||
"""获取 PR 扫描历史趋势"""
|
||
try:
|
||
limit = request.args.get('limit', 20, type=int)
|
||
repo_name = request.args.get('repo_name', '')
|
||
|
||
# 获取 PR 列表
|
||
prs = PRScanDB.get_all_prs(status='completed')
|
||
if repo_name:
|
||
prs = [p for p in prs if p.get('repo_name') == repo_name]
|
||
|
||
# 只取最近的 N 个
|
||
prs = prs[:limit]
|
||
|
||
# 构建趋势数据
|
||
history = []
|
||
for pr in reversed(prs): # 从旧到新
|
||
issues_count = pr.get('issues_count', 0)
|
||
|
||
# 从 scan_result 中各扫描器汇总 error/warning 数量
|
||
scan_result = pr.get('scan_result')
|
||
if isinstance(scan_result, str):
|
||
try:
|
||
scan_result = json.loads(scan_result)
|
||
except:
|
||
scan_result = None
|
||
|
||
error_count = 0
|
||
warning_count = 0
|
||
if scan_result and isinstance(scan_result, dict):
|
||
# 遍历各扫描器,汇总 error 和 warning
|
||
for scanner_name, scanner_result in scan_result.items():
|
||
if isinstance(scanner_result, dict):
|
||
summary = scanner_result.get('summary', {})
|
||
if isinstance(summary, dict):
|
||
error_count += summary.get('error', 0)
|
||
warning_count += summary.get('warning', 0)
|
||
|
||
history.append({
|
||
'pr_id': pr.get('id'),
|
||
'pr_number': pr.get('pr_number'),
|
||
'repo_name': pr.get('repo_name'),
|
||
'title': pr.get('pr_title', ''),
|
||
'author': pr.get('author', ''),
|
||
'created_at': pr.get('created_at', ''),
|
||
'issues_count': issues_count,
|
||
'error_count': error_count,
|
||
'warning_count': warning_count,
|
||
'total_issues': error_count + warning_count,
|
||
'state': pr.get('state', '')
|
||
})
|
||
|
||
return jsonify(history)
|
||
except Exception as e:
|
||
logger.error(f'获取历史趋势失败: {str(e)}')
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>/merge', methods=['POST'])
|
||
def api_merge_pr(pr_id):
|
||
"""合并 PR"""
|
||
try:
|
||
pr = PRScanDB.get_pr_by_id(pr_id)
|
||
if not pr:
|
||
return jsonify({'success': False, 'message': 'PR not found'}), 404
|
||
|
||
logger.info(f"合并 PR - 数据库记录: {pr}")
|
||
|
||
if pr['state'] != 'open':
|
||
return jsonify({'success': False, 'message': 'PR 状态不是 open'}), 400
|
||
|
||
# 解析仓库名
|
||
repo_name = pr['repo_name']
|
||
logger.info(f"仓库名称: {repo_name}")
|
||
|
||
if '/' in repo_name:
|
||
owner, repo = repo_name.split('/')
|
||
else:
|
||
owner = ''
|
||
repo = repo_name
|
||
|
||
logger.info(f"owner: {owner}, repo: {repo}, pr_number: {pr['pr_number']}")
|
||
|
||
# 先检查 PR 状态
|
||
pr_info = gitea_client.get_pull_request(owner, repo, pr['pr_number'])
|
||
if not pr_info:
|
||
return jsonify({'success': False, 'message': '无法获取 PR 信息,请检查仓库名称是否正确'}), 400
|
||
|
||
logger.info(f"PR 信息: state={pr_info.get('state')}, mergeable={pr_info.get('mergeable')}")
|
||
|
||
if pr_info.get('state') != 'open':
|
||
return jsonify({'success': False, 'message': f'PR 状态是 {pr_info.get("state")}, 不是 open'}), 400
|
||
|
||
# 调用 Gitea API 合并
|
||
success = gitea_client.merge_pull_request(
|
||
owner=owner,
|
||
repo=repo,
|
||
pr_number=pr['pr_number'],
|
||
merge_message=f'通过管理平台合并 PR #{pr["pr_number"]}'
|
||
)
|
||
|
||
if success:
|
||
# 更新数据库状态
|
||
PRScanDB.update_pr_state(pr_id, 'merged', merged_by='admin')
|
||
|
||
# 发送飞书通知
|
||
result_text = f"✅ **PR 已通过管理平台合并**\n\n**PR:** {repo_name}#{pr['pr_number']}\n**标题:** {pr['pr_title']}\n**合并人:** 管理员"
|
||
feishu_notifier.send_simple_message('PR 合并', result_text)
|
||
|
||
return jsonify({'success': True, 'message': 'PR 已合并'})
|
||
else:
|
||
return jsonify({'success': False, 'message': '合并失败'}), 500
|
||
|
||
except Exception as e:
|
||
logger.error(f'合并 PR 失败: {str(e)}')
|
||
return jsonify({'success': False, 'message': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/prs/<int:pr_id>/close', methods=['POST'])
|
||
def api_close_pr(pr_id):
|
||
"""关闭 PR"""
|
||
try:
|
||
pr = PRScanDB.get_pr_by_id(pr_id)
|
||
if not pr:
|
||
return jsonify({'success': False, 'message': 'PR not found'}), 404
|
||
|
||
if pr['state'] != 'open':
|
||
return jsonify({'success': False, 'message': 'PR 状态不是 open'}), 400
|
||
|
||
# 解析仓库名
|
||
repo_name = pr['repo_name']
|
||
if '/' in repo_name:
|
||
owner, repo = repo_name.split('/')
|
||
else:
|
||
owner = ''
|
||
repo = repo_name
|
||
|
||
# 调用 Gitea API 关闭
|
||
success = gitea_client.close_pull_request(
|
||
owner=owner,
|
||
repo=repo,
|
||
pr_number=pr['pr_number']
|
||
)
|
||
|
||
if success:
|
||
# 更新数据库状态
|
||
PRScanDB.update_pr_state(pr_id, 'closed')
|
||
|
||
# 发送飞书通知
|
||
result_text = f"❌ **PR 已被管理平台拒绝**\n\n**PR:** {repo_name}#{pr['pr_number']}\n**标题:** {pr['pr_title']}"
|
||
feishu_notifier.send_simple_message('PR 拒绝', result_text)
|
||
|
||
return jsonify({'success': True, 'message': 'PR 已关闭'})
|
||
else:
|
||
return jsonify({'success': False, 'message': '关闭失败'}), 500
|
||
|
||
except Exception as e:
|
||
logger.error(f'关闭 PR 失败: {str(e)}')
|
||
return jsonify({'success': False, 'message': str(e)}), 500
|
||
|
||
|
||
# ============================================
|
||
# 扫描管理平台页面
|
||
# ============================================
|
||
|
||
# 获取 web 目录的绝对路径
|
||
WEB_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'web')
|
||
|
||
|
||
@app.route('/dashboard')
|
||
def dashboard():
|
||
"""扫描管理平台首页"""
|
||
return send_from_directory(WEB_DIR, 'index.html')
|
||
|
||
|
||
@app.route('/web/<path:filename>')
|
||
def serve_static(filename):
|
||
"""提供静态文件服务"""
|
||
return send_from_directory(WEB_DIR, filename)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 强制监听所有网络接口
|
||
host = "0.0.0.0"
|
||
port = config.get('server', {}).get('port', 5000)
|
||
debug = config.get('server', {}).get('debug', True)
|
||
|
||
logger.info(f'启动服务: {host}:{port}')
|
||
app.run(host=host, port=port, debug=debug)
|