225 lines
7.9 KiB
Python
225 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
安全扫描器
|
||
使用 Bandit 进行安全漏洞扫描
|
||
"""
|
||
import os
|
||
import json
|
||
import logging
|
||
from typing import Dict, Any, List, Optional
|
||
from scanner.base import BaseScanner
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class SecurityScanner(BaseScanner):
|
||
"""安全漏洞扫描器"""
|
||
|
||
def __init__(self, config: Dict[str, Any]):
|
||
super().__init__(config)
|
||
# 扫描所有代码文件以发现安全问题
|
||
self.extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', '.rb', '.php']
|
||
|
||
def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]:
|
||
"""
|
||
执行安全扫描
|
||
|
||
Args:
|
||
repo_url: 仓库 URL
|
||
commit_id: 提交 ID
|
||
branch: 分支名
|
||
changed_files: 可选的变更文件列表(来自 PR)
|
||
|
||
Returns:
|
||
扫描结果
|
||
"""
|
||
result = {
|
||
'tool': 'Security Scanner',
|
||
'language': 'multi',
|
||
'status': 'success',
|
||
'issues': [],
|
||
'summary': {
|
||
'total': 0,
|
||
'high': 0,
|
||
'medium': 0,
|
||
'low': 0,
|
||
'info': 0
|
||
},
|
||
'files_scanned': 0
|
||
}
|
||
|
||
clone_dir = None
|
||
try:
|
||
# 克隆仓库
|
||
clone_dir = self.clone_repo(repo_url, commit_id, branch)
|
||
|
||
# 获取所有支持的文件(只扫描变更的文件)
|
||
all_files = self.get_changed_files(clone_dir, self.extensions, changed_files)
|
||
result['files_scanned'] = len(all_files)
|
||
|
||
if not all_files:
|
||
logger.info('没有找到可扫描的文件')
|
||
return result
|
||
|
||
# Python 安全扫描 (Bandit)
|
||
py_files = [f for f in all_files if f.endswith('.py')]
|
||
if py_files:
|
||
bandit_result = self._run_bandit(clone_dir, py_files)
|
||
result['issues'].extend(bandit_result.get('issues', []))
|
||
|
||
# JavaScript 安全扫描
|
||
js_files = [f for f in all_files if f.endswith(('.js', '.jsx', '.ts', '.tsx'))]
|
||
if js_files:
|
||
# 使用简单的模式匹配检测常见安全问题
|
||
js_security_result = self._scan_js_security(clone_dir, js_files)
|
||
result['issues'].extend(js_security_result.get('issues', []))
|
||
|
||
# 计算摘要
|
||
result['summary'] = self._calculate_summary(result['issues'])
|
||
|
||
# 限制返回的问题数量
|
||
if self.detailed:
|
||
result['issues'] = result['issues'][:self.max_issues]
|
||
|
||
except Exception as e:
|
||
logger.error(f'安全扫描失败: {str(e)}')
|
||
result['status'] = 'error'
|
||
result['error'] = str(e)
|
||
|
||
return result
|
||
|
||
def _run_bandit(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
|
||
"""运行 Bandit 安全扫描"""
|
||
result = {
|
||
'tool': 'bandit',
|
||
'issues': []
|
||
}
|
||
|
||
try:
|
||
# 运行 bandit
|
||
cmd = ['python', '-m', 'bandit', '-f', 'json'] + files
|
||
output = self.run_command(cmd, clone_dir, timeout=120)
|
||
|
||
# 解析 JSON 输出
|
||
if output.get('stdout'):
|
||
try:
|
||
data = json.loads(output['stdout'])
|
||
results = data.get('results', [])
|
||
|
||
for issue in results:
|
||
# 使用相对于 clone_dir 的路径
|
||
full_path = issue.get('filename', '')
|
||
rel_path = os.path.relpath(full_path, clone_dir) if full_path else ''
|
||
# 映射严重级别
|
||
severity = issue.get('issue_severity', 'LOW')
|
||
result['issues'].append({
|
||
'tool': 'bandit',
|
||
'type': issue.get('issue_id', 'unknown'),
|
||
'severity': severity,
|
||
'confidence': issue.get('issue_confidence', 'LOW'),
|
||
'message': issue.get('issue_text', ''),
|
||
'file': rel_path,
|
||
'line': issue.get('line_number', 0),
|
||
'code': issue.get('code', '')
|
||
})
|
||
except json.JSONDecodeError:
|
||
logger.warning('Bandit JSON 解析失败')
|
||
|
||
except Exception as e:
|
||
logger.warning(f'Bandit 运行失败: {str(e)}')
|
||
|
||
return result
|
||
|
||
def _scan_js_security(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
|
||
"""简单的 JavaScript 安全扫描(基于模式匹配)"""
|
||
result = {
|
||
'tool': 'js-security',
|
||
'issues': []
|
||
}
|
||
|
||
# 需要检测的不安全模式
|
||
dangerous_patterns = [
|
||
{
|
||
'pattern': r'eval\s*\(',
|
||
'message': '使用 eval() 可能导致代码注入',
|
||
'severity': 'HIGH'
|
||
},
|
||
{
|
||
'pattern': r'innerHTML\s*=',
|
||
'message': '使用 innerHTML 可能导致 XSS 攻击',
|
||
'severity': 'MEDIUM'
|
||
},
|
||
{
|
||
'pattern': r'document\.write\s*\(',
|
||
'message': '使用 document.write 可能导致 XSS 攻击',
|
||
'severity': 'MEDIUM'
|
||
},
|
||
{
|
||
'pattern': r'password\s*[:=]',
|
||
'message': '硬编码密码可能存在安全风险',
|
||
'severity': 'HIGH'
|
||
},
|
||
{
|
||
'pattern': r'api[_-]?key\s*[:=]',
|
||
'message': '硬编码 API Key 可能存在安全风险',
|
||
'severity': 'HIGH'
|
||
},
|
||
{
|
||
'pattern': r'secret\s*[:=]',
|
||
'message': '硬编码密钥可能存在安全风险',
|
||
'severity': 'HIGH'
|
||
}
|
||
]
|
||
|
||
import re
|
||
|
||
for file_path in files:
|
||
try:
|
||
# 使用相对于 clone_dir 的路径
|
||
rel_path = os.path.relpath(file_path, clone_dir) if file_path else ''
|
||
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
||
content = f.read()
|
||
lines = content.split('\n')
|
||
|
||
for line_num, line in enumerate(lines, 1):
|
||
for pattern_info in dangerous_patterns:
|
||
if re.search(pattern_info['pattern'], line, re.IGNORECASE):
|
||
result['issues'].append({
|
||
'tool': 'js-security',
|
||
'type': 'security-warning',
|
||
'severity': pattern_info['severity'],
|
||
'confidence': 'MEDIUM',
|
||
'message': pattern_info['message'],
|
||
'file': rel_path,
|
||
'line': line_num,
|
||
'code': line.strip()[:80]
|
||
})
|
||
except Exception as e:
|
||
logger.warning(f'扫描文件 {file_path} 失败: {str(e)}')
|
||
|
||
return result
|
||
|
||
def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
|
||
"""计算问题摘要"""
|
||
summary = {
|
||
'total': len(issues),
|
||
'high': 0,
|
||
'medium': 0,
|
||
'low': 0,
|
||
'info': 0
|
||
}
|
||
|
||
for issue in issues:
|
||
severity = issue.get('severity', '').upper()
|
||
if severity in ['HIGH', 'CRITICAL']:
|
||
summary['high'] += 1
|
||
elif severity == 'MEDIUM':
|
||
summary['medium'] += 1
|
||
elif severity == 'LOW':
|
||
summary['low'] += 1
|
||
else:
|
||
summary['info'] += 1
|
||
|
||
return summary
|