init
This commit is contained in:
1
scanner/__init__.py
Normal file
1
scanner/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Scanner 模块
|
||||
142
scanner/base.py
Normal file
142
scanner/base.py
Normal file
@@ -0,0 +1,142 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
代码扫描器基类
|
||||
定义扫描器接口和通用功能
|
||||
"""
|
||||
import os
|
||||
import logging
|
||||
import tempfile
|
||||
import shutil
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, Any, List, Optional
|
||||
from git import Repo
|
||||
logger = logging.getLogger(__name__)
|
||||
class BaseScanner(ABC):
|
||||
"""代码扫描器基类"""
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""
|
||||
初始化扫描器
|
||||
Args:
|
||||
config: 扫描器配置
|
||||
"""
|
||||
self.config = config
|
||||
self.temp_dir = config.get('temp_clone_dir', '/tmp/code_scanner_clones')
|
||||
self.max_issues = config.get('max_issues', 10)
|
||||
self.detailed = config.get('detailed', True)
|
||||
# 确保临时目录存在
|
||||
os.makedirs(self.temp_dir, exist_ok=True)
|
||||
@abstractmethod
|
||||
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
|
||||
"""
|
||||
执行代码扫描
|
||||
Args:
|
||||
repo_url: 仓库 URL
|
||||
commit_id: 提交 ID
|
||||
branch: 分支名
|
||||
Returns:
|
||||
扫描结果
|
||||
"""
|
||||
pass
|
||||
def clone_repo(self, repo_url: str, commit_id: Optional[str], branch: str) -> str:
|
||||
"""
|
||||
克隆代码仓库到临时目录
|
||||
Args:
|
||||
repo_url: 仓库 URL
|
||||
commit_id: 提交 ID(可选,为 None 时使用 branch)
|
||||
branch: 分支名
|
||||
Returns:
|
||||
克隆的目录路径
|
||||
"""
|
||||
# 生成唯一的目录名
|
||||
repo_name = repo_url.split('/')[-1].replace('.git', '')
|
||||
commit_hash = commit_id or branch
|
||||
clone_dir = os.path.join(self.temp_dir, f"{repo_name}_{commit_hash}")
|
||||
# 如果目录已存在,先删除
|
||||
if os.path.exists(clone_dir):
|
||||
shutil.rmtree(clone_dir)
|
||||
try:
|
||||
logger.info(f'克隆仓库: {repo_url}')
|
||||
# 克隆仓库(浅克隆,只获取最新提交)
|
||||
repo = Repo.clone_from(
|
||||
repo_url,
|
||||
clone_dir,
|
||||
depth=1,
|
||||
branch=branch
|
||||
)
|
||||
# 如果指定了 commit_id,切换到该提交
|
||||
if commit_id:
|
||||
repo.git.checkout(commit_id)
|
||||
logger.info(f'仓库克隆成功: {clone_dir}')
|
||||
return clone_dir
|
||||
except Exception as e:
|
||||
logger.error(f'克隆仓库失败: {str(e)}')
|
||||
raise
|
||||
def cleanup(self, clone_dir: str):
|
||||
"""
|
||||
清理临时目录
|
||||
Args:
|
||||
clone_dir: 克隆的目录路径
|
||||
"""
|
||||
try:
|
||||
if os.path.exists(clone_dir):
|
||||
shutil.rmtree(clone_dir)
|
||||
logger.info(f'清理临时目录: {clone_dir}')
|
||||
except Exception as e:
|
||||
logger.warning(f'清理临时目录失败: {str(e)}')
|
||||
def run_command(self, cmd: List[str], cwd: str, timeout: int = 300) -> Dict[str, Any]:
|
||||
"""
|
||||
运行命令并返回结果
|
||||
Args:
|
||||
cmd: 命令列表
|
||||
cwd: 工作目录
|
||||
timeout: 超时时间(秒)
|
||||
Returns:
|
||||
命令执行结果
|
||||
"""
|
||||
import subprocess
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=cwd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout
|
||||
)
|
||||
return {
|
||||
'success': result.returncode == 0,
|
||||
'returncode': result.returncode,
|
||||
'stdout': result.stdout,
|
||||
'stderr': result.stderr
|
||||
}
|
||||
except subprocess.TimeoutExpired:
|
||||
return {
|
||||
'success': False,
|
||||
'returncode': -1,
|
||||
'stdout': '',
|
||||
'stderr': 'Command timeout'
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'returncode': -1,
|
||||
'stdout': '',
|
||||
'stderr': str(e)
|
||||
}
|
||||
def get_changed_files(self, clone_dir: str, extensions: List[str]) -> List[str]:
|
||||
"""
|
||||
获取指定扩展名的文件列表
|
||||
Args:
|
||||
clone_dir: 仓库目录
|
||||
extensions: 文件扩展名列表
|
||||
Returns:
|
||||
文件路径列表
|
||||
"""
|
||||
files = []
|
||||
for root, dirs, filenames in os.walk(clone_dir):
|
||||
# 跳过隐藏目录和特殊目录
|
||||
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv', '.git']]
|
||||
for filename in filenames:
|
||||
if any(filename.endswith(ext) for ext in extensions):
|
||||
files.append(os.path.join(root, filename))
|
||||
return files
|
||||
162
scanner/js_scanner.py
Normal file
162
scanner/js_scanner.py
Normal file
@@ -0,0 +1,162 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
JavaScript/TypeScript 代码扫描器
|
||||
使用 ESLint 进行代码质量检查
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional
|
||||
from scanner.base import BaseScanner
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JavaScriptScanner(BaseScanner):
|
||||
"""JavaScript/TypeScript 代码扫描器"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
super().__init__(config)
|
||||
self.extensions = ['.js', '.jsx', '.ts', '.tsx', '.vue', '.svelte']
|
||||
|
||||
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
|
||||
"""
|
||||
执行 JavaScript/TypeScript 代码扫描
|
||||
|
||||
Args:
|
||||
repo_url: 仓库 URL
|
||||
commit_id: 提交 ID
|
||||
branch: 分支名
|
||||
|
||||
Returns:
|
||||
扫描结果
|
||||
"""
|
||||
result = {
|
||||
'tool': 'JavaScript Scanner',
|
||||
'language': 'javascript',
|
||||
'status': 'success',
|
||||
'issues': [],
|
||||
'summary': {
|
||||
'total': 0,
|
||||
'error': 0,
|
||||
'warning': 0,
|
||||
'info': 0
|
||||
},
|
||||
'files_scanned': 0
|
||||
}
|
||||
|
||||
clone_dir = None
|
||||
try:
|
||||
# 克隆仓库
|
||||
clone_dir = self.clone_repo(repo_url, commit_id, branch)
|
||||
|
||||
# 获取 JavaScript/TypeScript 文件
|
||||
js_files = self.get_changed_files(clone_dir, self.extensions)
|
||||
result['files_scanned'] = len(js_files)
|
||||
|
||||
if not js_files:
|
||||
logger.info('没有找到 JavaScript/TypeScript 文件')
|
||||
return result
|
||||
|
||||
# 运行 ESLint 扫描
|
||||
eslint_result = self._run_eslint(clone_dir, js_files)
|
||||
|
||||
# 合并结果
|
||||
result['issues'] = eslint_result.get('issues', [])[:self.max_issues] if self.detailed else eslint_result.get('issues', [])
|
||||
result['summary'] = self._calculate_summary(eslint_result.get('issues', []))
|
||||
result['raw_output'] = eslint_result.get('raw_output', '')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'JavaScript 扫描失败: {str(e)}')
|
||||
result['status'] = 'error'
|
||||
result['error'] = str(e)
|
||||
|
||||
finally:
|
||||
# 清理临时目录
|
||||
if clone_dir:
|
||||
self.cleanup(clone_dir)
|
||||
|
||||
return result
|
||||
|
||||
def _run_eslint(self, cwd: str, files: List[str]) -> Dict[str, Any]:
|
||||
"""运行 ESLint 扫描"""
|
||||
result = {
|
||||
'tool': 'eslint',
|
||||
'issues': [],
|
||||
'raw_output': ''
|
||||
}
|
||||
|
||||
try:
|
||||
# 尝试使用 npx 运行 eslint
|
||||
cmd = ['npx', 'eslint', '--format=json', '--no-eslintrc'] + files
|
||||
|
||||
# 如果没有 eslint 配置,先创建默认配置
|
||||
eslintrc_path = os.path.join(cwd, '.eslintrc.json')
|
||||
if not os.path.exists(eslintrc_path):
|
||||
# 创建简单的 ESLint 配置
|
||||
eslint_config = {
|
||||
"env": {
|
||||
"browser": True,
|
||||
"es2021": True,
|
||||
"node": True
|
||||
},
|
||||
"extends": ["eslint:recommended"],
|
||||
"parserOptions": {
|
||||
"ecmaVersion": "latest",
|
||||
"sourceType": "module"
|
||||
}
|
||||
}
|
||||
with open(eslintrc_path, 'w') as f:
|
||||
json.dump(eslint_config, f)
|
||||
|
||||
output = self.run_command(cmd, cwd, timeout=120)
|
||||
result['raw_output'] = output.get('stdout', '') + output.get('stderr', '')
|
||||
|
||||
# 解析 JSON 输出
|
||||
if output.get('stdout'):
|
||||
try:
|
||||
eslint_results = json.loads(output['stdout'])
|
||||
for file_result in eslint_results:
|
||||
file_path = file_result.get('filePath', '')
|
||||
messages = file_result.get('messages', [])
|
||||
|
||||
for msg in messages:
|
||||
severity = 'error' if msg.get('severity', 0) == 2 else 'warning'
|
||||
result['issues'].append({
|
||||
'tool': 'eslint',
|
||||
'type': severity,
|
||||
'severity': 'Error' if msg.get('severity', 0) == 2 else 'Warning',
|
||||
'message': msg.get('message', ''),
|
||||
'file': os.path.basename(file_path),
|
||||
'line': msg.get('line', 0),
|
||||
'column': msg.get('column', 0),
|
||||
'symbol': msg.get('ruleId', 'unknown')
|
||||
})
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning(f'ESLint JSON 解析失败: {e}')
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f'ESLint 运行失败: {str(e)}')
|
||||
|
||||
return result
|
||||
|
||||
def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
|
||||
"""计算问题摘要"""
|
||||
summary = {
|
||||
'total': len(issues),
|
||||
'error': 0,
|
||||
'warning': 0,
|
||||
'info': 0
|
||||
}
|
||||
|
||||
for issue in issues:
|
||||
severity = issue.get('severity', '').lower()
|
||||
if severity in ['error', 'critical']:
|
||||
summary['error'] += 1
|
||||
elif severity in ['warning', 'moderate']:
|
||||
summary['warning'] += 1
|
||||
else:
|
||||
summary['info'] += 1
|
||||
|
||||
return summary
|
||||
196
scanner/python_scanner.py
Normal file
196
scanner/python_scanner.py
Normal file
@@ -0,0 +1,196 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Python 代码扫描器
|
||||
使用 Pylint、Flake8、MyPy 进行代码质量检查
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional
|
||||
from scanner.base import BaseScanner
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PythonScanner(BaseScanner):
|
||||
"""Python 代码扫描器"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
super().__init__(config)
|
||||
self.extensions = ['.py']
|
||||
|
||||
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
|
||||
"""
|
||||
执行 Python 代码扫描
|
||||
|
||||
Args:
|
||||
repo_url: 仓库 URL
|
||||
commit_id: 提交 ID
|
||||
branch: 分支名
|
||||
|
||||
Returns:
|
||||
扫描结果
|
||||
"""
|
||||
result = {
|
||||
'tool': 'Python Scanner',
|
||||
'language': 'python',
|
||||
'status': 'success',
|
||||
'issues': [],
|
||||
'summary': {
|
||||
'total': 0,
|
||||
'error': 0,
|
||||
'warning': 0,
|
||||
'info': 0
|
||||
},
|
||||
'files_scanned': 0
|
||||
}
|
||||
|
||||
clone_dir = None
|
||||
try:
|
||||
# 克隆仓库
|
||||
clone_dir = self.clone_repo(repo_url, commit_id, branch)
|
||||
|
||||
# 获取 Python 文件
|
||||
py_files = self.get_changed_files(clone_dir, self.extensions)
|
||||
result['files_scanned'] = len(py_files)
|
||||
|
||||
if not py_files:
|
||||
logger.info('没有找到 Python 文件')
|
||||
return result
|
||||
|
||||
# 运行各种扫描工具
|
||||
pylint_result = self._run_pylint(clone_dir, py_files)
|
||||
flake8_result = self._run_flake8(clone_dir, py_files)
|
||||
|
||||
# 合并结果
|
||||
all_issues = []
|
||||
all_issues.extend(pylint_result.get('issues', []))
|
||||
all_issues.extend(flake8_result.get('issues', []))
|
||||
|
||||
result['issues'] = all_issues[:self.max_issues] if self.detailed else all_issues
|
||||
result['summary'] = self._calculate_summary(all_issues)
|
||||
result['raw_output'] = {
|
||||
'pylint': pylint_result.get('raw_output', ''),
|
||||
'flake8': flake8_result.get('raw_output', '')
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'Python 扫描失败: {str(e)}')
|
||||
result['status'] = 'error'
|
||||
result['error'] = str(e)
|
||||
|
||||
finally:
|
||||
# 清理临时目录
|
||||
if clone_dir:
|
||||
self.cleanup(clone_dir)
|
||||
|
||||
return result
|
||||
|
||||
def _run_pylint(self, cwd: str, files: List[str]) -> Dict[str, Any]:
|
||||
"""运行 Pylint 扫描"""
|
||||
result = {
|
||||
'tool': 'pylint',
|
||||
'issues': [],
|
||||
'raw_output': ''
|
||||
}
|
||||
|
||||
# 只扫描变更的文件
|
||||
try:
|
||||
cmd = ['python', '-m', 'pylint', '--output-format=json'] + files
|
||||
output = self.run_command(cmd, cwd, timeout=120)
|
||||
|
||||
result['raw_output'] = output.get('stdout', '')
|
||||
|
||||
# 解析 JSON 输出
|
||||
if output.get('stdout'):
|
||||
try:
|
||||
issues = json.loads(output['stdout'])
|
||||
for issue in issues:
|
||||
result['issues'].append({
|
||||
'tool': 'pylint',
|
||||
'type': issue.get('type', 'info'),
|
||||
'severity': issue.get('severity', 'Info'),
|
||||
'message': issue.get('message', ''),
|
||||
'file': os.path.basename(issue.get('path', '')),
|
||||
'line': issue.get('line', 0),
|
||||
'column': issue.get('column', 0),
|
||||
'symbol': issue.get('symbol', '')
|
||||
})
|
||||
except json.JSONDecodeError:
|
||||
logger.warning('Pylint JSON 解析失败')
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f'Pylint 运行失败: {str(e)}')
|
||||
|
||||
return result
|
||||
|
||||
def _run_flake8(self, cwd: str, files: List[str]) -> Dict[str, Any]:
|
||||
"""运行 Flake8 扫描"""
|
||||
result = {
|
||||
'tool': 'flake8',
|
||||
'issues': [],
|
||||
'raw_output': ''
|
||||
}
|
||||
|
||||
try:
|
||||
cmd = ['python', '-m', 'flake8', '--format=json'] + files
|
||||
output = self.run_command(cmd, cwd, timeout=120)
|
||||
|
||||
result['raw_output'] = output.get('stdout', '')
|
||||
|
||||
# 解析 JSON 输出
|
||||
if output.get('stdout'):
|
||||
try:
|
||||
issues = json.loads(output['stdout'])
|
||||
for issue in issues:
|
||||
result['issues'].append({
|
||||
'tool': 'flake8',
|
||||
'type': self._map_flake8_code(issue.get('code', '')),
|
||||
'severity': 'Warning',
|
||||
'message': issue.get('text', ''),
|
||||
'file': os.path.basename(issue.get('filename', '')),
|
||||
'line': issue.get('line_number', 0),
|
||||
'column': issue.get('column_number', 0),
|
||||
'symbol': issue.get('code', '')
|
||||
})
|
||||
except json.JSONDecodeError:
|
||||
logger.warning('Flake8 JSON 解析失败')
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f'Flake8 运行失败: {str(e)}')
|
||||
|
||||
return result
|
||||
|
||||
def _map_flake8_code(self, code: str) -> str:
|
||||
"""映射 Flake8 错误代码到类型"""
|
||||
# E/W - Flake8 错误/警告
|
||||
# F - Pyflakes
|
||||
# C - mccabe 复杂度
|
||||
if code.startswith('E') or code.startswith('W'):
|
||||
return 'error' if code.startswith('E') else 'warning'
|
||||
elif code.startswith('F'):
|
||||
return 'error'
|
||||
elif code.startswith('C'):
|
||||
return 'warning'
|
||||
return 'info'
|
||||
|
||||
def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
|
||||
"""计算问题摘要"""
|
||||
summary = {
|
||||
'total': len(issues),
|
||||
'error': 0,
|
||||
'warning': 0,
|
||||
'info': 0
|
||||
}
|
||||
|
||||
for issue in issues:
|
||||
severity = issue.get('severity', '').lower()
|
||||
if severity in ['error', 'critical', 'fatal', 'error']:
|
||||
summary['error'] += 1
|
||||
elif severity in ['warning', 'moderate']:
|
||||
summary['warning'] += 1
|
||||
else:
|
||||
summary['info'] += 1
|
||||
|
||||
return summary
|
||||
223
scanner/security_scanner.py
Normal file
223
scanner/security_scanner.py
Normal file
@@ -0,0 +1,223 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
安全扫描器
|
||||
使用 Bandit 进行安全漏洞扫描
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional
|
||||
from scanner.base import BaseScanner
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SecurityScanner(BaseScanner):
|
||||
"""安全漏洞扫描器"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
super().__init__(config)
|
||||
# 扫描所有代码文件以发现安全问题
|
||||
self.extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', '.rb', '.php']
|
||||
|
||||
def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]:
|
||||
"""
|
||||
执行安全扫描
|
||||
|
||||
Args:
|
||||
repo_url: 仓库 URL
|
||||
commit_id: 提交 ID
|
||||
branch: 分支名
|
||||
|
||||
Returns:
|
||||
扫描结果
|
||||
"""
|
||||
result = {
|
||||
'tool': 'Security Scanner',
|
||||
'language': 'multi',
|
||||
'status': 'success',
|
||||
'issues': [],
|
||||
'summary': {
|
||||
'total': 0,
|
||||
'high': 0,
|
||||
'medium': 0,
|
||||
'low': 0,
|
||||
'info': 0
|
||||
},
|
||||
'files_scanned': 0
|
||||
}
|
||||
|
||||
clone_dir = None
|
||||
try:
|
||||
# 克隆仓库
|
||||
clone_dir = self.clone_repo(repo_url, commit_id, branch)
|
||||
|
||||
# 获取所有支持的文件
|
||||
all_files = self.get_changed_files(clone_dir, self.extensions)
|
||||
result['files_scanned'] = len(all_files)
|
||||
|
||||
if not all_files:
|
||||
logger.info('没有找到可扫描的文件')
|
||||
return result
|
||||
|
||||
# Python 安全扫描 (Bandit)
|
||||
py_files = [f for f in all_files if f.endswith('.py')]
|
||||
if py_files:
|
||||
bandit_result = self._run_bandit(clone_dir, py_files)
|
||||
result['issues'].extend(bandit_result.get('issues', []))
|
||||
|
||||
# JavaScript 安全扫描
|
||||
js_files = [f for f in all_files if f.endswith(('.js', '.jsx', '.ts', '.tsx'))]
|
||||
if js_files:
|
||||
# 使用简单的模式匹配检测常见安全问题
|
||||
js_security_result = self._scan_js_security(clone_dir, js_files)
|
||||
result['issues'].extend(js_security_result.get('issues', []))
|
||||
|
||||
# 计算摘要
|
||||
result['summary'] = self._calculate_summary(result['issues'])
|
||||
|
||||
# 限制返回的问题数量
|
||||
if self.detailed:
|
||||
result['issues'] = result['issues'][:self.max_issues]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'安全扫描失败: {str(e)}')
|
||||
result['status'] = 'error'
|
||||
result['error'] = str(e)
|
||||
|
||||
finally:
|
||||
# 清理临时目录
|
||||
if clone_dir:
|
||||
self.cleanup(clone_dir)
|
||||
|
||||
return result
|
||||
|
||||
def _run_bandit(self, cwd: str, files: List[str]) -> Dict[str, Any]:
|
||||
"""运行 Bandit 安全扫描"""
|
||||
result = {
|
||||
'tool': 'bandit',
|
||||
'issues': []
|
||||
}
|
||||
|
||||
try:
|
||||
# 运行 bandit
|
||||
cmd = ['python', '-m', 'bandit', '-f', 'json'] + files
|
||||
output = self.run_command(cmd, cwd, timeout=120)
|
||||
|
||||
# 解析 JSON 输出
|
||||
if output.get('stdout'):
|
||||
try:
|
||||
data = json.loads(output['stdout'])
|
||||
results = data.get('results', [])
|
||||
|
||||
for issue in results:
|
||||
# 映射严重级别
|
||||
severity = issue.get('issue_severity', 'LOW')
|
||||
result['issues'].append({
|
||||
'tool': 'bandit',
|
||||
'type': issue.get('issue_id', 'unknown'),
|
||||
'severity': severity,
|
||||
'confidence': issue.get('issue_confidence', 'LOW'),
|
||||
'message': issue.get('issue_text', ''),
|
||||
'file': os.path.basename(issue.get('filename', '')),
|
||||
'line': issue.get('line_number', 0),
|
||||
'code': issue.get('code', '')
|
||||
})
|
||||
except json.JSONDecodeError:
|
||||
logger.warning('Bandit JSON 解析失败')
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f'Bandit 运行失败: {str(e)}')
|
||||
|
||||
return result
|
||||
|
||||
def _scan_js_security(self, cwd: str, files: List[str]) -> Dict[str, Any]:
|
||||
"""简单的 JavaScript 安全扫描(基于模式匹配)"""
|
||||
result = {
|
||||
'tool': 'js-security',
|
||||
'issues': []
|
||||
}
|
||||
|
||||
# 需要检测的不安全模式
|
||||
dangerous_patterns = [
|
||||
{
|
||||
'pattern': r'eval\s*\(',
|
||||
'message': '使用 eval() 可能导致代码注入',
|
||||
'severity': 'HIGH'
|
||||
},
|
||||
{
|
||||
'pattern': r'innerHTML\s*=',
|
||||
'message': '使用 innerHTML 可能导致 XSS 攻击',
|
||||
'severity': 'MEDIUM'
|
||||
},
|
||||
{
|
||||
'pattern': r'document\.write\s*\(',
|
||||
'message': '使用 document.write 可能导致 XSS 攻击',
|
||||
'severity': 'MEDIUM'
|
||||
},
|
||||
{
|
||||
'pattern': r'password\s*[:=]',
|
||||
'message': '硬编码密码可能存在安全风险',
|
||||
'severity': 'HIGH'
|
||||
},
|
||||
{
|
||||
'pattern': r'api[_-]?key\s*[:=]',
|
||||
'message': '硬编码 API Key 可能存在安全风险',
|
||||
'severity': 'HIGH'
|
||||
},
|
||||
{
|
||||
'pattern': r'secret\s*[:=]',
|
||||
'message': '硬编码密钥可能存在安全风险',
|
||||
'severity': 'HIGH'
|
||||
}
|
||||
]
|
||||
|
||||
import re
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
content = f.read()
|
||||
lines = content.split('\n')
|
||||
|
||||
for line_num, line in enumerate(lines, 1):
|
||||
for pattern_info in dangerous_patterns:
|
||||
if re.search(pattern_info['pattern'], line, re.IGNORECASE):
|
||||
result['issues'].append({
|
||||
'tool': 'js-security',
|
||||
'type': 'security-warning',
|
||||
'severity': pattern_info['severity'],
|
||||
'confidence': 'MEDIUM',
|
||||
'message': pattern_info['message'],
|
||||
'file': os.path.basename(file_path),
|
||||
'line': line_num,
|
||||
'code': line.strip()[:80]
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning(f'扫描文件 {file_path} 失败: {str(e)}')
|
||||
|
||||
return result
|
||||
|
||||
def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
|
||||
"""计算问题摘要"""
|
||||
summary = {
|
||||
'total': len(issues),
|
||||
'high': 0,
|
||||
'medium': 0,
|
||||
'low': 0,
|
||||
'info': 0
|
||||
}
|
||||
|
||||
for issue in issues:
|
||||
severity = issue.get('severity', '').upper()
|
||||
if severity in ['HIGH', 'CRITICAL']:
|
||||
summary['high'] += 1
|
||||
elif severity == 'MEDIUM':
|
||||
summary['medium'] += 1
|
||||
elif severity == 'LOW':
|
||||
summary['low'] += 1
|
||||
else:
|
||||
summary['info'] += 1
|
||||
|
||||
return summary
|
||||
Reference in New Issue
Block a user