Files
code_scan/scanner/security_scanner.py
Dang Zerong 027cf50759 add web
2026-03-12 14:42:23 +08:00

225 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
安全扫描器
使用 Bandit 进行安全漏洞扫描
"""
import os
import json
import logging
from typing import Dict, Any, List, Optional
from scanner.base import BaseScanner
logger = logging.getLogger(__name__)
class SecurityScanner(BaseScanner):
"""安全漏洞扫描器"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
# 扫描所有代码文件以发现安全问题
self.extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', '.rb', '.php']
def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]:
"""
执行安全扫描
Args:
repo_url: 仓库 URL
commit_id: 提交 ID
branch: 分支名
changed_files: 可选的变更文件列表(来自 PR
Returns:
扫描结果
"""
result = {
'tool': 'Security Scanner',
'language': 'multi',
'status': 'success',
'issues': [],
'summary': {
'total': 0,
'high': 0,
'medium': 0,
'low': 0,
'info': 0
},
'files_scanned': 0
}
clone_dir = None
try:
# 克隆仓库
clone_dir = self.clone_repo(repo_url, commit_id, branch)
# 获取所有支持的文件(只扫描变更的文件)
all_files = self.get_changed_files(clone_dir, self.extensions, changed_files)
result['files_scanned'] = len(all_files)
if not all_files:
logger.info('没有找到可扫描的文件')
return result
# Python 安全扫描 (Bandit)
py_files = [f for f in all_files if f.endswith('.py')]
if py_files:
bandit_result = self._run_bandit(clone_dir, py_files)
result['issues'].extend(bandit_result.get('issues', []))
# JavaScript 安全扫描
js_files = [f for f in all_files if f.endswith(('.js', '.jsx', '.ts', '.tsx'))]
if js_files:
# 使用简单的模式匹配检测常见安全问题
js_security_result = self._scan_js_security(clone_dir, js_files)
result['issues'].extend(js_security_result.get('issues', []))
# 计算摘要
result['summary'] = self._calculate_summary(result['issues'])
# 限制返回的问题数量
if self.detailed:
result['issues'] = result['issues'][:self.max_issues]
except Exception as e:
logger.error(f'安全扫描失败: {str(e)}')
result['status'] = 'error'
result['error'] = str(e)
return result
def _run_bandit(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""运行 Bandit 安全扫描"""
result = {
'tool': 'bandit',
'issues': []
}
try:
# 运行 bandit
cmd = ['python', '-m', 'bandit', '-f', 'json'] + files
output = self.run_command(cmd, clone_dir, timeout=120)
# 解析 JSON 输出
if output.get('stdout'):
try:
data = json.loads(output['stdout'])
results = data.get('results', [])
for issue in results:
# 使用相对于 clone_dir 的路径
full_path = issue.get('filename', '')
rel_path = os.path.relpath(full_path, clone_dir) if full_path else ''
# 映射严重级别
severity = issue.get('issue_severity', 'LOW')
result['issues'].append({
'tool': 'bandit',
'type': issue.get('issue_id', 'unknown'),
'severity': severity,
'confidence': issue.get('issue_confidence', 'LOW'),
'message': issue.get('issue_text', ''),
'file': rel_path,
'line': issue.get('line_number', 0),
'code': issue.get('code', '')
})
except json.JSONDecodeError:
logger.warning('Bandit JSON 解析失败')
except Exception as e:
logger.warning(f'Bandit 运行失败: {str(e)}')
return result
def _scan_js_security(self, clone_dir: str, files: List[str]) -> Dict[str, Any]:
"""简单的 JavaScript 安全扫描(基于模式匹配)"""
result = {
'tool': 'js-security',
'issues': []
}
# 需要检测的不安全模式
dangerous_patterns = [
{
'pattern': r'eval\s*\(',
'message': '使用 eval() 可能导致代码注入',
'severity': 'HIGH'
},
{
'pattern': r'innerHTML\s*=',
'message': '使用 innerHTML 可能导致 XSS 攻击',
'severity': 'MEDIUM'
},
{
'pattern': r'document\.write\s*\(',
'message': '使用 document.write 可能导致 XSS 攻击',
'severity': 'MEDIUM'
},
{
'pattern': r'password\s*[:=]',
'message': '硬编码密码可能存在安全风险',
'severity': 'HIGH'
},
{
'pattern': r'api[_-]?key\s*[:=]',
'message': '硬编码 API Key 可能存在安全风险',
'severity': 'HIGH'
},
{
'pattern': r'secret\s*[:=]',
'message': '硬编码密钥可能存在安全风险',
'severity': 'HIGH'
}
]
import re
for file_path in files:
try:
# 使用相对于 clone_dir 的路径
rel_path = os.path.relpath(file_path, clone_dir) if file_path else ''
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
for line_num, line in enumerate(lines, 1):
for pattern_info in dangerous_patterns:
if re.search(pattern_info['pattern'], line, re.IGNORECASE):
result['issues'].append({
'tool': 'js-security',
'type': 'security-warning',
'severity': pattern_info['severity'],
'confidence': 'MEDIUM',
'message': pattern_info['message'],
'file': rel_path,
'line': line_num,
'code': line.strip()[:80]
})
except Exception as e:
logger.warning(f'扫描文件 {file_path} 失败: {str(e)}')
return result
def _calculate_summary(self, issues: List[Dict]) -> Dict[str, int]:
"""计算问题摘要"""
summary = {
'total': len(issues),
'high': 0,
'medium': 0,
'low': 0,
'info': 0
}
for issue in issues:
severity = issue.get('severity', '').upper()
if severity in ['HIGH', 'CRITICAL']:
summary['high'] += 1
elif severity == 'MEDIUM':
summary['medium'] += 1
elif severity == 'LOW':
summary['low'] += 1
else:
summary['info'] += 1
return summary