Compare commits
25 Commits
0991b3de26
...
dingshuo-p
| Author | SHA1 | Date | |
|---|---|---|---|
| 1a96077431 | |||
|
|
a49f42efe5 | ||
| 8062ed4bfd | |||
|
|
ffd77057e3 | ||
| 279a01b897 | |||
| 77fd09e6d2 | |||
|
|
91c16cbc88 | ||
|
|
c8c0ef1620 | ||
|
|
95831d5190 | ||
|
|
9a14c0b219 | ||
|
|
87b2dacf65 | ||
|
|
453414efb2 | ||
|
|
04518812f4 | ||
|
|
6c4ee107f9 | ||
|
|
d11b349d5e | ||
|
|
2a2ff1ad5f | ||
|
|
bc5a19fffc | ||
|
|
78655ce5dc | ||
|
|
2201f6d696 | ||
|
|
97881ee00e | ||
|
|
e46aff2797 | ||
|
|
887c8ae154 | ||
|
|
ecc39402d5 | ||
|
|
dc9b921091 | ||
|
|
a928b79d6d |
285
README.md
285
README.md
@@ -1,224 +1,109 @@
|
|||||||
# AI Code Quality Scanner - 飞书通知版
|
# AI 代码质量扫描系统
|
||||||
|
|
||||||
一个自动化代码质量扫描系统,在代码提交时自动扫描并发送报告到飞书。
|
自动化代码质量扫描工具,监听 PR 事件,自动扫描代码缺陷并提供合并决策支持。
|
||||||
|
|
||||||
## 功能特性
|
## 工作流程
|
||||||
|
|
||||||
- 🤖 自动监听 Gitea 代码提交事件
|
|
||||||
- 🔍 多维度代码质量扫描(语法、风格、安全)
|
|
||||||
- 📊 生成 Markdown 格式扫描报告
|
|
||||||
- 📱 实时推送飞书机器人通知
|
|
||||||
|
|
||||||
## 系统架构
|
|
||||||
|
|
||||||
```
|
```
|
||||||
┌─────────────┐ Webhook ┌──────────────────┐
|
┌──────────┐ 1. 创建 PR ┌────────────┐
|
||||||
│ Gitea │ ───────────────► │ Webhook Server │
|
│ Gitea │ ───────────────► │ Webhook │
|
||||||
│ 代码仓库 │ │ (Flask) │
|
└──────────┘ │ Server │
|
||||||
└─────────────┘ └────────┬─────────┘
|
└─────┬──────┘
|
||||||
│
|
│ 2. 拉取代码、扫描、存库
|
||||||
▼
|
▼
|
||||||
┌──────────────────┐
|
┌────────────┐
|
||||||
│ Code Scanner │
|
│ SQLite │
|
||||||
│ - ESLint │
|
│ Database │
|
||||||
│ - Pylint │
|
└────────────┘
|
||||||
│ - SonarQube │
|
│ 3. 前端查询
|
||||||
└────────┬─────────┘
|
▼
|
||||||
│
|
┌────────────┐
|
||||||
▼
|
│ 前端页面 │
|
||||||
┌──────────────────┐
|
└────────────┘
|
||||||
│ Report Generator│
|
|
||||||
│ - Markdown │
|
|
||||||
└────────┬─────────┘
|
|
||||||
│
|
|
||||||
▼
|
|
||||||
┌──────────────────┐
|
|
||||||
│ Feishu Bot │
|
|
||||||
│ - Webhook │
|
|
||||||
└──────────────────┘
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## 三个核心功能
|
||||||
|
|
||||||
|
### 1. PR 创建
|
||||||
|
|
||||||
|
- Gitea 仓库创建 PR 时自动触发扫描
|
||||||
|
- 支持事件:`opened`、`reopened`、`synchronize`
|
||||||
|
|
||||||
|
### 2. 后端处理
|
||||||
|
|
||||||
|
- 拉取 PR 对应的代码
|
||||||
|
- 执行代码扫描(Python/JavaScript/TypeScript)
|
||||||
|
- AI 智能审查代码缺陷
|
||||||
|
- 扫描结果存入 SQLite 数据库
|
||||||
|
|
||||||
|
### 3. 前端功能
|
||||||
|
|
||||||
|
- 查询所有 PR 及扫描状态
|
||||||
|
- 查看每个 PR 的缺陷详情
|
||||||
|
- 一键「拒绝合并」或「同意合并」
|
||||||
|
|
||||||
## 快速开始
|
## 快速开始
|
||||||
|
|
||||||
### 1. 安装依赖
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
# 安装依赖
|
||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
```
|
|
||||||
|
|
||||||
### 2. 配置飞书机器人
|
# 运行服务
|
||||||
|
|
||||||
1. 打开飞书群聊 → 设置 → 群机器人
|
|
||||||
2. 添加机器人 → 选择"自定义机器人"
|
|
||||||
3. 获取 Webhook 地址
|
|
||||||
4. 配置 `config.yaml`
|
|
||||||
|
|
||||||
### 3. 配置 Gitea Webhook
|
|
||||||
|
|
||||||
#### 方式一:Push 时扫描(原有方式)
|
|
||||||
|
|
||||||
1. 进入 Gitea 仓库 → 设置 → Webhooks
|
|
||||||
2. 添加 Webhook:
|
|
||||||
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
|
|
||||||
- 触发事件: Push
|
|
||||||
- 密钥: 配置 `config.yaml` 中的 secret
|
|
||||||
|
|
||||||
#### 方式二:PR 创建时扫描(推荐)
|
|
||||||
|
|
||||||
1. 进入 Gitea 仓库 → 设置 → Webhooks
|
|
||||||
2. 添加 Webhook:
|
|
||||||
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
|
|
||||||
- 触发事件: Pull Request
|
|
||||||
- 密钥: 配置 `config.yaml` 中的 secret
|
|
||||||
|
|
||||||
**支持的 PR 事件:**
|
|
||||||
- `opened` - 创建新 PR
|
|
||||||
- `reopened` - 重新打开 PR
|
|
||||||
- `synchronize` - PR 中的提交有更新
|
|
||||||
- `ready_for_review` - PR 标记为准备好审查
|
|
||||||
|
|
||||||
### 4. 运行服务
|
|
||||||
|
|
||||||
```bash
|
|
||||||
python app.py
|
python app.py
|
||||||
```
|
```
|
||||||
|
|
||||||
## 配置说明
|
访问 http://localhost:5000 查看前端页面。
|
||||||
|
|
||||||
所有配置在 `config.yaml` 中:
|
## Docker 部署
|
||||||
|
|
||||||
|
### 1. 构建镜像
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker buildx build --load --push -t dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest .
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. 登录仓库
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker login dcr-by1jwyxk44.71826370.xyz
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Push 到仓库
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run -d --name code-scan -p 5000:5000 dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. 使用 docker compose 启动
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 启动服务
|
||||||
|
docker compose up -d
|
||||||
|
|
||||||
|
# 查看日志
|
||||||
|
docker compose logs -f
|
||||||
|
|
||||||
|
# 停止服务
|
||||||
|
docker compose down
|
||||||
|
```
|
||||||
|
|
||||||
|
## 配置
|
||||||
|
|
||||||
|
配置文件 `config.yaml`:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
server:
|
server:
|
||||||
host: "0.0.0.0"
|
host: "0.0.0.0"
|
||||||
port: 5000
|
port: 5000
|
||||||
debug: true
|
|
||||||
|
|
||||||
gitea:
|
gitea:
|
||||||
base_url: "http://localhost:3000"
|
base_url: "https://code.deep-pilot.chat"
|
||||||
# Webhook 签名密钥
|
webhook_secret: "xxx"
|
||||||
webhook_secret: "your_webhook_secret"
|
api_token: "xxx"
|
||||||
|
|
||||||
feishu:
|
ai:
|
||||||
# 飞书机器人 Webhook 地址
|
provider: "api"
|
||||||
webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
|
model: "qwen3.5-plus"
|
||||||
# 消息推送 secret(可选,用于签名)
|
api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
|
||||||
secret: "your_feishu_secret"
|
api_key: "sk-xxx"
|
||||||
|
|
||||||
scanner:
|
|
||||||
# 支持的语言
|
|
||||||
languages:
|
|
||||||
- python
|
|
||||||
- javascript
|
|
||||||
- typescript
|
|
||||||
# 扫描阈值
|
|
||||||
max_issues: 10
|
|
||||||
# 是否启用详细扫描
|
|
||||||
detailed: true
|
|
||||||
|
|
||||||
report:
|
|
||||||
# 报告保存路径
|
|
||||||
output_dir: "./reports"
|
|
||||||
# 是否保留报告文件
|
|
||||||
keep_files: true
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## 项目结构
|
|
||||||
|
|
||||||
```
|
|
||||||
code-scanner/
|
|
||||||
├── app.py # 主应用入口
|
|
||||||
├── config.yaml # 配置文件
|
|
||||||
├── requirements.txt # Python 依赖
|
|
||||||
├── README.md # 项目说明
|
|
||||||
├── scanner/
|
|
||||||
│ ├── __init__.py
|
|
||||||
│ ├── base.py # 扫描器基类
|
|
||||||
│ ├── python_scanner.py # Python 代码扫描
|
|
||||||
│ ├── js_scanner.py # JavaScript/TypeScript 扫描
|
|
||||||
│ └── security_scanner.py # 安全扫描
|
|
||||||
├── report/
|
|
||||||
│ ├── __init__.py
|
|
||||||
│ └── generator.py # Markdown 报告生成
|
|
||||||
├── notify/
|
|
||||||
│ ├── __init__.py
|
|
||||||
│ └── feishu.py # 飞书通知
|
|
||||||
├── webhook/
|
|
||||||
│ ├── __init__.py
|
|
||||||
│ └── handler.py # Webhook 处理
|
|
||||||
└── reports/ # 报告输出目录
|
|
||||||
```
|
|
||||||
|
|
||||||
## 支持的扫描工具
|
|
||||||
|
|
||||||
### Python
|
|
||||||
- **Pylint** - 代码风格和错误检查
|
|
||||||
- **Flake8** - Python 代码检查
|
|
||||||
- **Bandit** - 安全漏洞扫描
|
|
||||||
|
|
||||||
### JavaScript/TypeScript
|
|
||||||
- **ESLint** - JavaScript/TypeScript 检查
|
|
||||||
- **Prettier** - 代码格式化
|
|
||||||
|
|
||||||
## 飞书消息效果
|
|
||||||
|
|
||||||
扫描完成后,将收到类似以下消息:
|
|
||||||
|
|
||||||
### Push 扫描消息
|
|
||||||
|
|
||||||
```
|
|
||||||
📊 代码质量扫描报告
|
|
||||||
|
|
||||||
仓库: my-project
|
|
||||||
分支: main
|
|
||||||
提交: abc1234
|
|
||||||
提交者: developer@example.com
|
|
||||||
|
|
||||||
✅ 扫描通过 (0 issues)
|
|
||||||
或
|
|
||||||
⚠️ 发现问题 (5 issues)
|
|
||||||
```
|
|
||||||
|
|
||||||
### PR 扫描消息
|
|
||||||
|
|
||||||
```
|
|
||||||
📊 PR 代码质量扫描报告
|
|
||||||
|
|
||||||
仓库: my-project
|
|
||||||
源分支: feature-xxx → 目标分支: main
|
|
||||||
PR链接: https://gitea.example.com/user/project/pulls/123
|
|
||||||
提交: abc1234
|
|
||||||
提交者: developer@example.com
|
|
||||||
|
|
||||||
✅ 扫描通过 (0 issues)
|
|
||||||
或
|
|
||||||
⚠️ 发现问题 (5 issues)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Docker 部署
|
|
||||||
|
|
||||||
```dockerfile
|
|
||||||
FROM python:3.11-slim
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
COPY requirements.txt .
|
|
||||||
RUN pip install -r requirements.txt
|
|
||||||
|
|
||||||
COPY . .
|
|
||||||
EXPOSE 5000
|
|
||||||
|
|
||||||
CMD ["python", "app.py"]
|
|
||||||
```
|
|
||||||
|
|
||||||
## 环境变量
|
|
||||||
|
|
||||||
也可以通过环境变量配置:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
export FEISHU_WEBHOOK_URL="https://open.feishu.cn/..."
|
|
||||||
export GITEA_WEBHOOK_SECRET="secret"
|
|
||||||
export SCANNER_MAX_ISSUES=10
|
|
||||||
```
|
|
||||||
|
|
||||||
## 许可证
|
|
||||||
|
|
||||||
MIT License
|
|
||||||
|
|||||||
21
app.py
21
app.py
@@ -2,6 +2,7 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Dict, Tuple, Any
|
from typing import Dict, Tuple, Any
|
||||||
@@ -131,21 +132,27 @@ def handle_gitea_webhook():
|
|||||||
|
|
||||||
# Python 扫描
|
# Python 扫描
|
||||||
if 'python' in config.get('scanner', {}).get('languages', []):
|
if 'python' in config.get('scanner', {}).get('languages', []):
|
||||||
|
start_time = time.time()
|
||||||
scan_results['python'] = python_scanner.scan(
|
scan_results['python'] = python_scanner.scan(
|
||||||
clone_url, commit_id, branch
|
clone_url, commit_id, branch
|
||||||
)
|
)
|
||||||
|
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
# JavaScript/TypeScript 扫描
|
# JavaScript/TypeScript 扫描
|
||||||
if any(lang in config.get('scanner', {}).get('languages', [])
|
if any(lang in config.get('scanner', {}).get('languages', [])
|
||||||
for lang in ['javascript', 'typescript']):
|
for lang in ['javascript', 'typescript']):
|
||||||
|
start_time = time.time()
|
||||||
scan_results['javascript'] = js_scanner.scan(
|
scan_results['javascript'] = js_scanner.scan(
|
||||||
clone_url, commit_id, branch
|
clone_url, commit_id, branch
|
||||||
)
|
)
|
||||||
|
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
# 安全扫描
|
# 安全扫描
|
||||||
|
start_time = time.time()
|
||||||
scan_results['security'] = security_scanner.scan(
|
scan_results['security'] = security_scanner.scan(
|
||||||
clone_url, commit_id, branch
|
clone_url, commit_id, branch
|
||||||
)
|
)
|
||||||
|
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
# 生成报告
|
# 生成报告
|
||||||
report = report_generator.generate(
|
report = report_generator.generate(
|
||||||
@@ -228,27 +235,35 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
|
|||||||
|
|
||||||
# Python 扫描
|
# Python 扫描
|
||||||
if 'python' in config.get('scanner', {}).get('languages', []):
|
if 'python' in config.get('scanner', {}).get('languages', []):
|
||||||
|
start_time = time.time()
|
||||||
scan_results['python'] = python_scanner.scan(
|
scan_results['python'] = python_scanner.scan(
|
||||||
clone_url, source_sha, source_branch, changed_files
|
clone_url, source_sha, source_branch, changed_files
|
||||||
)
|
)
|
||||||
|
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
# JavaScript/TypeScript 扫描
|
# JavaScript/TypeScript 扫描
|
||||||
if any(lang in config.get('scanner', {}).get('languages', [])
|
if any(lang in config.get('scanner', {}).get('languages', [])
|
||||||
for lang in ['javascript', 'typescript']):
|
for lang in ['javascript', 'typescript']):
|
||||||
|
start_time = time.time()
|
||||||
scan_results['javascript'] = js_scanner.scan(
|
scan_results['javascript'] = js_scanner.scan(
|
||||||
clone_url, source_sha, source_branch, changed_files
|
clone_url, source_sha, source_branch, changed_files
|
||||||
)
|
)
|
||||||
|
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
# 安全扫描
|
# 安全扫描
|
||||||
|
start_time = time.time()
|
||||||
scan_results['security'] = security_scanner.scan(
|
scan_results['security'] = security_scanner.scan(
|
||||||
clone_url, source_sha, source_branch, changed_files
|
clone_url, source_sha, source_branch, changed_files
|
||||||
)
|
)
|
||||||
|
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
# AI 代码审查
|
# AI 代码审查
|
||||||
if config.get('ai', {}).get('enabled', False):
|
if config.get('ai', {}).get('enabled', False):
|
||||||
|
start_time = time.time()
|
||||||
scan_results['ai'] = ai_reviewer.scan(
|
scan_results['ai'] = ai_reviewer.scan(
|
||||||
clone_url, source_sha, source_branch, changed_files
|
clone_url, source_sha, source_branch, changed_files
|
||||||
)
|
)
|
||||||
|
logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
# 获取 PR 的代码差异,用于将问题与代码片段关联
|
# 获取 PR 的代码差异,用于将问题与代码片段关联
|
||||||
pr_diff = None
|
pr_diff = None
|
||||||
@@ -326,13 +341,19 @@ def manual_scan():
|
|||||||
scan_results = {}
|
scan_results = {}
|
||||||
|
|
||||||
if 'python' in config.get('scanner', {}).get('languages', []):
|
if 'python' in config.get('scanner', {}).get('languages', []):
|
||||||
|
start_time = time.time()
|
||||||
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
|
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
|
||||||
|
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
if any(lang in config.get('scanner', {}).get('languages', [])
|
if any(lang in config.get('scanner', {}).get('languages', [])
|
||||||
for lang in ['javascript', 'typescript']):
|
for lang in ['javascript', 'typescript']):
|
||||||
|
start_time = time.time()
|
||||||
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
|
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
|
||||||
|
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
|
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
|
||||||
|
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}秒")
|
||||||
|
|
||||||
# 生成报告
|
# 生成报告
|
||||||
report = report_generator.generate(
|
report = report_generator.generate(
|
||||||
|
|||||||
@@ -58,4 +58,4 @@ ai:
|
|||||||
# 是否启用 AI 审查
|
# 是否启用 AI 审查
|
||||||
enabled: true
|
enabled: true
|
||||||
# 每次审查的最大代码行数
|
# 每次审查的最大代码行数
|
||||||
max_lines: 200
|
max_lines: 100
|
||||||
@@ -109,7 +109,7 @@ class GiteaClient:
|
|||||||
timeout=30
|
timeout=30
|
||||||
)
|
)
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code in (200, 201):
|
||||||
logger.info(f"成功关闭 PR #{pr_number}")
|
logger.info(f"成功关闭 PR #{pr_number}")
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ class AIReviewer(BaseScanner):
|
|||||||
|
|
||||||
self.config = config
|
self.config = config
|
||||||
self.enabled = config.get('enabled', True)
|
self.enabled = config.get('enabled', True)
|
||||||
self.provider = config.get('provider', 'ollama')
|
self.provider = config.get('provider', 'api')
|
||||||
self.model = config.get('model', 'llama3')
|
self.model = config.get('model', 'llama3')
|
||||||
self.api_url = config.get('api_url', 'http://localhost:11434')
|
self.api_url = config.get('api_url', 'http://localhost:11434')
|
||||||
self.api_key = config.get('api_key', '')
|
self.api_key = config.get('api_key', '')
|
||||||
@@ -424,13 +424,7 @@ class AIReviewer(BaseScanner):
|
|||||||
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
|
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
|
||||||
"""调用 AI 服务"""
|
"""调用 AI 服务"""
|
||||||
try:
|
try:
|
||||||
if self.provider == 'ollama':
|
return self._call_api(prompt)
|
||||||
return self._call_ollama(prompt)
|
|
||||||
elif self.provider == 'api':
|
|
||||||
return self._call_api(prompt)
|
|
||||||
else:
|
|
||||||
logger.warning(f'未知的 AI provider: {self.provider}')
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("异常追踪信息:", e.__traceback__)
|
print("异常追踪信息:", e.__traceback__)
|
||||||
logger.error(f'AI 调用失败: {str(e)}')
|
logger.error(f'AI 调用失败: {str(e)}')
|
||||||
@@ -517,32 +511,6 @@ class AIReviewer(BaseScanner):
|
|||||||
logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象")
|
logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]:
|
|
||||||
"""调用 Ollama 本地模型"""
|
|
||||||
import requests
|
|
||||||
|
|
||||||
url = f"{self.api_url}/api/generate"
|
|
||||||
payload = {
|
|
||||||
"model": self.model,
|
|
||||||
"prompt": prompt,
|
|
||||||
"stream": False,
|
|
||||||
"format": "json"
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(f"调用 Ollama: {url}, model={self.model}")
|
|
||||||
response = requests.post(url, json=payload, timeout=120)
|
|
||||||
|
|
||||||
if response.status_code == 200:
|
|
||||||
result = response.json()
|
|
||||||
content = result.get('response', '')
|
|
||||||
logger.info(f"Ollama 返回内容长度: {len(content) if content else 0}")
|
|
||||||
logger.debug(f"Ollama 返回内容预览: {content[:200] if content else 'empty'}")
|
|
||||||
parsed = self._extract_json_obj(content)
|
|
||||||
return parsed
|
|
||||||
|
|
||||||
logger.warning(f'Ollama 返回错误: {response.status_code}')
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]:
|
def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]:
|
||||||
"""调用在线 API"""
|
"""调用在线 API"""
|
||||||
import requests
|
import requests
|
||||||
@@ -560,7 +528,7 @@ class AIReviewer(BaseScanner):
|
|||||||
payload = {
|
payload = {
|
||||||
"model": self.model,
|
"model": self.model,
|
||||||
"messages": [{"role": "user", "content": prompt}],
|
"messages": [{"role": "user", "content": prompt}],
|
||||||
"max_tokens": 1024*5,
|
"max_tokens": 1024,
|
||||||
"temperature": 0.7
|
"temperature": 0.7
|
||||||
}
|
}
|
||||||
elif 'deepseek' in self.api_url:
|
elif 'deepseek' in self.api_url:
|
||||||
@@ -568,25 +536,43 @@ class AIReviewer(BaseScanner):
|
|||||||
payload = {
|
payload = {
|
||||||
"model": self.model,
|
"model": self.model,
|
||||||
"messages": [{"role": "user", "content": prompt}],
|
"messages": [{"role": "user", "content": prompt}],
|
||||||
"max_tokens": 1024*5,
|
"max_tokens": 1024,
|
||||||
"temperature": 0.7
|
"temperature": 0.7
|
||||||
}
|
}
|
||||||
|
elif 'dashscope' in self.api_url:
|
||||||
|
# 阿里云 dashscope 专用端点
|
||||||
|
url = f"{self.api_url}/chat/completions"
|
||||||
|
payload = {
|
||||||
|
"model": self.model,
|
||||||
|
"messages": [{"role": "user", "content": prompt}],
|
||||||
|
"max_tokens": 1024,
|
||||||
|
"temperature": 0.7,
|
||||||
|
"stream": False # 显式关闭流式
|
||||||
|
}
|
||||||
else:
|
else:
|
||||||
url = f"{self.api_url}/chat/completions"
|
url = f"{self.api_url}/chat/completions"
|
||||||
payload = {
|
payload = {
|
||||||
"model": self.model,
|
"model": self.model,
|
||||||
"messages": [{"role": "user", "content": prompt}],
|
"messages": [{"role": "user", "content": prompt}],
|
||||||
"max_tokens": 1024*5,
|
"max_tokens": 1024,
|
||||||
"temperature": 0.7
|
"temperature": 0.7
|
||||||
}
|
}
|
||||||
|
|
||||||
response = requests.post(url, json=payload, headers=headers, timeout=120)
|
logger.info(f"调用 API: {url}, model={self.model}")
|
||||||
|
|
||||||
if response.status_code == 200:
|
try:
|
||||||
result = response.json()
|
response = requests.post(url, json=payload, headers=headers, timeout=120)
|
||||||
content = result['choices'][0]['message']['content']
|
|
||||||
parsed = self._extract_json_obj(content)
|
|
||||||
return parsed
|
|
||||||
|
|
||||||
logger.warning(f'API 返回错误: {response.status_code}')
|
if response.status_code == 200:
|
||||||
return None
|
result = response.json()
|
||||||
|
content = result['choices'][0]['message']['content']
|
||||||
|
logger.info(f"API 返回内容长度: {len(content) if content else 0}")
|
||||||
|
parsed = self._extract_json_obj(content)
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
logger.warning(f'API 返回错误: {response.status_code}, {response.text[:200]}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f'API 调用失败: {e}')
|
||||||
|
return None
|
||||||
|
|||||||
@@ -25,47 +25,6 @@ def unused_variable_demo():
|
|||||||
print("Function executed")
|
print("Function executed")
|
||||||
|
|
||||||
|
|
||||||
def calculate():
|
|
||||||
"""计算并返回结果"""
|
|
||||||
return 42
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷3: 未定义的变量
|
|
||||||
def undefined_variable_demo():
|
|
||||||
"""演示未定义的变量"""
|
|
||||||
print(undefined_var) # undefined_var 未定义
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷4: 变量在定义前使用
|
|
||||||
def use_before_define():
|
|
||||||
"""在定义前使用变量"""
|
|
||||||
print(before_var) # before_var 在下面才定义
|
|
||||||
before_var = 100
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷5: 硬编码密码(安全问题)
|
|
||||||
def connect_database():
|
|
||||||
"""连接数据库"""
|
|
||||||
password = "admin123" # 硬编码密码
|
|
||||||
username = "root"
|
|
||||||
return f"Connecting with {username}:{password}"
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷6: 使用 eval(安全问题)
|
|
||||||
def unsafe_eval():
|
|
||||||
"""危险使用 eval"""
|
|
||||||
user_input = "os.system('ls')"
|
|
||||||
result = eval(user_input) # 危险!
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷7: 使用 pickle 反序列化(安全问题)
|
|
||||||
def unsafe_pickle():
|
|
||||||
"""不安全的 pickle 反序列化"""
|
|
||||||
data = b"..." # 模拟恶意数据
|
|
||||||
obj = pickle.loads(data) # 危险!
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷8: 行太长(风格问题)
|
# 缺陷8: 行太长(风格问题)
|
||||||
def long_line():
|
def long_line():
|
||||||
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""
|
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""
|
||||||
@@ -75,193 +34,36 @@ def long_line():
|
|||||||
def missing_spaces():
|
def missing_spaces():
|
||||||
"""缺少必要空格"""
|
"""缺少必要空格"""
|
||||||
x=1+2
|
x=1+2
|
||||||
y=3*4
|
y=3*99
|
||||||
if x==1:
|
if x==1:
|
||||||
print(x)
|
print(x)
|
||||||
|
|
||||||
|
|
||||||
# 缺陷10: 多余空格
|
# 缺陷1: 未使用的导入
|
||||||
def extra_spaces():
|
import unused_module # 未使用
|
||||||
"""多余空格"""
|
import collections as col # 使用了 col 但 flake8 可能检测
|
||||||
x = 1
|
|
||||||
y = 2
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷11: 未捕获的异常
|
# 缺陷2: 未使用的变量
|
||||||
def unhandled_exception():
|
def unused_variable_demo():
|
||||||
"""捕获异常后未处理"""
|
"""演示未使用的变量"""
|
||||||
try:
|
result = calculate() # result 未被使用
|
||||||
result = 10 / 0
|
print("Function executed")
|
||||||
except ZeroDivisionError:
|
|
||||||
pass # 捕获但未处理
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷12: 过于宽泛的异常
|
def calculate():
|
||||||
def broad_exception():
|
"""计算并返回结果"""
|
||||||
"""捕获所有异常"""
|
return 42
|
||||||
try:
|
|
||||||
data = json.loads('{"key": "value"}')
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷13: 裸 except 子句
|
# 缺陷3: 未定义的变量
|
||||||
def bare_except():
|
# def undefined_variable_demo():
|
||||||
"""使用裸 except"""
|
# """演示未定义的变量"""
|
||||||
try:
|
# print(undefined_var) # undefined_var 未定义
|
||||||
x = int("abc")
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷14: 重复代码
|
# 缺陷4: 变量在定义前使用
|
||||||
def duplicate_code():
|
def use_before_define():
|
||||||
"""重复代码示例"""
|
"""在定义前使用变量"""
|
||||||
a = 1
|
print(before_var) # before_var 在下面才定义
|
||||||
b = 2
|
before_var = 100
|
||||||
c = a + b
|
|
||||||
print(c)
|
|
||||||
|
|
||||||
a = 3
|
|
||||||
b = 4
|
|
||||||
c = a + b
|
|
||||||
print(c)
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷15: 变量名与内置函数冲突
|
|
||||||
def shadow_builtin():
|
|
||||||
"""变量名覆盖内置函数"""
|
|
||||||
list = [1, 2, 3] # 覆盖内置 list
|
|
||||||
dict = {} # 覆盖内置 dict
|
|
||||||
str = "hello" # 覆盖内置 str
|
|
||||||
return list, dict, str
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷16: 不必要的 pass
|
|
||||||
def unnecessary_pass():
|
|
||||||
"""不必要的 pass"""
|
|
||||||
if True:
|
|
||||||
pass # 可以直接删除
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷17: 使用 + 进行字符串拼接(推荐用 join)
|
|
||||||
def string_concat():
|
|
||||||
"""低效字符串拼接"""
|
|
||||||
result = ""
|
|
||||||
for i in range(100):
|
|
||||||
result = result + str(i)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷18: 在循环中修改集合
|
|
||||||
def modify_during_iteration():
|
|
||||||
"""在迭代时修改列表"""
|
|
||||||
items = [1, 2, 3, 4, 5]
|
|
||||||
for item in items:
|
|
||||||
if item % 2 == 0:
|
|
||||||
items.remove(item) # 在迭代时修改
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷19: 全局变量
|
|
||||||
global_counter = 0 # 全局变量
|
|
||||||
|
|
||||||
|
|
||||||
def increment():
|
|
||||||
global global_counter # 依赖全局变量
|
|
||||||
global_counter += 1
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷20: 魔法数字
|
|
||||||
def calculate_price():
|
|
||||||
"""使用魔法数字"""
|
|
||||||
price = 100
|
|
||||||
tax = price * 1.1 # 1.1 是什么?
|
|
||||||
discount = price * 0.9
|
|
||||||
return tax, discount
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷21: 函数参数过多
|
|
||||||
def bad_function(a, b, c, d, e, f, g, h):
|
|
||||||
"""参数过多的函数"""
|
|
||||||
return a + b + c + d + e + f + g + h
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷22: 空函数体
|
|
||||||
def empty_function():
|
|
||||||
"""空函数应该使用 pass 或文档字符串"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷23: 使用 time.sleep 测试
|
|
||||||
def bad_sleep():
|
|
||||||
"""生产代码中使用 time.sleep"""
|
|
||||||
import time
|
|
||||||
time.sleep(5) # 阻塞
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷24: 注释掉的代码
|
|
||||||
def commented_code():
|
|
||||||
# print("This is commented out")
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷25: TODO/FIXME 注释
|
|
||||||
def todo_comment():
|
|
||||||
# TODO: Implement this
|
|
||||||
# FIXME: This is broken
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷26: 导入顺序错误(应先标准库,再第三方,本地)
|
|
||||||
import sys # 标准库
|
|
||||||
import flask # 第三方
|
|
||||||
from . import local # 本地
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷27: 不必要的列表推导式
|
|
||||||
def unnecessary_list_comp():
|
|
||||||
"""不必要的列表推导式"""
|
|
||||||
result = [x for x in range(10)] # 可简化为 list(range(10))
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷28: 条件表达式中的赋值
|
|
||||||
def assignment_in_condition():
|
|
||||||
"""在条件中赋值(不推荐)"""
|
|
||||||
if (x := get_value()) > 0: # 海象运算符但可能难以阅读
|
|
||||||
print(x)
|
|
||||||
|
|
||||||
|
|
||||||
def get_value():
|
|
||||||
return 5
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷29: 比较布尔值
|
|
||||||
def compare_bool():
|
|
||||||
"""与布尔值比较"""
|
|
||||||
flag = True
|
|
||||||
if flag == True: # 应直接用 if flag:
|
|
||||||
print("yes")
|
|
||||||
|
|
||||||
|
|
||||||
# 缺陷30: 使用 hasattr/getattr 而非异常处理
|
|
||||||
def use_hasattr():
|
|
||||||
"""滥用 hasattr"""
|
|
||||||
class Foo:
|
|
||||||
pass
|
|
||||||
obj = Foo()
|
|
||||||
if hasattr(obj, 'bar'): # 可直接用 try/except
|
|
||||||
print(obj.bar)
|
|
||||||
|
|
||||||
|
|
||||||
# 主函数入口
|
|
||||||
def main():
|
|
||||||
"""主函数"""
|
|
||||||
connect_database()
|
|
||||||
unsafe_eval()
|
|
||||||
unsafe_pickle()
|
|
||||||
print("Demo executed")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|||||||
Reference in New Issue
Block a user