185 lines
6.4 KiB
Python
185 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
代码扫描器基类
|
||
定义扫描器接口和通用功能
|
||
"""
|
||
import os
|
||
import logging
|
||
import tempfile
|
||
import shutil
|
||
from abc import ABC, abstractmethod
|
||
from typing import Dict, Any, List, Optional
|
||
from git import Repo
|
||
logger = logging.getLogger(__name__)
|
||
class BaseScanner(ABC):
|
||
"""代码扫描器基类"""
|
||
def __init__(self, config: Dict[str, Any]):
|
||
"""
|
||
初始化扫描器
|
||
Args:
|
||
config: 扫描器配置
|
||
"""
|
||
self.config = config
|
||
self.temp_dir = config.get('temp_clone_dir', '/tmp/code_scanner_clones')
|
||
self.max_issues = config.get('max_issues', 10)
|
||
self.detailed = config.get('detailed', True)
|
||
# 确保临时目录存在
|
||
os.makedirs(self.temp_dir, exist_ok=True)
|
||
@abstractmethod
|
||
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
|
||
"""
|
||
执行代码扫描
|
||
Args:
|
||
repo_url: 仓库 URL
|
||
commit_id: 提交 ID
|
||
branch: 分支名
|
||
Returns:
|
||
扫描结果
|
||
"""
|
||
pass
|
||
def clone_repo(self, repo_url: str, commit_id: Optional[str], branch: str) -> str:
|
||
"""
|
||
克隆代码仓库到临时目录
|
||
Args:
|
||
repo_url: 仓库 URL
|
||
commit_id: 提交 ID(可选,为 None 时使用 branch)
|
||
branch: 分支名
|
||
Returns:
|
||
克隆的目录路径
|
||
"""
|
||
# 生成唯一的目录名
|
||
repo_name = repo_url.split('/')[-1].replace('.git', '')
|
||
commit_hash = commit_id or branch
|
||
clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}")
|
||
|
||
# 如果目录已存在,先删除(带重试机制)
|
||
if os.path.exists(clone_dir):
|
||
self.cleanup(clone_dir)
|
||
|
||
repo = None
|
||
try:
|
||
logger.info(f'克隆仓库: {repo_url}')
|
||
# 克隆仓库(浅克隆,只获取最新提交)
|
||
repo = Repo.clone_from(
|
||
repo_url,
|
||
clone_dir,
|
||
depth=1,
|
||
branch=branch
|
||
)
|
||
|
||
# 如果指定了 commit_id,切换到该提交
|
||
if commit_id:
|
||
repo.git.checkout(commit_id)
|
||
|
||
logger.info(f'仓库克隆成功: {clone_dir}')
|
||
return clone_dir
|
||
except Exception as e:
|
||
logger.error(f'克隆仓库失败: {str(e)}')
|
||
raise
|
||
finally:
|
||
# 显式关闭 Repo 对象以释放文件句柄(特别是 Windows)
|
||
if repo is not None:
|
||
repo.close()
|
||
def cleanup(self, clone_dir: str):
|
||
"""
|
||
清理临时目录(带重试机制,处理 Windows 权限问题)
|
||
Args:
|
||
clone_dir: 克隆的目录路径
|
||
"""
|
||
import time
|
||
import stat
|
||
|
||
def handle_remove_readonly(func, path, exc_info):
|
||
"""处理只读文件的删除问题(Windows)"""
|
||
# 添加写权限并重试
|
||
os.chmod(path, stat.S_IWRITE)
|
||
func(path)
|
||
|
||
max_retries = 3
|
||
retry_delay = 1 # 秒
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
if os.path.exists(clone_dir):
|
||
# Windows 上使用 onerror 回调处理只读文件
|
||
shutil.rmtree(clone_dir, onerror=handle_remove_readonly)
|
||
logger.info(f'清理临时目录: {clone_dir}')
|
||
return # 成功清理,直接返回
|
||
except Exception as e:
|
||
if attempt < max_retries - 1:
|
||
logger.warning(f'清理临时目录失败,{retry_delay}秒后重试: {str(e)}')
|
||
time.sleep(retry_delay)
|
||
retry_delay *= 2 # 指数退避
|
||
else:
|
||
logger.warning(f'清理临时目录失败(已重试{max_retries}次): {str(e)}')
|
||
def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]:
|
||
"""
|
||
运行命令并返回结果
|
||
Args:
|
||
cmd: 命令列表
|
||
cwd: 工作目录
|
||
timeout: 超时时间(秒)
|
||
Returns:
|
||
命令执行结果
|
||
"""
|
||
import subprocess
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout
|
||
)
|
||
return {
|
||
'success': result.returncode == 0,
|
||
'returncode': result.returncode,
|
||
'stdout': result.stdout,
|
||
'stderr': result.stderr
|
||
}
|
||
except subprocess.TimeoutExpired:
|
||
return {
|
||
'success': False,
|
||
'returncode': -1,
|
||
'stdout': '',
|
||
'stderr': 'Command timeout'
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'returncode': -1,
|
||
'stdout': '',
|
||
'stderr': str(e)
|
||
}
|
||
def get_changed_files(self, clone_dir: str, extensions: List[str], changed_files: Optional[List[str]] = None) -> List[str]:
|
||
"""
|
||
获取指定扩展名的文件列表
|
||
Args:
|
||
clone_dir: 仓库目录
|
||
extensions: 文件扩展名列表
|
||
changed_files: 可选的变更文件列表(来自 PR),如果提供则只返回这些文件
|
||
Returns:
|
||
文件路径列表
|
||
"""
|
||
# 如果提供了变更文件列表,只扫描这些文件
|
||
if changed_files:
|
||
files = []
|
||
for changed_file in changed_files:
|
||
# 检查文件扩展名是否匹配
|
||
if any(changed_file.endswith(ext) for ext in extensions):
|
||
full_path = os.path.join(clone_dir, changed_file)
|
||
if os.path.exists(full_path):
|
||
files.append(full_path)
|
||
return files
|
||
|
||
# 否则扫描整个仓库
|
||
files = []
|
||
for root, dirs, filenames in os.walk(clone_dir):
|
||
# 跳过隐藏目录和特殊目录
|
||
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv', '.git']]
|
||
for filename in filenames:
|
||
if any(filename.endswith(ext) for ext in extensions):
|
||
files.append(os.path.join(root, filename))
|
||
return files
|