#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Python 代码扫描器 使用 Pylint、Flake8、MyPy 进行代码质量检查 """ import os import json import logging from typing import Dict, Any, List, Optional from scanner.base import BaseScanner logger = logging.getLogger(__name__) class PythonScanner(BaseScanner): """Python 代码扫描器""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.extensions = ['.py'] def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: """ 执行 Python 代码扫描 Args: repo_url: 仓库 URL commit_id: 提交 ID branch: 分支名 Returns: 扫描结果 """ result = { 'tool': 'Python Scanner', 'language': 'python', 'status': 'success', 'issues': [], 'summary': { 'total': 0, 'error': 0, 'warning': 0, 'info': 0 }, 'files_scanned': 0 } clone_dir = None try: # 克隆仓库 clone_dir = self.clone_repo(repo_url, commit_id, branch) # 获取 Python 文件 py_files = self.get_changed_files(clone_dir, self.extensions) result['files_scanned'] = len(py_files) if not py_files: logger.info('没有找到 Python 文件') return result # 运行各种扫描工具 pylint_result = self._run_pylint(clone_dir, py_files) flake8_result = self._run_flake8(clone_dir, py_files) # 合并结果 all_issues = [] all_issues.extend(pylint_result.get('issues', [])) all_issues.extend(flake8_result.get('issues', [])) result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues result['summary'] = self._calculate_summary(all_issues) result['raw_output'] = { 'pylint': pylint_result.get('raw_output', ''), 'flake8': flake8_result.get('raw_output', '') } except Exception as e: logger.error(f'Python 扫描失败: {str(e)}') result['status'] = 'error' result['error'] = str(e) return result def _run_pylint(self, cwd: str, files: List[str]) -> Dict[str, Any]: """运行 Pylint 扫描""" result = { 'tool': 'pylint', 'issues': [], 'raw_output': '' } # 只扫描变更的文件 try: cmd = ['python', '-m', 'pylint', '--output-format=json'] + files output = self.run_command(cmd, cwd, timeout=120) result['raw_output'] = output.get('stdout', '') # 解析 JSON 输出 if output.get('stdout'): try: issues = json.loads(output['stdout']) for issue in issues: result['issues'].append({ 'tool': 'pylint', 'type': issue.get('type', 'info'), 'severity': issue.get('severity', 'Info'), 'message': issue.get('message', ''), 'file': os.path.basename(issue.get('path', '')), 'line': issue.get('line', 0), 'column': issue.get('column', 0), 'symbol': issue.get('symbol', '') }) except json.JSONDecodeError: logger.warning('Pylint JSON 解析失败') except Exception as e: logger.warning(f'Pylint 运行失败: {str(e)}') return result def _run_flake8(self, cwd: str, files: List[str]) -> Dict[str, Any]: """运行 Flake8 扫描""" result = { 'tool': 'flake8', 'issues': [], 'raw_output': '' } try: cmd = ['python', '-m', 'flake8', '--format=json'] + files output = self.run_command(cmd, cwd, timeout=120) result['raw_output'] = output.get('stdout', '') # 解析 JSON 输出 if output.get('stdout'): try: issues = json.loads(output['stdout']) for issue in issues: result['issues'].append({ 'tool': 'flake8', 'type': self._map_flake8_code(issue.get('code', '')), 'severity': 'Warning', 'message': issue.get('text', ''), 'file': os.path.basename(issue.get('filename', '')), 'line': issue.get('line_number', 0), 'column': issue.get('column_number', 0), 'symbol': issue.get('code', '') }) except json.JSONDecodeError: logger.warning('Flake8 JSON 解析失败') except Exception as e: logger.warning(f'Flake8 运行失败: {str(e)}') return result def _map_flake8_code(self, code: str) -> str: """映射 Flake8 错误代码到类型""" # E/W - Flake8 错误/警告 # F - Pyflakes # C - mccabe 复杂度 if code.startswith('E') or code.startswith('W'): return 'error' if code.startswith('E') else 'warning' elif code.startswith('F'): return 'error' elif code.startswith('C'): return 'warning' return 'info' def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]: """计算问题摘要""" summary = { 'total': len(issues), 'error': 0, 'warning': 0, 'info': 0 } for issue in issues: severity = issue.get('severity', '').lower() if severity in ['error', 'critical', 'fatal', 'error']: summary['error'] += 1 elif severity in ['warning', 'moderate']: summary['warning'] += 1 else: summary['info'] += 1 return summary