16 Commits

Author SHA1 Message Date
279a01b897 Update test_demo/demo_flaws.py 2026-03-15 13:29:58 +08:00
77fd09e6d2 Update test_demo/demo_flaws.py 2026-03-15 12:27:43 +08:00
dangzerong
91c16cbc88 Merge pull request '测试的扫描文件' (#20) from dev into main 2026-03-13 21:04:31 +08:00
Dang Zerong
c8c0ef1620 测试的扫描文件 2026-03-13 21:00:53 +08:00
dangzerong
95831d5190 Merge pull request 'dev' (#19) from dev into main 2026-03-13 18:09:32 +08:00
Dang Zerong
9a14c0b219 测试的扫描文件 2026-03-13 18:00:27 +08:00
Dang Zerong
87b2dacf65 测试的扫描文件 2026-03-13 18:00:22 +08:00
dangzerong
453414efb2 Merge pull request 'dev' (#17) from dev into main 2026-03-13 17:57:36 +08:00
Dang Zerong
04518812f4 Merge branch 'dev' of https://code.deep-pilot.chat/Bosch_Demo/code_scan into dev 2026-03-13 17:42:54 +08:00
Dang Zerong
6c4ee107f9 测试的扫描文件 2026-03-13 17:42:27 +08:00
dangzerong
d11b349d5e Merge pull request '测试的扫描文件' (#15) from dev into main 2026-03-13 17:41:51 +08:00
dangzerong
2a2ff1ad5f Merge branch 'main' into dev 2026-03-13 17:40:33 +08:00
Dang Zerong
bc5a19fffc 测试的扫描文件 2026-03-13 17:39:20 +08:00
Dang Zerong
78655ce5dc 测试的扫描文件 2026-03-13 17:37:46 +08:00
Dang Zerong
2201f6d696 Merge branch 'dev' 2026-03-13 17:37:10 +08:00
Dang Zerong
97881ee00e 测试的扫描文件 2026-03-13 17:32:23 +08:00
5 changed files with 164 additions and 267 deletions

281
README.md
View File

@@ -1,224 +1,109 @@
# AI Code Quality Scanner - 飞书通知版
# AI 代码质量扫描系统
一个自动化代码质量扫描系统,在代码提交时自动扫描并发送报告到飞书
自动化代码质量扫描工具,监听 PR 事件,自动扫描代码缺陷并提供合并决策支持
## 功能特性
- 🤖 自动监听 Gitea 代码提交事件
- 🔍 多维度代码质量扫描(语法、风格、安全)
- 📊 生成 Markdown 格式扫描报告
- 📱 实时推送飞书机器人通知
## 系统架构
## 工作流程
```
┌───────────── Webhook ┌──────────────────┐
Gitea │ ───────────────► │ Webhook Server
│ 代码仓库 │ (Flask)
└─────────────┘ └────────┬─────────┘
┌──────────┐ 1. 创建 PR ┌────────────┐
│ Gitea │ ───────────────► │ Webhook │
└──────────┘Server
└───────────┘
│ 2. 拉取代码、扫描、存库
┌──────────────────┐
│ Code Scanner
│ - ESLint
│ - Pylint │
- SonarQube │
└────────┬─────────┘
────────────┐
│ SQLite
│ Database
└────────────┘
│ 3. 前端查询
┌──────────────────┐
│ Report Generator
│ - Markdown │
└────────┬─────────┘
┌──────────────────┐
│ Feishu Bot │
│ - Webhook │
└──────────────────┘
────────────┐
前端页面
└────────────┘
```
## 三个核心功能
### 1. PR 创建
- Gitea 仓库创建 PR 时自动触发扫描
- 支持事件:`opened``reopened``synchronize`
### 2. 后端处理
- 拉取 PR 对应的代码
- 执行代码扫描Python/JavaScript/TypeScript
- AI 智能审查代码缺陷
- 扫描结果存入 SQLite 数据库
### 3. 前端功能
- 查询所有 PR 及扫描状态
- 查看每个 PR 的缺陷详情
- 一键「拒绝合并」或「同意合并」
## 快速开始
### 1. 安装依赖
```bash
# 安装依赖
pip install -r requirements.txt
```
### 2. 配置飞书机器人
1. 打开飞书群聊 → 设置 → 群机器人
2. 添加机器人 → 选择"自定义机器人"
3. 获取 Webhook 地址
4. 配置 `config.yaml`
### 3. 配置 Gitea Webhook
#### 方式一Push 时扫描(原有方式)
1. 进入 Gitea 仓库 → 设置 → Webhooks
2. 添加 Webhook
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 触发事件: Push
- 密钥: 配置 `config.yaml` 中的 secret
#### 方式二PR 创建时扫描(推荐)
1. 进入 Gitea 仓库 → 设置 → Webhooks
2. 添加 Webhook
- 目标 URL: `http://你的服务器IP:5000/webhook/gitea`
- 触发事件: Pull Request
- 密钥: 配置 `config.yaml` 中的 secret
**支持的 PR 事件:**
- `opened` - 创建新 PR
- `reopened` - 重新打开 PR
- `synchronize` - PR 中的提交有更新
- `ready_for_review` - PR 标记为准备好审查
### 4. 运行服务
```bash
# 运行服务
python app.py
```
## 配置说明
访问 http://localhost:5000 查看前端页面。
所有配置在 `config.yaml` 中:
## Docker 部署
### 1. 构建镜像
```bash
docker buildx build --load --push -t dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest .
```
### 2. 登录仓库
```bash
docker login dcr-by1jwyxk44.71826370.xyz
```
### 3. Push 到仓库
```bash
docker run -d --name code-scan -p 5000:5000 dcr-by1jwyxk44.71826370.xyz/whlaoding/code-scan:latest
```
### 4. 使用 docker compose 启动
```bash
# 启动服务
docker compose up -d
# 查看日志
docker compose logs -f
# 停止服务
docker compose down
```
## 配置
配置文件 `config.yaml`
```yaml
server:
host: "0.0.0.0"
port: 5000
debug: true
gitea:
base_url: "http://localhost:3000"
# Webhook 签名密钥
webhook_secret: "your_webhook_secret"
base_url: "https://code.deep-pilot.chat"
webhook_secret: "xxx"
api_token: "xxx"
feishu:
# 飞书机器人 Webhook 地址
webhook_url: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
# 消息推送 secret可选用于签名
secret: "your_feishu_secret"
scanner:
# 支持的语言
languages:
- python
- javascript
- typescript
# 扫描阈值
max_issues: 10
# 是否启用详细扫描
detailed: true
report:
# 报告保存路径
output_dir: "./reports"
# 是否保留报告文件
keep_files: true
ai:
provider: "api"
model: "qwen3.5-plus"
api_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_key: "sk-xxx"
```
## 项目结构
```
code-scanner/
├── app.py # 主应用入口
├── config.yaml # 配置文件
├── requirements.txt # Python 依赖
├── README.md # 项目说明
├── scanner/
│ ├── __init__.py
│ ├── base.py # 扫描器基类
│ ├── python_scanner.py # Python 代码扫描
│ ├── js_scanner.py # JavaScript/TypeScript 扫描
│ └── security_scanner.py # 安全扫描
├── report/
│ ├── __init__.py
│ └── generator.py # Markdown 报告生成
├── notify/
│ ├── __init__.py
│ └── feishu.py # 飞书通知
├── webhook/
│ ├── __init__.py
│ └── handler.py # Webhook 处理
└── reports/ # 报告输出目录
```
## 支持的扫描工具
### Python
- **Pylint** - 代码风格和错误检查
- **Flake8** - Python 代码检查
- **Bandit** - 安全漏洞扫描
### JavaScript/TypeScript
- **ESLint** - JavaScript/TypeScript 检查
- **Prettier** - 代码格式化
## 飞书消息效果
扫描完成后,将收到类似以下消息:
### Push 扫描消息
```
📊 代码质量扫描报告
仓库: my-project
分支: main
提交: abc1234
提交者: developer@example.com
✅ 扫描通过 (0 issues)
⚠️ 发现问题 (5 issues)
```
### PR 扫描消息
```
📊 PR 代码质量扫描报告
仓库: my-project
源分支: feature-xxx → 目标分支: main
PR链接: https://gitea.example.com/user/project/pulls/123
提交: abc1234
提交者: developer@example.com
✅ 扫描通过 (0 issues)
⚠️ 发现问题 (5 issues)
```
## Docker 部署
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "app.py"]
```
## 环境变量
也可以通过环境变量配置:
```bash
export FEISHU_WEBHOOK_URL="https://open.feishu.cn/..."
export GITEA_WEBHOOK_SECRET="secret"
export SCANNER_MAX_ISSUES=10
```
## 许可证
MIT License

21
app.py
View File

@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import os
import time
import logging
import traceback
from typing import Dict, Tuple, Any
@@ -131,21 +132,27 @@ def handle_gitea_webhook():
# Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan(
clone_url, commit_id, branch
)
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan(
clone_url, commit_id, branch
)
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan(
clone_url, commit_id, branch
)
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告
report = report_generator.generate(
@@ -228,27 +235,35 @@ def handle_pull_request(payload: Dict[str, Any]) -> Tuple[Dict, int]:
# Python 扫描
if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan(
clone_url, source_sha, source_branch, changed_files
)
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
# JavaScript/TypeScript 扫描
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan(
clone_url, source_sha, source_branch, changed_files
)
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
# 安全扫描
start_time = time.time()
scan_results['security'] = security_scanner.scan(
clone_url, source_sha, source_branch, changed_files
)
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# AI 代码审查
if config.get('ai', {}).get('enabled', False):
start_time = time.time()
scan_results['ai'] = ai_reviewer.scan(
clone_url, source_sha, source_branch, changed_files
)
logger.info(f"[TIMER] AI 扫描耗时: {time.time() - start_time:.2f}")
# 获取 PR 的代码差异,用于将问题与代码片段关联
pr_diff = None
@@ -326,13 +341,19 @@ def manual_scan():
scan_results = {}
if 'python' in config.get('scanner', {}).get('languages', []):
start_time = time.time()
scan_results['python'] = python_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] Python 扫描耗时: {time.time() - start_time:.2f}")
if any(lang in config.get('scanner', {}).get('languages', [])
for lang in ['javascript', 'typescript']):
start_time = time.time()
scan_results['javascript'] = js_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] JavaScript 扫描耗时: {time.time() - start_time:.2f}")
start_time = time.time()
scan_results['security'] = security_scanner.scan(repo_url, commit_id, branch)
logger.info(f"[TIMER] 安全扫描耗时: {time.time() - start_time:.2f}")
# 生成报告
report = report_generator.generate(

View File

@@ -58,4 +58,4 @@ ai:
# 是否启用 AI 审查
enabled: true
# 每次审查的最大代码行数
max_lines: 200
max_lines: 100

View File

@@ -30,7 +30,7 @@ class AIReviewer(BaseScanner):
self.config = config
self.enabled = config.get('enabled', True)
self.provider = config.get('provider', 'ollama')
self.provider = config.get('provider', 'api')
self.model = config.get('model', 'llama3')
self.api_url = config.get('api_url', 'http://localhost:11434')
self.api_key = config.get('api_key', '')
@@ -424,13 +424,7 @@ class AIReviewer(BaseScanner):
def _call_ai(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 AI 服务"""
try:
if self.provider == 'ollama':
return self._call_ollama(prompt)
elif self.provider == 'api':
return self._call_api(prompt)
else:
logger.warning(f'未知的 AI provider: {self.provider}')
return None
except Exception as e:
print("异常追踪信息:", e.__traceback__)
logger.error(f'AI 调用失败: {str(e)}')
@@ -517,32 +511,6 @@ class AIReviewer(BaseScanner):
logger.debug("_extract_json_obj: 未能提取到有效的 JSON 对象")
return None
def _call_ollama(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用 Ollama 本地模型"""
import requests
url = f"{self.api_url}/api/generate"
payload = {
"model": self.model,
"prompt": prompt,
"stream": False,
"format": "json"
}
logger.info(f"调用 Ollama: {url}, model={self.model}")
response = requests.post(url, json=payload, timeout=120)
if response.status_code == 200:
result = response.json()
content = result.get('response', '')
logger.info(f"Ollama 返回内容长度: {len(content) if content else 0}")
logger.debug(f"Ollama 返回内容预览: {content[:200] if content else 'empty'}")
parsed = self._extract_json_obj(content)
return parsed
logger.warning(f'Ollama 返回错误: {response.status_code}')
return None
def _call_api(self, prompt: str) -> Optional[Dict[str, Any]]:
"""调用在线 API"""
import requests
@@ -560,7 +528,7 @@ class AIReviewer(BaseScanner):
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024*5,
"max_tokens": 1024,
"temperature": 0.7
}
elif 'deepseek' in self.api_url:
@@ -568,25 +536,43 @@ class AIReviewer(BaseScanner):
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024*5,
"max_tokens": 1024,
"temperature": 0.7
}
elif 'dashscope' in self.api_url:
# 阿里云 dashscope 专用端点
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024,
"temperature": 0.7,
"stream": False # 显式关闭流式
}
else:
url = f"{self.api_url}/chat/completions"
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1024*5,
"max_tokens": 1024,
"temperature": 0.7
}
logger.info(f"调用 API: {url}, model={self.model}")
try:
response = requests.post(url, json=payload, headers=headers, timeout=120)
if response.status_code == 200:
result = response.json()
content = result['choices'][0]['message']['content']
logger.info(f"API 返回内容长度: {len(content) if content else 0}")
parsed = self._extract_json_obj(content)
return parsed
logger.warning(f'API 返回错误: {response.status_code}')
logger.warning(f'API 返回错误: {response.status_code}, {response.text[:200]}')
return None
except Exception as e:
logger.warning(f'API 调用失败: {e}')
return None

View File

@@ -25,6 +25,32 @@ def unused_variable_demo():
print("Function executed")
# 缺陷8: 行太长(风格问题)
def long_line():
"""这是一行非常非常非常非常非常非常非常非常非常非常非常非常长的代码超过了 120 个字符的限制"""
# 缺陷9: 缺少空格
def missing_spaces():
"""缺少必要空格"""
x=1+2
y=3*99
if x==1:
print(x)
# 缺陷1: 未使用的导入
import unused_module # 未使用
import collections as col # 使用了 col 但 flake8 可能检测
# 缺陷2: 未使用的变量
# def unused_variable_demo():
# """演示未使用的变量"""
# result = calculate() # result 未被使用
# print("Function executed")
def calculate():
"""计算并返回结果"""
return 42
@@ -41,24 +67,3 @@ def use_before_define():
"""在定义前使用变量"""
print(before_var) # before_var 在下面才定义
before_var = 100
# 缺陷5: 硬编码密码(安全问题)
def connect_database():
"""连接数据库"""
password = "admin123" # 硬编码密码
username = "root"
return f"Connecting with {username}:{password}"
# 缺陷6: 使用 eval安全问题
def unsafe_eval():
"""危险使用 eval"""
user_input = "os.system('ls')"
result = eval(user_input) # 危险!
return result
if __name__ == "__main__":
main()