Files
code_scan/scanner/python_scanner.py
Dang Zerong 027cf50759 add web
2026-03-12 14:42:23 +08:00

201 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Python 代码扫描器
使用 Pylint、Flake8、MyPy 进行代码质量检查
"""
import os
import json
import logging
from typing import Dict, Any, List, Optional
from scanner.base import BaseScanner
logger = logging.getLogger(__name__)
class PythonScanner(BaseScanner):
"""Python 代码扫描器"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.extensions = ['.py']
def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]:
"""
执行 Python 代码扫描
Args:
repo_url: 仓库 URL
commit_id: 提交 ID
branch: 分支名
changed_files: 可选的变更文件列表(来自 PR
Returns:
扫描结果
"""
result = {
'tool': 'Python Scanner',
'language': 'python',
'status': 'success',
'issues': [],
'summary': {
'total': 0,
'error': 0,
'warning': 0,
'info': 0
},
'files_scanned': 0
}
clone_dir = None
try:
# 克隆仓库
clone_dir = self.clone_repo(repo_url, commit_id, branch)
# 获取 Python 文件(只扫描变更的文件)
py_files = self.get_changed_files(clone_dir, self.extensions, changed_files)
result['files_scanned'] = len(py_files)
if not py_files:
logger.info('没有找到 Python 文件')
return result
# 运行各种扫描工具
pylint_result = self._run_pylint(clone_dir, py_files)
flake8_result = self._run_flake8(clone_dir, py_files)
# 合并结果
all_issues = []
all_issues.extend(pylint_result.get('issues', []))
all_issues.extend(flake8_result.get('issues', []))
result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues
result['summary'] = self._calculate_summary(all_issues)
result['raw_output'] = {
'pylint': pylint_result.get('raw_output', ''),
'flake8': flake8_result.get('raw_output', '')
}
except Exception as e:
logger.error(f'Python 扫描失败: {str(e)}')
result['status'] = 'error'
result['error'] = str(e)
return result
def _run_pylint(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""运行 Pylint 扫描"""
result = {
'tool': 'pylint',
'issues': [],
'raw_output': ''
}
# 只扫描变更的文件
try:
cmd = ['python', '-m', 'pylint', '--output-format=json'] + files
output = self.run_command(cmd, clone_dir, timeout=120)
result['raw_output'] = output.get('stdout', '')
# 解析 JSON 输出
if output.get('stdout'):
try:
issues = json.loads(output['stdout'])
for issue in issues:
# 使用相对于 clone_dir 的路径
full_path = issue.get('path', '')
rel_path = os.path.relpath(full_path, clone_dir) if full_path else ''
result['issues'].append({
'tool': 'pylint',
'type': issue.get('type', 'info'),
'severity': issue.get('severity', 'Info'),
'message': issue.get('message', ''),
'file': rel_path,
'line': issue.get('line', 0),
'column': issue.get('column', 0),
'symbol': issue.get('symbol', '')
})
except json.JSONDecodeError:
logger.warning('Pylint JSON 解析失败')
except Exception as e:
logger.warning(f'Pylint 运行失败: {str(e)}')
return result
def _run_flake8(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""运行 Flake8 扫描"""
result = {
'tool': 'flake8',
'issues': [],
'raw_output': ''
}
try:
cmd = ['python', '-m', 'flake8', '--format=json'] + files
output = self.run_command(cmd, clone_dir, timeout=120)
result['raw_output'] = output.get('stdout', '')
# 解析 JSON 输出
if output.get('stdout'):
try:
issues = json.loads(output['stdout'])
for issue in issues:
# 使用相对于 clone_dir 的路径
full_path = issue.get('filename', '')
rel_path = os.path.relpath(full_path, clone_dir) if full_path else ''
result['issues'].append({
'tool': 'flake8',
'type': self._map_flake8_code(issue.get('code', '')),
'severity': 'Warning',
'message': issue.get('text', ''),
'file': rel_path,
'line': issue.get('line_number', 0),
'column': issue.get('column_number', 0),
'symbol': issue.get('code', '')
})
except json.JSONDecodeError:
logger.warning('Flake8 JSON 解析失败')
except Exception as e:
logger.warning(f'Flake8 运行失败: {str(e)}')
return result
def _map_flake8_code(self, code: str) -> str:
"""映射 Flake8 错误代码到类型"""
# E/W - Flake8 错误/警告
# F - Pyflakes
# C - mccabe 复杂度
if code.startswith('E') or code.startswith('W'):
return 'error' if code.startswith('E') else 'warning'
elif code.startswith('F'):
return 'error'
elif code.startswith('C'):
return 'warning'
return 'info'
def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
"""计算问题摘要"""
summary = {
'total': len(issues),
'error': 0,
'warning': 0,
'info': 0
}
for issue in issues:
severity = issue.get('severity', '').lower()
if severity in ['error', 'critical', 'fatal', 'error']:
summary['error'] += 1
elif severity in ['warning', 'moderate']:
summary['warning'] += 1
else:
summary['info'] += 1
return summary