197 lines
6.4 KiB
Python
197 lines
6.4 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
# -*- coding: utf-8 -*-
|
||
|
|
"""
|
||
|
|
Python 代码扫描器
|
||
|
|
使用 Pylint、Flake8、MyPy 进行代码质量检查
|
||
|
|
"""
|
||
|
|
import os
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
from typing import Dict, Any, List, Optional
|
||
|
|
from scanner.base import BaseScanner
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class PythonScanner(BaseScanner):
|
||
|
|
"""Python 代码扫描器"""
|
||
|
|
|
||
|
|
def __init__(self, config: Dict[str, Any]):
|
||
|
|
super().__init__(config)
|
||
|
|
self.extensions = ['.py']
|
||
|
|
|
||
|
|
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
|
||
|
|
"""
|
||
|
|
执行 Python 代码扫描
|
||
|
|
|
||
|
|
Args:
|
||
|
|
repo_url: 仓库 URL
|
||
|
|
commit_id: 提交 ID
|
||
|
|
branch: 分支名
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
扫描结果
|
||
|
|
"""
|
||
|
|
result = {
|
||
|
|
'tool': 'Python Scanner',
|
||
|
|
'language': 'python',
|
||
|
|
'status': 'success',
|
||
|
|
'issues': [],
|
||
|
|
'summary': {
|
||
|
|
'total': 0,
|
||
|
|
'error': 0,
|
||
|
|
'warning': 0,
|
||
|
|
'info': 0
|
||
|
|
},
|
||
|
|
'files_scanned': 0
|
||
|
|
}
|
||
|
|
|
||
|
|
clone_dir = None
|
||
|
|
try:
|
||
|
|
# 克隆仓库
|
||
|
|
clone_dir = self.clone_repo(repo_url, commit_id, branch)
|
||
|
|
|
||
|
|
# 获取 Python 文件
|
||
|
|
py_files = self.get_changed_files(clone_dir, self.extensions)
|
||
|
|
result['files_scanned'] = len(py_files)
|
||
|
|
|
||
|
|
if not py_files:
|
||
|
|
logger.info('没有找到 Python 文件')
|
||
|
|
return result
|
||
|
|
|
||
|
|
# 运行各种扫描工具
|
||
|
|
pylint_result = self._run_pylint(clone_dir, py_files)
|
||
|
|
flake8_result = self._run_flake8(clone_dir, py_files)
|
||
|
|
|
||
|
|
# 合并结果
|
||
|
|
all_issues = []
|
||
|
|
all_issues.extend(pylint_result.get('issues', []))
|
||
|
|
all_issues.extend(flake8_result.get('issues', []))
|
||
|
|
|
||
|
|
result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues
|
||
|
|
result['summary'] = self._calculate_summary(all_issues)
|
||
|
|
result['raw_output'] = {
|
||
|
|
'pylint': pylint_result.get('raw_output', ''),
|
||
|
|
'flake8': flake8_result.get('raw_output', '')
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f'Python 扫描失败: {str(e)}')
|
||
|
|
result['status'] = 'error'
|
||
|
|
result['error'] = str(e)
|
||
|
|
|
||
|
|
finally:
|
||
|
|
# 清理临时目录
|
||
|
|
if clone_dir:
|
||
|
|
self.cleanup(clone_dir)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
def _run_pylint(self, cwd: str, files: List[str]) -> Dict[str, Any]:
|
||
|
|
"""运行 Pylint 扫描"""
|
||
|
|
result = {
|
||
|
|
'tool': 'pylint',
|
||
|
|
'issues': [],
|
||
|
|
'raw_output': ''
|
||
|
|
}
|
||
|
|
|
||
|
|
# 只扫描变更的文件
|
||
|
|
try:
|
||
|
|
cmd = ['python', '-m', 'pylint', '--output-format=json'] + files
|
||
|
|
output = self.run_command(cmd, cwd, timeout=120)
|
||
|
|
|
||
|
|
result['raw_output'] = output.get('stdout', '')
|
||
|
|
|
||
|
|
# 解析 JSON 输出
|
||
|
|
if output.get('stdout'):
|
||
|
|
try:
|
||
|
|
issues = json.loads(output['stdout'])
|
||
|
|
for issue in issues:
|
||
|
|
result['issues'].append({
|
||
|
|
'tool': 'pylint',
|
||
|
|
'type': issue.get('type', 'info'),
|
||
|
|
'severity': issue.get('severity', 'Info'),
|
||
|
|
'message': issue.get('message', ''),
|
||
|
|
'file': os.path.basename(issue.get('path', '')),
|
||
|
|
'line': issue.get('line', 0),
|
||
|
|
'column': issue.get('column', 0),
|
||
|
|
'symbol': issue.get('symbol', '')
|
||
|
|
})
|
||
|
|
except json.JSONDecodeError:
|
||
|
|
logger.warning('Pylint JSON 解析失败')
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f'Pylint 运行失败: {str(e)}')
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
def _run_flake8(self, cwd: str, files: List[str]) -> Dict[str, Any]:
|
||
|
|
"""运行 Flake8 扫描"""
|
||
|
|
result = {
|
||
|
|
'tool': 'flake8',
|
||
|
|
'issues': [],
|
||
|
|
'raw_output': ''
|
||
|
|
}
|
||
|
|
|
||
|
|
try:
|
||
|
|
cmd = ['python', '-m', 'flake8', '--format=json'] + files
|
||
|
|
output = self.run_command(cmd, cwd, timeout=120)
|
||
|
|
|
||
|
|
result['raw_output'] = output.get('stdout', '')
|
||
|
|
|
||
|
|
# 解析 JSON 输出
|
||
|
|
if output.get('stdout'):
|
||
|
|
try:
|
||
|
|
issues = json.loads(output['stdout'])
|
||
|
|
for issue in issues:
|
||
|
|
result['issues'].append({
|
||
|
|
'tool': 'flake8',
|
||
|
|
'type': self._map_flake8_code(issue.get('code', '')),
|
||
|
|
'severity': 'Warning',
|
||
|
|
'message': issue.get('text', ''),
|
||
|
|
'file': os.path.basename(issue.get('filename', '')),
|
||
|
|
'line': issue.get('line_number', 0),
|
||
|
|
'column': issue.get('column_number', 0),
|
||
|
|
'symbol': issue.get('code', '')
|
||
|
|
})
|
||
|
|
except json.JSONDecodeError:
|
||
|
|
logger.warning('Flake8 JSON 解析失败')
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f'Flake8 运行失败: {str(e)}')
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
def _map_flake8_code(self, code: str) -> str:
|
||
|
|
"""映射 Flake8 错误代码到类型"""
|
||
|
|
# E/W - Flake8 错误/警告
|
||
|
|
# F - Pyflakes
|
||
|
|
# C - mccabe 复杂度
|
||
|
|
if code.startswith('E') or code.startswith('W'):
|
||
|
|
return 'error' if code.startswith('E') else 'warning'
|
||
|
|
elif code.startswith('F'):
|
||
|
|
return 'error'
|
||
|
|
elif code.startswith('C'):
|
||
|
|
return 'warning'
|
||
|
|
return 'info'
|
||
|
|
|
||
|
|
def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
|
||
|
|
"""计算问题摘要"""
|
||
|
|
summary = {
|
||
|
|
'total': len(issues),
|
||
|
|
'error': 0,
|
||
|
|
'warning': 0,
|
||
|
|
'info': 0
|
||
|
|
}
|
||
|
|
|
||
|
|
for issue in issues:
|
||
|
|
severity = issue.get('severity', '').lower()
|
||
|
|
if severity in ['error', 'critical', 'fatal', 'error']:
|
||
|
|
summary['error'] += 1
|
||
|
|
elif severity in ['warning', 'moderate']:
|
||
|
|
summary['warning'] += 1
|
||
|
|
else:
|
||
|
|
summary['info'] += 1
|
||
|
|
|
||
|
|
return summary
|