""" 示例代码 - 待办事项应用后端实现 这是多智能体系统生成的代码示例 """ from fastapi import FastAPI, HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel, EmailStr, Field from typing import Optional, List from datetime import datetime, timedelta import uvicorn import sqlite3 import hashlib import jwt # ==================== 配置 ==================== SECRET_KEY = "your-secret-key-change-in-production" ALGORITHM = "HS256" DATABASE = "todo_app.db" # ==================== 数据模型 ==================== class UserCreate(BaseModel): """用户注册请求""" username: str = Field(..., min_length=3, max_length=50) email: EmailStr password: str = Field(..., min_length=6) class UserLogin(BaseModel): """用户登录请求""" username: str password: str class TodoItemCreate(BaseModel): """创建待办事项""" title: str = Field(..., min_length=1, max_length=200) description: Optional[str] = None priority: int = Field(default=1, ge=1, le=5) # 1-5, 5 最高 due_date: Optional[datetime] = None class TodoItemUpdate(BaseModel): """更新待办事项""" title: Optional[str] = Field(None, min_length=1, max_length=200) description: Optional[str] = None priority: Optional[int] = Field(None, ge=1, le=5) due_date: Optional[datetime] = None completed: Optional[bool] = None class TodoItemResponse(BaseModel): """待办事项响应""" id: int user_id: int title: str description: Optional[str] priority: int completed: bool due_date: Optional[datetime] created_at: datetime updated_at: datetime class Config: from_attributes = True # ==================== 数据库操作 ==================== def get_db_connection(): """获取数据库连接""" conn = sqlite3.connect(DATABASE) conn.row_factory = sqlite3.Row return conn def init_db(): """初始化数据库""" conn = get_db_connection() cursor = conn.cursor() # 创建用户表 cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username VARCHAR(50) UNIQUE NOT NULL, email VARCHAR(100) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建待办事项表 cursor.execute(''' CREATE TABLE IF NOT EXISTS todo_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, title VARCHAR(200) NOT NULL, description TEXT, priority INTEGER DEFAULT 1, completed BOOLEAN DEFAULT 0, due_date TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_todo_user ON todo_items(user_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_todo_completed ON todo_items(completed)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_todo_priority ON todo_items(priority DESC)') conn.commit() conn.close() print("✓ 数据库初始化完成") # ==================== 认证工具 ==================== security = HTTPBearer() def hash_password(password: str) -> str: """密码哈希""" return hashlib.sha256(password.encode()).hexdigest() def verify_password(password: str, password_hash: str) -> bool: """验证密码""" return hash_password(password) == password_hash def create_access_token(data: dict, expires_delta: timedelta = timedelta(days=7)) -> str: """创建 JWT Token""" to_encode = data.copy() expire = datetime.utcnow() + expires_delta to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> dict: """获取当前用户(从 JWT Token)""" try: token = credentials.credentials payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id = payload.get("sub") if user_id is None: raise HTTPException(status_code=401, detail="Invalid token") return {"user_id": int(user_id), "username": payload.get("username")} except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Token validation failed") # ==================== FastAPI 应用 ==================== app = FastAPI( title="Todo App API", description="在线待办事项应用 - 由多智能体系统生成", version="1.0.0" ) @app.on_event("startup") async def startup_event(): """应用启动时初始化""" init_db() # ==================== 用户接口 ==================== @app.post("/api/users/register", tags=["用户管理"]) async def register(user_data: UserCreate): """用户注册""" conn = get_db_connection() cursor = conn.cursor() # 检查用户名是否已存在 cursor.execute('SELECT id FROM users WHERE username = ?', (user_data.username,)) if cursor.fetchone(): conn.close() raise HTTPException(status_code=400, detail="Username already exists") # 检查邮箱是否已存在 cursor.execute('SELECT id FROM users WHERE email = ?', (user_data.email,)) if cursor.fetchone(): conn.close() raise HTTPException(status_code=400, detail="Email already registered") # 创建用户 password_hash = hash_password(user_data.password) cursor.execute( 'INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)', (user_data.username, user_data.email, password_hash) ) conn.commit() user_id = cursor.lastrowid conn.close() return { "message": "注册成功", "user_id": user_id, "username": user_data.username } @app.post("/api/users/login", tags=["用户管理"]) async def login(credentials: UserLogin): """用户登录""" conn = get_db_connection() cursor = conn.cursor() # 查找用户 cursor.execute( 'SELECT id, username, password_hash FROM users WHERE username = ?', (credentials.username,) ) user = cursor.fetchone() conn.close() if not user or not verify_password(credentials.password, user['password_hash']): raise HTTPException(status_code=401, detail="Invalid username or password") # 生成 Token access_token = create_access_token( data={"sub": str(user['id']), "username": user['username']} ) return { "message": "登录成功", "access_token": access_token, "token_type": "bearer", "user_id": user['id'], "username": user['username'] } # ==================== 待办事项接口 ==================== @app.post("/api/todos", tags=["待办事项"], response_model=TodoItemResponse) async def create_todo( todo_data: TodoItemCreate, current_user: dict = Depends(get_current_user) ): """创建待办事项""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(''' INSERT INTO todo_items (user_id, title, description, priority, due_date) VALUES (?, ?, ?, ?, ?) ''', ( current_user["user_id"], todo_data.title, todo_data.description, todo_data.priority, todo_data.due_date )) conn.commit() todo_id = cursor.lastrowid # 查询刚创建的记录 cursor.execute('SELECT * FROM todo_items WHERE id = ?', (todo_id,)) todo_row = cursor.fetchone() conn.close() return dict(todo_row) @app.get("/api/todos", tags=["待办事项"], response_model=List[TodoItemResponse]) async def list_todos( skip: int = 0, limit: int = 50, completed: Optional[bool] = None, priority: Optional[int] = None, search: Optional[str] = None, sort_by: str = "priority", # priority, due_date, created_at current_user: dict = Depends(get_current_user) ): """获取待办事项列表(支持过滤和排序)""" conn = get_db_connection() cursor = conn.cursor() # 构建查询 query = 'SELECT * FROM todo_items WHERE user_id = ?' params = [current_user["user_id"]] # 添加过滤条件 if completed is not None: query += ' AND completed = ?' params.append(completed) if priority is not None: query += ' AND priority = ?' params.append(priority) if search: query += ' AND (title LIKE ? OR description LIKE ?)' search_term = f'%{search}%' params.extend([search_term, search_term]) # 添加排序 order_map = { 'priority': 'priority DESC', 'due_date': 'due_date ASC', 'created_at': 'created_at DESC' } order_clause = order_map.get(sort_by, 'priority DESC') query += f' ORDER BY {order_clause}' # 添加分页 query += ' LIMIT ? OFFSET ?' params.extend([limit, skip]) cursor.execute(query, params) todos = cursor.fetchall() conn.close() return [dict(todo) for todo in todos] @app.get("/api/todos/{todo_id}", tags=["待办事项"], response_model=TodoItemResponse) async def get_todo( todo_id: int, current_user: dict = Depends(get_current_user) ): """获取单个待办事项详情""" conn = get_db_connection() cursor = conn.cursor() cursor.execute( 'SELECT * FROM todo_items WHERE id = ? AND user_id = ?', (todo_id, current_user["user_id"]) ) todo = cursor.fetchone() conn.close() if not todo: raise HTTPException(status_code=404, detail="Todo item not found") return dict(todo) @app.put("/api/todos/{todo_id}", tags=["待办事项"], response_model=TodoItemResponse) async def update_todo( todo_id: int, todo_data: TodoItemUpdate, current_user: dict = Depends(get_current_user) ): """更新待办事项""" conn = get_db_connection() cursor = conn.cursor() # 检查是否存在 cursor.execute( 'SELECT id FROM todo_items WHERE id = ? AND user_id = ?', (todo_id, current_user["user_id"]) ) if not cursor.fetchone(): conn.close() raise HTTPException(status_code=404, detail="Todo item not found") # 构建更新字段 updates = [] params = [] if todo_data.title is not None: updates.append('title = ?') params.append(todo_data.title) if todo_data.description is not None: updates.append('description = ?') params.append(todo_data.description) if todo_data.priority is not None: updates.append('priority = ?') params.append(todo_data.priority) if todo_data.completed is not None: updates.append('completed = ?') params.append(todo_data.completed) if todo_data.due_date is not None: updates.append('due_date = ?') params.append(todo_data.due_date) # 添加更新时间 updates.append('updated_at = CURRENT_TIMESTAMP') # 执行更新 params.append(todo_id) query = f'UPDATE todo_items SET {", ".join(updates)} WHERE id = ?' cursor.execute(query, params) conn.commit() # 查询更新后的记录 cursor.execute('SELECT * FROM todo_items WHERE id = ?', (todo_id,)) todo = cursor.fetchone() conn.close() return dict(todo) @app.delete("/api/todos/{todo_id}", tags=["待办事项"]) async def delete_todo( todo_id: int, current_user: dict = Depends(get_current_user) ): """删除待办事项""" conn = get_db_connection() cursor = conn.cursor() # 检查是否存在 cursor.execute( 'SELECT id FROM todo_items WHERE id = ? AND user_id = ?', (todo_id, current_user["user_id"]) ) if not cursor.fetchone(): conn.close() raise HTTPException(status_code=404, detail="Todo item not found") # 执行删除 cursor.execute('DELETE FROM todo_items WHERE id = ?', (todo_id,)) conn.commit() conn.close() return {"message": "删除成功"} @app.get("/api/todos/stats", tags=["待办事项"]) async def get_stats(current_user: dict = Depends(get_current_user)): """获取统计信息""" conn = get_db_connection() cursor = conn.cursor() # 总数 cursor.execute( 'SELECT COUNT(*) as total FROM todo_items WHERE user_id = ?', (current_user["user_id"],) ) total = cursor.fetchone()['total'] # 已完成 cursor.execute( 'SELECT COUNT(*) as completed FROM todo_items WHERE user_id = ? AND completed = 1', (current_user["user_id"],) ) completed = cursor.fetchone()['completed'] # 未完成 pending = total - completed conn.close() return { "total": total, "completed": completed, "pending": pending } # ==================== 主程序入口 ==================== if __name__ == "__main__": print("=" * 60) print("🚀 Todo App API服务启动中...") print("=" * 60) print("\n📖 API 文档:http://localhost:8000/docs") print("📊 健康检查:http://localhost:8000/health\n") uvicorn.run( "example_code:app", host="0.0.0.0", port=8000, reload=True )