Files
code_scan/report/generator.py

323 lines
12 KiB
Python
Raw Normal View History

2026-03-09 09:24:08 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Markdown 报告生成器
生成代码质量扫描报告
"""
import os
import json
import logging
from datetime import datetime
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class ReportGenerator:
"""代码质量扫描报告生成器"""
def __init__(self, config: Dict[str, Any]):
"""
初始化报告生成器
Args:
config: 报告配置
"""
self.config = config
self.output_dir = config.get('output_dir', './reports')
self.keep_files = config.get('keep_files', True)
# 确保输出目录存在
os.makedirs(self.output_dir, exist_ok=True)
def generate(
self,
repo_name: str,
branch: str,
commit_id: str,
commit_message: str,
author: str,
2026-03-10 11:18:39 +08:00
scan_results: Dict[str, Any],
pr_url: str = None,
2026-03-11 12:30:45 +08:00
target_branch: str = None,
pr_number: int = None
2026-03-09 09:24:08 +08:00
) -> Dict[str, Any]:
"""
生成扫描报告
Args:
repo_name: 仓库名称
branch: 分支名
commit_id: 提交 ID
commit_message: 提交信息
author: 提交者
scan_results: 扫描结果
2026-03-10 11:18:39 +08:00
pr_url: PR 链接可选
target_branch: 目标分支可选用于 PR 扫描
2026-03-09 09:24:08 +08:00
Returns:
报告数据
"""
# 计算总体统计
total_issues = 0
total_errors = 0
total_warnings = 0
for scanner_name, result in scan_results.items():
2026-03-10 17:22:07 +08:00
# AI 审查的 summary 是字符串,跳过统计
if scanner_name == 'ai':
continue
2026-03-09 09:24:08 +08:00
summary = result.get('summary', {})
2026-03-10 17:22:07 +08:00
if not isinstance(summary, dict):
continue
2026-03-09 09:24:08 +08:00
total_issues += summary.get('total', 0)
total_errors += summary.get('error', 0) + summary.get('high', 0)
total_warnings += summary.get('warning', 0) + summary.get('medium', 0)
# 确定状态
if total_issues == 0:
status = 'pass'
status_text = '✅ 扫描通过'
elif total_errors > 0:
status = 'fail'
status_text = f'❌ 发现 {total_errors} 个错误'
else:
status = 'warning'
status_text = f'⚠️ 发现 {total_warnings} 个警告'
# 生成报告数据
report = {
'repo_name': repo_name,
'branch': branch,
'commit_id': commit_id,
'commit_message': commit_message,
'author': author,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': status,
'status_text': status_text,
'total_issues': total_issues,
'total_errors': total_errors,
'total_warnings': total_warnings,
'scan_results': scan_results,
2026-03-10 11:18:39 +08:00
'pr_url': pr_url,
'target_branch': target_branch,
2026-03-11 12:30:45 +08:00
'pr_number': pr_number,
2026-03-09 09:24:08 +08:00
'markdown': self._generate_markdown(
2026-03-10 11:18:39 +08:00
repo_name, branch, commit_id, commit_message, author, scan_results, status, status_text, pr_url, target_branch
2026-03-09 09:24:08 +08:00
)
}
# 保存报告文件
2026-03-10 17:22:07 +08:00
report_file = None
2026-03-09 09:24:08 +08:00
if self.keep_files:
2026-03-10 17:22:07 +08:00
report_file = self._save_report(report)
report['report_file'] = report_file
2026-03-09 09:24:08 +08:00
return report
def _generate_markdown(
self,
repo_name: str,
branch: str,
commit_id: str,
commit_message: str,
author: str,
scan_results: Dict[str, Any],
status: str,
2026-03-10 11:18:39 +08:00
status_text: str,
pr_url: str = None,
target_branch: str = None
2026-03-09 09:24:08 +08:00
) -> str:
"""生成 Markdown 格式的报告"""
lines = []
2026-03-10 11:18:39 +08:00
# 标题 - 根据是否为 PR 扫描显示不同标题
if pr_url:
lines.append('# 📊 PR 代码质量扫描报告')
else:
lines.append('# 📊 代码质量扫描报告')
2026-03-09 09:24:08 +08:00
lines.append('')
# 基本信息
lines.append('## 📋 基本信息')
lines.append('')
lines.append(f'| 项目 | 内容 |')
lines.append(f'|------|------|')
lines.append(f'| 仓库 | `{repo_name}` |')
2026-03-10 11:18:39 +08:00
# 如果是 PR显示 PR 特有信息
if pr_url and target_branch:
lines.append(f'| 源分支 | `{branch}` |')
lines.append(f'| 目标分支 | `{target_branch}` |')
lines.append(f'| PR 链接 | [查看 PR]({pr_url}) |')
else:
lines.append(f'| 分支 | `{branch}` |')
2026-03-09 09:24:08 +08:00
lines.append(f'| 提交 | `{commit_id}` |')
lines.append(f'| 提交者 | {author} |')
lines.append(f'| 提交信息 | {commit_message[:50]}... |' if len(commit_message) > 50 else f'| 提交信息 | {commit_message} |')
lines.append(f'| 扫描时间 | {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |')
lines.append('')
# 扫描状态
lines.append('## 📈 扫描状态')
lines.append('')
lines.append(f'**{status_text}**')
lines.append('')
# 各扫描器结果汇总
lines.append('## 🔍 扫描详情')
lines.append('')
for scanner_name, result in scan_results.items():
2026-03-10 17:22:07 +08:00
# 跳过 AI 审查结果(单独处理)
if scanner_name == 'ai':
continue
2026-03-09 09:24:08 +08:00
tool_name = result.get('tool', scanner_name)
summary = result.get('summary', {})
lines.append(f'### {tool_name}')
lines.append('')
lines.append(f'- 扫描文件数: {result.get("files_scanned", 0)}')
lines.append(f'- 总问题数: {summary.get("total", 0)}')
# 根据不同扫描器显示不同的摘要字段
if 'error' in summary:
lines.append(f' - 错误: {summary.get("error", 0)}')
lines.append(f' - 警告: {summary.get("warning", 0)}')
lines.append(f' - 提示: {summary.get("info", 0)}')
elif 'high' in summary:
lines.append(f' - 高危: {summary.get("high", 0)}')
lines.append(f' - 中危: {summary.get("medium", 0)}')
lines.append(f' - 低危: {summary.get("low", 0)}')
issues = result.get('issues', [])
if issues and self.config.get('detailed', True):
lines.append('')
lines.append('**问题列表:**')
lines.append('')
for i, issue in enumerate(issues[:10], 1): # 最多显示10个
severity = issue.get('severity', 'Unknown')
severity_emoji = {
'HIGH': '🔴',
'MEDIUM': '🟡',
'LOW': '🔵',
'ERROR': '🔴',
'WARNING': '🟡',
'INFO': ''
}.get(severity.upper(), '')
file_path = issue.get('file', 'unknown')
line_num = issue.get('line', 0)
message = issue.get('message', 'No message')
lines.append(f'{i}. {severity_emoji} **{severity}** - `{file_path}:{line_num}`')
lines.append(f' - {message}')
lines.append('')
2026-03-13 11:26:01 +08:00
# AI 审查结果适配新格式issues 列表)
2026-03-10 17:22:07 +08:00
if 'ai' in scan_results:
ai_result = scan_results['ai']
lines.append('')
lines.append('## 🤖 AI 代码审查')
lines.append('')
2026-03-13 11:26:01 +08:00
# 新格式:直接使用 summary
if 'summary' in ai_result:
# summary 可能是字符串或 dict
summary = ai_result.get('summary', '')
if isinstance(summary, dict):
lines.append(f"发现 {summary.get('total', 0)} 个问题,"
f"错误 {summary.get('error', 0)}"
f"警告 {summary.get('warning', 0)}"
f"提示 {summary.get('info', 0)}")
else:
lines.append(str(summary))
lines.append('')
2026-03-10 17:22:07 +08:00
2026-03-13 11:26:01 +08:00
# 新格式issues 列表
ai_issues = ai_result.get('issues', [])
if ai_issues:
# 按文件分组
issues_by_file = {}
for issue in ai_issues:
file_name = issue.get('file', 'unknown')
if file_name not in issues_by_file:
issues_by_file[file_name] = []
issues_by_file[file_name].append(issue)
for file_name, issues in issues_by_file.items():
2026-03-10 17:22:07 +08:00
lines.append(f'### 📄 {file_name}')
lines.append('')
2026-03-13 11:26:01 +08:00
for i, issue in enumerate(issues[:10], 1):
severity = issue.get('severity', 'Info')
severity_emoji = {
'ERROR': '🔴',
'WARNING': '🟡',
'INFO': ''
}.get(severity.upper(), '')
line_num = issue.get('line', 0)
symbol = issue.get('symbol', '')
message = issue.get('message', 'No message')
code_context = issue.get('code_context', '')
defect_reason = issue.get('defect_reason', '')
lines.append(f'{i}. {severity_emoji} **{severity}** - 行 {line_num}')
if symbol:
lines.append(f' - 标识: `{symbol}`')
lines.append(f' - 问题: {message}')
if code_context:
lines.append(' - 代码:')
lines.append('```')
lines.append(code_context)
lines.append('```')
if defect_reason:
lines.append(f' - 原因: {defect_reason}')
2026-03-10 17:22:07 +08:00
lines.append('')
2026-03-09 09:24:08 +08:00
# 添加报告链接或下一步操作
lines.append('---')
lines.append('')
lines.append('*此报告由 AI Code Quality Scanner 自动生成*')
return '\n'.join(lines)
2026-03-10 17:22:07 +08:00
def _save_report(self, report: Dict[str, Any]) -> str:
"""
保存报告到文件
Returns:
保存的文件路径
"""
2026-03-09 09:24:08 +08:00
try:
# 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
repo_name = report['repo_name'].replace('/', '_')
filename = f'{repo_name}_{report["commit_id"]}_{timestamp}.md'
filepath = os.path.join(self.output_dir, filename)
# 写入文件
with open(filepath, 'w', encoding='utf-8') as f:
f.write(report['markdown'])
logger.info(f'报告已保存: {filepath}')
2026-03-10 17:22:07 +08:00
return filepath
2026-03-09 09:24:08 +08:00
# 同时保存 JSON 格式(便于程序解析)
json_filename = filename.replace('.md', '.json')
json_filepath = os.path.join(self.output_dir, json_filename)
with open(json_filepath, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
logger.info(f'JSON 报告已保存: {json_filepath}')
except Exception as e:
logger.error(f'保存报告失败: {str(e)}')