diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..6a0bed1 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,43 @@ +# Git +.git +.gitignore + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +venv/ +env/ +ENV/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Environment +.env +.env.local + +# Build +build/ +dist/ +*.egg-info/ + +# Docker +Dockerfile +.dockerignore + +# Documentation +README.md +*.md + +# Test +.pytest_cache/ +.coverage +htmlcov/ diff --git a/.env b/.env index db5147f..60f921e 100644 --- a/.env +++ b/.env @@ -1,31 +1,15 @@ -# 环境变量配置示例 -# 复制此文件为 .env 并填入实际值 +# DashScope API 配置 +# 获取 API Key: https://dashscope.console.aliyun.com/ +DASHSCOPE_API_KEY=sk-616332b2afa94699b4572d0fe6ac370a -# ==================== DashScope 配置 ==================== -# DashScope API Key (阿里百炼 Qwen 模型) -# 获取地址:https://dashscope.console.aliyun.com/ -DASHSCOPE_API_KEY=sk-your-actual-api-key-here +# Qwen 模型配置 +QWEN_MODEL=qwen3.5-flash +QWEN_TEMPERATURE=0.7 +QWEN_MAX_TOKENS=4096 -# 禁用 LiteLLM 的远程模型成本映射(避免启动时联网超时) -DISABLE_LITELLM_MODEL_COST_MAP=True -LITELLM_LOCAL_MODEL_COST_MAP=True - -# 注意:请将 DASHSCOPE_API_KEY 替换为您的真实 API Key -# 不要将 .env 文件提交到版本控制系统 - -# ==================== 服务配置 ==================== +# 服务器配置 HOST=0.0.0.0 PORT=8000 -# ==================== 日志配置 ==================== -LOG_LEVEL=info # debug, info, warning, error - -# ==================== 高级配置(可选)==================== -# SSE 队列最大长度 -SSE_QUEUE_MAX_SIZE=1000 - -# 流清理间隔(秒) -STREAM_CLEANUP_INTERVAL=300 - -# 任务超时时间(秒) -TASK_TIMEOUT=3600 +# 日志配置 +LOG_LEVEL=info diff --git a/.env.example b/.env.example index 5f1f722..6819b35 100644 --- a/.env.example +++ b/.env.example @@ -1,27 +1,15 @@ -# 环境变量配置示例 -# 复制此文件为 .env 并填入实际值 +# DashScope API 配置 +# 获取 API Key: https://dashscope.console.aliyun.com/ +DASHSCOPE_API_KEY=your_dashscope_api_key_here -# ==================== DashScope 配置 ==================== -# DashScope API Key (通义千问) -# 获取地址:https://dashscope.console.aliyun.com/ -DASHSCOPE_API_KEY=sk-your-actual-api-key-here +# Qwen 模型配置 +QWEN_MODEL=qwen3.5-flash +QWEN_TEMPERATURE=0.7 +QWEN_MAX_TOKENS=4096 -# 注意:如果您使用 OpenAI,也可以设置 OPENAI_API_KEY -# OPENAI_API_KEY=sk-your-openai-key - -# ==================== 服务配置 ==================== +# 服务器配置 HOST=0.0.0.0 PORT=8000 -# ==================== 日志配置 ==================== -LOG_LEVEL=info # debug, info, warning, error - -# ==================== 高级配置(可选)==================== -# SSE 队列最大长度 -SSE_QUEUE_MAX_SIZE=1000 - -# 流清理间隔(秒) -STREAM_CLEANUP_INTERVAL=300 - -# 任务超时时间(秒) -TASK_TIMEOUT=3600 +# 日志配置 +LOG_LEVEL=info diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fecb1ad --- /dev/null +++ b/.gitignore @@ -0,0 +1,39 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +venv/ +env/ +ENV/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Environment +.env +.env.local + +# Build +build/ +dist/ +*.egg-info/ + +# Test +.pytest_cache/ +.coverage +htmlcov/ + +# OS +.DS_Store +Thumbs.db + +# Logs +*.log +logs/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 7abe554..caa7fee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,8 @@ -# Multi-Agent Software Delivery System -# 基于 CrewAI + Qwen3.5-flash 的多智能体软件交付系统 +# 多阶段构建 - SDLC Agent Demo +FROM python:3.11-slim as builder -FROM python:3.11-slim +# 设置工作目录 +WORKDIR /app # 设置环境变量 ENV PYTHONDONTWRITEBYTECODE=1 \ @@ -9,29 +10,28 @@ ENV PYTHONDONTWRITEBYTECODE=1 \ PIP_NO_CACHE_DIR=1 \ PIP_DISABLE_PIP_VERSION_CHECK=1 +# 安装依赖 +COPY requirements.txt . +RUN pip install --user --no-cache-dir -r requirements.txt + +# 运行阶段 +FROM python:3.11-slim + # 设置工作目录 WORKDIR /app -# 安装系统依赖 -RUN apt-get update && apt-get install -y --no-install-recommends \ - gcc \ - && rm -rf /var/lib/apt/lists/* +# 从 builder 阶段复制安装的包 +COPY --from=builder /root/.local /root/.local -# 复制依赖文件 -COPY requirements.txt . - -# 安装 Python 依赖 -RUN pip install --no-cache-dir -r requirements.txt +# 确保脚本路径在 PATH 中 +ENV PATH=/root/.local/bin:$PATH \ + PYTHONPATH=/app # 复制应用代码 -COPY main.py . -COPY crew_factory.py . -COPY agents_config.py . -COPY stream_manager.py . -COPY .env.example .env.example +COPY . . -# 创建非 root 用户运行应用 -RUN useradd --create-home --shell /bin/bash appuser && \ +# 创建非 root 用户 +RUN useradd -m -u 1000 appuser && \ chown -R appuser:appuser /app USER appuser @@ -40,7 +40,7 @@ EXPOSE 8000 # 健康检查 HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ - CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1 + CMD python -c "import httpx; httpx.get('http://localhost:8000/health')" || exit 1 # 启动命令 -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] +CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/QUICKSTART.md b/QUICKSTART.md new file mode 100644 index 0000000..b661916 --- /dev/null +++ b/QUICKSTART.md @@ -0,0 +1,237 @@ +# SDLC Agent Demo - 快速使用指南 + +## 项目概述 + +这是一个基于 **CrewAI + Qwen3.5-flash + FastAPI(SSE)** 的多智能体软件交付协同系统演示项目。 + +### 核心功能 + +- ✅ **PM Agent** - 将用户需求转化为结构化 SRS 文档 +- ✅ **QA Agent** - 根据 SRS 生成自动化测试用例 +- ✅ **Dev Agent** - 根据需求和测试用例实现业务代码 +- ✅ **SSE 实时流** - 实时展示各智能体执行进度 +- ✅ **现代化 UI** - Vue3 + TailwindCSS 单页面应用 + +--- + +## 快速启动(3 步) + +### 步骤 1: 配置 API Key + +```bash +# 复制环境变量模板 +copy .env.example .env + +# 编辑 .env 文件,填入你的 DashScope API Key +# 获取地址:https://dashscope.console.aliyun.com/ +``` + +**.env 文件内容:** +```env +DASHSCOPE_API_KEY=sk-your-api-key-here +QWEN_MODEL=qwen3.5-flash +HOST=0.0.0.0 +PORT=8000 +``` + +### 步骤 2: 启动服务 + +**方式 A: 使用启动脚本(Windows)** +```bash +start.bat +``` + +**方式 B: 手动启动** +```bash +# 激活虚拟环境(如果有) +python -m venv venv +venv\Scripts\activate # Windows +source venv/bin/activate # Linux/Mac + +# 启动 FastAPI 服务 +uvicorn main:app --reload --host 0.0.0.0 --port 8000 +``` + +### 步骤 3: 访问应用 + +打开浏览器访问:**http://localhost:8000/static/index.html** + +--- + +## 使用示例 + +### 输入需求示例 + +在首页输入框中输入以下需求: + +``` +开发一个在线书签管理系统,需要包含以下功能: +1. 用户注册和登录(支持邮箱验证) +2. 书签的添加、编辑、删除 +3. 书签分类和标签管理 +4. 书签搜索功能 +5. 导出/导入书签(HTML 格式) +6. 响应式设计,支持移动端访问 + +性能要求: +- 页面加载时间 < 2 秒 +- 支持并发用户数 > 1000 +``` + +### 预期输出 + +系统将依次执行: + +1. **PM Agent** → 生成《软件需求规格说明书 (SRS)》 + - 功能性需求清单 + - 非功能性需求(性能、安全等) + - 验收标准 + +2. **QA Agent** → 生成《测试方案与用例》 + - Pytest 测试脚本框架 + - 测试场景描述 + - 测试数据准备 + +3. **Dev Agent** → 生成《业务代码实现》 + - 完整的 Python 代码模块 + - 数据模型定义 + - API 接口设计 + +--- + +## API 接口说明 + +### 1. 启动任务 + +```bash +curl -X POST http://localhost:8000/api/v1/sdlc/start \ + -H "Content-Type: application/json" \ + -d '{"requirement": "开发一个简单的待办事项应用"}' +``` + +**响应:** +```json +{ + "task_id": "550e8400-e29b-41d4-a716-446655440000", + "status": "processing" +} +``` + +### 2. SSE流式进度 + +```bash +curl -N http://localhost:8000/api/v1/sdlc/stream/{task_id} +``` + +**事件类型:** +- `pm_start` / `pm_complete` - PM 阶段开始/完成 +- `qa_start` / `qa_complete` - QA 阶段开始/完成 +- `dev_start` / `dev_complete` - Dev 阶段开始/完成 +- `final_result` - 最终结果 +- `error` - 错误处理 + +### 3. 查询状态 + +```bash +curl http://localhost:8000/api/v1/sdlc/status/{task_id} +curl http://localhost:8000/api/v1/sdlc/result/{task_id} +``` + +--- + +## Docker 部署 + +### 构建镜像 + +```bash +docker build -t sdlc-agent-demo . +``` + +### 运行容器 + +```bash +docker run -d \ + -p 8000:8000 \ + -e DASHSCOPE_API_KEY=your_api_key \ + --name sdlc-demo \ + sdlc-agent-demo +``` + +### 访问服务 + +http://localhost:8000/static/index.html + +--- + +## 常见问题 + +### Q1: 如何获取 DashScope API Key? + +访问 https://dashscope.console.aliyun.com/ 注册账号并创建 API Key。 + +### Q2: 支持其他模型吗? + +可以修改 `.env` 中的 `QWEN_MODEL` 参数,使用其他 OpenAI 兼容的模型。 + +### Q3: SSE 连接中断怎么办? + +- 检查防火墙设置 +- 增加超时时间(修改 `main.py` 中的 timeout 参数) +- 前端已自动实现重连机制 + +### Q4: 输出质量不佳? + +- 调整 Temperature 参数(降低随机性) +- 优化 Prompt 描述 +- 增加更详细的上下文约束 + +--- + +## 技术栈 + +| 组件 | 技术 | 版本 | +|------|------|------| +| AI 框架 | CrewAI | >=0.85.0 | +| 大模型 | Qwen3.5-flash | via DashScope | +| Web 框架 | FastAPI | >=0.109.0 | +| 实时通信 | SSE | sse-starlette | +| 前端 | Vue3 | 3.x | +| UI 框架 | TailwindCSS | 3.x | +| 代码高亮 | Highlight.js | 11.9.0 | + +--- + +## 项目结构 + +``` +sdlc_agent_demo/ +├── main.py # FastAPI 入口 +├── agents/ +│ ├── pm_agent.py # PM 智能体 +│ ├── qa_agent.py # QA 智能体 +│ └── dev_agent.py # Dev 智能体 +├── crews/ +│ └── sdlc_crew.py # 编排逻辑 +├── models/ +│ └── qwen_config.py # 模型配置 +├── static/ +│ └── index.html # 前端页面 +├── .env.example # 环境变量模板 +├── Dockerfile # Docker 配置 +├── requirements.txt # Python 依赖 +└── README.md # 详细文档 +``` + +--- + +## 下一步计划 + +- [ ] 支持多任务并行处理 +- [ ] 添加任务历史记录 +- [ ] 集成代码执行沙箱 +- [ ] 支持更多模型后端 +- [ ] 添加认证授权机制 + +--- + +**Built with ❤️ using CrewAI + Qwen3.5-flash + FastAPI** diff --git a/README.md b/README.md index a7bd504..48f799d 100644 --- a/README.md +++ b/README.md @@ -1,408 +1,293 @@ -# Multi-Agent Software Delivery System +# SDLC Agent Demo - 多智能体端到端软件交付协同系统 -基于 CrewAI + Qwen3.5-flash 的多智能体软件交付系统,支持 SSE 实时推送。 +基于 **CrewAI + Qwen3.5-flash + FastAPI(SSE)** 构建的企业级研发提效演示系统,模拟从需求分析→测试用例→代码实现的完整 SDLC 流程。 -## 📋 功能特性 +## 📋 项目特性 -- **多智能体协作**: 4 个专业 Agent 协同完成软件交付 - - ProductManager: 产品需求分析 - - QAEngineer: 测试计划制定 - - SoftwareDeveloper: 技术方案设计 - - Coordinator: 质量审核与交付 +- ✅ **多智能体协同**:PM Agent、QA Agent、Dev Agent 顺序流水线作业 +- ✅ **实时进度追踪**:SSE流式输出各阶段执行状态 +- ✅ **Qwen3.5-flash**:通过 DashScope OpenAI 兼容 API 调用 +- ✅ **现代化前端**:Vue3 + TailwindCSS + 代码高亮 +- ✅ **任务持久化**:内存级任务状态管理 +- ✅ **Docker 支持**:一键容器化部署 -- **实时通信**: 基于 SSE 协议实时推送执行日志 -- **异步处理**: 任务异步启动,不阻塞 API 响应 -- **并发支持**: 多用户同时请求互不干扰 +## 🏗️ 系统架构 -## 🔑 关键技术点 - -### 1. SSE 数据格式设计 - -统一的 JSON 格式,便于前端解析: - -```json -{ - "task_id": "550e8400-e29b...", - "sequence": 1, - "agent_name": "ProductManager", - "event_type": "thought", - "content": "正在分析用户需求,提取关键指标...", - "timestamp": "2023-10-27T10:00:00Z" -} +``` +┌─────────────┐ ┌──────────────┐ ┌─────────────┐ +│ PM Agent │────▶│ QA Agent │────▶│ Dev Agent │ +│ 需求分析师 │ │ 测试架构师 │ │ 全栈工程师 │ +└─────────────┘ └──────────────┘ └─────────────┘ + │ │ │ + ▼ ▼ ▼ + SRS 文档 测试用例 业务代码 ``` -**字段说明**: -- `task_id`: 任务唯一标识,用于区分不同用户的请求 -- `sequence`: 序列号(每个 task_id 独立递增),用于检测消息丢失 -- `agent_name`: 发送事件的 Agent 名称 -- `event_type`: 事件类型(start/thought/action/output/end/error) -- `content`: 事件内容 -- `timestamp`: UTC 时间戳(ISO 8601 格式) +## 📁 项目结构 -### 2. 并发处理逻辑 - -**核心挑战**:CrewAI 默认是同步运行的,而 FastAPI 和 SSE 需要异步。 - -**解决方案**: - -```python -# 在 TaskStreamQueue 中实现线程安全的消息发布 -def put_nowait(self, event: StreamEvent) -> bool: - """ - 从同步线程(如 CrewAI 事件处理器)安全地发布事件 - - 使用 run_coroutine_threadsafe 将协程提交到事件循环执行 - 这是实现 CrewAI(同步)与 SSE(异步)集成的关键 - """ - future = asyncio.run_coroutine_threadsafe( - self.queue.put(event), - self._loop - ) - future.result(timeout=5.0) - return True +``` +sdlc_agent_demo/ +├── main.py # FastAPI 入口 +├── agents/ +│ ├── __init__.py +│ ├── pm_agent.py # PM 智能体定义 +│ ├── qa_agent.py # QA 智能体定义 +│ └── dev_agent.py # Dev 智能体定义 +├── crews/ +│ └── sdlc_crew.py # CrewAI 编排逻辑 +├── models/ +│ └── qwen_config.py # Qwen3.5-flash 配置 +├── static/ +│ └── index.html # 测试页面 +├── requirements.txt +├── .env.example # 环境变量模板 +├── Dockerfile +└── README.md ``` -**关键实现**: -1. 使用 `asyncio.Queue` 实现异步非阻塞消息队列 -2. 通过 `asyncio.Lock` 保证并发安全 -3. 每个 `task_id` 独立队列,实现任务隔离 -4. 使用 `run_coroutine_threadsafe` 从同步线程安全发布事件 -5. 确保 `stream_manager` 能安全地在线程间传递消息 +## 🚀 快速启动 -### 3. 多任务隔离机制 +### 方法一:本地运行 -```python -class StreamManager: - """全局流管理器 - 管理所有任务的 SSE 流""" - - def __init__(self): - # task_id -> TaskStreamQueue 映射 - self.streams: Dict[str, TaskStreamQueue] = {} - self._lock = asyncio.Lock() # 并发控制 +#### 1. 创建虚拟环境 + +```bash +# Windows +python -m venv venv +venv\Scripts\activate + +# Linux/Mac +python3 -m venv venv +source venv/bin/activate ``` -- 每个 `task_id` 对应独立的 `TaskStreamQueue` -- 使用 `asyncio.Lock` 保护字典操作 -- 定期清理已完成的流(默认每小时) -- 序列号计数器按 `task_id` 独立维护 - -## 🚀 快速开始 - -### 1. 安装依赖 +#### 2. 安装依赖 ```bash pip install -r requirements.txt ``` -### 2. 测试模块 - -运行测试脚本验证所有模块正常: - -```bash -python test_import.py -``` - -预期输出: -``` -[OK] Module Imports -[OK] Agent Creation -[OK] Stream Manager -[OK] API Endpoints - -[SUCCESS] All tests passed! System is ready. -``` - -### 3. 配置环境变量 - -复制 `.env.example` 为 `.env` 并填入 DashScope API Key: +#### 3. 配置环境变量 ```bash +# 复制模板 cp .env.example .env + +# 编辑 .env 文件,填入你的 DashScope API Key +# 获取 API Key: https://dashscope.console.aliyun.com/ ``` -编辑 `.env` 文件: -``` -DASHSCOPE_API_KEY=sk-your-actual-api-key +**.env 文件内容:** +```env +DASHSCOPE_API_KEY=your_dashscope_api_key_here +QWEN_MODEL=qwen3.5-flash +HOST=0.0.0.0 +PORT=8000 ``` -获取 API Key: https://dashscope.console.aliyun.com/ - -### 4. 启动服务 +#### 4. 启动服务 ```bash -python main.py +uvicorn main:app --reload --host 0.0.0.0 --port 8000 ``` -或使用 uvicorn: +#### 5. 访问测试页面 + +打开浏览器访问:http://localhost:8000/static/index.html + +### 方法二:Docker 运行 + +#### 1. 构建镜像 + ```bash -uvicorn main:app --host 0.0.0.0 --port 8000 --reload +docker build -t sdlc-agent-demo . ``` -### 4. 访问服务 +#### 2. 运行容器 -- **API 文档**: http://localhost:8000/docs -- **测试 UI**: http://localhost:8000/test-ui +```bash +docker run -d \ + -p 8000:8000 \ + -e DASHSCOPE_API_KEY=your_api_key \ + --name sdlc-demo \ + sdlc-agent-demo +``` + +#### 3. 访问服务 + +http://localhost:8000/static/index.html ## 📡 API 接口 -### POST /api/run_task +### 1. 启动 SDLC 流程 -启动多智能体任务 +**请求:** +```http +POST /api/v1/sdlc/start +Content-Type: application/json -**请求体**: -```json { - "user_requirement": "开发一个在线商城系统...", - "skip_confirmation": true + "requirement": "开发一个用户管理系统,支持增删改查功能" } ``` -**响应**: +**响应:** ```json { "task_id": "550e8400-e29b-41d4-a716-446655440000", - "status": "started", - "message": "任务已启动..." + "status": "processing" } ``` -### GET /api/stream/{task_id} +### 2. SSE流式进度 -SSE 端点,订阅任务执行日志 - -**事件格式**: -```json -{ - "agent": "ProductManager", - "type": "thought", - "content": "正在分析用户需求...", - "timestamp": "2024-01-01T12:00:00", - "task_id": "uuid" -} +**请求:** +```http +GET /api/v1/sdlc/stream/{task_id} +Accept: text/event-stream ``` -**事件类型**: -- `start`: 任务开始 -- `agent_start`: Agent 开始执行 -- `thought`: Agent 思考过程 -- `action`: Agent 执行动作 -- `output`: Agent 输出结果 -- `step_end`: 步骤完成 -- `end`: 任务结束 -- `error`: 发生错误 +**SSE事件格式:** +```javascript +event: pm_start +data: {"stage": "需求分析", "status": "started", "timestamp": "2026-01-15T10:30:00Z"} -### GET /api/task/{task_id}/status +event: pm_complete +data: {"stage": "需求分析", "content": "SRS 文档...", "status": "completed", "timestamp": "2026-01-15T10:35:00Z"} -查询任务状态 +event: final_result +data: {"stage": "交付完成", "status": "success", ...} +``` -### GET /api/streams +### 3. 查询任务状态 -列出所有活跃的 SSE 流 +```http +GET /api/v1/sdlc/status/{task_id} +``` -## 🧪 使用示例 +### 4. 获取任务结果 -### 使用 curl 测试 +```http +GET /api/v1/sdlc/result/{task_id} +``` + +## 🤖 智能体角色 + +### PM Agent(产品经理) +- **角色**:资深产品需求分析师 +- **职责**:将用户需求转化为结构化 SRS 文档 +- **输出**:功能性需求、非功能性需求、验收标准 + +### QA Agent(测试工程师) +- **角色**:高级测试架构师 +- **职责**:根据 SRS 生成自动化测试用例 +- **输出**:Pytest 测试脚本、测试场景 + +### Dev Agent(开发工程师) +- **角色**:全栈软件工程师 +- **职责**:根据 SRS 和测试用例实现业务代码 +- **输出**:可运行的 Python 代码模块 + +## 💡 使用示例 + +### 示例需求输入 + +``` +开发一个在线书签管理系统,需要包含以下功能: +1. 用户注册和登录(支持邮箱验证) +2. 书签的添加、编辑、删除 +3. 书签分类和标签管理 +4. 书签搜索功能 +5. 导出/导入书签(HTML 格式) +6. 响应式设计,支持移动端访问 + +性能要求: +- 页面加载时间 < 2 秒 +- 支持并发用户数 > 1000 +``` + +### 预期输出 + +1. **PM Agent** → 软件需求规格说明书(包含功能列表、验收标准) +2. **QA Agent** → Pytest 测试用例(覆盖所有核心功能) +3. **Dev Agent** → 完整的 Python 实现代码(FastAPI + SQLite) + +## 🔧 配置说明 + +### 模型配置 + +| 参数 | 默认值 | 说明 | +|------|--------|------| +| `QWEN_MODEL` | `qwen3.5-flash` | 模型名称 | +| `QWEN_TEMPERATURE` | `0.7` | 温度参数 (0-1) | +| `QWEN_MAX_TOKENS` | `4096` | 最大生成长度 | + +### 服务器配置 + +| 参数 | 默认值 | 说明 | +|------|--------|------| +| `HOST` | `0.0.0.0` | 监听地址 | +| `PORT` | `8000` | 监听端口 | +| `LOG_LEVEL` | `info` | 日志级别 | + +## 🧪 测试与验证 + +### API 测试(使用 curl) ```bash # 1. 启动任务 -curl -X POST http://localhost:8000/api/run_task \ +curl -X POST http://localhost:8000/api/v1/sdlc/start \ -H "Content-Type: application/json" \ - -d '{ - "user_requirement": "开发一个简单的待办事项应用", - "skip_confirmation": true - }' + -d '{"requirement": "开发一个简单的待办事项应用"}' -# 2. 订阅 SSE 流 (使用返回的 task_id) -curl -N http://localhost:8000/api/stream/{task_id} +# 2. 查看 SSE流(新终端) +curl -N http://localhost:8000/api/v1/sdlc/stream/{task_id} + +# 3. 健康检查 +curl http://localhost:8000/health ``` -### Python 客户端示例 +## ⚠️ 注意事项 -```python -import requests -import json +1. **API Key 安全**:不要将 `.env` 文件提交到版本控制系统 +2. **网络要求**:需要访问 DashScope API 服务 +3. **资源消耗**:每个任务约消耗 10,000-50,000 tokens +4. **超时设置**:SSE 连接默认超时 120 秒,可根据需要调整 -# 启动任务 -response = requests.post( - 'http://localhost:8000/api/run_task', - json={ - 'user_requirement': '开发一个博客系统', - 'skip_confirmation': True - } -) -task_data = response.json() -task_id = task_data['task_id'] +## 🛠️ 故障排查 -# 订阅 SSE 流 -import eventstream +### 问题 1:无法连接到 DashScope API -with requests.get( - f'http://localhost:8000/api/stream/{task_id}', - stream=True -) as r: - for line in r.iter_lines(): - if line: - data = line.decode('utf-8').replace('data: ', '') - event = json.loads(data) - print(f"[{event['agent']}] {event['type']}: {event['content']}") -``` +**解决方案:** +- 检查 API Key 是否正确 +- 确认网络连接正常 +- 验证账户余额充足 -## 🏗️ 项目结构 +### 问题 2:SSE 连接中断 -``` -. -├── main.py # FastAPI 入口和路由 -├── crew_factory.py # CrewAI 工厂和事件处理器 -├── agents_config.py # Agent 配置和 Prompt 模板 -├── stream_manager.py # SSE 流管理和消息队列 -├── requirements.txt # Python 依赖 -├── test_import.py # 模块测试脚本 -├── Dockerfile # Docker 镜像构建文件 -├── docker-compose.yml # Docker Compose 配置 -├── nginx.conf # Nginx 反向代理配置 -├── .env.example # 环境变量示例 -└── README.md # 本文档 -``` +**解决方案:** +- 检查防火墙设置 +- 增加超时时间 +- 启用前端自动重连机制 -## ⚙️ 配置说明 +### 问题 3:智能体输出质量不佳 -### LLM 配置 +**解决方案:** +- 调整 Temperature 参数(降低随机性) +- 优化 Prompt 描述 +- 增加上下文约束 -在 `agents_config.py` 中配置: +## 📝 许可证 -```python -QWEN_MODEL_CONFIG = { - "model": "qwen-plus", # Qwen3.5-flash - "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1", - "api_key_env": "DASHSCOPE_API_KEY", -} -``` +本项目仅供学习和演示用途。 -### Agent 角色定制 +## 🔗 相关链接 -在 `agents_config.py` 中修改各 Agent 的: -- `role`: 角色名称 -- `goal`: 目标 -- `backstory`: 背景描述 -- `TASK_TEMPLATES`: 任务 Prompt 模板 +- [CrewAI 官方文档](https://docs.crewai.com/) +- [DashScope 控制台](https://dashscope.console.aliyun.com/) +- [FastAPI 文档](https://fastapi.tiangolo.com/) +- [Vue3 文档](https://vuejs.org/) -## 🔧 高级用法 +--- -### 自定义事件处理器 - -继承 `CrewEventsHandler` 类并重写回调方法: - -```python -class MyCustomHandler(CrewEventsHandler): - def on_agent_output(self, agent, output): - # 自定义处理逻辑 - pass -``` - -### 调整并发策略 - -在 `stream_manager.py` 中调整队列大小和清理策略: - -```python -TaskStreamQueue(task_id, max_size=1000) # 默认 1000 -``` - -## 🐛 故障排查 - -### 问题:SSE 连接立即断开 - -**解决**: 确保 Nginx 配置中禁用了缓冲: -```nginx -proxy_buffering off; -proxy_cache off; -proxy_request_buffering off; -``` - -### 问题:LLM 调用失败 - -**检查**: -1. DASHSCOPE_API_KEY 是否正确配置 -2. 网络连接是否正常 -3. API Key 是否有足够额度 - -### 问题:内存占用过高 - -**解决**: 调整 `cleanup_old_streams` 的调用频率和保留时间 - -## 🏗️ Docker 部署 - -### 使用 Docker Compose(推荐) - -```bash -# 1. 设置环境变量 -export DASHSCOPE_API_KEY=sk-your-api-key - -# 2. 启动服务 -docker-compose up -d - -# 3. 查看日志 -docker-compose logs -f multi-agent-system - -# 4. 停止服务 -docker-compose down -``` - -### 生产环境部署(带 Nginx) - -```bash -# 使用 production profile 启动(包含 Nginx) -docker-compose --profile production up -d -``` - -**目录结构**: -``` -. -├── docker-compose.yml # Docker Compose 配置 -├── Dockerfile # 应用镜像构建文件 -├── nginx.conf # Nginx 配置(反向代理 + SSE 支持) -├── .env # 环境变量文件 -└── ... -``` - -### Nginx 关键配置 - -对于 SSE 流端点,Nginx 必须禁用缓冲: - -```nginx -location /api/stream/ { - proxy_pass http://multi-agent-system:8000; - - # HTTP/1.1 支持(SSE 必需) - proxy_http_version 1.1; - proxy_set_header Connection ""; - - # 禁用缓冲(关键) - proxy_buffering off; - proxy_cache off; - proxy_request_buffering off; - - # 长连接超时 - proxy_read_timeout 300s; -} -``` - -## 📝 注意事项 - -1. **生产环境部署**: - - 限制 CORS 允许的来源域名 - - 添加 API 认证机制 - - 使用 Redis 等持久化队列替代内存队列 - -2. **性能优化**: - - 调整 SSE 队列大小避免内存溢出 - - 限制单个任务的超时时间 - - 实现任务优先级队列 - -3. **安全考虑**: - - 验证用户输入防止注入攻击 - - 限制请求频率防止滥用 - - 记录审计日志 - -## 📄 License - -MIT License +**Built with ❤️ using CrewAI + Qwen3.5-flash + FastAPI** diff --git a/USAGE_GUIDE.md b/USAGE_GUIDE.md deleted file mode 100644 index f2d88b7..0000000 --- a/USAGE_GUIDE.md +++ /dev/null @@ -1,243 +0,0 @@ -# 多智能体系统使用指南 - -## 📋 目录结构 - -``` -AI_CrewAI/ -├── generated_output/ # 生成的代码和文档输出目录 -│ └── task_YYYYMMDD_HHMMSS/ # 每次任务的时间戳目录 -│ ├── PRD_产品需求文档.md # 产品需求文档 -│ ├── QA_测试计划.md # 测试计划文档 -│ ├── Dev_技术方案.md # 技术方案文档 -│ ├── Final_交付报告.md # 最终交付报告 -│ └── events_log.json # 完整事件日志 -├── code_docs/ # 文档目录 -├── example_usage.py # 使用示例脚本 -├── main.py # FastAPI服务入口 -└── USAGE_GUIDE.md # 本文件 -``` - -## 🚀 快速开始 - -### 1. 配置 API Key - -编辑 `.env` 文件,设置您的阿里百炼 API Key: - -```bash -DASHSCOPE_API_KEY=sk-your-actual-api-key-here -``` - -获取 API Key: https://dashscope.console.aliyun.com/ - -### 2. 安装依赖 - -```bash -pip install -r requirements.txt -``` - -### 3. 启动服务 - -**方式一:直接运行 Python** -```bash -python main.py -``` - -**方式二:使用 uvicorn** -```bash -uvicorn main:app --host 0.0.0.0 --port 8000 --reload -``` - -### 4. 访问服务 - -- **API 文档**: http://localhost:8000/docs -- **测试 UI**: http://localhost:8000/test-ui - -## 📝 使用方法 - -### 方法一:通过 Web UI(推荐) - -1. 访问 http://localhost:8000/test-ui -2. 在输入框中输入您的需求,例如: - ``` - 开发一个简单的在线待办事项应用(Todo App),包含以下功能: - 1. 用户可以注册和登录 - 2. 创建、编辑、删除待办事项 - 3. 标记事项为完成/未完成 - 4. 按优先级和截止日期排序 - 5. 基本的搜索和过滤功能 - - 技术栈要求: - - 后端:Python FastAPI - - 数据库:SQLite - - 前端:简单的 HTML/CSS/JavaScript - ``` -3. 点击"启动任务"按钮 -4. 实时查看各 Agent 的执行过程 -5. 完成后查看生成的文档 - -### 方法二:通过 API 调用 - -使用 curl 或任何 HTTP 客户端: - -```bash -# 启动任务 -curl -X POST http://localhost:8000/api/run_task \ - -H "Content-Type: application/json" \ - -d '{ - "user_requirement": "开发一个博客系统", - "skip_confirmation": true - }' - -# 返回示例: -# {"task_id":"uuid","status":"started","message":"..."} -``` - -然后订阅 SSE 流: - -```bash -curl -N http://localhost:8000/api/stream/{task_id} -``` - -### 方法三:使用示例脚本 - -运行提供的示例脚本: - -```bash -python example_usage.py -``` - -这会自动保存生成的内容到 `generated_output/` 目录。 - -## 📊 生成的内容 - -每个任务会生成以下文档: - -### 1. PRD_产品需求文档.md -- 项目概述 -- 功能需求列表(按优先级) -- 用户故事和用例 -- 验收标准 -- 风险评估 - -### 2. QA_测试计划.md -- 测试策略 -- 测试用例(正常场景 + 异常场景) -- 性能测试方案 -- 自动化测试建议 - -### 3. Dev_技术方案.md -- 系统架构设计 -- 技术栈选择 -- 数据库 Schema -- API 接口设计 -- 核心代码实现 -- 部署方案 - -### 4. Final_交付报告.md -- 交付摘要 -- 一致性检查 -- 质量评估 -- 风险提示 -- 后续行动建议 - -## 🔍 查看生成的文件 - -生成的文件保存在 `generated_output/task_YYYYMMDD_HHMMSS/` 目录下。 - -**Windows 用户**: -```powershell -# 打开最新生成的目录 -explorer (Get-ChildItem generated_output -Directory | Sort-Object LastWriteTime -Descending | Select-Object -First 1).FullName -``` - -**Linux/Mac 用户**: -```bash -# 打开最新生成的目录 -xdg-open $(ls -td generated_output/task_* | head -n1) # Linux -open $(ls -td generated_output/task_* | head -n1) # Mac -``` - -或者直接在文件管理器中浏览 `generated_output/` 目录。 - -## 💡 实用技巧 - -### 1. 实时监控任务进度 - -在另一个终端窗口查看日志: - -```bash -# Docker Compose 方式 -docker-compose logs -f multi-agent-system - -# 直接运行 -# 日志会自动输出到控制台 -``` - -### 2. 自定义输出目录 - -修改 `example_usage.py` 中的 `output_dir` 参数: - -```python -await save_generated_content(task_id, output_dir="my_custom_output") -``` - -### 3. 批量处理多个需求 - -创建批处理脚本: - -```python -requirements = [ - "需求 1...", - "需求 2...", - "需求 3..." -] - -for req in requirements: - task_id = await run_multi_agent_task(user_requirement=req) - await save_generated_content(task_id) -``` - -## ⚠️ 常见问题 - -### Q: 为什么没有生成文件? - -**A**: 确保: -1. API Key 已正确配置 -2. 网络连接正常 -3. 任务执行完成(看到"任务执行完成"事件) - -### Q: 生成的内容在哪里? - -**A**: -- 默认在 `generated_output/task_时间戳/` 目录 -- 可以通过 Web UI 直接查看 -- 或在控制台查找保存路径 - -### Q: 如何重新查看生成的内容? - -**A**: -1. 找到对应的任务时间戳目录 -2. 打开相应的 .md 文件 -3. 使用 Markdown 阅读器或 VS Code 查看 - -### Q: 可以修改生成的文档吗? - -**A**: 当然可以!生成的文档是 Markdown 格式,可以使用任何文本编辑器修改。 - -## 🎯 最佳实践 - -1. **明确需求描述**:需求越详细,生成的文档越准确 -2. **指定技术栈**:明确说明使用的技术和框架 -3. **设定约束条件**:如性能要求、安全要求等 -4. **迭代优化**:根据生成结果调整需求描述 - -## 📞 技术支持 - -如有问题,请查看: -- API 文档:http://localhost:8000/docs -- 项目 README:README.md -- 日志输出:控制台或 Docker 日志 - ---- - -**祝您使用愉快!** 🎉 diff --git a/agents/__init__.py b/agents/__init__.py new file mode 100644 index 0000000..9a4af2d --- /dev/null +++ b/agents/__init__.py @@ -0,0 +1,19 @@ +""" +Agents 包初始化 +""" + +from .pm_agent import create_pm_agent, create_pm_task, execute_pm_stage +from .qa_agent import create_qa_agent, create_qa_task, execute_qa_stage +from .dev_agent import create_dev_agent, create_dev_task, execute_dev_stage + +__all__ = [ + "create_pm_agent", + "create_pm_task", + "execute_pm_stage", + "create_qa_agent", + "create_qa_task", + "execute_qa_stage", + "create_dev_agent", + "create_dev_task", + "execute_dev_stage" +] diff --git a/agents/dev_agent.py b/agents/dev_agent.py new file mode 100644 index 0000000..1eea06b --- /dev/null +++ b/agents/dev_agent.py @@ -0,0 +1,145 @@ +""" +Dev Agent - 开发工程师智能体 +负责根据 SRS 和测试用例编写业务代码 +""" + +from typing import Dict, Any +from crewai import Agent, Task +from models.qwen_config import get_llm + + +def create_dev_agent() -> Agent: + """ + 创建开发工程师智能体 + + Returns: + Agent: Dev 智能体实例 + """ + return Agent( + role="全栈软件工程师", + goal="根据软件需求规格说明书 (SRS) 和测试用例实现高质量的业务代码", + backstory="""你是一位拥有 10 年经验的全栈软件工程师,擅长: + 1. 快速理解需求并设计合理的系统架构 + 2. 编写清晰、可维护、高性能的代码 + 3. 遵循 TDD(测试驱动开发)实践 + 4. 遵循博世研发规范中的编码标准 + + 你的代码实现必须通过所有测试用例,并满足性能和安全要求。""", + verbose=True, + allow_delegation=False, + llm=get_llm() + ) + + +def create_dev_task(srs_document: str, test_plan: str) -> Task: + """ + 创建代码实现任务 + + Args: + srs_document: 软件需求规格说明书 + test_plan: 测试方案和测试用例 + + Returns: + Task: Dev 任务实例 + """ + return Task( + description=f""" + 请根据以下 SRS 文档和测试方案,实现完整的业务代码: + + 【SRS 文档】 + {srs_document} + + 【测试方案】 + {test_plan} + + 【输出要求】 + 请按照以下结构输出代码实现: + + ## 1. 项目结构设计 + - 1.1 目录结构 + - 1.2 模块划分 + - 1.3 依赖关系 + + ## 2. 核心业务代码 + 实现所有功能性需求,包含: + ```python + # 示例:主业务逻辑 + class : + \"\"\"<服务>业务逻辑类\"\"\" + + def __init__(self): + # 初始化 + + def (self, params) -> ReturnType: + \"\"\" + 方法描述 + + Args: + params: 参数说明 + + Returns: + 返回值说明 + \"\"\" + # 实现逻辑 + pass + ``` + + ## 3. 数据模型定义 + - 3.1 Pydantic 模型 + - 3.2 数据库模型(如需要) + - 3.3 DTO/VO 对象 + + ## 4. API 接口定义(如适用) + - 4.1 RESTful 端点 + - 4.2 请求/响应格式 + - 4.3 错误处理 + + ## 5. 配置文件 + - 5.1 环境变量配置 + - 5.2 日志配置 + - 5.3 其他配置项 + + ## 6. 使用说明 + - 6.1 安装步骤 + - 6.2 运行命令 + - 6.3 配置说明 + + 【注意事项】 + - 遵循 PEP 8 编码规范 + - 使用类型提示 (Type Hints) + - 包含必要的文档字符串 + - 实现适当的错误处理 + - 确保代码可通过所有测试用例 + """, + expected_output="完整的业务代码实现和使用说明", + agent=create_dev_agent() + ) + + +def execute_dev_stage(srs_document: str, test_plan: str) -> Dict[str, Any]: + """ + 执行 Dev 阶段任务 + + Args: + srs_document: 软件需求规格说明书 + test_plan: 测试方案和测试用例 + + Returns: + Dict[str, Any]: 包含代码实现和执行结果 + """ + dev_agent = create_dev_agent() + dev_task = create_dev_task(srs_document, test_plan) + + crew = Crew( + agents=[dev_agent], + tasks=[dev_task], + verbose=True + ) + + result = crew.kickoff() + + return { + "stage": "代码实现", + "implementation": result.raw, + "status": "completed" + } diff --git a/agents/pm_agent.py b/agents/pm_agent.py new file mode 100644 index 0000000..38e8e27 --- /dev/null +++ b/agents/pm_agent.py @@ -0,0 +1,115 @@ +""" +PM Agent - 产品经理智能体 +负责将用户输入的非结构化需求转化为结构化软件需求规格说明书 (SRS) +""" + +from typing import Dict, Any +from crewai import Agent, Task, Crew +from models.qwen_config import get_llm + + +def create_pm_agent() -> Agent: + """ + 创建产品经理智能体 + + Returns: + Agent: PM 智能体实例 + """ + return Agent( + role="资深产品需求分析师", + goal="将模糊的用户需求转化为清晰、完整、可执行的软件需求规格说明书 (SRS)", + backstory="""你是一位拥有 10 年经验的资深产品需求分析师,擅长: + 1. 快速理解用户业务场景和核心痛点 + 2. 识别功能性需求和非功能性需求 + 3. 定义清晰的验收标准 (Acceptance Criteria) + 4. 遵循博世研发规范,确保需求的可追溯性和可验证性 + + 你的输出将作为测试和开发团队的输入,务必保证准确性和完整性。""", + verbose=True, + allow_delegation=False, + llm=get_llm() + ) + + +def create_pm_task(requirement: str) -> Task: + """ + 创建需求分析任务 + + Args: + requirement: 用户需求描述 + + Returns: + Task: PM 任务实例 + """ + return Task( + description=f""" + 请分析以下用户需求,生成结构化的软件需求规格说明书 (SRS): + + 【用户需求】 + {requirement} + + 【输出要求】 + 请按照以下结构输出 SRS 文档: + + ## 1. 项目概述 + - 1.1 项目背景 + - 1.2 项目目标 + - 1.3 适用范围 + + ## 2. 功能性需求 + - 2.1 功能列表(使用 MoSCoW 优先级标注) + - 2.2 功能详细描述(包含输入、处理、输出) + - 2.3 业务流程图描述 + + ## 3. 非功能性需求 + - 3.1 性能要求(响应时间、并发量等) + - 3.2 安全要求(认证、授权、数据保护) + - 3.3 可用性要求(易用性、可访问性) + - 3.4 可维护性要求(日志、监控、可扩展性) + + ## 4. 验收标准 (Acceptance Criteria) + - 4.1 功能验收标准(每个功能的通过标准) + - 4.2 性能验收标准(量化指标) + - 4.3 用户体验验收标准 + + ## 5. 约束条件 + - 5.1 技术约束 + - 5.2 业务约束 + - 5.3 合规约束(博世研发规范) + + 【注意事项】 + - 使用清晰、无歧义的语言 + - 需求必须是可测试、可验证的 + - 优先保证核心功能的完整性 + """, + expected_output="完整的软件需求规格说明书 (SRS),包含功能性需求、非功能性需求和验收标准", + agent=create_pm_agent() + ) + + +def execute_pm_stage(requirement: str) -> Dict[str, Any]: + """ + 执行 PM 阶段任务 + + Args: + requirement: 用户需求描述 + + Returns: + Dict[str, Any]: 包含 SRS 文档和执行结果 + """ + pm_agent = create_pm_agent() + pm_task = create_pm_task(requirement) + + crew = Crew( + agents=[pm_agent], + tasks=[pm_task], + verbose=True + ) + + result = crew.kickoff() + + return { + "stage": "需求分析", + "srs_document": result.raw, + "status": "completed" + } diff --git a/agents/qa_agent.py b/agents/qa_agent.py new file mode 100644 index 0000000..ed51e51 --- /dev/null +++ b/agents/qa_agent.py @@ -0,0 +1,128 @@ +""" +QA Agent - 测试工程师智能体 +负责根据 SRS 生成自动化测试用例(Pytest 格式) +""" + +from typing import Dict, Any +from crewai import Agent, Task +from models.qwen_config import get_llm + + +def create_qa_agent() -> Agent: + """ + 创建测试工程师智能体 + + Returns: + Agent: QA 智能体实例 + """ + return Agent( + role="高级测试架构师", + goal="根据软件需求规格说明书 (SRS) 设计并生成完整的自动化测试用例", + backstory="""你是一位拥有 8 年经验的高级测试架构师,擅长: + 1. 基于需求文档设计全面的测试策略 + 2. 编写高质量的 Pytest 自动化测试脚本 + 3. 覆盖单元测试、集成测试、端到端测试 + 4. 遵循博世研发规范中的测试要求 + + 你的测试用例将作为开发实现的验证标准,务必保证覆盖率和可执行性。""", + verbose=True, + allow_delegation=False, + llm=get_llm() + ) + + +def create_qa_task(srs_document: str) -> Task: + """ + 创建测试用例生成任务 + + Args: + srs_document: 软件需求规格说明书 + + Returns: + Task: QA 任务实例 + """ + return Task( + description=f""" + 请根据以下 SRS 文档,生成完整的自动化测试用例: + + 【SRS 文档】 + {srs_document} + + 【输出要求】 + 请按照以下结构输出测试方案: + + ## 1. 测试策略概述 + - 1.1 测试范围 + - 1.2 测试方法(单元测试/集成测试/E2E 测试) + - 1.3 测试工具和技术栈 + + ## 2. 测试场景列表 + - 2.1 功能测试场景(对应每个功能性需求) + - 2.2 边界值测试场景 + - 2.3 异常场景测试 + - 2.4 性能测试场景 + + ## 3. Pytest 测试代码 + 为每个核心功能生成 Pytest 测试脚本,包含: + ```python + # 测试文件:test_.py + import pytest + + class Test: + \"\"\"<功能>测试类\"\"\" + + def test__(self): + \"\"\"测试场景描述\"\"\" + # Arrange - 准备测试数据 + # Act - 执行操作 + # Assert - 验证结果 + pass + ``` + + ## 4. 测试数据准备 + - 4.1 测试 fixtures + - 4.2 Mock 数据 + - 4.3 测试数据库种子 + + ## 5. 测试执行说明 + - 5.1 运行命令 + - 5.2 环境要求 + - 5.3 预期通过率 + + 【注意事项】 + - 遵循 AAA(Arrange-Act-Assert)测试模式 + - 使用描述性的测试函数命名 + - 包含充足的断言验证 + - 考虑边界条件和异常情况 + """, + expected_output="完整的测试方案和 Pytest 测试代码", + agent=create_qa_agent() + ) + + +def execute_qa_stage(srs_document: str) -> Dict[str, Any]: + """ + 执行 QA 阶段任务 + + Args: + srs_document: 软件需求规格说明书 + + Returns: + Dict[str, Any]: 包含测试方案和执行结果 + """ + qa_agent = create_qa_agent() + qa_task = create_qa_task(srs_document) + + crew = Crew( + agents=[qa_agent], + tasks=[qa_task], + verbose=True + ) + + result = crew.kickoff() + + return { + "stage": "测试用例设计", + "test_plan": result.raw, + "status": "completed" + } diff --git a/agents_config.py b/agents_config.py deleted file mode 100644 index 0f10e29..0000000 --- a/agents_config.py +++ /dev/null @@ -1,302 +0,0 @@ -""" -Agent 配置文件 -定义所有 Agent 的角色、目标、背景描述和任务模板 -""" - -from typing import Dict, Any -from crewai import Agent - - -# Qwen3.5-flash 模型配置(阿里百炼 - DashScope) -QWEN_MODEL_CONFIG = { - "model": "qwen3.5-flash", # 阿里百炼 Qwen3.5-flash 对应 qwen-plus - "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1", - "api_key_env": "DASHSCOPE_API_KEY", -} - - -def get_product_manager_agent() -> Agent: - """创建产品经理 Agent""" - from crewai.llm import LLM - - # 创建一个基础的 LLM 占位符(会在 configure_llm 中被替换) - llm_placeholder = LLM( - model="qwen-plus", - api_key="placeholder", # 会被 configure_llm 替换 - base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", - ) - - return Agent( - role="ProductManager", - goal="将用户需求转化为清晰的产品需求文档 (PRD)", - backstory="""你是一位经验丰富的产品经理,擅长理解用户需求并转化为可执行的产品规格。 -你的职责包括: -1. 分析用户需求的核心痛点和业务价值 -2. 定义功能列表和优先级 -3. 编写详细的功能描述和验收标准 -4. 识别潜在的技术风险和业务风险""", - verbose=True, - allow_delegation=False, - llm=llm_placeholder, - ) - - -def get_qa_engineer_agent() -> Agent: - """创建 QA 工程师 Agent""" - from crewai.llm import LLM - - # 创建一个基础的 LLM 占位符(会在 configure_llm 中被替换) - llm_placeholder = LLM( - model="qwen-plus", - api_key="placeholder", # 会被 configure_llm 替换 - base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", - ) - - return Agent( - role="QAEngineer", - goal="根据产品需求制定测试计划和测试用例", - backstory="""你是一位资深 QA 工程师,专注于软件质量保障。 -你的职责包括: -1. 分析 PRD 文档,识别测试范围 -2. 设计测试策略(单元测试、集成测试、端到端测试) -3. 编写详细的测试用例,包括正常场景和异常场景 -4. 定义验收标准和性能指标""", - verbose=True, - allow_delegation=False, - llm=llm_placeholder, - ) - - -def get_software_developer_agent() -> Agent: - """创建软件开发工程师 Agent""" - from crewai.llm import LLM - - # 创建一个基础的 LLM 占位符(会在 configure_llm 中被替换) - llm_placeholder = LLM( - model="qwen-plus", - api_key="placeholder", # 会被 configure_llm 替换 - base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", - ) - - return Agent( - role="SoftwareDeveloper", - goal="根据需求和测试用例设计技术方案并生成代码框架", - backstory="""你是一位全栈软件架构师,拥有深厚的技术功底。 -你的职责包括: -1. 根据 PRD 和测试用例设计系统架构 -2. 选择合适的技术栈和框架 -3. 设计数据库 schema 和 API 接口 -4. 生成核心模块的代码框架和关键算法实现 -5. 编写技术文档和部署指南""", - verbose=True, - allow_delegation=False, - llm=llm_placeholder, - ) - - -def get_coordinator_agent() -> Agent: - """创建协调员 Agent - 负责最终审核和交付""" - from crewai.llm import LLM - - # 创建一个基础的 LLM 占位符(会在 configure_llm 中被替换) - llm_placeholder = LLM( - model="qwen-plus", - api_key="placeholder", # 会被 configure_llm 替换 - base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", - ) - - return Agent( - role="Coordinator", - goal="审核所有产出物,确保质量并生成最终交付报告", - backstory="""你是一位项目协调员,负责把控整体交付质量。 -你的职责包括: -1. 审核 PRD、测试计划和技术方案的完整性 -2. 识别各文档之间的一致性和潜在冲突 -3. 汇总所有产出物,生成结构化交付报告 -4. 评估项目风险和后续建议 -5. 在需要时请求人工确认(通过 API 控制)""", - verbose=True, - allow_delegation=False, - llm=llm_placeholder, - ) - - -# 任务模板配置 -TASK_TEMPLATES = { - "product_manager": """ -## 任务:产品需求分析 - -### 用户需求输入: -{user_requirement} - -### 输出要求: -请按照以下结构输出产品需求文档 (PRD): - -1. **项目概述** - - 项目背景 - - 目标用户群体 - - 核心价值主张 - -2. **功能需求列表** - - 按优先级排序(P0/P1/P2) - - 每个功能的详细描述 - - 用户故事格式(作为...我希望...以便...) - -3. **非功能需求** - - 性能要求 - - 安全性要求 - - 可用性要求 - -4. **验收标准** - - 每个功能的验收条件 - - 关键业务指标 (KPI) - -5. **风险评估** - - 技术风险 - - 业务风险 - - 缓解措施 - -请使用 Markdown 格式输出,确保结构清晰、内容完整. -""", - - "qa_engineer": """ -## 任务:制定测试计划 - -### 输入信息: -- 产品需求文档 (来自 ProductManager): -{prd_content} - -### 输出要求: -请按照以下结构输出测试计划文档: - -1. **测试策略** - - 测试范围界定 - - 测试类型(单元/集成/E2E/性能/安全) - - 测试环境规划 - -2. **测试用例设计** - - 针对每个 P0/P1 功能设计测试用例 - - 包含:测试目的、前置条件、测试步骤、预期结果 - - 覆盖正常流程和异常流程 - -3. **自动化测试建议** - - 推荐使用的测试框架 - - 需要自动化的测试场景列表 - - CI/CD 集成建议 - -4. **性能测试方案** - - 压测场景设计 - - 性能基准指标 - - 监控指标 - -5. **验收检查清单** - - 上线前必须通过的检查项 - -请使用 Markdown 格式,测试用例使用表格形式展示。 -""", - - "software_developer": """ -## 任务:技术方案设计与代码实现 - -### 输入信息: -- 产品需求文档: -{prd_content} - -- 测试计划: -{qa_plan} - -### 输出要求: -请按照以下结构输出技术方案文档: - -1. **系统架构设计** - - 架构图(使用文字描述或 ASCII art) - - 技术选型及理由 - - 系统组件划分 - -2. **API 接口设计** - - RESTful API 列表(方法、路径、请求/响应格式) - - 接口鉴权方案 - - 错误码规范 - -3. **数据模型设计** - - 数据库表结构(表名、字段、类型、索引) - - ER 关系描述 - - 数据迁移策略 - -4. **核心代码实现** - - 关键模块的代码框架(Python/TypeScript 等) - - 核心算法伪代码或实现 - - 重要设计模式的应用 - -5. **部署方案** - - 基础设施需求 - - Docker 容器化配置示例 - - CI/CD 流水线配置建议 - -6. **开发注意事项** - - 代码规范 - - 日志和监控 - - 安全最佳实践 - -请确保代码片段语法正确,注释清晰。 -""", - - "coordinator": """ -## 任务:最终审核与交付报告 - -### 输入信息: -- 产品需求文档: -{prd_content} - -- 测试计划: -{qa_plan} - -- 技术方案: -{dev_plan} - -### 输出要求: -请按照以下结构输出最终交付报告: - -1. **交付摘要** - - 项目基本信息 - - 交付物清单 - - 整体质量评估 - -2. **一致性检查** - - PRD 与测试计划的匹配度 - - 技术方案对需求的覆盖度 - - 发现的遗漏或不一致点 - -3. **质量评估** - - 文档完整性评分(1-10 分) - - 技术可行性评估 - - 测试覆盖度评估 - -4. **风险提示** - - 高风险项列表 - - 中低风险项列表 - - 风险缓解建议 - -5. **后续行动建议** - - 短期行动计划(1-2 周) - - 中期目标(1-3 月) - - 长期演进方向 - -6. **交付确认** - - 是否满足交付标准 - - 需要人工复核的点 - - 最终结论(通过/有条件通过/不通过) - -请使用专业、客观的语气,给出具体、可执行的建议。 -""", -} - - -def create_agents() -> Dict[str, Agent]: - """创建所有 Agent 实例""" - return { - "product_manager": get_product_manager_agent(), - "qa_engineer": get_qa_engineer_agent(), - "software_developer": get_software_developer_agent(), - "coordinator": get_coordinator_agent(), - } diff --git a/check_config.py b/check_config.py new file mode 100644 index 0000000..a8ddc6d --- /dev/null +++ b/check_config.py @@ -0,0 +1,133 @@ +""" +Configuration Check Script +Verify project configuration is correct +""" + +import sys +from pathlib import Path + + +def check_python_version(): + """Check Python version""" + print("Checking Python version...") + if sys.version_info < (3, 10): + print(f"[ERROR] Python version too low: {sys.version}") + print(" Required: Python 3.10+") + return False + print(f"[OK] Python version: {sys.version.split()[0]}") + return True + + +def check_dependencies(): + """Check required packages""" + print("\nChecking dependencies...") + + required_packages = { + 'crewai': 'CrewAI', + 'fastapi': 'FastAPI', + 'uvicorn': 'Uvicorn', + 'pydantic': 'Pydantic', + 'pydantic_settings': 'Pydantic Settings', + 'dotenv': 'python-dotenv' + } + + all_ok = True + for package, name in required_packages.items(): + try: + __import__(package) + print(f"[OK] {name}: Installed") + except ImportError: + print(f"[ERROR] {name}: Not installed") + all_ok = False + + return all_ok + + +def check_env_file(): + """Check environment file""" + print("\nChecking environment configuration...") + + env_file = Path('.env') + env_example = Path('.env.example') + + if not env_example.exists(): + print("[ERROR] .env.example file not found") + return False + + print("[OK] .env.example exists") + + if not env_file.exists(): + print("[WARNING] .env file not found. Please copy from .env.example and configure") + print(" Command: cp .env.example .env") + return False + + print("[OK] .env file exists") + + # Check API Key + with open(env_file, 'r', encoding='utf-8') as f: + content = f.read() + if 'DASHSCOPE_API_KEY=your_dashscope_api_key_here' in content: + print("[WARNING] Please configure valid DashScope API Key in .env file") + return False + elif 'DASHSCOPE_API_KEY=' in content: + print("[OK] DashScope API Key configured") + + return True + + +def check_project_structure(): + """Check project structure""" + print("\nChecking project structure...") + + required_files = [ + 'main.py', + 'agents/__init__.py', + 'agents/pm_agent.py', + 'agents/qa_agent.py', + 'agents/dev_agent.py', + 'crews/__init__.py', + 'crews/sdlc_crew.py', + 'models/__init__.py', + 'models/qwen_config.py', + 'static/index.html', + 'requirements.txt' + ] + + all_ok = True + for file_path in required_files: + if Path(file_path).exists(): + print(f"[OK] {file_path}") + else: + print(f"[ERROR] {file_path} not found") + all_ok = False + + return all_ok + + +def main(): + """Main function""" + print("=" * 60) + print("SDLC Agent Demo - Configuration Check") + print("=" * 60) + + checks = [ + check_python_version(), + check_dependencies(), + check_env_file(), + check_project_structure() + ] + + print("\n" + "=" * 60) + if all(checks): + print("All checks passed! Ready to start the service.") + print("\nStart command:") + print(" uvicorn main:app --reload --host 0.0.0.0 --port 8000") + print("\nAccess URL:") + print(" http://localhost:8000/static/index.html") + else: + print("Some checks failed. Please fix the issues first.") + print("=" * 60) + + +if __name__ == '__main__': + main() diff --git a/check_error.py b/check_error.py new file mode 100644 index 0000000..1162902 --- /dev/null +++ b/check_error.py @@ -0,0 +1,22 @@ +import requests +import time + +# Start a task +print("Starting task...") +response = requests.post('http://localhost:8000/api/v1/sdlc/start', json={'requirement': 'test'}) +task_id = response.json()['task_id'] +print(f"Task ID: {task_id}") + +# Wait 2 seconds +time.sleep(2) + +# Get status +status = requests.get(f'http://localhost:8000/api/v1/sdlc/status/{task_id}') +print(f"\nStatus: {status.json()}") + +# Get events via SSE stream +print("\nGetting events...") +stream = requests.get(f'http://localhost:8000/api/v1/sdlc/stream/{task_id}', timeout=5) +for line in stream.iter_lines(): + if line: + print(line.decode('utf-8')) diff --git a/code_docs/example_code.py b/code_docs/example_code.py deleted file mode 100644 index 09213d9..0000000 --- a/code_docs/example_code.py +++ /dev/null @@ -1,481 +0,0 @@ -""" -示例代码 - 待办事项应用后端实现 -这是多智能体系统生成的代码示例 -""" - -from fastapi import FastAPI, HTTPException, Depends, status -from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials -from pydantic import BaseModel, EmailStr, Field -from typing import Optional, List -from datetime import datetime, timedelta -import uvicorn -import sqlite3 -import hashlib -import jwt - -# ==================== 配置 ==================== - -SECRET_KEY = "your-secret-key-change-in-production" -ALGORITHM = "HS256" -DATABASE = "todo_app.db" - -# ==================== 数据模型 ==================== - -class UserCreate(BaseModel): - """用户注册请求""" - username: str = Field(..., min_length=3, max_length=50) - email: EmailStr - password: str = Field(..., min_length=6) - - -class UserLogin(BaseModel): - """用户登录请求""" - username: str - password: str - - -class TodoItemCreate(BaseModel): - """创建待办事项""" - title: str = Field(..., min_length=1, max_length=200) - description: Optional[str] = None - priority: int = Field(default=1, ge=1, le=5) # 1-5, 5 最高 - due_date: Optional[datetime] = None - - -class TodoItemUpdate(BaseModel): - """更新待办事项""" - title: Optional[str] = Field(None, min_length=1, max_length=200) - description: Optional[str] = None - priority: Optional[int] = Field(None, ge=1, le=5) - due_date: Optional[datetime] = None - completed: Optional[bool] = None - - -class TodoItemResponse(BaseModel): - """待办事项响应""" - id: int - user_id: int - title: str - description: Optional[str] - priority: int - completed: bool - due_date: Optional[datetime] - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - -# ==================== 数据库操作 ==================== - -def get_db_connection(): - """获取数据库连接""" - conn = sqlite3.connect(DATABASE) - conn.row_factory = sqlite3.Row - return conn - - -def init_db(): - """初始化数据库""" - conn = get_db_connection() - cursor = conn.cursor() - - # 创建用户表 - cursor.execute(''' - CREATE TABLE IF NOT EXISTS users ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - username VARCHAR(50) UNIQUE NOT NULL, - email VARCHAR(100) UNIQUE NOT NULL, - password_hash VARCHAR(255) NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - ''') - - # 创建待办事项表 - cursor.execute(''' - CREATE TABLE IF NOT EXISTS todo_items ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL, - title VARCHAR(200) NOT NULL, - description TEXT, - priority INTEGER DEFAULT 1, - completed BOOLEAN DEFAULT 0, - due_date TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE - ) - ''') - - # 创建索引 - cursor.execute('CREATE INDEX IF NOT EXISTS idx_todo_user ON todo_items(user_id)') - cursor.execute('CREATE INDEX IF NOT EXISTS idx_todo_completed ON todo_items(completed)') - cursor.execute('CREATE INDEX IF NOT EXISTS idx_todo_priority ON todo_items(priority DESC)') - - conn.commit() - conn.close() - print("✓ 数据库初始化完成") - - -# ==================== 认证工具 ==================== - -security = HTTPBearer() - - -def hash_password(password: str) -> str: - """密码哈希""" - return hashlib.sha256(password.encode()).hexdigest() - - -def verify_password(password: str, password_hash: str) -> bool: - """验证密码""" - return hash_password(password) == password_hash - - -def create_access_token(data: dict, expires_delta: timedelta = timedelta(days=7)) -> str: - """创建 JWT Token""" - to_encode = data.copy() - expire = datetime.utcnow() + expires_delta - to_encode.update({"exp": expire}) - return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) - - -def get_current_user( - credentials: HTTPAuthorizationCredentials = Depends(security) -) -> dict: - """获取当前用户(从 JWT Token)""" - try: - token = credentials.credentials - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - user_id = payload.get("sub") - if user_id is None: - raise HTTPException(status_code=401, detail="Invalid token") - return {"user_id": int(user_id), "username": payload.get("username")} - except jwt.PyJWTError: - raise HTTPException(status_code=401, detail="Token validation failed") - - -# ==================== FastAPI 应用 ==================== - -app = FastAPI( - title="Todo App API", - description="在线待办事项应用 - 由多智能体系统生成", - version="1.0.0" -) - - -@app.on_event("startup") -async def startup_event(): - """应用启动时初始化""" - init_db() - - -# ==================== 用户接口 ==================== - -@app.post("/api/users/register", tags=["用户管理"]) -async def register(user_data: UserCreate): - """用户注册""" - conn = get_db_connection() - cursor = conn.cursor() - - # 检查用户名是否已存在 - cursor.execute('SELECT id FROM users WHERE username = ?', (user_data.username,)) - if cursor.fetchone(): - conn.close() - raise HTTPException(status_code=400, detail="Username already exists") - - # 检查邮箱是否已存在 - cursor.execute('SELECT id FROM users WHERE email = ?', (user_data.email,)) - if cursor.fetchone(): - conn.close() - raise HTTPException(status_code=400, detail="Email already registered") - - # 创建用户 - password_hash = hash_password(user_data.password) - cursor.execute( - 'INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)', - (user_data.username, user_data.email, password_hash) - ) - conn.commit() - user_id = cursor.lastrowid - conn.close() - - return { - "message": "注册成功", - "user_id": user_id, - "username": user_data.username - } - - -@app.post("/api/users/login", tags=["用户管理"]) -async def login(credentials: UserLogin): - """用户登录""" - conn = get_db_connection() - cursor = conn.cursor() - - # 查找用户 - cursor.execute( - 'SELECT id, username, password_hash FROM users WHERE username = ?', - (credentials.username,) - ) - user = cursor.fetchone() - conn.close() - - if not user or not verify_password(credentials.password, user['password_hash']): - raise HTTPException(status_code=401, detail="Invalid username or password") - - # 生成 Token - access_token = create_access_token( - data={"sub": str(user['id']), "username": user['username']} - ) - - return { - "message": "登录成功", - "access_token": access_token, - "token_type": "bearer", - "user_id": user['id'], - "username": user['username'] - } - - -# ==================== 待办事项接口 ==================== - -@app.post("/api/todos", tags=["待办事项"], response_model=TodoItemResponse) -async def create_todo( - todo_data: TodoItemCreate, - current_user: dict = Depends(get_current_user) -): - """创建待办事项""" - conn = get_db_connection() - cursor = conn.cursor() - - cursor.execute(''' - INSERT INTO todo_items (user_id, title, description, priority, due_date) - VALUES (?, ?, ?, ?, ?) - ''', ( - current_user["user_id"], - todo_data.title, - todo_data.description, - todo_data.priority, - todo_data.due_date - )) - - conn.commit() - todo_id = cursor.lastrowid - - # 查询刚创建的记录 - cursor.execute('SELECT * FROM todo_items WHERE id = ?', (todo_id,)) - todo_row = cursor.fetchone() - conn.close() - - return dict(todo_row) - - -@app.get("/api/todos", tags=["待办事项"], response_model=List[TodoItemResponse]) -async def list_todos( - skip: int = 0, - limit: int = 50, - completed: Optional[bool] = None, - priority: Optional[int] = None, - search: Optional[str] = None, - sort_by: str = "priority", # priority, due_date, created_at - current_user: dict = Depends(get_current_user) -): - """获取待办事项列表(支持过滤和排序)""" - conn = get_db_connection() - cursor = conn.cursor() - - # 构建查询 - query = 'SELECT * FROM todo_items WHERE user_id = ?' - params = [current_user["user_id"]] - - # 添加过滤条件 - if completed is not None: - query += ' AND completed = ?' - params.append(completed) - - if priority is not None: - query += ' AND priority = ?' - params.append(priority) - - if search: - query += ' AND (title LIKE ? OR description LIKE ?)' - search_term = f'%{search}%' - params.extend([search_term, search_term]) - - # 添加排序 - order_map = { - 'priority': 'priority DESC', - 'due_date': 'due_date ASC', - 'created_at': 'created_at DESC' - } - order_clause = order_map.get(sort_by, 'priority DESC') - query += f' ORDER BY {order_clause}' - - # 添加分页 - query += ' LIMIT ? OFFSET ?' - params.extend([limit, skip]) - - cursor.execute(query, params) - todos = cursor.fetchall() - conn.close() - - return [dict(todo) for todo in todos] - - -@app.get("/api/todos/{todo_id}", tags=["待办事项"], response_model=TodoItemResponse) -async def get_todo( - todo_id: int, - current_user: dict = Depends(get_current_user) -): - """获取单个待办事项详情""" - conn = get_db_connection() - cursor = conn.cursor() - - cursor.execute( - 'SELECT * FROM todo_items WHERE id = ? AND user_id = ?', - (todo_id, current_user["user_id"]) - ) - todo = cursor.fetchone() - conn.close() - - if not todo: - raise HTTPException(status_code=404, detail="Todo item not found") - - return dict(todo) - - -@app.put("/api/todos/{todo_id}", tags=["待办事项"], response_model=TodoItemResponse) -async def update_todo( - todo_id: int, - todo_data: TodoItemUpdate, - current_user: dict = Depends(get_current_user) -): - """更新待办事项""" - conn = get_db_connection() - cursor = conn.cursor() - - # 检查是否存在 - cursor.execute( - 'SELECT id FROM todo_items WHERE id = ? AND user_id = ?', - (todo_id, current_user["user_id"]) - ) - if not cursor.fetchone(): - conn.close() - raise HTTPException(status_code=404, detail="Todo item not found") - - # 构建更新字段 - updates = [] - params = [] - - if todo_data.title is not None: - updates.append('title = ?') - params.append(todo_data.title) - - if todo_data.description is not None: - updates.append('description = ?') - params.append(todo_data.description) - - if todo_data.priority is not None: - updates.append('priority = ?') - params.append(todo_data.priority) - - if todo_data.completed is not None: - updates.append('completed = ?') - params.append(todo_data.completed) - - if todo_data.due_date is not None: - updates.append('due_date = ?') - params.append(todo_data.due_date) - - # 添加更新时间 - updates.append('updated_at = CURRENT_TIMESTAMP') - - # 执行更新 - params.append(todo_id) - query = f'UPDATE todo_items SET {", ".join(updates)} WHERE id = ?' - cursor.execute(query, params) - conn.commit() - - # 查询更新后的记录 - cursor.execute('SELECT * FROM todo_items WHERE id = ?', (todo_id,)) - todo = cursor.fetchone() - conn.close() - - return dict(todo) - - -@app.delete("/api/todos/{todo_id}", tags=["待办事项"]) -async def delete_todo( - todo_id: int, - current_user: dict = Depends(get_current_user) -): - """删除待办事项""" - conn = get_db_connection() - cursor = conn.cursor() - - # 检查是否存在 - cursor.execute( - 'SELECT id FROM todo_items WHERE id = ? AND user_id = ?', - (todo_id, current_user["user_id"]) - ) - if not cursor.fetchone(): - conn.close() - raise HTTPException(status_code=404, detail="Todo item not found") - - # 执行删除 - cursor.execute('DELETE FROM todo_items WHERE id = ?', (todo_id,)) - conn.commit() - conn.close() - - return {"message": "删除成功"} - - -@app.get("/api/todos/stats", tags=["待办事项"]) -async def get_stats(current_user: dict = Depends(get_current_user)): - """获取统计信息""" - conn = get_db_connection() - cursor = conn.cursor() - - # 总数 - cursor.execute( - 'SELECT COUNT(*) as total FROM todo_items WHERE user_id = ?', - (current_user["user_id"],) - ) - total = cursor.fetchone()['total'] - - # 已完成 - cursor.execute( - 'SELECT COUNT(*) as completed FROM todo_items WHERE user_id = ? AND completed = 1', - (current_user["user_id"],) - ) - completed = cursor.fetchone()['completed'] - - # 未完成 - pending = total - completed - - conn.close() - - return { - "total": total, - "completed": completed, - "pending": pending - } - - -# ==================== 主程序入口 ==================== - -if __name__ == "__main__": - print("=" * 60) - print("🚀 Todo App API服务启动中...") - print("=" * 60) - print("\n📖 API 文档:http://localhost:8000/docs") - print("📊 健康检查:http://localhost:8000/health\n") - - uvicorn.run( - "example_code:app", - host="0.0.0.0", - port=8000, - reload=True - ) diff --git a/crew_factory.py b/crew_factory.py deleted file mode 100644 index 4994f16..0000000 --- a/crew_factory.py +++ /dev/null @@ -1,523 +0,0 @@ -""" -CrewAI 工厂模块 -负责根据输入动态创建 Crew 实例,并集成 SSE 事件推送 - -技术方案说明: -由于 CrewAI 库的事件 API 可能随版本变化,我们采用更稳定的方案: -1. 使用自定义的 Logger 拦截 Agent 输出 -2. 在任务执行前后手动发送 SSE 事件 -3. 通过 run_in_executor 实现同步转异步 -""" - -import asyncio -from typing import Dict, Any, Optional, List, Callable -from datetime import datetime -import uuid -import io -import sys -from contextlib import contextmanager - -from crewai import Agent, Task, Crew, Process - -from agents_config import ( - create_agents, - TASK_TEMPLATES, - QWEN_MODEL_CONFIG, -) -from stream_manager import stream_manager, StreamEvent - - -class CrewExecutionLogger: - """ - CrewAI 执行日志拦截器 - - 通过捕获 stdout/stderr 来实时获取 CrewAI 的执行日志, - 并将其转发到 SSE 流。这是一种稳定且非侵入式的方法。 - """ - - def __init__(self, task_id: str, callback: Optional[Callable] = None): - self.task_id = task_id - self.callback = callback - self._old_stdout = None - self._old_stderr = None - self._buffer = io.StringIO() - - def _write(self, text: str): - """写入时触发回调""" - self._buffer.write(text) - - # 按行处理(遇到换行符时发送) - if '\n' in text: - lines = self._buffer.getvalue().split('\n') - for line in lines[:-1]: # 除了最后一行 - if line.strip(): - self._send_line(line) - self._buffer = io.StringIO() - self._buffer.write(lines[-1]) # 保留未完成的行 - - def _send_line(self, line: str): - """发送单行日志到 SSE 流""" - if self.callback: - self.callback("log", "System", line.strip()) - - def start_capture(self): - """开始捕获输出""" - self._old_stdout = sys.stdout - self._old_stderr = sys.stderr - - # 创建代理对象 - class OutputProxy: - def __init__(self, logger, original): - self.logger = logger - self.original = original - - def write(self, text): - self.logger._write(text) - if self.original: - self.original.write(text) - - def flush(self): - if self.original: - self.original.flush() - - sys.stdout = OutputProxy(self, self._old_stdout) - sys.stderr = OutputProxy(self, self._old_stderr) - - def stop_capture(self): - """停止捕获输出""" - if self._old_stdout: - sys.stdout = self._old_stdout - if self._old_stderr: - sys.stderr = self._old_stderr - - # 发送剩余内容 - remaining = self._buffer.getvalue() - if remaining.strip(): - self._send_line(remaining) - - -class SSECrewExecutor: - """ - SSE Crew 执行器 - - 封装 CrewAI 的执行过程,在关键节点发送 SSE 事件。 - 这是与 CrewAI 版本无关的稳定方案。 - """ - - def __init__(self, task_id: str): - self.task_id = task_id - self.logger = None - - def _send_event(self, event_type: str, agent_name: str, content: str, metadata: Optional[Dict] = None): - """发送 SSE 事件(同步方法,使用线程安全的方式)""" - try: - loop = asyncio.get_event_loop() - - async def send_async(): - stream = await stream_manager.get_stream(self.task_id) - if stream is None: - return False - - event = StreamEvent( - event_type=event_type, - agent=agent_name, - content=content, - task_id=self.task_id, - metadata=metadata or {} - ) - return stream.put_nowait(event) - - future = asyncio.run_coroutine_threadsafe(send_async(), loop) - future.result(timeout=5.0) - except Exception as e: - # 静默失败,不影响主流程 - pass - - def execute(self, crew: 'Crew', inputs: Dict[str, Any]) -> Any: - """ - 执行 Crew 任务,并在过程中发送 SSE 事件 - - Args: - crew: CrewAI Crew 实例 - inputs: 任务输入参数 - - Returns: - Crew 执行结果 - """ - # 1. 发送开始事件 - self._send_event( - event_type="start", - agent_name="System", - content=f"Crew 任务开始执行,共 {len(crew.agents)} 个 Agent,{len(crew.tasks)} 个任务", - metadata={ - "agent_count": len(crew.agents), - "task_count": len(crew.tasks) - } - ) - - # 2. 设置日志拦截 - def log_callback(event_type: str, agent_name: str, content: str): - self._send_event(event_type, agent_name, content) - - self.logger = CrewExecutionLogger(self.task_id, log_callback) - self.logger.start_capture() - - try: - # 3. 遍历执行任务(手动控制以便发送事件) - context = inputs.copy() - last_output = None - - for i, task in enumerate(crew.tasks): - # 将 context 字典转换为 JSON 字符串(CrewAI 要求) - import json - context_str = json.dumps(context, ensure_ascii=False) - task_result = self._execute_task(task, context_str, context, i) - - # 更新上下文(简单的字符串替换) - if task_result: - context[f"task_{i}_output"] = str(task_result) - last_output = task_result - - # 4. 发送结束事件 - self._send_event( - event_type="end", - agent_name="System", - content="Crew 任务执行完成", - metadata={"success": True} - ) - - return last_output - - except Exception as e: - # 5. 发送错误事件 - self._send_event( - event_type="error", - agent_name="System", - content=str(e), - metadata={"error_type": type(e).__name__} - ) - raise - - finally: - # 6. 恢复原始输出 - self.logger.stop_capture() - - def _execute_task(self, task: 'Task', context_str: str, context_dict: Dict[str, Any], index: int) -> Optional[Any]: - """ - 执行单个任务 - - Args: - task: Task 实例 - context_str: 上下文信息(JSON 字符串格式,用于传递给 CrewAI) - context_dict: 上下文信息(字典格式,用于变量替换) - index: 任务索引 - - Returns: - 任务执行结果 - """ - # 获取负责此任务的 agent - agent_name = task.agent.role if task.agent else "Unknown" - - # 1. 发送任务开始事件 - self._send_event( - event_type="agent_start", - agent_name=agent_name, - content=f"[任务 {index + 1}] {agent_name} 开始执行任务", - metadata={ - "task_index": index, - "task_description": task.description[:200] if task.description else "" - } - ) - - # 2. 准备任务描述(替换上下文变量) - description = self._prepare_task_description(task.description, context_dict) - - # 3. 执行任务 - try: - # 使用 CrewAI 的原生执行方式 - result = task.execute_sync(context=context_str) - - # 4. 发送任务完成事件 - output_str = str(result)[:500] if result else "No output" - self._send_event( - event_type="output", - agent_name=agent_name, - content=f"任务完成,输出摘要:{output_str}", - metadata={ - "output_length": len(str(result)) if result else 0 - } - ) - - return result - - except Exception as e: - # 5. 发送错误事件 - self._send_event( - event_type="error", - agent_name=agent_name, - content=f"任务执行失败:{str(e)}", - metadata={"error_type": type(e).__name__} - ) - raise - - def _prepare_task_description(self, description: str, context: Dict[str, Any]) -> str: - """ - 准备任务描述,替换上下文变量 - - 支持以下占位符格式: - - {prd_output}: 产品需求文档输出 - - {qa_output}: QA 测试计划输出 - - {dev_output}: 技术方案输出 - """ - if not description: - return "" - - result = description - - # 尝试从上下文中获取输出 - prd_output = context.get('prd_content', context.get('task_0_output', '')) - qa_output = context.get('qa_plan', context.get('task_1_output', '')) - dev_output = context.get('dev_plan', context.get('task_2_output', '')) - - # 替换占位符 - result = result.replace('{prd_output}', str(prd_output)[:3000]) - result = result.replace('{qa_plan}', str(qa_output)[:3000]) - result = result.replace('{qa_output}', str(qa_output)[:3000]) - result = result.replace('{dev_plan}', str(dev_output)[:3000]) - result = result.replace('{dev_output}', str(dev_output)[:3000]) - - return result - - -def configure_llm(): - """配置 LLM 使用 Qwen3.5-flash (DashScope/阿里百炼)""" - import os - - # 检查 API Key 是否配置 - dashscope_api_key = os.getenv("DASHSCOPE_API_KEY") - if not dashscope_api_key: - raise ValueError( - "DASHSCOPE_API_KEY 未配置,请设置环境变量或在 .env 文件中配置" - ) - - # 使用 CrewAI 原生的 LLM 类(通过 OpenAI 兼容接口连接 DashScope) - from crewai.llm import LLM - - llm = LLM( - model=QWEN_MODEL_CONFIG["model"], # qwen-plus (Qwen3.5-flash) - api_key=dashscope_api_key, - base_url=QWEN_MODEL_CONFIG["base_url"], # https://dashscope.aliyuncs.com/compatible-mode/v1 - temperature=0.7, - max_tokens=4096, - ) - - return llm - - -class CrewFactory: - """Crew 工厂类 - 负责创建和执行业务流程""" - - @staticmethod - def create_crew( - task_id: str, - user_requirement: str, - skip_confirmation: bool = True - ) -> tuple[Crew, Dict[str, Any]]: - """ - 创建 Crew 实例 - - Args: - task_id: 任务 ID - user_requirement: 用户需求描述 - skip_confirmation: 是否跳过 Coordinator 的人工确认环节 - - Returns: - (Crew 实例,上下文信息) - """ - # 创建 Agents - agents = create_agents() - pm_agent = agents["product_manager"] - qa_agent = agents["qa_engineer"] - dev_agent = agents["software_developer"] - coord_agent = agents["coordinator"] - - # 配置 LLM - try: - llm = configure_llm() - # 为每个 agent 分配 LLM - for agent in agents.values(): - agent.llm = llm - except ValueError as e: - # 如果 LLM 配置失败,使用默认配置(会在运行时失败) - print(f"警告:LLM 配置失败 - {e}") - - # 创建任务 - # Task 1: ProductManager 分析需求 - pm_task = Task( - description=TASK_TEMPLATES["product_manager"].format( - user_requirement=user_requirement - ), - expected_output="完整的产品需求文档 (PRD),包含功能列表、验收标准和风险评估", - agent=pm_agent, - ) - - # Task 2: QAEngineer 制定测试计划 - qa_task = Task( - description=TASK_TEMPLATES["qa_engineer"].format( - prd_content="{prd_output}" - ), - expected_output="详细的测试计划文档,包含测试策略、测试用例和性能测试方案", - agent=qa_agent, - context=[pm_task], - ) - - # Task 3: SoftwareDeveloper 设计技术方案 - dev_task = Task( - description=TASK_TEMPLATES["software_developer"].format( - prd_content="{prd_output}", - qa_plan="{qa_output}" - ), - expected_output="完整的技术方案文档,包含架构设计、API 接口、数据模型和核心代码实现", - agent=dev_agent, - context=[pm_task, qa_task], - ) - - # Task 4: Coordinator 最终审核 - coord_task = Task( - description=TASK_TEMPLATES["coordinator"].format( - prd_content="{prd_output}", - qa_plan="{qa_output}", - dev_plan="{dev_output}" - ), - expected_output="最终交付报告,包含质量评估、风险提示和交付结论", - agent=coord_agent, - context=[pm_task, qa_task, dev_task], - ) - - # 创建 Crew - crew = Crew( - agents=list(agents.values()), - tasks=[pm_task, qa_task, dev_task, coord_task], - process=Process.sequential, - verbose=True, - ) - - # 上下文信息 - context = { - "user_requirement": user_requirement, - "skip_confirmation": skip_confirmation, - "created_at": datetime.now().isoformat(), - } - - return crew, context - - @staticmethod - async def execute_crew_async( - task_id: str, - user_requirement: str, - skip_confirmation: bool = True - ) -> Dict[str, Any]: - """ - 异步执行 Crew 任务 - - Args: - task_id: 任务 ID - user_requirement: 用户需求描述 - skip_confirmation: 是否跳过人工确认 - - Returns: - 执行结果摘要 - """ - # 创建流队列 - await stream_manager.create_stream(task_id) - - # 发送开始事件 - await stream_manager.publish_event( - task_id=task_id, - event_type="start", - agent="System", - content="任务已启动,开始处理用户需求...", - metadata={"user_requirement": user_requirement[:200]} - ) - - try: - # 创建 Crew - crew, context = CrewFactory.create_crew( - task_id=task_id, - user_requirement=user_requirement, - skip_confirmation=skip_confirmation - ) - - # 创建 SSE 执行器 - executor = SSECrewExecutor(task_id) - - # 在线程池中执行(避免阻塞事件循环) - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, - lambda: executor.execute(crew, context) - ) - - # 发送完成事件 - await stream_manager.publish_event( - task_id=task_id, - event_type="end", - agent="System", - content="任务执行完成", - metadata={"success": True} - ) - - return { - "task_id": task_id, - "status": "completed", - "result": str(result)[:5000] if result else None, - "context": context, - } - - except Exception as e: - # 发送错误事件 - await stream_manager.publish_event( - task_id=task_id, - event_type="error", - agent="System", - content=str(e), - metadata={"error_type": type(e).__name__} - ) - - await stream_manager.close_stream(task_id) - - return { - "task_id": task_id, - "status": "failed", - "error": str(e), - "error_type": type(e).__name__, - } - - -# 便捷函数 -async def run_multi_agent_task( - user_requirement: str, - skip_confirmation: bool = True -) -> str: - """ - 运行多智能体任务的便捷函数 - - Args: - user_requirement: 用户需求描述 - skip_confirmation: 是否跳过人工确认 - - Returns: - task_id: 任务 ID(用于后续 SSE 流订阅) - """ - task_id = str(uuid.uuid4()) - - # 异步启动任务 - asyncio.create_task( - CrewFactory.execute_crew_async( - task_id=task_id, - user_requirement=user_requirement, - skip_confirmation=skip_confirmation - ) - ) - - return task_id diff --git a/crews/__init__.py b/crews/__init__.py new file mode 100644 index 0000000..0a5d9ed --- /dev/null +++ b/crews/__init__.py @@ -0,0 +1,7 @@ +""" +Crews 包初始化 +""" + +from .sdlc_crew import SDLCCrew + +__all__ = ["SDLCCrew"] diff --git a/crews/sdlc_crew.py b/crews/sdlc_crew.py new file mode 100644 index 0000000..54e3650 --- /dev/null +++ b/crews/sdlc_crew.py @@ -0,0 +1,176 @@ +""" +SDLC Crew - CrewAI 编排逻辑 (纯同步版本) +负责协调 PM、QA、Dev 三个智能体按顺序执行 +""" + +from typing import Dict, Any, Optional, Generator +from datetime import datetime +from crewai import Crew +from agents.pm_agent import create_pm_agent, create_pm_task +from agents.qa_agent import create_qa_agent, create_qa_task +from agents.dev_agent import create_dev_agent, create_dev_task + + +class SDLCCrew: + """ + SDLC 多智能体协同编排类 + 实现从需求分析→测试用例→代码实现的完整流程 + """ + + def __init__(self): + """初始化 SDLC Crew""" + self.pm_agent = create_pm_agent() + self.qa_agent = create_qa_agent() + self.dev_agent = create_dev_agent() + + # 存储各阶段结果 + self.srs_document: Optional[str] = None + self.test_plan: Optional[str] = None + self.implementation: Optional[str] = None + + def execute(self, requirement: str) -> Generator[Dict[str, Any], None, None]: + """ + 执行完整的 SDLC 流程(纯同步生成器) + + Args: + requirement: 用户需求描述 + + Yields: + Dict[str, Any]: 各阶段的 SSE事件 + """ + try: + # ========== 阶段 1: PM Agent - 需求分析 ========== + pm_task = create_pm_task(requirement) + pm_crew = Crew( + agents=[self.pm_agent], + tasks=[pm_task], + verbose=True + ) + + # 发送开始事件 + yield { + "event": "pm_start", + "data": { + "stage": "需求分析", + "status": "started", + "timestamp": datetime.now().isoformat() + } + } + + # 执行 Crew 任务 + result = pm_crew.kickoff() + content = result.raw if hasattr(result, 'raw') else str(result) + + # 发送完成事件 + yield { + "event": "pm_complete", + "data": { + "stage": "需求分析", + "content": content, + "status": "completed", + "timestamp": datetime.now().isoformat() + } + } + self.srs_document = content + + # ========== 阶段 2: QA Agent - 测试用例设计 ========== + qa_task = create_qa_task(self.srs_document) + qa_crew = Crew( + agents=[self.qa_agent], + tasks=[qa_task], + verbose=True + ) + + # 发送开始事件 + yield { + "event": "qa_start", + "data": { + "stage": "测试用例设计", + "status": "started", + "timestamp": datetime.now().isoformat() + } + } + + # 执行 Crew 任务 + result = qa_crew.kickoff() + content = result.raw if hasattr(result, 'raw') else str(result) + + # 发送完成事件 + yield { + "event": "qa_complete", + "data": { + "stage": "测试用例设计", + "content": content, + "status": "completed", + "timestamp": datetime.now().isoformat() + } + } + self.test_plan = content + + # ========== 阶段 3: Dev Agent - 代码实现 ========== + dev_task = create_dev_task(self.srs_document, self.test_plan) + dev_crew = Crew( + agents=[self.dev_agent], + tasks=[dev_task], + verbose=True + ) + + # 发送开始事件 + yield { + "event": "dev_start", + "data": { + "stage": "代码实现", + "status": "started", + "timestamp": datetime.now().isoformat() + } + } + + # 执行 Crew 任务 + result = dev_crew.kickoff() + content = result.raw if hasattr(result, 'raw') else str(result) + + # 发送完成事件 + yield { + "event": "dev_complete", + "data": { + "stage": "代码实现", + "content": content, + "status": "completed", + "timestamp": datetime.now().isoformat() + } + } + self.implementation = content + + # ========== 最终结果汇总 ========== + yield { + "event": "final_result", + "data": { + "stage": "交付完成", + "status": "success", + "timestamp": datetime.now().isoformat(), + "summary": { + "srs_length": len(self.srs_document) if self.srs_document else 0, + "test_plan_length": len(self.test_plan) if self.test_plan else 0, + "implementation_length": len(self.implementation) if self.implementation else 0 + }, + "deliverables": { + "srs_document": self.srs_document, + "test_plan": self.test_plan, + "implementation": self.implementation + } + } + } + + except Exception as e: + # 错误处理 + yield { + "event": "error", + "data": { + "stage": "系统错误", + "error": str(e), + "timestamp": datetime.now().isoformat() + } + } + + # 别名,方便调用 + execute_sync = execute diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 6400577..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,50 +0,0 @@ -version: '3.8' - -services: - multi-agent-system: - build: - context: . - dockerfile: Dockerfile - container_name: multi-agent-delivery-system - restart: unless-stopped - ports: - - "8000:8000" - environment: - # DashScope API Key(必需) - - DASHSCOPE_API_KEY=${DASHSCOPE_API_KEY:sk-616332b2afa94699b4572d0fe6ac370a} - # 可选配置 - - HOST=0.0.0.0 - - PORT=8000 - # 日志级别 - - LOG_LEVEL=info - volumes: - # 挂载日志目录 - - ./logs:/app/logs - # 如果需要持久化 .env 文件 - - ./.env:/app/.env:ro - networks: - - agent-network - # 资源限制 - deploy: - resources: - limits: - cpus: '2.0' - memory: 4G - reservations: - cpus: '0.5' - memory: 1G - # 健康检查 - healthcheck: - test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 10s - -networks: - agent-network: - driver: bridge - -# 使用示例: -# 开发环境:docker-compose up -d -# 生产环境:docker-compose --profile production up -d diff --git a/main.py b/main.py index c8f644a..45d4f2c 100644 --- a/main.py +++ b/main.py @@ -1,709 +1,348 @@ """ -FastAPI 主入口 -提供 RESTful API 和 SSE 流式接口 +SDLC Agent Demo - FastAPI 主服务 (纯同步版本) +多智能体端到端软件交付协同系统 """ -import asyncio import json +import uuid +import threading +from typing import Dict, Optional, Generator from datetime import datetime -from typing import Dict, Any, Optional -from contextlib import asynccontextmanager - -from fastapi import FastAPI, HTTPException, Request -from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse +from fastapi import FastAPI, HTTPException +from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse, RedirectResponse, JSONResponse from pydantic import BaseModel, Field +import uvicorn +import time -from crew_factory import CrewFactory, run_multi_agent_task -from stream_manager import stream_manager, create_sse_generator - - -# ==================== 数据模型 ==================== - -class RunTaskRequest(BaseModel): - """运行任务请求体""" - user_requirement: str = Field( - ..., - description="用户需求描述", - example="开发一个在线商城系统,支持用户注册、商品浏览、购物车和支付功能" - ) - skip_confirmation: bool = Field( - default=True, - description="是否跳过 Coordinator 的人工确认环节" - ) - - -class RunTaskResponse(BaseModel): - """运行任务响应体""" - task_id: str - status: str - message: Optional[str] = None - - -class SSEEvent(BaseModel): - """SSE 事件数据结构""" - agent: str - type: str - content: str - timestamp: str - task_id: str - - -# ==================== 生命周期管理 ==================== - -@asynccontextmanager -async def lifespan(app: FastAPI): - """应用生命周期管理""" - # 启动时执行 - print("🚀 Multi-Agent System 启动中...") - print("📡 SSE 流服务已就绪") - - # 启动后台任务:定期清理旧流 - asyncio.create_task(cleanup_streams_periodically()) - - yield - - # 关闭时清理 - print("👋 正在关闭所有 SSE 流...") - # 可以在这里添加清理逻辑 - - -# ==================== FastAPI 应用 ==================== +from crews.sdlc_crew import SDLCCrew +from models.qwen_config import get_qwen_config +# ========== FastAPI 应用初始化 ========== app = FastAPI( - title="Multi-Agent Software Delivery System", - description=""" - 基于 CrewAI + Qwen3.5-flash 的多智能体软件交付系统 - - ## 核心功能 - - **产品需求分析**: ProductManager Agent 分析用户需求,生成 PRD - - **测试计划制定**: QAEngineer Agent 设计测试策略和用例 - - **技术方案设计**: SoftwareDeveloper Agent 输出架构设计和代码框架 - - **质量审核**: Coordinator Agent 审核所有产出物并生成交付报告 - - ## 实时通信 - 支持 SSE (Server-Sent Events) 协议,实时推送每个 Agent 的思考过程和任务状态 - - ## API 端点 - - `POST /api/run_task` - 启动新任务 - - `GET /api/stream/{task_id}` - 订阅任务执行日志(SSE) - - `GET /api/task/{task_id}/status` - 查询任务状态 - - `GET /test-ui` - 测试页面 - """, - version="1.0.0", - lifespan=lifespan, + title="SDLC Agent Demo", + description="多智能体端到端软件交付协同系统 - 基于 CrewAI + Qwen3.5-flash", + version="1.0.0" ) -# CORS 中间件(允许跨域) +# 启用 CORS app.add_middleware( CORSMiddleware, - allow_origins=["*"], # 生产环境应限制具体域名 + allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) - -# ==================== 后台任务 ==================== - -async def cleanup_streams_periodically(interval: int = 300): - """定期清理旧的流(每 5 分钟)""" - while True: - await asyncio.sleep(interval) - try: - await stream_manager.cleanup_old_streams(max_age_seconds=3600) - except Exception as e: - print(f"清理流失败:{e}") +# 挂载静态文件目录 +app.mount("/static", StaticFiles(directory="static"), name="static") -# ==================== API 路由 ==================== +# ========== 数据模型 ========== +class StartRequest(BaseModel): + """启动请求模型""" + requirement: str = Field(..., description="用户需求描述", min_length=10) -@app.post( - "/api/run_task", - response_model=RunTaskResponse, - summary="启动多智能体任务", - description="接收用户需求,启动 CrewAI 流程,异步执行并立即返回 task_id" -) -async def run_task(request: RunTaskRequest): - """ - 启动新的多智能体任务 + +# ========== 任务管理(内存存储) ========== +class TaskManager: + """任务管理器 - 负责任务状态持久化""" - - **user_requirement**: 用户需求描述 - - **skip_confirmation**: 是否跳过人工确认(默认 True) + def __init__(self): + self.tasks: Dict[str, Dict] = {} + self._lock = threading.Lock() - 返回 task_id 用于后续 SSE 流订阅 + def create_task(self, requirement: str) -> str: + """创建新任务""" + task_id = str(uuid.uuid4()) + with self._lock: + self.tasks[task_id] = { + "task_id": task_id, + "requirement": requirement, + "status": "pending", + "created_at": datetime.now().isoformat(), + "updated_at": datetime.now().isoformat(), + "events": [] + } + return task_id + + def update_task_status(self, task_id: str, status: str): + """更新任务状态""" + with self._lock: + if task_id in self.tasks: + self.tasks[task_id]["status"] = status + self.tasks[task_id]["updated_at"] = datetime.now().isoformat() + + def add_event(self, task_id: str, event: dict): + """添加事件""" + with self._lock: + if task_id in self.tasks: + self.tasks[task_id]["events"].append(event) + self.tasks[task_id]["updated_at"] = datetime.now().isoformat() + + def get_task(self, task_id: str) -> Optional[Dict]: + """获取任务信息""" + with self._lock: + return self.tasks.get(task_id).copy() if task_id in self.tasks else None + + def get_events_after(self, task_id: str, last_index: int): + """获取指定索引之后的事件""" + with self._lock: + if task_id not in self.tasks: + return [] + events = self.tasks[task_id]["events"] + return [e.copy() for e in events[last_index:]] + + +# 全局任务管理器 +task_manager = TaskManager() + + +# ========== API 端点 ========== +@app.post("/api/v1/sdlc/start", response_model=Dict[str, str]) +async def start_sdlc_process(request: StartRequest): """ + 启动 SDLC 流程(后台线程执行) + """ + # 验证配置 try: - # 生成 task_id 并启动任务 - task_id = await run_multi_agent_task( - user_requirement=request.user_requirement, - skip_confirmation=request.skip_confirmation - ) + get_qwen_config() + except ValueError as e: + raise HTTPException(status_code=500, detail=str(e)) + + # 创建任务 + task_id = task_manager.create_task(request.requirement) + + # 在后台线程中执行 SDLC 流程 + thread = threading.Thread( + target=execute_sdlc_flow, + args=(task_id, request.requirement), + daemon=True + ) + thread.start() + + return { + "task_id": task_id, + "status": "processing" + } + + +def execute_sdlc_flow(task_id: str, requirement: str): + """ + 同步执行 SDLC 流程(在后台线程中运行) + """ + task_manager.update_task_status(task_id, "processing") + + try: + crew = SDLCCrew() - return RunTaskResponse( - task_id=task_id, - status="started", - message="任务已启动,请通过 /api/stream/{task_id} 订阅执行日志" - ) + # 同步执行并收集所有事件 + for event in crew.execute_sync(requirement): + task_manager.add_event(task_id, event) + + # 标记完成 + task_manager.update_task_status(task_id, "completed") except Exception as e: - raise HTTPException( - status_code=500, - detail=f"启动任务失败:{str(e)}" - ) + task_manager.update_task_status(task_id, "failed") + task_manager.add_event(task_id, { + "event": "error", + "data": { + "error": str(e), + "timestamp": datetime.now().isoformat() + } + }) -@app.get( - "/api/stream/{task_id}", - summary="订阅任务执行日志 (SSE)", - description="建立 SSE 连接,实时接收任务执行过程中的所有事件" -) -async def stream_task_logs(task_id: str): +@app.get("/api/v1/sdlc/stream/{task_id}") +def stream_task_progress(task_id: str): """ - SSE 端点 - 实时推送任务执行日志 - - 事件类型包括: - - **start**: 任务开始 - - **agent_start**: Agent 开始执行 - - **thought**: Agent 思考过程 - - **action**: Agent 执行动作 - - **output**: Agent 输出结果 - - **step_end**: 步骤完成 - - **end**: 任务结束 - - **error**: 发生错误 - - 数据格式: - ```json - { - "agent": "ProductManager", - "type": "thought", - "content": "正在分析用户需求...", - "timestamp": "2024-01-01T12:00:00", - "task_id": "uuid" - } - ``` + SSE流式输出任务进度(同步生成器) """ - # 检查任务是否存在 - stream = await stream_manager.get_stream(task_id) - if stream is None: - # 任务不存在,返回错误 - return StreamingResponse( - iter([f"data: {json.dumps({'error': 'Task not found', 'task_id': task_id})}\n\n"]), - media_type="text/event-stream" - ) + # 验证任务存在 + task = task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + def event_generator(): + """生成 SSE事件(同步)""" + last_event_index = 0 + max_wait_time = 300 # 最多等待 5 分钟 + + start_time = time.time() + + while True: + # 检查超时 + if time.time() - start_time > max_wait_time: + break + + # 获取新事件 + events = task_manager.get_events_after(task_id, last_event_index) + + for event in events: + event_type = event.get("event", "message") + event_data = event.get("data", {}) + + yield f"event: {event_type}\ndata: {json.dumps(event_data, ensure_ascii=False)}\n\n" + last_event_index += 1 + + # 如果是结束事件,断开连接 + if event_type in ["final_result", "error"]: + return + + # 检查任务状态 + task_data = task_manager.get_task(task_id) + if task_data and task_data["status"] in ["completed", "failed"]: + break + + # 等待一下再检查 + time.sleep(0.5) - # 创建 SSE 流 return StreamingResponse( - create_sse_generator(task_id), + event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", - "X-Accel-Buffering": "no", # Nginx 禁用缓冲 + "X-Accel-Buffering": "no" } ) -@app.get( - "/api/task/{task_id}/status", - summary="查询任务状态", - description="获取任务的当前状态和基本信息" -) -async def get_task_status(task_id: str): - """查询任务状态""" - stream = await stream_manager.get_stream(task_id) - - if stream is None: - return {"task_id": task_id, "status": "not_found"} +@app.get("/api/v1/sdlc/status/{task_id}") +def get_task_status(task_id: str): + """ + 获取任务状态(非流式) + """ + task = task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") return { - "task_id": task_id, - "status": "closed" if stream.is_closed else "running", - "queue_size": stream.queue.qsize() if hasattr(stream, 'queue') else 0, + "task_id": task["task_id"], + "status": task["status"], + "created_at": task["created_at"], + "updated_at": task["updated_at"], + "events_count": len(task["events"]) } -@app.get( - "/api/streams", - summary="列出所有活跃流", - description="查看当前所有正在执行的任务" -) -async def list_active_streams(): - """列出所有活跃的 SSE 流""" - streams = stream_manager.list_active_streams() - return { - "total": len(streams), - "streams": streams - } +@app.get("/api/v1/sdlc/result/{task_id}") +def get_task_result(task_id: str): + """ + 获取任务完整结果 + """ + task = task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + if task["status"] != "completed": + raise HTTPException( + status_code=400, + detail=f"Task not completed yet. Status: {task['status']}" + ) + + return task -@app.get( - "/health", - summary="健康检查", - description="检查服务是否正常运行" -) -async def health_check(): +@app.get("/api/v1/sdlc/download/{task_id}") +def download_result(task_id: str): + """ + 打包下载任务结果(ZIP 文件) + """ + import zipfile + import io + from fastapi.responses import StreamingResponse + + task = task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + if task["status"] != "completed": + raise HTTPException( + status_code=400, + detail=f"Task not completed yet. Status: {task['status']}" + ) + + # 创建 ZIP 文件 + zip_buffer = io.BytesIO() + + with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: + # 1. SRS 文档 + srs_content = "" + for event in task["events"]: + if event["event"] == "pm_complete": + srs_content = event["data"].get("content", "") + break + zip_file.writestr("01_SRS_需求规格说明书.md", srs_content) + + # 2. 测试用例 + test_content = "" + for event in task["events"]: + if event["event"] == "qa_complete": + test_content = event["data"].get("content", "") + break + zip_file.writestr("02_Test_测试用例.md", test_content) + + # 3. 代码实现 + code_content = "" + for event in task["events"]: + if event["event"] == "dev_complete": + code_content = event["data"].get("content", "") + break + zip_file.writestr("03_Code_代码实现.md", code_content) + + # 4. 项目摘要 + summary = f"""# SDLC 项目交付摘要 + +## 项目信息 +- 任务 ID: {task['task_id']} +- 创建时间:{task['created_at']} +- 完成时间:{task['updated_at']} +- 原始需求:{task['requirement']} + +## 交付物清单 +1. 01_SRS_需求规格说明书.md - 软件需求规格说明书 +2. 02_Test_测试用例.md - 测试方案与用例 +3. 03_Code_代码实现.md - 业务代码实现 + +## 生成说明 +本项目由 SDLC Agent Demo 自动生成 +基于 CrewAI + Qwen3.5-flash + FastAPI +""" + zip_file.writestr("README_项目摘要.md", summary) + + # 准备下载 + zip_buffer.seek(0) + + return StreamingResponse( + zip_buffer, + media_type="application/zip", + headers={ + "Content-Disposition": f"attachment; filename=SDLC_Result_{task_id[:8]}.zip" + } + ) + + +@app.get("/") +def root(): + """根路径重定向到测试页面""" + return RedirectResponse(url="/static/index.html") + + +@app.get("/health") +def health_check(): """健康检查端点""" - return { - "status": "healthy", - "timestamp": datetime.now().isoformat(), - "service": "Multi-Agent Software Delivery System" - } + return {"status": "healthy", "version": "1.0.0"} -# ==================== 测试页面 ==================== - -@app.get( - "/test-ui", - response_class=HTMLResponse, - summary="测试 UI 页面", - description="一个简单的 HTML 页面,用于测试 SSE 流功能" -) -async def test_ui(): - """返回测试页面""" - return HTMLResponse(content=get_test_page_html()) - - -def get_test_page_html() -> str: - """生成测试页面 HTML""" - return """ - - - - - 多智能体系统 - 测试 UI - - - -
-

🤖 多智能体软件交付系统

- - -
-
- - - -
- 示例需求:
- - - -
-
- -
-
- - -
-
- - -
- - -
-
-
- - 等待中... -
-
- Task ID: - -
- -
-
- - -
-

📊 实时事件日志

-
-
- 暂无事件,点击上方"启动任务"按钮开始 -
-
-
-
- - - -""" - - -# ==================== 主程序入口 ==================== - +# ========== 主程序入口 ========== if __name__ == "__main__": - import uvicorn - - # 加载环境变量 - try: - from dotenv import load_dotenv - load_dotenv() - except ImportError: - pass - uvicorn.run( "main:app", host="0.0.0.0", - port=8000, + port=8000, reload=True, log_level="info" ) diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..b5fea72 --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,7 @@ +""" +Models 包初始化 +""" + +from .qwen_config import QwenConfig, get_qwen_config, get_llm + +__all__ = ["QwenConfig", "get_qwen_config", "get_llm"] diff --git a/models/qwen_config.py b/models/qwen_config.py new file mode 100644 index 0000000..02aa0d8 --- /dev/null +++ b/models/qwen_config.py @@ -0,0 +1,83 @@ +""" +Qwen3.5-flash 模型配置模块 +通过 DashScope OpenAI 兼容 API 调用通义千问模型 +""" + +import os +from typing import Optional +from crewai import LLM +from pydantic_settings import BaseSettings + + +class QwenConfig(BaseSettings): + """Qwen 模型配置类""" + + api_key: str = "sk-616332b2afa94699b4572d0fe6ac370a" + base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1" + model_name: str = "qwen3.5-flash" + temperature: float = 0.7 + max_tokens: int = 4096 + + class Config: + env_file = ".env" + extra = "ignore" + + def get_llm(self) -> LLM: + """ + 获取 CrewAI LLM 实例 + + Returns: + LLM: CrewAI LLM 对象 + """ + return LLM( + model=self.model_name, + base_url=self.base_url, + api_key=self.api_key, + temperature=self.temperature, + max_tokens=self.max_tokens + ) + + @classmethod + def load_from_env(cls) -> "QwenConfig": + """ + 从环境变量加载配置 + + Returns: + QwenConfig: 配置实例 + """ + config = cls() + if not config.api_key: + raise ValueError( + "DASHSCOPE_API_KEY 未设置,请在 .env 文件中配置或在 https://dashscope.console.aliyun.com/ 获取 API Key" + ) + return config + + +# 全局配置实例(延迟初始化) +_qwen_config: Optional[QwenConfig] = None + + +def get_qwen_config() -> QwenConfig: + """ + 获取全局 Qwen 配置实例 + + Returns: + QwenConfig: 配置实例 + + Raises: + ValueError: 如果 API Key 未设置 + """ + global _qwen_config + if _qwen_config is None: + _qwen_config = QwenConfig.load_from_env() + return _qwen_config + + +def get_llm() -> LLM: + """ + 获取全局 LLM 实例 + + Returns: + LLM: CrewAI LLM 对象 + """ + return get_qwen_config().get_llm() diff --git a/nginx.conf b/nginx.conf deleted file mode 100644 index cc043fa..0000000 --- a/nginx.conf +++ /dev/null @@ -1,129 +0,0 @@ -events { - worker_connections 1024; -} - -http { - # 上游服务器配置 - upstream multi_agent_backend { - server multi-agent-system:8000; - keepalive 32; - } - - # 日志格式 - log_format main '$remote_addr - $remote_user [$time_local] "$request" ' - '$status $body_bytes_sent "$http_referer" ' - '"$http_user_agent" "$http_x_forwarded_for"'; - - access_log /var/log/nginx/access.log main; - error_log /var/log/nginx/error.log warn; - - # 连接超时设置 - proxy_connect_timeout 60s; - proxy_send_timeout 60s; - proxy_read_timeout 60s; - - # SSE 特殊配置:禁用缓冲 - # 这对于 Server-Sent Events 至关重要 - map $http_accept $sse_connection { - default "keep-alive"; - "text/event-stream" "keep-alive"; - } - - server { - listen 80; - server_name localhost; - - # 客户端请求体大小限制 - client_max_body_size 10M; - - # API 代理配置 - location /api/ { - proxy_pass http://multi_agent_backend; - - # 必要的代理头 - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - - # HTTP/1.1 支持(SSE 必需) - proxy_http_version 1.1; - proxy_set_header Connection ""; - - # 禁用缓冲(SSE 关键配置) - proxy_buffering off; - proxy_cache off; - proxy_request_buffering off; - - # Chunked 传输编码 - chunked_transfer_encoding on; - } - - # SSE 流端点特殊配置 - location /api/stream/ { - proxy_pass http://multi_agent_backend; - - # 代理头 - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - - # HTTP/1.1 和 Connection - proxy_http_version 1.1; - proxy_set_header Connection ""; - - # SSE 关键:完全禁用缓冲 - proxy_buffering off; - proxy_cache off; - proxy_request_buffering off; - - # Nginx 特殊指令:禁用 FastCGI 缓冲 - fastcgi_buffering off; - - # 保持长连接 - proxy_connect_timeout 60s; - proxy_send_timeout 300s; - proxy_read_timeout 300s; - - # SSE 心跳 - proxy_ignore_client_abort off; - } - - # 测试 UI 页面 - location /test-ui { - proxy_pass http://multi_agent_backend; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - } - - # 健康检查端点 - location /health { - proxy_pass http://multi_agent_backend; - access_log off; - } - - # API 文档 - location /docs { - proxy_pass http://multi_agent_backend; - proxy_set_header Host $host; - } - - location /openapi.json { - proxy_pass http://multi_agent_backend; - proxy_set_header Host $host; - } - } - - # HTTPS 配置(可选,取消注释启用) - # server { - # listen 443 ssl http2; - # server_name your-domain.com; - # - # ssl_certificate /etc/nginx/ssl/fullchain.pem; - # ssl_certificate_key /etc/nginx/ssl/privkey.pem; - # ssl_protocols TLSv1.2 TLSv1.3; - # ssl_ciphers HIGH:!aNULL:!MD5; - # - # # 同样的代理配置... - # } -} diff --git a/requirements.txt b/requirements.txt index 17e4b7b..337eb59 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,31 @@ -fastapi==0.109.0 -uvicorn[standard]==0.27.0 -crewai==0.51.0 -langchain==0.1.0 -langchain-community==0.0.10 -dashscope==1.14.1 -python-dotenv==1.0.0 -pydantic==2.5.3 -uuid6==2024.1.12 -sse-starlette==2.0.0 +# AI 框架核心 +crewai>=0.85.0 +crewai-tools>=0.14.0 + +# Web 框架 +fastapi>=0.109.0 +uvicorn[standard]>=0.27.0 +python-multipart>=0.0.6 + +# SSE 支持 +sse-starlette>=2.0.0 + +# 异步处理 +asyncio>=3.4.3 +aiofiles>=23.2.1 + +# 工具库 +pydantic>=2.5.0 +pydantic-settings>=2.1.0 +python-dotenv>=1.0.0 + +# HTTP 客户端 +httpx>=0.26.0 +requests>=2.31.0 + +# 代码高亮和格式化 +pygments>=2.17.0 + +# 测试(可选) +pytest>=7.4.0 +pytest-asyncio>=0.23.0 diff --git a/start.bat b/start.bat new file mode 100644 index 0000000..c710785 --- /dev/null +++ b/start.bat @@ -0,0 +1,25 @@ +@echo off +chcp 65001 >nul +echo ============================================================ +echo SDLC Agent Demo - Quick Start Script +echo ============================================================ +echo. + +REM Check if .env file exists +if not exist ".env" ( + echo [WARNING] .env file not found! + echo Please copy .env.example to .env and configure your API key: + echo copy .env.example .env + echo. + pause + exit /b 1 +) + +echo [INFO] Starting FastAPI server... +echo [INFO] Access URL: http://localhost:8000/static/index.html +echo [INFO] Press Ctrl+C to stop the server +echo. + +python -m uvicorn main:app --reload --host 0.0.0.0 --port 8000 + +pause diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..410c84a --- /dev/null +++ b/static/index.html @@ -0,0 +1,655 @@ + + + + + + SDLC Agent Demo - 多智能体软件交付协同系统 + + + + + + + + + + + + + + + + + +
+ +
+
+
+
+
+ + + +
+
+

SDLC Agent Demo

+

多智能体端到端软件交付协同系统

+
+
+
+
+ + {{ connectionStatusText }} +
+
+
+
+
+ + +
+ +
+

1. 输入软件需求

+
+ +
+

+ 当前任务 ID: {{ taskId || '无' }} +

+ +
+
+
+ + +
+

2. 执行进度

+
+
+
+
+ {{ index + 1 }} +
+
+

{{ stage.name }}

+

{{ stage.agent }}

+
+
+
+ + {{ getStageStatusText(stage.status) }} + +
+
+
+
+ + +
+
+

3. 实时日志

+ +
+
+
+ {{ log.timestamp }} + [{{ log.event }}] + {{ log.message }} +
+
+
+ + +
+
+

4. 输出结果

+ +
+ +
+
+
+
+ + + +

{{ result.title }}

+
+
+ {{ formatDate(result.timestamp) }} + +
+
+
+ +
+
+
+
+
+
+ + +
+
+

+ 基于 CrewAI + Qwen3.5-flash + FastAPI(SSE) 构建 | Bosch Demo +

+
+
+ + +
+ ✓ 已复制到剪贴板 +
+
+ + + + diff --git a/stream_manager.py b/stream_manager.py deleted file mode 100644 index f96ff08..0000000 --- a/stream_manager.py +++ /dev/null @@ -1,283 +0,0 @@ -""" -SSE 流管理器 -负责管理任务执行过程中的消息队列和 SSE 连接 -确保多用户并发时不同 task_id 的流互不干扰 - -关键技术点: -1. 使用 asyncio.Queue 实现异步非阻塞消息队列 -2. 通过 asyncio.Lock 保证并发安全 -3. 每个 task_id 独立队列,实现任务隔离 -4. 支持从同步线程(CrewAI)安全地发布事件到异步队列 -""" - -import asyncio -from datetime import datetime -from typing import Dict, AsyncGenerator, Optional, Any -from collections import deque -import uuid -import threading -from concurrent.futures import ThreadPoolExecutor - - -class StreamEvent: - """ - SSE 事件数据结构 - - 统一的 JSON 格式设计,便于前端解析: - { - "task_id": "550e8400-e29b...", - "sequence": 1, - "agent_name": "ProductManager", - "event_type": "thought", // 或 action, output, complete - "content": "正在分析用户需求...", - "timestamp": "2023-10-27T10:00:00Z" - } - """ - - # 全局序列号计数器(每个 task_id 独立) - _sequence_counters: Dict[str, int] = {} - - def __init__( - self, - event_type: str, - agent: str, - content: str, - task_id: str, - timestamp: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None - ): - self.event_type = event_type # start, thought, action, output, end, error - self.agent = agent - self.content = content - self.task_id = task_id - self.timestamp = timestamp or datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') - self.metadata = metadata or {} - - # 为每个 task_id 维护独立的序列号 - if task_id not in StreamEvent._sequence_counters: - StreamEvent._sequence_counters[task_id] = 0 - StreamEvent._sequence_counters[task_id] += 1 - self.sequence = StreamEvent._sequence_counters[task_id] - - def to_dict(self) -> Dict[str, Any]: - """转换为字典格式用于 JSON 序列化""" - return { - "task_id": self.task_id, - "sequence": self.sequence, - "agent_name": self.agent, - "event_type": self.event_type, - "content": self.content, - "timestamp": self.timestamp, - **(self.metadata if self.metadata else {}) - } - - def to_sse_format(self) -> str: - """转换为 SSE 数据格式""" - import json - return f"data: {json.dumps(self.to_dict(), ensure_ascii=False)}\n\n" - - @classmethod - def reset_sequence(cls, task_id: str): - """重置指定 task_id 的序列号(任务重新开始时调用)""" - cls._sequence_counters[task_id] = 0 - - -class TaskStreamQueue: - """ - 单个任务的流式消息队列(线程安全) - - 并发处理逻辑: - - CrewAI 默认是同步运行的,而 FastAPI 和 SSE 需要异步 - - 使用 asyncio.Queue 的 run_coroutine_threadsafe 方法从同步线程安全地发布事件 - - 确保 stream_manager 能安全地在线程间传递消息 - """ - - def __init__(self, task_id: str, max_size: int = 1000): - self.task_id = task_id - self.queue: asyncio.Queue[StreamEvent] = asyncio.Queue(maxsize=max_size) - self.is_closed = False - self.subscribers: int = 0 - self._loop = asyncio.get_event_loop() - self._lock = threading.Lock() # 用于保护同步操作 - - async def put(self, event: StreamEvent) -> bool: - """向队列添加事件(异步调用)""" - if self.is_closed: - return False - try: - await asyncio.wait_for(self.queue.put(event), timeout=5.0) - return True - except asyncio.TimeoutError: - return False - except Exception: - return False - - def put_nowait(self, event: StreamEvent) -> bool: - """ - 从同步线程(如 CrewAI 事件处理器)安全地发布事件 - - 使用 run_coroutine_threadsafe 将协程提交到事件循环执行 - 这是实现 CrewAI(同步)与 SSE(异步)集成的关键 - """ - if self.is_closed: - return False - - try: - # 将协程提交到事件循环线程安全地执行 - future = asyncio.run_coroutine_threadsafe( - self.queue.put(event), - self._loop - ) - # 等待完成(带超时) - future.result(timeout=5.0) - return True - except Exception as e: - # print(f"发布事件失败:{e}") - return False - - async def get(self) -> Optional[StreamEvent]: - """从队列获取事件""" - if self.is_closed and self.queue.empty(): - return None - try: - return await self.queue.get() - except Exception: - return None - - def close(self): - """关闭队列""" - with self._lock: - self.is_closed = True - - async def stream_events(self) -> AsyncGenerator[StreamEvent, None]: - """异步生成器,持续产出事件直到队列关闭""" - while not (self.is_closed and self.queue.empty()): - try: - event = await asyncio.wait_for(self.queue.get(), timeout=30.0) - yield event - except asyncio.TimeoutError: - if self.is_closed and self.queue.empty(): - break - continue - except Exception: - break - - -class StreamManager: - """全局流管理器 - 管理所有任务的 SSE 流""" - - _instance: Optional['StreamManager'] = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if self._initialized: - return - self._initialized = True - # task_id -> TaskStreamQueue 映射 - self.streams: Dict[str, TaskStreamQueue] = {} - self._lock = asyncio.Lock() - - async def create_stream(self, task_id: str) -> TaskStreamQueue: - """为指定 task_id 创建新的流队列""" - async with self._lock: - if task_id in self.streams: - # 如果已存在,先关闭旧的 - old_stream = self.streams[task_id] - old_stream.close() - - # 重置序列号计数器 - StreamEvent.reset_sequence(task_id) - - queue = TaskStreamQueue(task_id) - self.streams[task_id] = queue - return queue - - async def get_stream(self, task_id: str) -> Optional[TaskStreamQueue]: - """获取指定 task_id 的流队列""" - async with self._lock: - return self.streams.get(task_id) - - async def publish_event( - self, - task_id: str, - event_type: str, - agent: str, - content: str, - metadata: Optional[Dict[str, Any]] = None - ) -> bool: - """发布事件到指定任务的流队列""" - async with self._lock: - stream = self.streams.get(task_id) - if stream is None: - return False - - event = StreamEvent( - event_type=event_type, - agent=agent, - content=content, - task_id=task_id, - metadata=metadata - ) - return await stream.put(event) - - async def close_stream(self, task_id: str): - """关闭指定任务的流队列""" - async with self._lock: - if task_id in self.streams: - self.streams[task_id].close() - # 可以选择删除或保留(如果需要历史记录) - # del self.streams[task_id] - - async def cleanup_old_streams(self, max_age_seconds: int = 3600): - """清理超过指定时间的旧流(定期调用)""" - now = datetime.now() - to_remove = [] - - async with self._lock: - for task_id, stream in self.streams.items(): - if stream.is_closed: - to_remove.append(task_id) - - async with self._lock: - for task_id in to_remove: - del self.streams[task_id] - - def list_active_streams(self) -> list: - """列出所有活跃的流""" - return [ - {"task_id": tid, "closed": s.is_closed} - for tid, s in self.streams.items() - ] - - -# 全局单例 -stream_manager = StreamManager() - - -async def create_sse_generator(task_id: str) -> AsyncGenerator[str, None]: - """ - 创建 SSE 异步生成器 - 供 FastAPI StreamingResponse 使用 - """ - stream = await stream_manager.get_stream(task_id) - if stream is None: - # 创建一个新的流(如果不存在) - stream = await stream_manager.create_stream(task_id) - - try: - async for event in stream.stream_events(): - yield event.to_sse_format() - finally: - # 发送结束标记 - import json - end_event = { - "type": "connection_end", - "task_id": task_id, - "timestamp": datetime.now().isoformat() - } - yield f"data: {json.dumps(end_event)}\n\n"