Files
TERES_fastapi_backend/api/apps/mcp_server_app.py

562 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import List, Optional, Dict, Any
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from api.db import VALID_MCP_SERVER_TYPES
from api.db.db_models import MCPServer
from api.db.services.mcp_server_service import MCPServerService
from api.db.services.user_service import TenantService
from api.settings import RetCode
from api.utils import get_uuid
from api.utils.api_utils import get_data_error_result, get_json_result, server_error_response, \
get_mcp_tools
from api.utils.web_utils import get_float, safe_json_parse
from rag.utils.mcp_tool_call_conn import MCPToolCallSession, close_multiple_mcp_toolcall_sessions
from pydantic import BaseModel
# Security
security = HTTPBearer()
# Pydantic models for request/response
class ListMCPRequest(BaseModel):
mcp_ids: List[str] = []
class CreateMCPRequest(BaseModel):
name: str
url: str
server_type: str
headers: Optional[Dict[str, Any]] = {}
variables: Optional[Dict[str, Any]] = {}
timeout: Optional[float] = 10
class UpdateMCPRequest(BaseModel):
mcp_id: str
name: Optional[str] = None
url: Optional[str] = None
server_type: Optional[str] = None
headers: Optional[Dict[str, Any]] = None
variables: Optional[Dict[str, Any]] = None
timeout: Optional[float] = None
class RemoveMCPRequest(BaseModel):
mcp_ids: List[str]
class ImportMCPRequest(BaseModel):
mcpServers: Dict[str, Dict[str, Any]]
timeout: Optional[float] = 10
class ExportMCPRequest(BaseModel):
mcp_ids: List[str]
class ListToolsRequest(BaseModel):
mcp_ids: List[str]
timeout: Optional[float] = 10
class TestToolRequest(BaseModel):
mcp_id: str
tool_name: str
arguments: Dict[str, Any]
timeout: Optional[float] = 10
class CacheToolsRequest(BaseModel):
mcp_id: str
tools: List[Dict[str, Any]]
class TestMCPRequest(BaseModel):
url: str
server_type: str
headers: Optional[Dict[str, Any]] = {}
variables: Optional[Dict[str, Any]] = {}
timeout: Optional[float] = 10
# Dependency injection
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""获取当前用户"""
from api.db import StatusEnum
from api.db.services.user_service import UserService
from fastapi import HTTPException, status
import logging
try:
from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer
except ImportError:
# 如果没有itsdangerous使用jwt作为替代
import jwt
Serializer = jwt
jwt = Serializer(secret_key=settings.SECRET_KEY)
authorization = credentials.credentials
if authorization:
try:
access_token = str(jwt.loads(authorization))
if not access_token or not access_token.strip():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication attempt with empty access token"
)
# Access tokens should be UUIDs (32 hex characters)
if len(access_token.strip()) < 32:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication attempt with invalid token format: {len(access_token)} chars"
)
user = UserService.query(
access_token=access_token, status=StatusEnum.VALID.value
)
if user:
if not user[0].access_token or not user[0].access_token.strip():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"User {user[0].email} has empty access_token in database"
)
return user[0]
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token"
)
except Exception as e:
logging.warning(f"load_user got exception {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token"
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authorization header required"
)
# Create router
router = APIRouter()
@router.post("/list")
async def list_mcp(
keywords: str = Query(""),
page: int = Query(0),
page_size: int = Query(0),
orderby: str = Query("create_time"),
desc: bool = Query(True),
req: ListMCPRequest = None,
current_user = Depends(get_current_user)
):
mcp_ids = req.mcp_ids if req else []
try:
servers = MCPServerService.get_servers(current_user.id, mcp_ids, 0, 0, orderby, desc, keywords) or []
total = len(servers)
if page and page_size:
servers = servers[(page - 1) * page_size : page * page_size]
return get_json_result(data={"mcp_servers": servers, "total": total})
except Exception as e:
return server_error_response(e)
@router.get("/detail")
async def detail(
mcp_id: str = Query(...),
current_user = Depends(get_current_user)
):
try:
mcp_server = MCPServerService.get_or_none(id=mcp_id, tenant_id=current_user.id)
if mcp_server is None:
return get_json_result(code=RetCode.NOT_FOUND, data=None)
return get_json_result(data=mcp_server.to_dict())
except Exception as e:
return server_error_response(e)
@router.post("/create")
async def create(
req: CreateMCPRequest,
current_user = Depends(get_current_user)
):
server_type = req.server_type
if server_type not in VALID_MCP_SERVER_TYPES:
return get_data_error_result(message="Unsupported MCP server type.")
server_name = req.name
if not server_name or len(server_name.encode("utf-8")) > 255:
return get_data_error_result(message=f"Invalid MCP name or length is {len(server_name)} which is large than 255.")
e, _ = MCPServerService.get_by_name_and_tenant(name=server_name, tenant_id=current_user.id)
if e:
return get_data_error_result(message="Duplicated MCP server name.")
url = req.url
if not url:
return get_data_error_result(message="Invalid url.")
headers = safe_json_parse(req.headers)
variables = safe_json_parse(req.variables)
variables.pop("tools", None)
timeout = req.timeout
try:
req_dict = req.dict()
req_dict["id"] = get_uuid()
req_dict["tenant_id"] = current_user.id
e, _ = TenantService.get_by_id(current_user.id)
if not e:
return get_data_error_result(message="Tenant not found.")
mcp_server = MCPServer(id=server_name, name=server_name, url=url, server_type=server_type, variables=variables, headers=headers)
server_tools, err_message = get_mcp_tools([mcp_server], timeout)
if err_message:
return get_data_error_result(err_message)
tools = server_tools[server_name]
tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool}
variables["tools"] = tools
req_dict["variables"] = variables
if not MCPServerService.insert(**req_dict):
return get_data_error_result("Failed to create MCP server.")
return get_json_result(data=req_dict)
except Exception as e:
return server_error_response(e)
@router.post("/update")
async def update(
req: UpdateMCPRequest,
current_user = Depends(get_current_user)
):
mcp_id = req.mcp_id
e, mcp_server = MCPServerService.get_by_id(mcp_id)
if not e or mcp_server.tenant_id != current_user.id:
return get_data_error_result(message=f"Cannot find MCP server {mcp_id} for user {current_user.id}")
server_type = req.server_type if req.server_type is not None else mcp_server.server_type
if server_type and server_type not in VALID_MCP_SERVER_TYPES:
return get_data_error_result(message="Unsupported MCP server type.")
server_name = req.name if req.name is not None else mcp_server.name
if server_name and len(server_name.encode("utf-8")) > 255:
return get_data_error_result(message=f"Invalid MCP name or length is {len(server_name)} which is large than 255.")
url = req.url if req.url is not None else mcp_server.url
if not url:
return get_data_error_result(message="Invalid url.")
headers = safe_json_parse(req.headers if req.headers is not None else mcp_server.headers)
variables = safe_json_parse(req.variables if req.variables is not None else mcp_server.variables)
variables.pop("tools", None)
timeout = req.timeout if req.timeout is not None else 10
try:
req_dict = req.dict(exclude_unset=True)
req_dict["tenant_id"] = current_user.id
req_dict.pop("mcp_id", None)
req_dict["id"] = mcp_id
mcp_server = MCPServer(id=server_name, name=server_name, url=url, server_type=server_type, variables=variables, headers=headers)
server_tools, err_message = get_mcp_tools([mcp_server], timeout)
if err_message:
return get_data_error_result(err_message)
tools = server_tools[server_name]
tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool}
variables["tools"] = tools
req_dict["variables"] = variables
if not MCPServerService.filter_update([MCPServer.id == mcp_id, MCPServer.tenant_id == current_user.id], req_dict):
return get_data_error_result(message="Failed to updated MCP server.")
e, updated_mcp = MCPServerService.get_by_id(req_dict["id"])
if not e:
return get_data_error_result(message="Failed to fetch updated MCP server.")
return get_json_result(data=updated_mcp.to_dict())
except Exception as e:
return server_error_response(e)
@manager.route("/rm", methods=["POST"]) # noqa: F821
@login_required
@validate_request("mcp_ids")
def rm() -> Response:
req = request.get_json()
mcp_ids = req.get("mcp_ids", [])
try:
req["tenant_id"] = current_user.id
if not MCPServerService.delete_by_ids(mcp_ids):
return get_data_error_result(message=f"Failed to delete MCP servers {mcp_ids}")
return get_json_result(data=True)
except Exception as e:
return server_error_response(e)
@manager.route("/import", methods=["POST"]) # noqa: F821
@login_required
@validate_request("mcpServers")
def import_multiple() -> Response:
req = request.get_json()
servers = req.get("mcpServers", {})
if not servers:
return get_data_error_result(message="No MCP servers provided.")
timeout = get_float(req, "timeout", 10)
results = []
try:
for server_name, config in servers.items():
if not all(key in config for key in {"type", "url"}):
results.append({"server": server_name, "success": False, "message": "Missing required fields (type or url)"})
continue
if not server_name or len(server_name.encode("utf-8")) > 255:
results.append({"server": server_name, "success": False, "message": f"Invalid MCP name or length is {len(server_name)} which is large than 255."})
continue
base_name = server_name
new_name = base_name
counter = 0
while True:
e, _ = MCPServerService.get_by_name_and_tenant(name=new_name, tenant_id=current_user.id)
if not e:
break
new_name = f"{base_name}_{counter}"
counter += 1
create_data = {
"id": get_uuid(),
"tenant_id": current_user.id,
"name": new_name,
"url": config["url"],
"server_type": config["type"],
"variables": {"authorization_token": config.get("authorization_token", "")},
}
headers = {"authorization_token": config["authorization_token"]} if "authorization_token" in config else {}
variables = {k: v for k, v in config.items() if k not in {"type", "url", "headers"}}
mcp_server = MCPServer(id=new_name, name=new_name, url=config["url"], server_type=config["type"], variables=variables, headers=headers)
server_tools, err_message = get_mcp_tools([mcp_server], timeout)
if err_message:
results.append({"server": base_name, "success": False, "message": err_message})
continue
tools = server_tools[new_name]
tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool}
create_data["variables"]["tools"] = tools
if MCPServerService.insert(**create_data):
result = {"server": server_name, "success": True, "action": "created", "id": create_data["id"], "new_name": new_name}
if new_name != base_name:
result["message"] = f"Renamed from '{base_name}' to '{new_name}' avoid duplication"
results.append(result)
else:
results.append({"server": server_name, "success": False, "message": "Failed to create MCP server."})
return get_json_result(data={"results": results})
except Exception as e:
return server_error_response(e)
@manager.route("/export", methods=["POST"]) # noqa: F821
@login_required
@validate_request("mcp_ids")
def export_multiple() -> Response:
req = request.get_json()
mcp_ids = req.get("mcp_ids", [])
if not mcp_ids:
return get_data_error_result(message="No MCP server IDs provided.")
try:
exported_servers = {}
for mcp_id in mcp_ids:
e, mcp_server = MCPServerService.get_by_id(mcp_id)
if e and mcp_server.tenant_id == current_user.id:
server_key = mcp_server.name
exported_servers[server_key] = {
"type": mcp_server.server_type,
"url": mcp_server.url,
"name": mcp_server.name,
"authorization_token": mcp_server.variables.get("authorization_token", ""),
"tools": mcp_server.variables.get("tools", {}),
}
return get_json_result(data={"mcpServers": exported_servers})
except Exception as e:
return server_error_response(e)
@manager.route("/list_tools", methods=["POST"]) # noqa: F821
@login_required
@validate_request("mcp_ids")
def list_tools() -> Response:
req = request.get_json()
mcp_ids = req.get("mcp_ids", [])
if not mcp_ids:
return get_data_error_result(message="No MCP server IDs provided.")
timeout = get_float(req, "timeout", 10)
results = {}
tool_call_sessions = []
try:
for mcp_id in mcp_ids:
e, mcp_server = MCPServerService.get_by_id(mcp_id)
if e and mcp_server.tenant_id == current_user.id:
server_key = mcp_server.id
cached_tools = mcp_server.variables.get("tools", {})
tool_call_session = MCPToolCallSession(mcp_server, mcp_server.variables)
tool_call_sessions.append(tool_call_session)
try:
tools = tool_call_session.get_tools(timeout)
except Exception as e:
tools = []
return get_data_error_result(message=f"MCP list tools error: {e}")
results[server_key] = []
for tool in tools:
tool_dict = tool.model_dump()
cached_tool = cached_tools.get(tool_dict["name"], {})
tool_dict["enabled"] = cached_tool.get("enabled", True)
results[server_key].append(tool_dict)
return get_json_result(data=results)
except Exception as e:
return server_error_response(e)
finally:
# PERF: blocking call to close sessions — consider moving to background thread or task queue
close_multiple_mcp_toolcall_sessions(tool_call_sessions)
@manager.route("/test_tool", methods=["POST"]) # noqa: F821
@login_required
@validate_request("mcp_id", "tool_name", "arguments")
def test_tool() -> Response:
req = request.get_json()
mcp_id = req.get("mcp_id", "")
if not mcp_id:
return get_data_error_result(message="No MCP server ID provided.")
timeout = get_float(req, "timeout", 10)
tool_name = req.get("tool_name", "")
arguments = req.get("arguments", {})
if not all([tool_name, arguments]):
return get_data_error_result(message="Require provide tool name and arguments.")
tool_call_sessions = []
try:
e, mcp_server = MCPServerService.get_by_id(mcp_id)
if not e or mcp_server.tenant_id != current_user.id:
return get_data_error_result(message=f"Cannot find MCP server {mcp_id} for user {current_user.id}")
tool_call_session = MCPToolCallSession(mcp_server, mcp_server.variables)
tool_call_sessions.append(tool_call_session)
result = tool_call_session.tool_call(tool_name, arguments, timeout)
# PERF: blocking call to close sessions — consider moving to background thread or task queue
close_multiple_mcp_toolcall_sessions(tool_call_sessions)
return get_json_result(data=result)
except Exception as e:
return server_error_response(e)
@manager.route("/cache_tools", methods=["POST"]) # noqa: F821
@login_required
@validate_request("mcp_id", "tools")
def cache_tool() -> Response:
req = request.get_json()
mcp_id = req.get("mcp_id", "")
if not mcp_id:
return get_data_error_result(message="No MCP server ID provided.")
tools = req.get("tools", [])
e, mcp_server = MCPServerService.get_by_id(mcp_id)
if not e or mcp_server.tenant_id != current_user.id:
return get_data_error_result(message=f"Cannot find MCP server {mcp_id} for user {current_user.id}")
variables = mcp_server.variables
tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool}
variables["tools"] = tools
if not MCPServerService.filter_update([MCPServer.id == mcp_id, MCPServer.tenant_id == current_user.id], {"variables": variables}):
return get_data_error_result(message="Failed to updated MCP server.")
return get_json_result(data=tools)
@manager.route("/test_mcp", methods=["POST"]) # noqa: F821
@validate_request("url", "server_type")
def test_mcp() -> Response:
req = request.get_json()
url = req.get("url", "")
if not url:
return get_data_error_result(message="Invalid MCP url.")
server_type = req.get("server_type", "")
if server_type not in VALID_MCP_SERVER_TYPES:
return get_data_error_result(message="Unsupported MCP server type.")
timeout = get_float(req, "timeout", 10)
headers = safe_json_parse(req.get("headers", {}))
variables = safe_json_parse(req.get("variables", {}))
mcp_server = MCPServer(id=f"{server_type}: {url}", server_type=server_type, url=url, headers=headers, variables=variables)
result = []
try:
tool_call_session = MCPToolCallSession(mcp_server, mcp_server.variables)
try:
tools = tool_call_session.get_tools(timeout)
except Exception as e:
tools = []
return get_data_error_result(message=f"Test MCP error: {e}")
finally:
# PERF: blocking call to close sessions — consider moving to background thread or task queue
close_multiple_mcp_toolcall_sessions([tool_call_session])
for tool in tools:
tool_dict = tool.model_dump()
tool_dict["enabled"] = True
result.append(tool_dict)
return get_json_result(data=result)
except Exception as e:
return server_error_response(e)