diff --git a/app.py b/app.py index 699c21d..9b166b6 100644 --- a/app.py +++ b/app.py @@ -206,41 +206,51 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: if web_url: clone_url = web_url.rstrip('/') + '.git' + # 获取 PR 中变更的文件列表 + changed_files = [] + try: + if '/' in repo_name: + repo_owner, repo_name_only = repo_name.split('/', 1) + else: + repo_owner = 'Bosch_Demo' + repo_name_only = repo_name + + pr_files = gitea_client.get_pull_request_files(repo_owner, repo_name_only, pr_number) + if pr_files: + changed_files = [f.get('filename', '') for f in pr_files if f.get('filename')] + logger.info(f"获取到 PR #{pr_number} 的变更文件: {changed_files}") + except Exception as e: + logger.warning(f"获取 PR 文件列表失败: {e}") + # 执行代码扫描 scan_results = {} # Python 扫描 if 'python' in config.get('scanner', {}).get('languages', []): scan_results['python'] = python_scanner.scan( - clone_url, source_sha, source_branch + clone_url, source_sha, source_branch, changed_files ) # JavaScript/TypeScript 扫描 if any(lang in config.get('scanner', {}).get('languages', []) for lang in ['javascript', 'typescript']): scan_results['javascript'] = js_scanner.scan( - clone_url, source_sha, source_branch + clone_url, source_sha, source_branch, changed_files ) # 安全扫描 scan_results['security'] = security_scanner.scan( - clone_url, source_sha, source_branch + clone_url, source_sha, source_branch, changed_files ) # AI 代码审查 if config.get('ai', {}).get('enabled', False): scan_results['ai'] = ai_reviewer.scan( - clone_url, source_sha, source_branch + clone_url, source_sha, source_branch, changed_files ) # 获取 PR 的代码差异,用于将问题与代码片段关联 pr_diff = None - if '/' in repo_name: - repo_owner, repo_name_only = repo_name.split('/', 1) - else: - repo_owner = 'Bosch_Demo' - repo_name_only = repo_name - try: pr_diff = gitea_client.get_pull_request_diff(repo_owner, repo_name_only, pr_number) logger.info(f"已获取 PR #{pr_number} 的 diff,长度: {len(pr_diff) if pr_diff else 0}") @@ -249,6 +259,12 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]: # 将问题与代码片段关联 scan_details_with_code = merge_issues_with_code(scan_results, pr_diff or '') + logger.info(f"[DEBUG] scan_results keys: {list(scan_results.keys())}") + for k, v in scan_results.items(): + if isinstance(v, dict): + issues_cnt = len(v.get('issues', [])) + logger.info(f"[DEBUG] scan_results['{k}'] issues count: {issues_cnt}") + logger.info(f"[DEBUG] scan_details_with_code scanners: {[s.get('name') for s in scan_details_with_code.get('scanners', [])] if scan_details_with_code else 'None'}") # 生成报告 commit_message = f'PR #{pr_number}: {pr_title}' @@ -580,12 +596,24 @@ def api_get_pr_file_content(pr_id): # 获取该文件的扫描问题(PR 创建时已扫描并存入 scan_details_with_code) scan_issues = [] path_norm = path.replace('\\', '/').strip() + logger.info(f"[DEBUG] 请求文件: path_norm={path_norm}") scan_details = pr.get('scan_details_with_code') if isinstance(scan_details, str): try: scan_details = json.loads(scan_details) except Exception: scan_details = None + if scan_details: + logger.info(f"[DEBUG] scan_details keys: {list(scan_details.keys()) if isinstance(scan_details, dict) else 'not dict'}") + if scan_details.get('scanners'): + logger.info(f"[DEBUG] scanners count: {len(scan_details['scanners'])}") + for scanner in scan_details['scanners']: + scanner_name = scanner.get('name', '') + issues_count = len(scanner.get('issues', [])) + logger.info(f"[DEBUG] scanner={scanner_name}, issues_count={issues_count}") + # 打印前几个 issue 的 file 看看 + for idx, issue in enumerate(scanner.get('issues', [])[:3]): + logger.info(f"[DEBUG] issue[{idx}] file={issue.get('file')}, line={issue.get('line')}") if scan_details and scan_details.get('scanners'): for scanner in scan_details['scanners']: for issue in scanner.get('issues', []): @@ -594,6 +622,7 @@ def api_get_pr_file_content(pr_id): continue # 匹配:精确相等或一端包含另一端(兼容 basename 或完整路径) if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm): + logger.info(f"[DEBUG] 匹配成功: issue_file={issue_file}, path_norm={path_norm}") sev = (issue.get('severity') or 'info') if isinstance(sev, str): sev = sev.lower() @@ -606,8 +635,33 @@ def api_get_pr_file_content(pr_id): 'message': (issue.get('message') or issue.get('description') or '').strip(), 'code_context': issue.get('code_context') }) + logger.info(f"[DEBUG] 最终 scan_issues count: {len(scan_issues)}") - return jsonify({'path': path, 'content': content, 'scan_issues': scan_issues}) + # 获取 AI 审查结果 + ai_issues = [] + if scan_details and scan_details.get('ai'): + ai_data = scan_details['ai'] + for issue in ai_data.get('issues', []): + issue_file = (issue.get('file') or '').replace('\\', '/').strip() + if not issue_file: + continue + # 匹配:精确相等或一端包含另一端 + if path_norm == issue_file or path_norm.endswith(issue_file) or issue_file.endswith(path_norm): + ai_issues.append({ + 'scanner': 'AI', + 'severity': issue.get('severity', 'info'), + 'line': int(issue.get('line') or 1), + 'message': issue.get('message', ''), + 'category': 'ai', + 'code_context': issue.get('code_context') + }) + + logger.info(f"[DEBUG] AI issues count: {len(ai_issues)}") + + # 合并静态扫描问题和 AI 问题 + all_issues = scan_issues + ai_issues + + return jsonify({'path': path, 'content': content, 'scan_issues': all_issues}) except Exception as e: logger.error(f'获取文件内容失败: {str(e)}') return jsonify({'error': str(e)}), 500 diff --git a/config.yaml b/config.yaml index 7521d1f..8b8e755 100644 --- a/config.yaml +++ b/config.yaml @@ -49,8 +49,8 @@ ai: # AI 审查器配置 # 支持: "ollama" (本地) 或 "api" (在线API) provider: "api" - # 模型名称(硅基流动可用模型) - model: "Qwen/Qwen2.5-7B-Instruct" + # 模型名称(硅基流动可用模型)- Qwen 最强语言模型 + model: "deepseek-ai/DeepSeek-V3.2" # API 地址 # 硅基流动: https://api.siliconflow.cn/v1 api_url: "https://api.siliconflow.cn/v1" diff --git a/scanner/ai_reviewer.py b/scanner/ai_reviewer.py index 2faeacf..51c1bd8 100644 --- a/scanner/ai_reviewer.py +++ b/scanner/ai_reviewer.py @@ -41,7 +41,7 @@ class AIReviewer(BaseScanner): logger.info(f'AI 审查器初始化: {self.provider}/{self.model}') - def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]: """ 执行代码扫描(实现抽象方法) @@ -49,16 +49,18 @@ class AIReviewer(BaseScanner): repo_url: 仓库 URL commit_id: 提交 ID branch: 分支名 + changed_files: 可选的变更文件列表(来自 PR) Returns: 审查结果 """ # 调用实际的审查逻辑 - return self._do_review(repo_url=repo_url, commit_id=commit_id, branch=branch) + return self._do_review(repo_url=repo_url, commit_id=commit_id, branch=branch, changed_files=changed_files) def _do_review(self, clone_dir: str = None, repo_url: str = None, commit_id: str = None, branch: str = None, - language: str = 'python') -> Dict[str, Any]: + language: str = 'python', + changed_files: Optional[List[str]] = None) -> Dict[str, Any]: """ 执行 AI 代码审查 @@ -68,6 +70,7 @@ class AIReviewer(BaseScanner): commit_id: 提交 ID branch: 分支名 language: 编程语言 + changed_files: 可选的变更文件列表(来自 PR) Returns: 审查结果 @@ -94,7 +97,7 @@ class AIReviewer(BaseScanner): } # 获取要审查的代码文件 - files = self._get_code_files(clone_dir, language) + files = self._get_code_files(clone_dir, language, changed_files) if not files: return { @@ -107,7 +110,7 @@ class AIReviewer(BaseScanner): # 对每个文件进行 AI 审查 all_reviews = [] for file_path in files[:5]: # 限制最多审查 5 个文件 - review = self._review_file(file_path, language) + review = self._review_file(file_path, language, clone_dir) if review: all_reviews.append(review) @@ -133,7 +136,7 @@ class AIReviewer(BaseScanner): 'summary': f'AI 审查出错: {str(e)}' } - def _get_code_files(self, clone_dir: str, language: str) -> List[str]: + def _get_code_files(self, clone_dir: str, language: str, changed_files: Optional[List[str]] = None) -> List[str]: """获取代码文件列表""" import glob @@ -144,6 +147,18 @@ class AIReviewer(BaseScanner): } exts = extensions.get(language, ['.py']) + + # 如果提供了变更文件列表,只返回这些文件 + if changed_files: + files = [] + for changed_file in changed_files: + if any(changed_file.endswith(ext) for ext in exts): + full_path = os.path.join(clone_dir, changed_file) + if os.path.exists(full_path): + files.append(full_path) + return files[:10] + + # 否则扫描整个仓库 files = [] for ext in exts: @@ -157,7 +172,7 @@ class AIReviewer(BaseScanner): return files[:10] # 最多 10 个文件 - def _review_file(self, file_path: str, language: str) -> Optional[Dict[str, Any]]: + def _review_file(self, file_path: str, language: str, clone_dir: str = None) -> Optional[Dict[str, Any]]: """审查单个文件""" try: with open(file_path, 'r', encoding='utf-8') as f: @@ -181,9 +196,9 @@ class AIReviewer(BaseScanner): return None # 解析响应 - filename = os.path.basename(file_path) + rel_path = os.path.relpath(file_path, clone_dir) if (clone_dir and file_path) else file_path return { - 'file': filename, + 'file': rel_path, 'path': file_path, 'truncated': truncated, 'review': response @@ -236,6 +251,7 @@ class AIReviewer(BaseScanner): logger.warning(f'未知的 AI provider: {self.provider}') return None except Exception as e: + print("异常追踪信息:", e.__traceback__) logger.error(f'AI 调用失败: {str(e)}') return None diff --git a/scanner/base.py b/scanner/base.py index 7528661..de9f727 100644 --- a/scanner/base.py +++ b/scanner/base.py @@ -152,15 +152,28 @@ class BaseScanner(ABC): 'stdout': '', 'stderr': str(e) } - def get_changed_files(self, clone_dir: str, extensions: List[str]) -> List[str]: + def get_changed_files(self, clone_dir: str, extensions: List[str], changed_files: Optional[List[str]] = None) -> List[str]: """ 获取指定扩展名的文件列表 Args: clone_dir: 仓库目录 extensions: 文件扩展名列表 + changed_files: 可选的变更文件列表(来自 PR),如果提供则只返回这些文件 Returns: 文件路径列表 """ + # 如果提供了变更文件列表,只扫描这些文件 + if changed_files: + files = [] + for changed_file in changed_files: + # 检查文件扩展名是否匹配 + if any(changed_file.endswith(ext) for ext in extensions): + full_path = os.path.join(clone_dir, changed_file) + if os.path.exists(full_path): + files.append(full_path) + return files + + # 否则扫描整个仓库 files = [] for root, dirs, filenames in os.walk(clone_dir): # 跳过隐藏目录和特殊目录 diff --git a/scanner/diff_parser.py b/scanner/diff_parser.py index e989e85..440e423 100644 --- a/scanner/diff_parser.py +++ b/scanner/diff_parser.py @@ -94,10 +94,8 @@ class DiffParser: def merge_issues_with_code(scan_results: Dict[str, Any], diff: str) -> Dict[str, Any]: """将扫描问题与代码片段关联""" - if not diff: - return scan_results + parser = DiffParser(diff) if diff else None - parser = DiffParser(diff) enriched_results = { 'scanners': [], 'summary': scan_results.get('summary', {}), @@ -118,17 +116,93 @@ def merge_issues_with_code(scan_results: Dict[str, Any], diff: str) -> Dict[str, issues = scanner_data.get('issues', []) for issue in issues: - enriched_issue = enrich_issue_with_code(issue, parser) + enriched_issue = enrich_issue_with_code(issue, parser) if parser else issue enriched_scanner['issues'].append(enriched_issue) enriched_results['scanners'].append(enriched_scanner) + # 处理 AI 审查结果,转换为问题格式 if 'ai' in scan_results: - enriched_results['ai'] = scan_results['ai'] + ai_issues = convert_ai_reviews_to_issues(scan_results['ai'], parser) + enriched_results['ai'] = { + 'name': 'ai', + 'issues': ai_issues, + 'summary': scan_results['ai'].get('summary', ''), + 'files_reviewed': scan_results['ai'].get('files_reviewed', 0) + } return enriched_results +def convert_ai_reviews_to_issues(ai_result: Dict[str, Any], parser: Optional[DiffParser] = None) -> List[Dict[str, Any]]: + """将 AI 审查结果转换为问题格式""" + issues = [] + + reviews = ai_result.get('reviews', []) + for review in reviews: + file_path = review.get('file', '') + review_data = review.get('review', {}) + + if not review_data: + continue + + # 获取文件内容作为代码上下文 + code_context = None + if parser: + matched_path = None + for path in parser.files.keys(): + if file_path.endswith(path) or path.endswith(file_path) or file_path in path: + matched_path = path + break + + if matched_path: + chunk = parser.get_file_content(matched_path) + if chunk and chunk.new_content: + lines = chunk.new_content.split('\n')[:10] + code_context = { + 'file': matched_path, + 'line': 1, + 'preview': '\n'.join(lines), + 'has_more': len(chunk.new_content.split('\n')) > 10 + } + + # 处理优点(不作为问题显示) + advantages = review_data.get('优点', []) + # 处理问题 + problems = review_data.get('问题', []) + for idx, problem in enumerate(problems): + issues.append({ + 'file': file_path, + 'line': 1, # AI 审查不返回具体行号 + 'severity': 'warning', + 'message': f'[AI 建议] {problem}', + 'category': 'ai', + 'code_context': code_context, + 'review_data': { + 'type': '问题', + 'content': problem + } + }) + + # 处理优化建议 + optimizations = review_data.get('优化', []) + for optimization in optimizations: + issues.append({ + 'file': file_path, + 'line': 1, + 'severity': 'info', + 'message': f'[AI 优化] {optimization}', + 'category': 'ai', + 'code_context': code_context, + 'review_data': { + 'type': '优化', + 'content': optimization + } + }) + + return issues + + def enrich_issue_with_code(issue: Dict[str, Any], parser: DiffParser) -> Dict[str, Any]: """为单个问题添加代码片段""" enriched = issue.copy() diff --git a/scanner/js_scanner.py b/scanner/js_scanner.py index 128c818..125169b 100644 --- a/scanner/js_scanner.py +++ b/scanner/js_scanner.py @@ -20,7 +20,7 @@ class JavaScriptScanner(BaseScanner): super().__init__(config) self.extensions = ['.js', '.jsx', '.ts', '.tsx', '.vue', '.svelte'] - def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]: """ 执行 JavaScript/TypeScript 代码扫描 @@ -28,6 +28,7 @@ class JavaScriptScanner(BaseScanner): repo_url: 仓库 URL commit_id: 提交 ID branch: 分支名 + changed_files: 可选的变更文件列表(来自 PR) Returns: 扫描结果 @@ -51,8 +52,8 @@ class JavaScriptScanner(BaseScanner): # 克隆仓库 clone_dir = self.clone_repo(repo_url, commit_id, branch) - # 获取 JavaScript/TypeScript 文件 - js_files = self.get_changed_files(clone_dir, self.extensions) + # 获取 JavaScript/TypeScript 文件(只扫描变更的文件) + js_files = self.get_changed_files(clone_dir, self.extensions, changed_files) result['files_scanned'] = len(js_files) if not js_files: @@ -75,7 +76,7 @@ class JavaScriptScanner(BaseScanner): return result - def _run_eslint(self, cwd: str, files: List[str]) -> Dict[str, Any]: + def _run_eslint(self, clone_dir: str, files: List[str]) -> Dict[str, Any]: """运行 ESLint 扫描""" result = { 'tool': 'eslint', @@ -88,7 +89,7 @@ class JavaScriptScanner(BaseScanner): cmd = ['npx', 'eslint', '--format=json', '--no-eslintrc'] + files # 如果没有 eslint 配置,先创建默认配置 - eslintrc_path = os.path.join(cwd, '.eslintrc.json') + eslintrc_path = os.path.join(clone_dir, '.eslintrc.json') if not os.path.exists(eslintrc_path): # 创建简单的 ESLint 配置 eslint_config = { @@ -106,7 +107,7 @@ class JavaScriptScanner(BaseScanner): with open(eslintrc_path, 'w') as f: json.dump(eslint_config, f) - output = self.run_command(cmd, cwd, timeout=120) + output = self.run_command(cmd, clone_dir, timeout=120) result['raw_output'] = output.get('stdout', '') + output.get('stderr', '') # 解析 JSON 输出 @@ -115,6 +116,8 @@ class JavaScriptScanner(BaseScanner): eslint_results = json.loads(output['stdout']) for file_result in eslint_results: file_path = file_result.get('filePath', '') + # 使用相对于 clone_dir 的路径 + rel_path = os.path.relpath(file_path, clone_dir) if file_path else '' messages = file_result.get('messages', []) for msg in messages: @@ -124,7 +127,7 @@ class JavaScriptScanner(BaseScanner): 'type': severity, 'severity': 'Error' if msg.get('severity', 0) == 2 else 'Warning', 'message': msg.get('message', ''), - 'file': os.path.basename(file_path), + 'file': rel_path, 'line': msg.get('line', 0), 'column': msg.get('column', 0), 'symbol': msg.get('ruleId', 'unknown') diff --git a/scanner/python_scanner.py b/scanner/python_scanner.py index 30c829c..bb87f66 100644 --- a/scanner/python_scanner.py +++ b/scanner/python_scanner.py @@ -20,7 +20,7 @@ class PythonScanner(BaseScanner): super().__init__(config) self.extensions = ['.py'] - def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]: """ 执行 Python 代码扫描 @@ -28,6 +28,7 @@ class PythonScanner(BaseScanner): repo_url: 仓库 URL commit_id: 提交 ID branch: 分支名 + changed_files: 可选的变更文件列表(来自 PR) Returns: 扫描结果 @@ -51,8 +52,8 @@ class PythonScanner(BaseScanner): # 克隆仓库 clone_dir = self.clone_repo(repo_url, commit_id, branch) - # 获取 Python 文件 - py_files = self.get_changed_files(clone_dir, self.extensions) + # 获取 Python 文件(只扫描变更的文件) + py_files = self.get_changed_files(clone_dir, self.extensions, changed_files) result['files_scanned'] = len(py_files) if not py_files: @@ -84,7 +85,7 @@ class PythonScanner(BaseScanner): return result - def _run_pylint(self, cwd: str, files: List[str]) -> Dict[str, Any]: + def _run_pylint(self, clone_dir: str, files: List[str]) -> Dict[str, Any]: """运行 Pylint 扫描""" result = { 'tool': 'pylint', @@ -95,7 +96,7 @@ class PythonScanner(BaseScanner): # 只扫描变更的文件 try: cmd = ['python', '-m', 'pylint', '--output-format=json'] + files - output = self.run_command(cmd, cwd, timeout=120) + output = self.run_command(cmd, clone_dir, timeout=120) result['raw_output'] = output.get('stdout', '') @@ -104,12 +105,15 @@ class PythonScanner(BaseScanner): try: issues = json.loads(output['stdout']) for issue in issues: + # 使用相对于 clone_dir 的路径 + full_path = issue.get('path', '') + rel_path = os.path.relpath(full_path, clone_dir) if full_path else '' result['issues'].append({ 'tool': 'pylint', 'type': issue.get('type', 'info'), 'severity': issue.get('severity', 'Info'), 'message': issue.get('message', ''), - 'file': os.path.basename(issue.get('path', '')), + 'file': rel_path, 'line': issue.get('line', 0), 'column': issue.get('column', 0), 'symbol': issue.get('symbol', '') @@ -122,7 +126,7 @@ class PythonScanner(BaseScanner): return result - def _run_flake8(self, cwd: str, files: List[str]) -> Dict[str, Any]: + def _run_flake8(self, clone_dir: str, files: List[str]) -> Dict[str, Any]: """运行 Flake8 扫描""" result = { 'tool': 'flake8', @@ -132,7 +136,7 @@ class PythonScanner(BaseScanner): try: cmd = ['python', '-m', 'flake8', '--format=json'] + files - output = self.run_command(cmd, cwd, timeout=120) + output = self.run_command(cmd, clone_dir, timeout=120) result['raw_output'] = output.get('stdout', '') @@ -141,12 +145,15 @@ class PythonScanner(BaseScanner): try: issues = json.loads(output['stdout']) for issue in issues: + # 使用相对于 clone_dir 的路径 + full_path = issue.get('filename', '') + rel_path = os.path.relpath(full_path, clone_dir) if full_path else '' result['issues'].append({ 'tool': 'flake8', 'type': self._map_flake8_code(issue.get('code', '')), 'severity': 'Warning', 'message': issue.get('text', ''), - 'file': os.path.basename(issue.get('filename', '')), + 'file': rel_path, 'line': issue.get('line_number', 0), 'column': issue.get('column_number', 0), 'symbol': issue.get('code', '') diff --git a/scanner/security_scanner.py b/scanner/security_scanner.py index de515e4..2eae0a5 100644 --- a/scanner/security_scanner.py +++ b/scanner/security_scanner.py @@ -21,7 +21,7 @@ class SecurityScanner(BaseScanner): # 扫描所有代码文件以发现安全问题 self.extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', '.rb', '.php'] - def scan(self, repo_url: str, commit_id: Optional[str], branch: str) -> Dict[str, Any]: + def scan(self, repo_url: str, commit_id: Optional[str], branch: str, changed_files: Optional[List[str]] = None) -> Dict[str, Any]: """ 执行安全扫描 @@ -29,6 +29,7 @@ class SecurityScanner(BaseScanner): repo_url: 仓库 URL commit_id: 提交 ID branch: 分支名 + changed_files: 可选的变更文件列表(来自 PR) Returns: 扫描结果 @@ -53,8 +54,8 @@ class SecurityScanner(BaseScanner): # 克隆仓库 clone_dir = self.clone_repo(repo_url, commit_id, branch) - # 获取所有支持的文件 - all_files = self.get_changed_files(clone_dir, self.extensions) + # 获取所有支持的文件(只扫描变更的文件) + all_files = self.get_changed_files(clone_dir, self.extensions, changed_files) result['files_scanned'] = len(all_files) if not all_files: @@ -88,7 +89,7 @@ class SecurityScanner(BaseScanner): return result - def _run_bandit(self, cwd: str, files: List[str]) -> Dict[str, Any]: + def _run_bandit(self, clone_dir: str, files: List[str]) -> Dict[str, Any]: """运行 Bandit 安全扫描""" result = { 'tool': 'bandit', @@ -98,7 +99,7 @@ class SecurityScanner(BaseScanner): try: # 运行 bandit cmd = ['python', '-m', 'bandit', '-f', 'json'] + files - output = self.run_command(cmd, cwd, timeout=120) + output = self.run_command(cmd, clone_dir, timeout=120) # 解析 JSON 输出 if output.get('stdout'): @@ -107,6 +108,9 @@ class SecurityScanner(BaseScanner): results = data.get('results', []) for issue in results: + # 使用相对于 clone_dir 的路径 + full_path = issue.get('filename', '') + rel_path = os.path.relpath(full_path, clone_dir) if full_path else '' # 映射严重级别 severity = issue.get('issue_severity', 'LOW') result['issues'].append({ @@ -115,7 +119,7 @@ class SecurityScanner(BaseScanner): 'severity': severity, 'confidence': issue.get('issue_confidence', 'LOW'), 'message': issue.get('issue_text', ''), - 'file': os.path.basename(issue.get('filename', '')), + 'file': rel_path, 'line': issue.get('line_number', 0), 'code': issue.get('code', '') }) @@ -127,7 +131,7 @@ class SecurityScanner(BaseScanner): return result - def _scan_js_security(self, cwd: str, files: List[str]) -> Dict[str, Any]: + def _scan_js_security(self, clone_dir: str, files: List[str]) -> Dict[str, Any]: """简单的 JavaScript 安全扫描(基于模式匹配)""" result = { 'tool': 'js-security', @@ -172,6 +176,8 @@ class SecurityScanner(BaseScanner): for file_path in files: try: + # 使用相对于 clone_dir 的路径 + rel_path = os.path.relpath(file_path, clone_dir) if file_path else '' with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() lines = content.split('\n') @@ -185,7 +191,7 @@ class SecurityScanner(BaseScanner): 'severity': pattern_info['severity'], 'confidence': 'MEDIUM', 'message': pattern_info['message'], - 'file': os.path.basename(file_path), + 'file': rel_path, 'line': line_num, 'code': line.strip()[:80] }) diff --git a/test_demo/demo_flaws.py b/test_demo/demo_flaws.py new file mode 100644 index 0000000..a06db6a --- /dev/null +++ b/test_demo/demo_flaws.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +测试文件:包含常见代码缺陷,用于验证扫描器 +""" + +import os +import sys +import json +import pickle +import subprocess +from ast import parse +from typing import List, Dict + + +# 缺陷1: 未使用的导入 +import unused_module # 未使用 +import collections as col # 使用了 col 但 flake8 可能检测 + + +# 缺陷2: 未使用的变量 +def unused_variable_demo(): + """演示未使用的变量""" + result = calculate() # result 未被使用 + print("Function executed") + + +def calculate(): + """计算并返回结果""" + return 42 + + +# 缺陷3: 未定义的变量 +def undefined_variable_demo(): + """演示未定义的变量""" + print(undefined_var) # undefined_var 未定义 + + +# 缺陷4: 变量在定义前使用 +def use_before_define(): + """在定义前使用变量""" + print(before_var) # before_var 在下面才定义 + before_var = 100 + + +# 缺陷5: 硬编码密码(安全问题) +def connect_database(): + """连接数据库""" + password = "admin123" # 硬编码密码 + username = "root" + return f"Connecting with {username}:{password}" + + +# 缺陷6: 使用 eval(安全问题) +def unsafe_eval(): + """危险使用 eval""" + user_input = "os.system('ls')" + result = eval(user_input) # 危险! + return result + + +# 缺陷7: 使用 pickle 反序列化(安全问题) +def unsafe_pickle(): + """不安全的 pickle 反序列化""" + data = b"..." # 模拟恶意数据 + obj = pickle.loads(data) # 危险! + + +# 缺陷8: 行太长(风格问题) +def long_line(): + """这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制""" + + +# 缺陷9: 缺少空格 +def missing_spaces(): + """缺少必要空格""" + x=1+2 + y=3*4 + if x==1: + print(x) + + +# 缺陷10: 多余空格 +def extra_spaces(): + """多余空格""" + x = 1 + y = 2 + + +# 缺陷11: 未捕获的异常 +def unhandled_exception(): + """捕获异常后未处理""" + try: + result = 10 / 0 + except ZeroDivisionError: + pass # 捕获但未处理 + + +# 缺陷12: 过于宽泛的异常 +def broad_exception(): + """捕获所有异常""" + try: + data = json.loads('{"key": "value"}') + except Exception: + pass + + +# 缺陷13: 裸 except 子句 +def bare_except(): + """使用裸 except""" + try: + x = int("abc") + except: + pass + + +# 缺陷14: 重复代码 +def duplicate_code(): + """重复代码示例""" + a = 1 + b = 2 + c = a + b + print(c) + + a = 3 + b = 4 + c = a + b + print(c) + + +# 缺陷15: 变量名与内置函数冲突 +def shadow_builtin(): + """变量名覆盖内置函数""" + list = [1, 2, 3] # 覆盖内置 list + dict = {} # 覆盖内置 dict + str = "hello" # 覆盖内置 str + return list, dict, str + + +# 缺陷16: 不必要的 pass +def unnecessary_pass(): + """不必要的 pass""" + if True: + pass # 可以直接删除 + + +# 缺陷17: 使用 + 进行字符串拼接(推荐用 join) +def string_concat(): + """低效字符串拼接""" + result = "" + for i in range(100): + result = result + str(i) + return result + + +# 缺陷18: 在循环中修改集合 +def modify_during_iteration(): + """在迭代时修改列表""" + items = [1, 2, 3, 4, 5] + for item in items: + if item % 2 == 0: + items.remove(item) # 在迭代时修改 + + +# 缺陷19: 全局变量 +global_counter = 0 # 全局变量 + + +def increment(): + global global_counter # 依赖全局变量 + global_counter += 1 + + +# 缺陷20: 魔法数字 +def calculate_price(): + """使用魔法数字""" + price = 100 + tax = price * 1.1 # 1.1 是什么? + discount = price * 0.9 + return tax, discount + + +# 缺陷21: 函数参数过多 +def bad_function(a, b, c, d, e, f, g, h): + """参数过多的函数""" + return a + b + c + d + e + f + g + h + + +# 缺陷22: 空函数体 +def empty_function(): + """空函数应该使用 pass 或文档字符串""" + pass + + +# 缺陷23: 使用 time.sleep 测试 +def bad_sleep(): + """生产代码中使用 time.sleep""" + import time + time.sleep(5) # 阻塞 + + +# 缺陷24: 注释掉的代码 +def commented_code(): + # print("This is commented out") + pass + + +# 缺陷25: TODO/FIXME 注释 +def todo_comment(): + # TODO: Implement this + # FIXME: This is broken + pass + + +# 缺陷26: 导入顺序错误(应先标准库,再第三方,本地) +import sys # 标准库 +import flask # 第三方 +from . import local # 本地 + + +# 缺陷27: 不必要的列表推导式 +def unnecessary_list_comp(): + """不必要的列表推导式""" + result = [x for x in range(10)] # 可简化为 list(range(10)) + return result + + +# 缺陷28: 条件表达式中的赋值 +def assignment_in_condition(): + """在条件中赋值(不推荐)""" + if (x := get_value()) > 0: # 海象运算符但可能难以阅读 + print(x) + + +def get_value(): + return 5 + + +# 缺陷29: 比较布尔值 +def compare_bool(): + """与布尔值比较""" + flag = True + if flag == True: # 应直接用 if flag: + print("yes") + + +# 缺陷30: 使用 hasattr/getattr 而非异常处理 +def use_hasattr(): + """滥用 hasattr""" + class Foo: + pass + obj = Foo() + if hasattr(obj, 'bar'): # 可直接用 try/except + print(obj.bar) + + +# 主函数入口 +def main(): + """主函数""" + connect_database() + unsafe_eval() + unsafe_pickle() + print("Demo executed") + + +if __name__ == "__main__": + main()