Files
code_scan/scanner/base.py

185 lines
6.4 KiB
Python
Raw Permalink Normal View History

2026-03-09 09:24:08 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
代码扫描器基类
定义扫描器接口和通用功能
"""
import os
import logging
import tempfile
import shutil
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from git import Repo
logger = logging.getLogger(__name__)
class BaseScanner(ABC):
"""代码扫描器基类"""
def __init__(self, config: Dict[str, Any]):
"""
初始化扫描器
Args:
config: 扫描器配置
"""
self.config = config
self.temp_dir = config.get('temp_clone_dir', '/tmp/code_scanner_clones')
self.max_issues = config.get('max_issues', 10)
self.detailed = config.get('detailed', True)
# 确保临时目录存在
os.makedirs(self.temp_dir, exist_ok=True)
@abstractmethod
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
"""
执行代码扫描
Args:
repo_url: 仓库 URL
commit_id: 提交 ID
branch: 分支名
Returns:
扫描结果
"""
pass
def clone_repo(self, repo_url: str, commit_id: Optional[str], branch: str) -> str:
"""
克隆代码仓库到临时目录
Args:
repo_url: 仓库 URL
commit_id: 提交 ID可选 None 时使用 branch
branch: 分支名
Returns:
克隆的目录路径
"""
# 生成唯一的目录名
repo_name = repo_url.split('/')[-1].replace('.git', '')
commit_hash = commit_id or branch
clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}")
2026-03-10 17:22:07 +08:00
# 如果目录已存在,先删除(带重试机制)
2026-03-09 09:24:08 +08:00
if os.path.exists(clone_dir):
2026-03-10 17:22:07 +08:00
self.cleanup(clone_dir)
repo = None
2026-03-09 09:24:08 +08:00
try:
logger.info(f'克隆仓库: {repo_url}')
# 克隆仓库(浅克隆,只获取最新提交)
repo = Repo.clone_from(
repo_url,
clone_dir,
depth=1,
branch=branch
)
2026-03-10 17:22:07 +08:00
2026-03-09 09:24:08 +08:00
# 如果指定了 commit_id切换到该提交
if commit_id:
repo.git.checkout(commit_id)
2026-03-10 17:22:07 +08:00
2026-03-09 09:24:08 +08:00
logger.info(f'仓库克隆成功: {clone_dir}')
return clone_dir
except Exception as e:
logger.error(f'克隆仓库失败: {str(e)}')
raise
2026-03-10 17:22:07 +08:00
finally:
# 显式关闭 Repo 对象以释放文件句柄(特别是 Windows
if repo is not None:
repo.close()
2026-03-09 09:24:08 +08:00
def cleanup(self, clone_dir: str):
"""
2026-03-10 17:22:07 +08:00
清理临时目录带重试机制处理 Windows 权限问题
2026-03-09 09:24:08 +08:00
Args:
clone_dir: 克隆的目录路径
"""
2026-03-10 17:22:07 +08:00
import time
import stat
def handle_remove_readonly(func, path, exc_info):
"""处理只读文件的删除问题Windows"""
# 添加写权限并重试
os.chmod(path, stat.S_IWRITE)
func(path)
max_retries = 3
retry_delay = 1 # 秒
for attempt in range(max_retries):
try:
if os.path.exists(clone_dir):
# Windows 上使用 onerror 回调处理只读文件
shutil.rmtree(clone_dir, onerror=handle_remove_readonly)
logger.info(f'清理临时目录: {clone_dir}')
return # 成功清理,直接返回
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f'清理临时目录失败,{retry_delay}秒后重试: {str(e)}')
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
else:
logger.warning(f'清理临时目录失败(已重试{max_retries}次): {str(e)}')
2026-03-09 09:24:08 +08:00
def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]:
"""
运行命令并返回结果
Args:
cmd: 命令列表
cwd: 工作目录
timeout: 超时时间
Returns:
命令执行结果
"""
import subprocess
try:
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout
)
return {
'success': result.returncode == 0,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
except subprocess.TimeoutExpired:
return {
'success': False,
'returncode': -1,
'stdout': '',
'stderr': 'Command timeout'
}
except Exception as e:
return {
'success': False,
'returncode': -1,
'stdout': '',
'stderr': str(e)
}
2026-03-12 14:42:23 +08:00
def get_changed_files(self, clone_dir: str, extensions: List[str], changed_files: Optional[List[str]] = None) -> List[str]:
2026-03-09 09:24:08 +08:00
"""
获取指定扩展名的文件列表
Args:
clone_dir: 仓库目录
extensions: 文件扩展名列表
2026-03-12 14:42:23 +08:00
changed_files: 可选的变更文件列表来自 PR如果提供则只返回这些文件
2026-03-09 09:24:08 +08:00
Returns:
文件路径列表
"""
2026-03-12 14:42:23 +08:00
# 如果提供了变更文件列表,只扫描这些文件
if changed_files:
files = []
for changed_file in changed_files:
# 检查文件扩展名是否匹配
if any(changed_file.endswith(ext) for ext in extensions):
full_path = os.path.join(clone_dir, changed_file)
if os.path.exists(full_path):
files.append(full_path)
return files
# 否则扫描整个仓库
2026-03-09 09:24:08 +08:00
files = []
for root, dirs, filenames in os.walk(clone_dir):
# 跳过隐藏目录和特殊目录
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv', '.git']]
for filename in filenames:
if any(filename.endswith(ext) for ext in extensions):
files.append(os.path.join(root, filename))
return files