Files
code_scan/scanner/base.py
Dang Zerong 17306c6814 init
2026-03-10 17:22:07 +08:00

172 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
代码扫描器基类
定义扫描器接口和通用功能
"""
import os
import logging
import tempfile
import shutil
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from git import Repo
logger = logging.getLogger(__name__)
class BaseScanner(ABC):
"""代码扫描器基类"""
def __init__(self, config: Dict[str, Any]):
"""
初始化扫描器
Args:
config: 扫描器配置
"""
self.config = config
self.temp_dir = config.get('temp_clone_dir', '/tmp/code_scanner_clones')
self.max_issues = config.get('max_issues', 10)
self.detailed = config.get('detailed', True)
# 确保临时目录存在
os.makedirs(self.temp_dir, exist_ok=True)
@abstractmethod
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
"""
执行代码扫描
Args:
repo_url: 仓库 URL
commit_id: 提交 ID
branch: 分支名
Returns:
扫描结果
"""
pass
def clone_repo(self, repo_url: str, commit_id: Optional[str], branch: str) -> str:
"""
克隆代码仓库到临时目录
Args:
repo_url: 仓库 URL
commit_id: 提交 ID可选为 None 时使用 branch
branch: 分支名
Returns:
克隆的目录路径
"""
# 生成唯一的目录名
repo_name = repo_url.split('/')[-1].replace('.git', '')
commit_hash = commit_id or branch
clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}")
# 如果目录已存在,先删除(带重试机制)
if os.path.exists(clone_dir):
self.cleanup(clone_dir)
repo = None
try:
logger.info(f'克隆仓库: {repo_url}')
# 克隆仓库(浅克隆,只获取最新提交)
repo = Repo.clone_from(
repo_url,
clone_dir,
depth=1,
branch=branch
)
# 如果指定了 commit_id切换到该提交
if commit_id:
repo.git.checkout(commit_id)
logger.info(f'仓库克隆成功: {clone_dir}')
return clone_dir
except Exception as e:
logger.error(f'克隆仓库失败: {str(e)}')
raise
finally:
# 显式关闭 Repo 对象以释放文件句柄(特别是 Windows
if repo is not None:
repo.close()
def cleanup(self, clone_dir: str):
"""
清理临时目录(带重试机制,处理 Windows 权限问题)
Args:
clone_dir: 克隆的目录路径
"""
import time
import stat
def handle_remove_readonly(func, path, exc_info):
"""处理只读文件的删除问题Windows"""
# 添加写权限并重试
os.chmod(path, stat.S_IWRITE)
func(path)
max_retries = 3
retry_delay = 1 # 秒
for attempt in range(max_retries):
try:
if os.path.exists(clone_dir):
# Windows 上使用 onerror 回调处理只读文件
shutil.rmtree(clone_dir, onerror=handle_remove_readonly)
logger.info(f'清理临时目录: {clone_dir}')
return # 成功清理,直接返回
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f'清理临时目录失败,{retry_delay}秒后重试: {str(e)}')
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
else:
logger.warning(f'清理临时目录失败(已重试{max_retries}次): {str(e)}')
def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]:
"""
运行命令并返回结果
Args:
cmd: 命令列表
cwd: 工作目录
timeout: 超时时间(秒)
Returns:
命令执行结果
"""
import subprocess
try:
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout
)
return {
'success': result.returncode == 0,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
except subprocess.TimeoutExpired:
return {
'success': False,
'returncode': -1,
'stdout': '',
'stderr': 'Command timeout'
}
except Exception as e:
return {
'success': False,
'returncode': -1,
'stdout': '',
'stderr': str(e)
}
def get_changed_files(self, clone_dir: str, extensions: List[str]) -> List[str]:
"""
获取指定扩展名的文件列表
Args:
clone_dir: 仓库目录
extensions: 文件扩展名列表
Returns:
文件路径列表
"""
files = []
for root, dirs, filenames in os.walk(clone_dir):
# 跳过隐藏目录和特殊目录
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv', '.git']]
for filename in filenames:
if any(filename.endswith(ext) for ext in extensions):
files.append(os.path.join(root, filename))
return files