#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 代码扫描器基类 定义扫描器接口和通用功能 """ import os import logging import tempfile import shutil from abc import ABC, abstractmethod from typing import Dict, Any, List, Optional from git import Repo logger = logging.getLogger(__name__) class BaseScanner(ABC): """代码扫描器基类""" def __init__(self, config: Dict[str, Any]): """ 初始化扫描器 Args: config: 扫描器配置 """ self.config = config self.temp_dir = config.get('temp_clone_dir', '/tmp/code_scanner_clones') self.max_issues = config.get('max_issues', 10) self.detailed = config.get('detailed', True) # 确保临时目录存在 os.makedirs(self.temp_dir, exist_ok=True) @abstractmethod def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: """ 执行代码扫描 Args: repo_url: 仓库 URL commit_id: 提交 ID branch: 分支名 Returns: 扫描结果 """ pass def clone_repo(self, repo_url: str, commit_id: Optional[str], branch: str) -> str: """ 克隆代码仓库到临时目录 Args: repo_url: 仓库 URL commit_id: 提交 ID(可选,为 None 时使用 branch) branch: 分支名 Returns: 克隆的目录路径 """ # 生成唯一的目录名 repo_name = repo_url.split('/')[-1].replace('.git', '') commit_hash = commit_id or branch clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}") # 如果目录已存在,先删除(带重试机制) if os.path.exists(clone_dir): self.cleanup(clone_dir) repo = None try: logger.info(f'克隆仓库: {repo_url}') # 克隆仓库(浅克隆,只获取最新提交) repo = Repo.clone_from( repo_url, clone_dir, depth=1, branch=branch ) # 如果指定了 commit_id,切换到该提交 if commit_id: repo.git.checkout(commit_id) logger.info(f'仓库克隆成功: {clone_dir}') return clone_dir except Exception as e: logger.error(f'克隆仓库失败: {str(e)}') raise finally: # 显式关闭 Repo 对象以释放文件句柄(特别是 Windows) if repo is not None: repo.close() def cleanup(self, clone_dir: str): """ 清理临时目录(带重试机制,处理 Windows 权限问题) Args: clone_dir: 克隆的目录路径 """ import time import stat def handle_remove_readonly(func, path, exc_info): """处理只读文件的删除问题(Windows)""" # 添加写权限并重试 os.chmod(path, stat.S_IWRITE) func(path) max_retries = 3 retry_delay = 1 # 秒 for attempt in range(max_retries): try: if os.path.exists(clone_dir): # Windows 上使用 onerror 回调处理只读文件 shutil.rmtree(clone_dir, onerror=handle_remove_readonly) logger.info(f'清理临时目录: {clone_dir}') return # 成功清理,直接返回 except Exception as e: if attempt < max_retries - 1: logger.warning(f'清理临时目录失败,{retry_delay}秒后重试: {str(e)}') time.sleep(retry_delay) retry_delay *= 2 # 指数退避 else: logger.warning(f'清理临时目录失败(已重试{max_retries}次): {str(e)}') def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]: """ 运行命令并返回结果 Args: cmd: 命令列表 cwd: 工作目录 timeout: 超时时间(秒) Returns: 命令执行结果 """ import subprocess try: result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=timeout ) return { 'success': result.returncode == 0, 'returncode': result.returncode, 'stdout': result.stdout, 'stderr': result.stderr } except subprocess.TimeoutExpired: return { 'success': False, 'returncode': -1, 'stdout': '', 'stderr': 'Command timeout' } except Exception as e: return { 'success': False, 'returncode': -1, 'stdout': '', 'stderr': str(e) } def get_changed_files(self, clone_dir: str, extensions: List[str], changed_files: Optional[List[str]] = None) -> List[str]: """ 获取指定扩展名的文件列表 Args: clone_dir: 仓库目录 extensions: 文件扩展名列表 changed_files: 可选的变更文件列表(来自 PR),如果提供则只返回这些文件 Returns: 文件路径列表 """ # 如果提供了变更文件列表,只扫描这些文件 if changed_files: files = [] for changed_file in changed_files: # 检查文件扩展名是否匹配 if any(changed_file.endswith(ext) for ext in extensions): full_path = os.path.join(clone_dir, changed_file) if os.path.exists(full_path): files.append(full_path) return files # 否则扫描整个仓库 files = [] for root, dirs, filenames in os.walk(clone_dir): # 跳过隐藏目录和特殊目录 dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv', '.git']] for filename in filenames: if any(filename.endswith(ext) for ext in extensions): files.append(os.path.join(root, filename)) return files