From e57c7350fb6fccbb4deed30d84e71050478d6e26 Mon Sep 17 00:00:00 2001 From: dangzerong <429714019@qq.com> Date: Tue, 21 Oct 2025 09:34:16 +0800 Subject: [PATCH] =?UTF-8?q?mcp=E6=8E=A5=E5=8F=A3flask=E6=94=B9=E6=88=90fas?= =?UTF-8?q?tapi?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/apps/mcp_server_app.py | 237 +++++++++++++++++++++++++++---------- 1 file changed, 177 insertions(+), 60 deletions(-) diff --git a/api/apps/mcp_server_app.py b/api/apps/mcp_server_app.py index f992282..33eb952 100644 --- a/api/apps/mcp_server_app.py +++ b/api/apps/mcp_server_app.py @@ -13,8 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from flask import Response, request -from flask_login import current_user, login_required +from typing import List, Optional, Dict, Any +from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from api.db import VALID_MCP_SERVER_TYPES from api.db.db_models import MCPServer @@ -23,42 +24,161 @@ from api.db.services.user_service import TenantService from api.settings import RetCode from api.utils import get_uuid -from api.utils.api_utils import get_data_error_result, get_json_result, server_error_response, validate_request, \ +from api.utils.api_utils import get_data_error_result, get_json_result, server_error_response, \ get_mcp_tools from api.utils.web_utils import get_float, safe_json_parse from rag.utils.mcp_tool_call_conn import MCPToolCallSession, close_multiple_mcp_toolcall_sessions +from pydantic import BaseModel +# Security +security = HTTPBearer() -@manager.route("/list", methods=["POST"]) # noqa: F821 -@login_required -def list_mcp() -> Response: - keywords = request.args.get("keywords", "") - page_number = int(request.args.get("page", 0)) - items_per_page = int(request.args.get("page_size", 0)) - orderby = request.args.get("orderby", "create_time") - if request.args.get("desc", "true").lower() == "false": - desc = False +# Pydantic models for request/response +class ListMCPRequest(BaseModel): + mcp_ids: List[str] = [] + +class CreateMCPRequest(BaseModel): + name: str + url: str + server_type: str + headers: Optional[Dict[str, Any]] = {} + variables: Optional[Dict[str, Any]] = {} + timeout: Optional[float] = 10 + +class UpdateMCPRequest(BaseModel): + mcp_id: str + name: Optional[str] = None + url: Optional[str] = None + server_type: Optional[str] = None + headers: Optional[Dict[str, Any]] = None + variables: Optional[Dict[str, Any]] = None + timeout: Optional[float] = None + +class RemoveMCPRequest(BaseModel): + mcp_ids: List[str] + +class ImportMCPRequest(BaseModel): + mcpServers: Dict[str, Dict[str, Any]] + timeout: Optional[float] = 10 + +class ExportMCPRequest(BaseModel): + mcp_ids: List[str] + +class ListToolsRequest(BaseModel): + mcp_ids: List[str] + timeout: Optional[float] = 10 + +class TestToolRequest(BaseModel): + mcp_id: str + tool_name: str + arguments: Dict[str, Any] + timeout: Optional[float] = 10 + +class CacheToolsRequest(BaseModel): + mcp_id: str + tools: List[Dict[str, Any]] + +class TestMCPRequest(BaseModel): + url: str + server_type: str + headers: Optional[Dict[str, Any]] = {} + variables: Optional[Dict[str, Any]] = {} + timeout: Optional[float] = 10 + +# Dependency injection +async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): + """获取当前用户""" + from api.db import StatusEnum + from api.db.services.user_service import UserService + from fastapi import HTTPException, status + import logging + + try: + from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer + except ImportError: + # 如果没有itsdangerous,使用jwt作为替代 + import jwt + Serializer = jwt + + jwt = Serializer(secret_key=settings.SECRET_KEY) + authorization = credentials.credentials + + if authorization: + try: + access_token = str(jwt.loads(authorization)) + + if not access_token or not access_token.strip(): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Authentication attempt with empty access token" + ) + + # Access tokens should be UUIDs (32 hex characters) + if len(access_token.strip()) < 32: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=f"Authentication attempt with invalid token format: {len(access_token)} chars" + ) + + user = UserService.query( + access_token=access_token, status=StatusEnum.VALID.value + ) + if user: + if not user[0].access_token or not user[0].access_token.strip(): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=f"User {user[0].email} has empty access_token in database" + ) + return user[0] + else: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid access token" + ) + except Exception as e: + logging.warning(f"load_user got exception {e}") + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid access token" + ) else: - desc = True + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Authorization header required" + ) - req = request.get_json() - mcp_ids = req.get("mcp_ids", []) +# Create router +router = APIRouter() + + +@router.post("/list") +async def list_mcp( + keywords: str = Query(""), + page: int = Query(0), + page_size: int = Query(0), + orderby: str = Query("create_time"), + desc: bool = Query(True), + req: ListMCPRequest = None, + current_user = Depends(get_current_user) +): + mcp_ids = req.mcp_ids if req else [] try: servers = MCPServerService.get_servers(current_user.id, mcp_ids, 0, 0, orderby, desc, keywords) or [] total = len(servers) - if page_number and items_per_page: - servers = servers[(page_number - 1) * items_per_page : page_number * items_per_page] + if page and page_size: + servers = servers[(page - 1) * page_size : page * page_size] return get_json_result(data={"mcp_servers": servers, "total": total}) except Exception as e: return server_error_response(e) -@manager.route("/detail", methods=["GET"]) # noqa: F821 -@login_required -def detail() -> Response: - mcp_id = request.args["mcp_id"] +@router.get("/detail") +async def detail( + mcp_id: str = Query(...), + current_user = Depends(get_current_user) +): try: mcp_server = MCPServerService.get_or_none(id=mcp_id, tenant_id=current_user.id) @@ -70,17 +190,16 @@ def detail() -> Response: return server_error_response(e) -@manager.route("/create", methods=["POST"]) # noqa: F821 -@login_required -@validate_request("name", "url", "server_type") -def create() -> Response: - req = request.get_json() - - server_type = req.get("server_type", "") +@router.post("/create") +async def create( + req: CreateMCPRequest, + current_user = Depends(get_current_user) +): + server_type = req.server_type if server_type not in VALID_MCP_SERVER_TYPES: return get_data_error_result(message="Unsupported MCP server type.") - server_name = req.get("name", "") + server_name = req.name if not server_name or len(server_name.encode("utf-8")) > 255: return get_data_error_result(message=f"Invalid MCP name or length is {len(server_name)} which is large than 255.") @@ -88,20 +207,20 @@ def create() -> Response: if e: return get_data_error_result(message="Duplicated MCP server name.") - url = req.get("url", "") + url = req.url if not url: return get_data_error_result(message="Invalid url.") - headers = safe_json_parse(req.get("headers", {})) - req["headers"] = headers - variables = safe_json_parse(req.get("variables", {})) + headers = safe_json_parse(req.headers) + variables = safe_json_parse(req.variables) variables.pop("tools", None) - timeout = get_float(req, "timeout", 10) + timeout = req.timeout try: - req["id"] = get_uuid() - req["tenant_id"] = current_user.id + req_dict = req.dict() + req_dict["id"] = get_uuid() + req_dict["tenant_id"] = current_user.id e, _ = TenantService.get_by_id(current_user.id) if not e: @@ -115,49 +234,47 @@ def create() -> Response: tools = server_tools[server_name] tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool} variables["tools"] = tools - req["variables"] = variables + req_dict["variables"] = variables - if not MCPServerService.insert(**req): + if not MCPServerService.insert(**req_dict): return get_data_error_result("Failed to create MCP server.") - return get_json_result(data=req) + return get_json_result(data=req_dict) except Exception as e: return server_error_response(e) -@manager.route("/update", methods=["POST"]) # noqa: F821 -@login_required -@validate_request("mcp_id") -def update() -> Response: - req = request.get_json() - - mcp_id = req.get("mcp_id", "") +@router.post("/update") +async def update( + req: UpdateMCPRequest, + current_user = Depends(get_current_user) +): + mcp_id = req.mcp_id e, mcp_server = MCPServerService.get_by_id(mcp_id) if not e or mcp_server.tenant_id != current_user.id: return get_data_error_result(message=f"Cannot find MCP server {mcp_id} for user {current_user.id}") - server_type = req.get("server_type", mcp_server.server_type) + server_type = req.server_type if req.server_type is not None else mcp_server.server_type if server_type and server_type not in VALID_MCP_SERVER_TYPES: return get_data_error_result(message="Unsupported MCP server type.") - server_name = req.get("name", mcp_server.name) + server_name = req.name if req.name is not None else mcp_server.name if server_name and len(server_name.encode("utf-8")) > 255: return get_data_error_result(message=f"Invalid MCP name or length is {len(server_name)} which is large than 255.") - url = req.get("url", mcp_server.url) + url = req.url if req.url is not None else mcp_server.url if not url: return get_data_error_result(message="Invalid url.") - headers = safe_json_parse(req.get("headers", mcp_server.headers)) - req["headers"] = headers - - variables = safe_json_parse(req.get("variables", mcp_server.variables)) + headers = safe_json_parse(req.headers if req.headers is not None else mcp_server.headers) + variables = safe_json_parse(req.variables if req.variables is not None else mcp_server.variables) variables.pop("tools", None) - timeout = get_float(req, "timeout", 10) + timeout = req.timeout if req.timeout is not None else 10 try: - req["tenant_id"] = current_user.id - req.pop("mcp_id", None) - req["id"] = mcp_id + req_dict = req.dict(exclude_unset=True) + req_dict["tenant_id"] = current_user.id + req_dict.pop("mcp_id", None) + req_dict["id"] = mcp_id mcp_server = MCPServer(id=server_name, name=server_name, url=url, server_type=server_type, variables=variables, headers=headers) server_tools, err_message = get_mcp_tools([mcp_server], timeout) @@ -167,12 +284,12 @@ def update() -> Response: tools = server_tools[server_name] tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool} variables["tools"] = tools - req["variables"] = variables + req_dict["variables"] = variables - if not MCPServerService.filter_update([MCPServer.id == mcp_id, MCPServer.tenant_id == current_user.id], req): + if not MCPServerService.filter_update([MCPServer.id == mcp_id, MCPServer.tenant_id == current_user.id], req_dict): return get_data_error_result(message="Failed to updated MCP server.") - e, updated_mcp = MCPServerService.get_by_id(req["id"]) + e, updated_mcp = MCPServerService.get_by_id(req_dict["id"]) if not e: return get_data_error_result(message="Failed to fetch updated MCP server.")