Files
code_scan/report/generator.py
Dang Zerong cb90b66f09 代码测试
2026-03-13 11:26:01 +08:00

323 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Markdown 报告生成器
生成代码质量扫描报告
"""
import os
import json
import logging
from datetime import datetime
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class ReportGenerator:
"""代码质量扫描报告生成器"""
def __init__(self, config: Dict[str, Any]):
"""
初始化报告生成器
Args:
config: 报告配置
"""
self.config = config
self.output_dir = config.get('output_dir', './reports')
self.keep_files = config.get('keep_files', True)
# 确保输出目录存在
os.makedirs(self.output_dir, exist_ok=True)
def generate(
self,
repo_name: str,
branch: str,
commit_id: str,
commit_message: str,
author: str,
scan_results: Dict[str, Any],
pr_url: str = None,
target_branch: str = None,
pr_number: int = None
) -> Dict[str, Any]:
"""
生成扫描报告
Args:
repo_name: 仓库名称
branch: 分支名
commit_id: 提交 ID
commit_message: 提交信息
author: 提交者
scan_results: 扫描结果
pr_url: PR 链接(可选)
target_branch: 目标分支(可选,用于 PR 扫描)
Returns:
报告数据
"""
# 计算总体统计
total_issues = 0
total_errors = 0
total_warnings = 0
for scanner_name, result in scan_results.items():
# AI 审查的 summary 是字符串,跳过统计
if scanner_name == 'ai':
continue
summary = result.get('summary', {})
if not isinstance(summary, dict):
continue
total_issues += summary.get('total', 0)
total_errors += summary.get('error', 0) + summary.get('high', 0)
total_warnings += summary.get('warning', 0) + summary.get('medium', 0)
# 确定状态
if total_issues == 0:
status = 'pass'
status_text = '✅ 扫描通过'
elif total_errors > 0:
status = 'fail'
status_text = f'❌ 发现 {total_errors} 个错误'
else:
status = 'warning'
status_text = f'⚠️ 发现 {total_warnings} 个警告'
# 生成报告数据
report = {
'repo_name': repo_name,
'branch': branch,
'commit_id': commit_id,
'commit_message': commit_message,
'author': author,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': status,
'status_text': status_text,
'total_issues': total_issues,
'total_errors': total_errors,
'total_warnings': total_warnings,
'scan_results': scan_results,
'pr_url': pr_url,
'target_branch': target_branch,
'pr_number': pr_number,
'markdown': self._generate_markdown(
repo_name, branch, commit_id, commit_message, author, scan_results, status, status_text, pr_url, target_branch
)
}
# 保存报告文件
report_file = None
if self.keep_files:
report_file = self._save_report(report)
report['report_file'] = report_file
return report
def _generate_markdown(
self,
repo_name: str,
branch: str,
commit_id: str,
commit_message: str,
author: str,
scan_results: Dict[str, Any],
status: str,
status_text: str,
pr_url: str = None,
target_branch: str = None
) -> str:
"""生成 Markdown 格式的报告"""
lines = []
# 标题 - 根据是否为 PR 扫描显示不同标题
if pr_url:
lines.append('# 📊 PR 代码质量扫描报告')
else:
lines.append('# 📊 代码质量扫描报告')
lines.append('')
# 基本信息
lines.append('## 📋 基本信息')
lines.append('')
lines.append(f'| 项目 | 内容 |')
lines.append(f'|------|------|')
lines.append(f'| 仓库 | `{repo_name}` |')
# 如果是 PR显示 PR 特有信息
if pr_url and target_branch:
lines.append(f'| 源分支 | `{branch}` |')
lines.append(f'| 目标分支 | `{target_branch}` |')
lines.append(f'| PR 链接 | [查看 PR]({pr_url}) |')
else:
lines.append(f'| 分支 | `{branch}` |')
lines.append(f'| 提交 | `{commit_id}` |')
lines.append(f'| 提交者 | {author} |')
lines.append(f'| 提交信息 | {commit_message[:50]}... |' if len(commit_message) > 50 else f'| 提交信息 | {commit_message} |')
lines.append(f'| 扫描时间 | {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |')
lines.append('')
# 扫描状态
lines.append('## 📈 扫描状态')
lines.append('')
lines.append(f'**{status_text}**')
lines.append('')
# 各扫描器结果汇总
lines.append('## 🔍 扫描详情')
lines.append('')
for scanner_name, result in scan_results.items():
# 跳过 AI 审查结果(单独处理)
if scanner_name == 'ai':
continue
tool_name = result.get('tool', scanner_name)
summary = result.get('summary', {})
lines.append(f'### {tool_name}')
lines.append('')
lines.append(f'- 扫描文件数: {result.get("files_scanned", 0)}')
lines.append(f'- 总问题数: {summary.get("total", 0)}')
# 根据不同扫描器显示不同的摘要字段
if 'error' in summary:
lines.append(f' - 错误: {summary.get("error", 0)}')
lines.append(f' - 警告: {summary.get("warning", 0)}')
lines.append(f' - 提示: {summary.get("info", 0)}')
elif 'high' in summary:
lines.append(f' - 高危: {summary.get("high", 0)}')
lines.append(f' - 中危: {summary.get("medium", 0)}')
lines.append(f' - 低危: {summary.get("low", 0)}')
issues = result.get('issues', [])
if issues and self.config.get('detailed', True):
lines.append('')
lines.append('**问题列表:**')
lines.append('')
for i, issue in enumerate(issues[:10], 1): # 最多显示10个
severity = issue.get('severity', 'Unknown')
severity_emoji = {
'HIGH': '🔴',
'MEDIUM': '🟡',
'LOW': '🔵',
'ERROR': '🔴',
'WARNING': '🟡',
'INFO': ''
}.get(severity.upper(), '')
file_path = issue.get('file', 'unknown')
line_num = issue.get('line', 0)
message = issue.get('message', 'No message')
lines.append(f'{i}. {severity_emoji} **{severity}** - `{file_path}:{line_num}`')
lines.append(f' - {message}')
lines.append('')
# AI 审查结果适配新格式issues 列表)
if 'ai' in scan_results:
ai_result = scan_results['ai']
lines.append('')
lines.append('## 🤖 AI 代码审查')
lines.append('')
# 新格式:直接使用 summary
if 'summary' in ai_result:
# summary 可能是字符串或 dict
summary = ai_result.get('summary', '')
if isinstance(summary, dict):
lines.append(f"发现 {summary.get('total', 0)} 个问题,"
f"错误 {summary.get('error', 0)}"
f"警告 {summary.get('warning', 0)}"
f"提示 {summary.get('info', 0)}")
else:
lines.append(str(summary))
lines.append('')
# 新格式issues 列表
ai_issues = ai_result.get('issues', [])
if ai_issues:
# 按文件分组
issues_by_file = {}
for issue in ai_issues:
file_name = issue.get('file', 'unknown')
if file_name not in issues_by_file:
issues_by_file[file_name] = []
issues_by_file[file_name].append(issue)
for file_name, issues in issues_by_file.items():
lines.append(f'### 📄 {file_name}')
lines.append('')
for i, issue in enumerate(issues[:10], 1):
severity = issue.get('severity', 'Info')
severity_emoji = {
'ERROR': '🔴',
'WARNING': '🟡',
'INFO': ''
}.get(severity.upper(), '')
line_num = issue.get('line', 0)
symbol = issue.get('symbol', '')
message = issue.get('message', 'No message')
code_context = issue.get('code_context', '')
defect_reason = issue.get('defect_reason', '')
lines.append(f'{i}. {severity_emoji} **{severity}** - 行 {line_num}')
if symbol:
lines.append(f' - 标识: `{symbol}`')
lines.append(f' - 问题: {message}')
if code_context:
lines.append(' - 代码:')
lines.append('```')
lines.append(code_context)
lines.append('```')
if defect_reason:
lines.append(f' - 原因: {defect_reason}')
lines.append('')
# 添加报告链接或下一步操作
lines.append('---')
lines.append('')
lines.append('*此报告由 AI Code Quality Scanner 自动生成*')
return '\n'.join(lines)
def _save_report(self, report: Dict[str, Any]) -> str:
"""
保存报告到文件
Returns:
保存的文件路径
"""
try:
# 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
repo_name = report['repo_name'].replace('/', '_')
filename = f'{repo_name}_{report["commit_id"]}_{timestamp}.md'
filepath = os.path.join(self.output_dir, filename)
# 写入文件
with open(filepath, 'w', encoding='utf-8') as f:
f.write(report['markdown'])
logger.info(f'报告已保存: {filepath}')
return filepath
# 同时保存 JSON 格式(便于程序解析)
json_filename = filename.replace('.md', '.json')
json_filepath = os.path.join(self.output_dir, json_filename)
with open(json_filepath, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
logger.info(f'JSON 报告已保存: {json_filepath}')
except Exception as e:
logger.error(f'保存报告失败: {str(e)}')