Files
doris-mcp-server/doris_mcp_server/tools/tools_manager.py
2025-06-12 19:36:16 +08:00

1085 lines
45 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Apache Doris MCP Tools Manager
Responsible for tool registration, management, scheduling and routing, does not contain specific business logic implementation
"""
import json
import time
from datetime import datetime
from typing import Any, Dict, List
from mcp.types import Tool
from ..utils.db import DorisConnectionManager
from ..utils.query_executor import DorisQueryExecutor
from ..utils.analysis_tools import TableAnalyzer, SQLAnalyzer, MemoryTracker
from ..utils.monitoring_tools import DorisMonitoringTools
from ..utils.schema_extractor import MetadataExtractor
from ..utils.logger import get_logger
logger = get_logger(__name__)
class DorisToolsManager:
"""Apache Doris Tools Manager"""
def __init__(self, connection_manager: DorisConnectionManager):
self.connection_manager = connection_manager
# Initialize business logic processors
self.query_executor = DorisQueryExecutor(connection_manager)
self.table_analyzer = TableAnalyzer(connection_manager)
self.sql_analyzer = SQLAnalyzer(connection_manager)
self.metadata_extractor = MetadataExtractor(connection_manager=connection_manager)
self.monitoring_tools = DorisMonitoringTools(connection_manager)
self.memory_tracker = MemoryTracker(connection_manager)
logger.info("DorisToolsManager initialized with business logic processors")
async def register_tools_with_mcp(self, mcp):
"""Register all tools to MCP server"""
logger.info("Starting to register MCP tools")
# SQL query execution tool (supports catalog federation queries)
@mcp.tool(
"exec_query",
description="""[Function Description]: Execute SQL query and return result command with catalog federation support.
[Parameter Content]:
- sql (string) [Required] - SQL statement to execute. MUST use three-part naming for all table references: 'catalog_name.db_name.table_name'. For internal tables use 'internal.db_name.table_name', for external tables use 'catalog_name.db_name.table_name'
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Reference catalog name for context, defaults to current catalog
- max_rows (integer) [Optional] - Maximum number of rows to return, default 100
- timeout (integer) [Optional] - Query timeout in seconds, default 30
""",
)
async def exec_query_tool(
sql: str,
db_name: str = None,
catalog_name: str = None,
max_rows: int = 100,
timeout: int = 30,
) -> str:
"""Execute SQL query (supports federation queries)"""
return await self.call_tool("exec_query", {
"sql": sql,
"db_name": db_name,
"catalog_name": catalog_name,
"max_rows": max_rows,
"timeout": timeout
})
# Get table schema tool
@mcp.tool(
"get_table_schema",
description="""[Function Description]: Get detailed structure information of the specified table (columns, types, comments, etc.).
[Parameter Content]:
- table_name (string) [Required] - Name of the table to query
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
)
async def get_table_schema_tool(
table_name: str, db_name: str = None, catalog_name: str = None
) -> str:
"""Get table schema information"""
return await self.call_tool("get_table_schema", {
"table_name": table_name,
"db_name": db_name,
"catalog_name": catalog_name
})
# Get database table list tool
@mcp.tool(
"get_db_table_list",
description="""[Function Description]: Get a list of all table names in the specified database.
[Parameter Content]:
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
)
async def get_db_table_list_tool(
db_name: str = None, catalog_name: str = None
) -> str:
"""Get database table list"""
return await self.call_tool("get_db_table_list", {
"db_name": db_name,
"catalog_name": catalog_name
})
# Get database list tool
@mcp.tool(
"get_db_list",
description="""[Function Description]: Get a list of all database names on the server.
[Parameter Content]:
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
)
async def get_db_list_tool(catalog_name: str = None) -> str:
"""Get database list"""
return await self.call_tool("get_db_list", {
"catalog_name": catalog_name
})
# Get table comment tool
@mcp.tool(
"get_table_comment",
description="""[Function Description]: Get the comment information for the specified table.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to query
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
)
async def get_table_comment_tool(
table_name: str, db_name: str = None, catalog_name: str = None
) -> str:
"""Get table comment"""
return await self.call_tool("get_table_comment", {
"table_name": table_name,
"db_name": db_name,
"catalog_name": catalog_name
})
# Get table column comments tool
@mcp.tool(
"get_table_column_comments",
description="""[Function Description]: Get comment information for all columns in the specified table.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to query
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
)
async def get_table_column_comments_tool(
table_name: str, db_name: str = None, catalog_name: str = None
) -> str:
"""Get table column comments"""
return await self.call_tool("get_table_column_comments", {
"table_name": table_name,
"db_name": db_name,
"catalog_name": catalog_name
})
# Get table indexes tool
@mcp.tool(
"get_table_indexes",
description="""[Function Description]: Get index information for the specified table.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to query
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
)
async def get_table_indexes_tool(
table_name: str, db_name: str = None, catalog_name: str = None
) -> str:
"""Get table indexes"""
return await self.call_tool("get_table_indexes", {
"table_name": table_name,
"db_name": db_name,
"catalog_name": catalog_name
})
# Get audit logs tool
@mcp.tool(
"get_recent_audit_logs",
description="""[Function Description]: Get audit log records for a recent period.
[Parameter Content]:
- days (integer) [Optional] - Number of recent days of logs to retrieve, default is 7
- limit (integer) [Optional] - Maximum number of records to return, default is 100
""",
)
async def get_recent_audit_logs_tool(
days: int = 7, limit: int = 100
) -> str:
"""Get audit logs"""
return await self.call_tool("get_recent_audit_logs", {
"days": days,
"limit": limit
})
# Get catalog list tool
@mcp.tool(
"get_catalog_list",
description="""[Function Description]: Get a list of all catalog names on the server.
[Parameter Content]:
- random_string (string) [Required] - Unique identifier for the tool call
""",
)
async def get_catalog_list_tool(random_string: str) -> str:
"""Get catalog list"""
return await self.call_tool("get_catalog_list", {
"random_string": random_string
})
# SQL Explain tool
@mcp.tool(
"get_sql_explain",
description="""[Function Description]: Get SQL execution plan using EXPLAIN command based on Doris syntax.
[Parameter Content]:
- sql (string) [Required] - SQL statement to explain
- verbose (boolean) [Optional] - Whether to show verbose information, default is false
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
)
async def get_sql_explain_tool(
sql: str,
verbose: bool = False,
db_name: str = None,
catalog_name: str = None
) -> str:
"""Get SQL execution plan"""
return await self.call_tool("get_sql_explain", {
"sql": sql,
"verbose": verbose,
"db_name": db_name,
"catalog_name": catalog_name
})
# SQL Profile tool
@mcp.tool(
"get_sql_profile",
description="""[Function Description]: Get SQL execution profile by setting trace ID and fetching profile via FE HTTP API.
[Parameter Content]:
- sql (string) [Required] - SQL statement to profile
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
- timeout (integer) [Optional] - Query timeout in seconds, default is 30
""",
)
async def get_sql_profile_tool(
sql: str,
db_name: str = None,
catalog_name: str = None,
timeout: int = 30
) -> str:
"""Get SQL execution profile"""
return await self.call_tool("get_sql_profile", {
"sql": sql,
"db_name": db_name,
"catalog_name": catalog_name,
"timeout": timeout
})
# Table data size tool
@mcp.tool(
"get_table_data_size",
description="""[Function Description]: Get table data size information via FE HTTP API.
[Parameter Content]:
- db_name (string) [Optional] - Database name, if not specified returns all databases
- table_name (string) [Optional] - Table name, if not specified returns all tables in the database
- single_replica (boolean) [Optional] - Whether to get single replica data size, default is false
""",
)
async def get_table_data_size_tool(
db_name: str = None,
table_name: str = None,
single_replica: bool = False
) -> str:
"""Get table data size information"""
return await self.call_tool("get_table_data_size", {
"db_name": db_name,
"table_name": table_name,
"single_replica": single_replica
})
# Monitoring metrics definition tool
@mcp.tool(
"get_monitoring_metrics_info",
description="""[Function Description]: Get Doris monitoring metrics definitions and descriptions without executing queries.
[Parameter Content]:
- role (string) [Optional] - Node role to get metric definitions for, default is "all"
* "fe": Only FE metrics definitions
* "be": Only BE metrics definitions
* "all": Both FE and BE metrics definitions
- monitor_type (string) [Optional] - Type of monitoring metrics, default is "all"
* "process": Process monitoring metrics
* "jvm": JVM monitoring metrics (FE only)
* "machine": Machine monitoring metrics
* "all": All monitoring types
- priority (string) [Optional] - Metric priority level, default is "core"
* "core": Only core essential metrics (10-12 items for production use)
* "p0": Only P0 (highest priority) metrics definitions
* "all": All metrics definitions (P0 and non-P0)
""",
)
async def get_monitoring_metrics_info_tool(
role: str = "all",
monitor_type: str = "all",
priority: str = "core"
) -> str:
"""Get Doris monitoring metrics definitions"""
return await self.call_tool("get_monitoring_metrics_info", {
"role": role,
"monitor_type": monitor_type,
"priority": priority
})
# Monitoring metrics data tool
@mcp.tool(
"get_monitoring_metrics_data",
description="""[Function Description]: Get actual Doris monitoring metrics data from FE and BE nodes via HTTP API.
[Parameter Content]:
- role (string) [Optional] - Node role to monitor, default is "all"
* "fe": Only FE nodes
* "be": Only BE nodes
* "all": Both FE and BE nodes
- monitor_type (string) [Optional] - Type of monitoring metrics, default is "all"
* "process": Process monitoring metrics
* "jvm": JVM monitoring metrics (FE only)
* "machine": Machine monitoring metrics
* "all": All monitoring types
- priority (string) [Optional] - Metric priority level, default is "core"
* "core": Only core essential metrics (10-12 items for production use)
* "p0": Only P0 (highest priority) metrics
* "all": All metrics (P0 and non-P0)
- include_raw_metrics (boolean) [Optional] - Whether to include raw detailed metrics data (can be very large)
""",
)
async def get_monitoring_metrics_data_tool(
role: str = "all",
monitor_type: str = "all",
priority: str = "core",
include_raw_metrics: bool = False
) -> str:
"""Get Doris monitoring metrics data"""
return await self.call_tool("get_monitoring_metrics_data", {
"role": role,
"monitor_type": monitor_type,
"priority": priority,
"include_raw_metrics": include_raw_metrics
})
# Real-time memory tracker tool
@mcp.tool(
"get_realtime_memory_stats",
description="""[Function Description]: Get real-time memory statistics via Doris BE Memory Tracker web interface.
[Parameter Content]:
- tracker_type (string) [Optional] - Type of memory trackers to retrieve, default is "overview"
* "overview": Overview type trackers (process memory, tracked memory summary)
* "global": Global shared memory trackers (cache, metadata)
* "query": Query-related memory trackers
* "load": Load-related memory trackers
* "compaction": Compaction-related memory trackers
* "all": All memory tracker types
- include_details (boolean) [Optional] - Whether to include detailed tracker information and definitions, default is true
""",
)
async def get_realtime_memory_stats_tool(
tracker_type: str = "overview",
include_details: bool = True
) -> str:
"""Get real-time memory statistics tool"""
return await self.call_tool("get_realtime_memory_stats", {
"tracker_type": tracker_type,
"include_details": include_details
})
# Historical memory tracker tool
@mcp.tool(
"get_historical_memory_stats",
description="""[Function Description]: Get historical memory statistics via Doris BE Bvar interface.
[Parameter Content]:
- tracker_names (array) [Optional] - List of specific tracker names to query, if not specified will get common trackers
* Example: ["process_resident_memory", "global", "query", "load", "compaction"]
- time_range (string) [Optional] - Time range for historical data, default is "1h"
* "1h": Last 1 hour
* "6h": Last 6 hours
* "24h": Last 24 hours
""",
)
async def get_historical_memory_stats_tool(
tracker_names: List[str] = None,
time_range: str = "1h"
) -> str:
"""Get historical memory statistics tool"""
return await self.call_tool("get_historical_memory_stats", {
"tracker_names": tracker_names,
"time_range": time_range
})
logger.info("Successfully registered 16 tools to MCP server")
async def list_tools(self) -> List[Tool]:
"""List all available query tools (for stdio mode)"""
tools = [
Tool(
name="exec_query",
description="""[Function Description]: Execute SQL query and return result command with catalog federation support.
[Parameter Content]:
- sql (string) [Required] - SQL statement to execute. MUST use three-part naming for all table references: 'catalog_name.db_name.table_name'. For internal tables use 'internal.db_name.table_name', for external tables use 'catalog_name.db_name.table_name'
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Reference catalog name for context, defaults to current catalog
- max_rows (integer) [Optional] - Maximum number of rows to return, default 100
- timeout (integer) [Optional] - Query timeout in seconds, default 30
""",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL statement to execute, must use three-part naming"},
"db_name": {"type": "string", "description": "Target database name"},
"catalog_name": {"type": "string", "description": "Catalog name"},
"max_rows": {"type": "integer", "description": "Maximum number of rows to return", "default": 100},
"timeout": {"type": "integer", "description": "Timeout in seconds", "default": 30},
},
"required": ["sql"],
},
),
Tool(
name="get_table_schema",
description="""[Function Description]: Get detailed structure information of the specified table (columns, types, comments, etc.).
[Parameter Content]:
- table_name (string) [Required] - Name of the table to query
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Table name"},
"db_name": {"type": "string", "description": "Database name"},
"catalog_name": {"type": "string", "description": "Catalog name"},
},
"required": ["table_name"],
},
),
Tool(
name="get_db_table_list",
description="""[Function Description]: Get a list of all table names in the specified database.
[Parameter Content]:
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
inputSchema={
"type": "object",
"properties": {
"db_name": {"type": "string", "description": "Database name"},
"catalog_name": {"type": "string", "description": "Catalog name"},
},
},
),
Tool(
name="get_db_list",
description="""[Function Description]: Get a list of all database names on the server.
[Parameter Content]:
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
inputSchema={
"type": "object",
"properties": {
"catalog_name": {"type": "string", "description": "Catalog name"},
},
},
),
Tool(
name="get_table_comment",
description="""[Function Description]: Get the comment information for the specified table.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to query
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Table name"},
"db_name": {"type": "string", "description": "Database name"},
"catalog_name": {"type": "string", "description": "Catalog name"},
},
"required": ["table_name"],
},
),
Tool(
name="get_table_column_comments",
description="""[Function Description]: Get comment information for all columns in the specified table.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to query
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Table name"},
"db_name": {"type": "string", "description": "Database name"},
"catalog_name": {"type": "string", "description": "Catalog name"},
},
"required": ["table_name"],
},
),
Tool(
name="get_table_indexes",
description="""[Function Description]: Get index information for the specified table.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to query
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Table name"},
"db_name": {"type": "string", "description": "Database name"},
"catalog_name": {"type": "string", "description": "Catalog name"},
},
"required": ["table_name"],
},
),
Tool(
name="get_recent_audit_logs",
description="""[Function Description]: Get audit log records for a recent period.
[Parameter Content]:
- days (integer) [Optional] - Number of recent days of logs to retrieve, default is 7
- limit (integer) [Optional] - Maximum number of records to return, default is 100
""",
inputSchema={
"type": "object",
"properties": {
"days": {"type": "integer", "description": "Number of recent days", "default": 7},
"limit": {"type": "integer", "description": "Maximum number of records", "default": 100},
},
},
),
Tool(
name="get_catalog_list",
description="""[Function Description]: Get a list of all catalog names on the server.
[Parameter Content]:
- random_string (string) [Required] - Unique identifier for the tool call
""",
inputSchema={
"type": "object",
"properties": {
"random_string": {"type": "string", "description": "Unique identifier"},
},
"required": ["random_string"],
},
),
Tool(
name="get_sql_explain",
description="""[Function Description]: Get SQL execution plan using EXPLAIN command based on Doris syntax.
[Parameter Content]:
- sql (string) [Required] - SQL statement to explain
- verbose (boolean) [Optional] - Whether to show verbose information, default is false
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
""",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL statement to explain"},
"verbose": {"type": "boolean", "description": "Whether to show verbose information", "default": False},
"db_name": {"type": "string", "description": "Database name"},
"catalog_name": {"type": "string", "description": "Catalog name"},
},
"required": ["sql"],
},
),
Tool(
name="get_sql_profile",
description="""[Function Description]: Get SQL execution profile by setting trace ID and fetching profile via FE HTTP API.
[Parameter Content]:
- sql (string) [Required] - SQL statement to profile
- db_name (string) [Optional] - Target database name, defaults to the current database
- catalog_name (string) [Optional] - Target catalog name for federation queries, defaults to current catalog
- timeout (integer) [Optional] - Query timeout in seconds, default is 30
""",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL statement to profile"},
"db_name": {"type": "string", "description": "Database name"},
"catalog_name": {"type": "string", "description": "Catalog name"},
"timeout": {"type": "integer", "description": "Query timeout in seconds", "default": 30},
},
"required": ["sql"],
},
),
Tool(
name="get_table_data_size",
description="""[Function Description]: Get table data size information via FE HTTP API.
[Parameter Content]:
- db_name (string) [Optional] - Database name, if not specified returns all databases
- table_name (string) [Optional] - Table name, if not specified returns all tables in the database
- single_replica (boolean) [Optional] - Whether to get single replica data size, default is false
""",
inputSchema={
"type": "object",
"properties": {
"db_name": {"type": "string", "description": "Database name"},
"table_name": {"type": "string", "description": "Table name"},
"single_replica": {"type": "boolean", "description": "Whether to get single replica data size", "default": False},
},
},
),
Tool(
name="get_monitoring_metrics_info",
description="""[Function Description]: Get Doris monitoring metrics definitions and descriptions without executing queries.
[Parameter Content]:
- role (string) [Optional] - Node role to get metric definitions for, default is "all"
* "fe": Only FE metrics definitions
* "be": Only BE metrics definitions
* "all": Both FE and BE metrics definitions
- monitor_type (string) [Optional] - Type of monitoring metrics, default is "all"
* "process": Process monitoring metrics
* "jvm": JVM monitoring metrics (FE only)
* "machine": Machine monitoring metrics
* "all": All monitoring types
- priority (string) [Optional] - Metric priority level, default is "core"
* "core": Only core essential metrics (10-12 items for production use)
* "p0": Only P0 (highest priority) metrics definitions
* "all": All metrics definitions (P0 and non-P0)
""",
inputSchema={
"type": "object",
"properties": {
"role": {"type": "string", "enum": ["fe", "be", "all"], "description": "Node role to get metric definitions for", "default": "all"},
"monitor_type": {"type": "string", "enum": ["process", "jvm", "machine", "all"], "description": "Type of monitoring metrics", "default": "all"},
"priority": {"type": "string", "enum": ["core", "p0", "all"], "description": "Metric priority level", "default": "core"},
},
},
),
Tool(
name="get_monitoring_metrics_data",
description="""[Function Description]: Get actual Doris monitoring metrics data from FE and BE nodes via HTTP API.
[Parameter Content]:
- role (string) [Optional] - Node role to monitor, default is "all"
* "fe": Only FE nodes
* "be": Only BE nodes
* "all": Both FE and BE nodes
- monitor_type (string) [Optional] - Type of monitoring metrics, default is "all"
* "process": Process monitoring metrics
* "jvm": JVM monitoring metrics (FE only)
* "machine": Machine monitoring metrics
* "all": All monitoring types
- priority (string) [Optional] - Metric priority level, default is "core"
* "core": Only core essential metrics (10-12 items for production use)
* "p0": Only P0 (highest priority) metrics
* "all": All metrics (P0 and non-P0)
- include_raw_metrics (boolean) [Optional] - Whether to include raw detailed metrics data (can be very large)
""",
inputSchema={
"type": "object",
"properties": {
"role": {"type": "string", "enum": ["fe", "be", "all"], "description": "Node role to monitor", "default": "all"},
"monitor_type": {"type": "string", "enum": ["process", "jvm", "machine", "all"], "description": "Type of monitoring metrics", "default": "all"},
"priority": {"type": "string", "enum": ["core", "p0", "all"], "description": "Metric priority level", "default": "core"},
"include_raw_metrics": {"type": "boolean", "description": "Whether to include raw detailed metrics data (can be very large)", "default": False},
},
},
),
Tool(
name="get_realtime_memory_stats",
description="""[Function Description]: Get real-time memory statistics via Doris BE Memory Tracker web interface.
[Parameter Content]:
- tracker_type (string) [Optional] - Type of memory trackers to retrieve, default is "overview"
* "overview": Overview type trackers (process memory, tracked memory summary)
* "global": Global shared memory trackers (cache, metadata)
* "query": Query-related memory trackers
* "load": Load-related memory trackers
* "compaction": Compaction-related memory trackers
* "all": All memory tracker types
- include_details (boolean) [Optional] - Whether to include detailed tracker information and definitions, default is true
""",
inputSchema={
"type": "object",
"properties": {
"tracker_type": {"type": "string", "enum": ["overview", "global", "query", "load", "compaction", "all"], "description": "Type of memory trackers to retrieve", "default": "overview"},
"include_details": {"type": "boolean", "description": "Whether to include detailed tracker information and definitions", "default": True},
},
},
),
Tool(
name="get_historical_memory_stats",
description="""[Function Description]: Get historical memory statistics via Doris BE Bvar interface.
[Parameter Content]:
- tracker_names (array) [Optional] - List of specific tracker names to query, if not specified will get common trackers
* Example: ["process_resident_memory", "global", "query", "load", "compaction"]
- time_range (string) [Optional] - Time range for historical data, default is "1h"
* "1h": Last 1 hour
* "6h": Last 6 hours
* "24h": Last 24 hours
""",
inputSchema={
"type": "object",
"properties": {
"tracker_names": {"type": "array", "items": {"type": "string"}, "description": "List of specific tracker names to query"},
"time_range": {"type": "string", "enum": ["1h", "6h", "24h"], "description": "Time range for historical data", "default": "1h"},
},
},
),
]
return tools
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> str:
"""
Call the specified query tool (tool routing and scheduling center)
"""
try:
start_time = time.time()
# Tool routing - dispatch requests to corresponding business logic processors
if name == "exec_query":
result = await self._exec_query_tool(arguments)
elif name == "get_table_schema":
result = await self._get_table_schema_tool(arguments)
elif name == "get_db_table_list":
result = await self._get_db_table_list_tool(arguments)
elif name == "get_db_list":
result = await self._get_db_list_tool(arguments)
elif name == "get_table_comment":
result = await self._get_table_comment_tool(arguments)
elif name == "get_table_column_comments":
result = await self._get_table_column_comments_tool(arguments)
elif name == "get_table_indexes":
result = await self._get_table_indexes_tool(arguments)
elif name == "get_recent_audit_logs":
result = await self._get_recent_audit_logs_tool(arguments)
elif name == "get_catalog_list":
result = await self._get_catalog_list_tool(arguments)
elif name == "get_sql_explain":
result = await self._get_sql_explain_tool(arguments)
elif name == "get_sql_profile":
result = await self._get_sql_profile_tool(arguments)
elif name == "get_table_data_size":
result = await self._get_table_data_size_tool(arguments)
elif name == "get_monitoring_metrics_info":
result = await self._get_monitoring_metrics_info_tool(arguments)
elif name == "get_monitoring_metrics_data":
result = await self._get_monitoring_metrics_data_tool(arguments)
elif name == "get_realtime_memory_stats":
result = await self._get_realtime_memory_stats_tool(arguments)
elif name == "get_historical_memory_stats":
result = await self._get_historical_memory_stats_tool(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
execution_time = time.time() - start_time
# Add execution information
if isinstance(result, dict):
result["_execution_info"] = {
"tool_name": name,
"execution_time": round(execution_time, 3),
"timestamp": datetime.now().isoformat(),
}
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Tool call failed {name}: {str(e)}")
error_result = {
"error": str(e),
"tool_name": name,
"arguments": arguments,
"timestamp": datetime.now().isoformat(),
}
return json.dumps(error_result, ensure_ascii=False, indent=2)
async def _exec_query_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""SQL query execution tool routing (supports federation queries)"""
sql = arguments.get("sql")
db_name = arguments.get("db_name")
catalog_name = arguments.get("catalog_name")
max_rows = arguments.get("max_rows", 100)
timeout = arguments.get("timeout", 30)
# Delegate to metadata extractor for processing
return await self.metadata_extractor.exec_query_for_mcp(
sql, db_name, catalog_name, max_rows, timeout
)
async def _get_table_schema_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get table schema tool routing"""
table_name = arguments.get("table_name")
db_name = arguments.get("db_name")
catalog_name = arguments.get("catalog_name")
# Delegate to metadata extractor for processing
return await self.metadata_extractor.get_table_schema_for_mcp(
table_name, db_name, catalog_name
)
async def _get_db_table_list_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get database table list tool routing"""
db_name = arguments.get("db_name")
catalog_name = arguments.get("catalog_name")
# Delegate to metadata extractor for processing
return await self.metadata_extractor.get_db_table_list_for_mcp(db_name, catalog_name)
async def _get_db_list_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get database list tool routing"""
catalog_name = arguments.get("catalog_name")
# Delegate to metadata extractor for processing
return await self.metadata_extractor.get_db_list_for_mcp(catalog_name)
async def _get_table_comment_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get table comment tool routing"""
table_name = arguments.get("table_name")
db_name = arguments.get("db_name")
catalog_name = arguments.get("catalog_name")
# Delegate to metadata extractor for processing
return await self.metadata_extractor.get_table_comment_for_mcp(
table_name, db_name, catalog_name
)
async def _get_table_column_comments_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get table column comments tool routing"""
table_name = arguments.get("table_name")
db_name = arguments.get("db_name")
catalog_name = arguments.get("catalog_name")
# Delegate to metadata extractor for processing
return await self.metadata_extractor.get_table_column_comments_for_mcp(
table_name, db_name, catalog_name
)
async def _get_table_indexes_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get table indexes tool routing"""
table_name = arguments.get("table_name")
db_name = arguments.get("db_name")
catalog_name = arguments.get("catalog_name")
# Delegate to metadata extractor for processing
return await self.metadata_extractor.get_table_indexes_for_mcp(
table_name, db_name, catalog_name
)
async def _get_recent_audit_logs_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get audit logs tool routing"""
days = arguments.get("days", 7)
limit = arguments.get("limit", 100)
# Delegate to metadata extractor for processing
return await self.metadata_extractor.get_recent_audit_logs_for_mcp(days, limit)
async def _get_catalog_list_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get catalog list tool routing"""
# random_string parameter is required in the source project, but not actually used in business logic
# Here we ignore it and directly call business logic
# Delegate to metadata extractor for processing
return await self.metadata_extractor.get_catalog_list_for_mcp()
async def _get_sql_explain_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""SQL Explain tool routing"""
sql = arguments.get("sql")
verbose = arguments.get("verbose", False)
db_name = arguments.get("db_name")
catalog_name = arguments.get("catalog_name")
# Delegate to SQL analyzer for processing
return await self.sql_analyzer.get_sql_explain(
sql, verbose, db_name, catalog_name
)
async def _get_sql_profile_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""SQL Profile tool routing"""
sql = arguments.get("sql")
db_name = arguments.get("db_name")
catalog_name = arguments.get("catalog_name")
timeout = arguments.get("timeout", 30)
# Delegate to SQL analyzer for processing
return await self.sql_analyzer.get_sql_profile(
sql, db_name, catalog_name, timeout
)
async def _get_table_data_size_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Table data size tool routing"""
db_name = arguments.get("db_name")
table_name = arguments.get("table_name")
single_replica = arguments.get("single_replica", False)
# Delegate to SQL analyzer for processing
return await self.sql_analyzer.get_table_data_size(
db_name, table_name, single_replica
)
async def _get_monitoring_metrics_info_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Monitoring metrics info tool routing"""
role = arguments.get("role", "all")
monitor_type = arguments.get("monitor_type", "all")
priority = arguments.get("priority", "p0")
# Delegate to monitoring tools for processing (info_only=True)
return await self.monitoring_tools.get_monitoring_metrics(
role, monitor_type, priority, info_only=True, format_type="prometheus"
)
async def _get_monitoring_metrics_data_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Monitoring metrics data tool routing"""
role = arguments.get("role", "all")
monitor_type = arguments.get("monitor_type", "all")
priority = arguments.get("priority", "p0")
include_raw_metrics = arguments.get("include_raw_metrics", False)
# Delegate to monitoring tools for processing (info_only=False)
return await self.monitoring_tools.get_monitoring_metrics(
role, monitor_type, priority, info_only=False, format_type="prometheus", include_raw_metrics=include_raw_metrics
)
async def _get_realtime_memory_stats_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Real-time memory statistics tool routing"""
tracker_type = arguments.get("tracker_type", "overview")
include_details = arguments.get("include_details", True)
# Delegate to memory tracker for processing
return await self.memory_tracker.get_realtime_memory_stats(
tracker_type, include_details
)
async def _get_historical_memory_stats_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Historical memory statistics tool routing"""
tracker_names = arguments.get("tracker_names")
time_range = arguments.get("time_range", "1h")
# Delegate to memory tracker for processing
return await self.memory_tracker.get_historical_memory_stats(
tracker_names, time_range
)