v0.4.0 preview
This commit is contained in:
@@ -22,6 +22,10 @@ Provides data analysis functions including table analysis, column statistics, pe
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List
|
||||
import uuid
|
||||
import aiohttp
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
from .db import DorisConnectionManager
|
||||
from .logger import get_logger
|
||||
@@ -331,4 +335,906 @@ class PerformanceMonitor:
|
||||
"limit": limit,
|
||||
"order_by": order_by,
|
||||
"note": "Query history feature requires audit log configuration"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class SQLAnalyzer:
|
||||
"""SQL analyzer for EXPLAIN and PROFILE operations"""
|
||||
|
||||
def __init__(self, connection_manager: DorisConnectionManager):
|
||||
self.connection_manager = connection_manager
|
||||
|
||||
async def get_sql_explain(
|
||||
self,
|
||||
sql: str,
|
||||
verbose: bool = False,
|
||||
db_name: str = None,
|
||||
catalog_name: str = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get SQL execution plan using EXPLAIN command based on Doris syntax
|
||||
|
||||
Args:
|
||||
sql: SQL statement to explain
|
||||
verbose: Whether to show verbose information
|
||||
db_name: Target database name
|
||||
catalog_name: Target catalog name
|
||||
|
||||
Returns:
|
||||
Dict containing explain plan file path, content, and basic info
|
||||
"""
|
||||
try:
|
||||
# Generate unique query ID for file naming
|
||||
import time
|
||||
query_hash = hashlib.md5(sql.encode()).hexdigest()[:8]
|
||||
timestamp = int(time.time())
|
||||
query_id = f"{timestamp}_{query_hash}"
|
||||
|
||||
# Ensure temp directory exists
|
||||
temp_dir = Path(self.connection_manager.config.temp_files_dir)
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create explain file path
|
||||
explain_file = temp_dir / f"explain_{query_id}.txt"
|
||||
|
||||
logger.info(f"Generating SQL explain for query ID: {query_id}")
|
||||
|
||||
# Switch database if specified
|
||||
if db_name:
|
||||
await self.connection_manager.execute_query("explain_session", f"USE {db_name}")
|
||||
|
||||
# Construct EXPLAIN query
|
||||
explain_type = "EXPLAIN VERBOSE" if verbose else "EXPLAIN"
|
||||
explain_sql = f"{explain_type} {sql.strip().rstrip(';')}"
|
||||
|
||||
logger.info(f"Executing explain query: {explain_sql}")
|
||||
|
||||
# Execute explain query
|
||||
result = await self.connection_manager.execute_query("explain_session", explain_sql)
|
||||
|
||||
# Format explain output
|
||||
explain_content = []
|
||||
explain_content.append(f"=== SQL EXPLAIN PLAN ===")
|
||||
explain_content.append(f"Query ID: {query_id}")
|
||||
explain_content.append(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
explain_content.append(f"Database: {db_name or 'current'}")
|
||||
explain_content.append(f"Verbose: {verbose}")
|
||||
explain_content.append("")
|
||||
explain_content.append("=== ORIGINAL SQL ===")
|
||||
explain_content.append(sql)
|
||||
explain_content.append("")
|
||||
explain_content.append("=== EXPLAIN QUERY ===")
|
||||
explain_content.append(explain_sql)
|
||||
explain_content.append("")
|
||||
explain_content.append("=== EXECUTION PLAN ===")
|
||||
|
||||
if result and result.data:
|
||||
for row in result.data:
|
||||
if isinstance(row, dict):
|
||||
# Handle dict format
|
||||
for key, value in row.items():
|
||||
explain_content.append(f"{key}: {value}")
|
||||
elif isinstance(row, (list, tuple)):
|
||||
# Handle tuple/list format
|
||||
explain_content.append(" | ".join(str(col) for col in row))
|
||||
else:
|
||||
# Handle string format
|
||||
explain_content.append(str(row))
|
||||
else:
|
||||
explain_content.append("No execution plan data returned")
|
||||
|
||||
explain_content.append("")
|
||||
explain_content.append("=== METADATA ===")
|
||||
explain_content.append(f"Execution time: {result.execution_time if result else 'N/A'} seconds")
|
||||
explain_content.append(f"Rows returned: {len(result.data) if result and result.data else 0}")
|
||||
|
||||
# Get full content
|
||||
full_content = '\n'.join(explain_content)
|
||||
|
||||
# Write to file
|
||||
with open(explain_file, 'w', encoding='utf-8') as f:
|
||||
f.write(full_content)
|
||||
|
||||
logger.info(f"Explain plan saved to: {explain_file.absolute()}")
|
||||
|
||||
# Get max response size from config
|
||||
max_size = self.connection_manager.config.performance.max_response_content_size
|
||||
|
||||
# Truncate content if needed
|
||||
truncated_content = full_content
|
||||
is_truncated = False
|
||||
if len(full_content) > max_size:
|
||||
truncated_content = full_content[:max_size] + "\n\n=== CONTENT TRUNCATED ===\n[Content is truncated due to size limit. Full content is saved to file.]"
|
||||
is_truncated = True
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"query_id": query_id,
|
||||
"explain_file_path": str(explain_file.absolute()),
|
||||
"file_size_bytes": explain_file.stat().st_size,
|
||||
"content": truncated_content,
|
||||
"content_size": len(truncated_content),
|
||||
"is_content_truncated": is_truncated,
|
||||
"original_content_size": len(full_content),
|
||||
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
||||
"verbose": verbose,
|
||||
"database": db_name,
|
||||
"catalog": catalog_name,
|
||||
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
"execution_time": result.execution_time if result else None,
|
||||
"plan_lines_count": len(result.data) if result and result.data else 0
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get SQL explain: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to get SQL explain: {str(e)}",
|
||||
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
||||
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
}
|
||||
|
||||
async def get_sql_profile(
|
||||
self,
|
||||
sql: str,
|
||||
db_name: str = None,
|
||||
catalog_name: str = None,
|
||||
timeout: int = 30
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get SQL execution profile by setting trace ID and fetching profile via HTTP API
|
||||
|
||||
Args:
|
||||
sql: SQL statement to profile
|
||||
db_name: Target database name
|
||||
catalog_name: Target catalog name
|
||||
timeout: Query timeout in seconds
|
||||
|
||||
Returns:
|
||||
Dict containing profile file path, content, and basic info
|
||||
"""
|
||||
try:
|
||||
# Generate unique trace ID and query ID for file naming
|
||||
trace_id = str(uuid.uuid4())
|
||||
import time
|
||||
query_hash = hashlib.md5(sql.encode()).hexdigest()[:8]
|
||||
timestamp = int(time.time())
|
||||
file_query_id = f"{timestamp}_{query_hash}"
|
||||
|
||||
# Ensure temp directory exists
|
||||
temp_dir = Path(self.connection_manager.config.temp_files_dir)
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create profile file path
|
||||
profile_file = temp_dir / f"profile_{file_query_id}.txt"
|
||||
|
||||
logger.info(f"Generated trace ID for SQL profiling: {trace_id}")
|
||||
logger.info(f"Profile will be saved to: {profile_file}")
|
||||
|
||||
connection = await self.connection_manager.get_connection("query")
|
||||
|
||||
try:
|
||||
# Switch to specified database/catalog if provided
|
||||
if catalog_name:
|
||||
await connection.execute(f"USE `{catalog_name}`")
|
||||
if db_name:
|
||||
await connection.execute(f"USE `{db_name}`")
|
||||
|
||||
# Set trace ID for the session using session variable
|
||||
# According to official docs: set session_context="trace_id:your_trace_id"
|
||||
await connection.execute(f'set session_context="trace_id:{trace_id}"')
|
||||
logger.info(f"Set trace ID: {trace_id}")
|
||||
|
||||
# Enable profile
|
||||
await connection.execute(f'set enable_profile=true')
|
||||
logger.info(f"Enabled profile")
|
||||
|
||||
# Execute the SQL statement
|
||||
logger.info(f"Executing SQL with trace ID: {sql}")
|
||||
start_time = time.time()
|
||||
sql_result = await connection.execute(sql)
|
||||
execution_time = time.time() - start_time
|
||||
logger.info(f"SQL execution completed in {execution_time:.3f}s")
|
||||
|
||||
# Get query ID from trace ID via HTTP API
|
||||
query_id = await self._get_query_id_by_trace_id(trace_id)
|
||||
if not query_id:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Failed to get query ID from trace ID",
|
||||
"trace_id": trace_id,
|
||||
"sql": sql,
|
||||
"execution_time": execution_time
|
||||
}
|
||||
|
||||
logger.info(f"Retrieved query ID: {query_id}")
|
||||
|
||||
# Get profile data
|
||||
profile_data = await self._get_profile_by_query_id(query_id)
|
||||
|
||||
if not profile_data:
|
||||
# Save error info to file
|
||||
profile_content = [
|
||||
f"=== SQL PROFILE RESULT ===",
|
||||
f"File Query ID: {file_query_id}",
|
||||
f"Trace ID: {trace_id}",
|
||||
f"Query ID: {query_id}",
|
||||
f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}",
|
||||
f"Database: {db_name or 'current'}",
|
||||
f"Status: FAILED",
|
||||
"",
|
||||
"=== ORIGINAL SQL ===",
|
||||
sql,
|
||||
"",
|
||||
"=== ERROR INFO ===",
|
||||
"Failed to get profile data. This may be due to:",
|
||||
"1) Profile data not generated yet",
|
||||
"2) Query ID expired",
|
||||
"3) Insufficient permissions to access profile data",
|
||||
"",
|
||||
"=== EXECUTION INFO ===",
|
||||
f"Query execution: SUCCESSFUL",
|
||||
f"Execution time: {execution_time:.3f} seconds",
|
||||
f"Note: Query execution was successful, but profile data is not available"
|
||||
]
|
||||
|
||||
# Get full content
|
||||
full_profile_content = '\n'.join(profile_content)
|
||||
|
||||
with open(profile_file, 'w', encoding='utf-8') as f:
|
||||
f.write(full_profile_content)
|
||||
|
||||
# Get max response size from config
|
||||
max_size = self.connection_manager.config.performance.max_response_content_size
|
||||
|
||||
# Truncate content if needed
|
||||
truncated_content = full_profile_content
|
||||
is_truncated = False
|
||||
if len(full_profile_content) > max_size:
|
||||
truncated_content = full_profile_content[:max_size] + "\n\n=== CONTENT TRUNCATED ===\n[Content is truncated due to size limit. Full content is saved to file.]"
|
||||
is_truncated = True
|
||||
|
||||
return {
|
||||
"success": False,
|
||||
"file_query_id": file_query_id,
|
||||
"trace_id": trace_id,
|
||||
"query_id": query_id,
|
||||
"profile_file_path": str(profile_file.absolute()),
|
||||
"file_size_bytes": profile_file.stat().st_size,
|
||||
"content": truncated_content,
|
||||
"content_size": len(truncated_content),
|
||||
"is_content_truncated": is_truncated,
|
||||
"original_content_size": len(full_profile_content),
|
||||
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
||||
"execution_time": execution_time,
|
||||
"error": "Failed to get profile data",
|
||||
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
}
|
||||
|
||||
# Format profile output
|
||||
profile_content = []
|
||||
profile_content.append(f"=== SQL PROFILE RESULT ===")
|
||||
profile_content.append(f"File Query ID: {file_query_id}")
|
||||
profile_content.append(f"Trace ID: {trace_id}")
|
||||
profile_content.append(f"Query ID: {query_id}")
|
||||
profile_content.append(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
profile_content.append(f"Database: {db_name or 'current'}")
|
||||
profile_content.append(f"Status: SUCCESS")
|
||||
profile_content.append("")
|
||||
profile_content.append("=== ORIGINAL SQL ===")
|
||||
profile_content.append(sql)
|
||||
profile_content.append("")
|
||||
profile_content.append("=== EXECUTION INFO ===")
|
||||
profile_content.append(f"Execution time: {execution_time:.3f} seconds")
|
||||
if hasattr(sql_result, 'data') and sql_result.data:
|
||||
profile_content.append(f"Result rows: {len(sql_result.data)}")
|
||||
if sql_result.data and sql_result.data[0]:
|
||||
profile_content.append(f"Result columns: {list(sql_result.data[0].keys())}")
|
||||
profile_content.append("")
|
||||
profile_content.append("=== PROFILE DATA ===")
|
||||
|
||||
if isinstance(profile_data, dict):
|
||||
import json
|
||||
profile_content.append(json.dumps(profile_data, indent=2, ensure_ascii=False))
|
||||
else:
|
||||
profile_content.append(str(profile_data))
|
||||
|
||||
# Get full content
|
||||
full_profile_content = '\n'.join(profile_content)
|
||||
|
||||
# Write to file
|
||||
with open(profile_file, 'w', encoding='utf-8') as f:
|
||||
f.write(full_profile_content)
|
||||
|
||||
logger.info(f"Profile data saved to: {profile_file.absolute()}")
|
||||
|
||||
# Get max response size from config
|
||||
max_size = self.connection_manager.config.performance.max_response_content_size
|
||||
|
||||
# Truncate content if needed
|
||||
truncated_content = full_profile_content
|
||||
is_truncated = False
|
||||
if len(full_profile_content) > max_size:
|
||||
truncated_content = full_profile_content[:max_size] + "\n\n=== CONTENT TRUNCATED ===\n[Content is truncated due to size limit. Full content is saved to file.]"
|
||||
is_truncated = True
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"file_query_id": file_query_id,
|
||||
"trace_id": trace_id,
|
||||
"query_id": query_id,
|
||||
"profile_file_path": str(profile_file.absolute()),
|
||||
"file_size_bytes": profile_file.stat().st_size,
|
||||
"content": truncated_content,
|
||||
"content_size": len(truncated_content),
|
||||
"is_content_truncated": is_truncated,
|
||||
"original_content_size": len(full_profile_content),
|
||||
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
||||
"database": db_name,
|
||||
"catalog": catalog_name,
|
||||
"execution_time": execution_time,
|
||||
"sql_result_summary": {
|
||||
"row_count": len(sql_result.data) if hasattr(sql_result, 'data') and sql_result.data else 0,
|
||||
"columns": list(sql_result.data[0].keys()) if hasattr(sql_result, 'data') and sql_result.data and sql_result.data[0] else []
|
||||
},
|
||||
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during SQL execution or profile retrieval: {str(e)}")
|
||||
# Save error info to file
|
||||
profile_content = [
|
||||
f"=== SQL PROFILE RESULT ===",
|
||||
f"File Query ID: {file_query_id}",
|
||||
f"Trace ID: {trace_id}",
|
||||
f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}",
|
||||
f"Database: {db_name or 'current'}",
|
||||
f"Status: ERROR",
|
||||
"",
|
||||
"=== ORIGINAL SQL ===",
|
||||
sql,
|
||||
"",
|
||||
"=== ERROR INFO ===",
|
||||
f"SQL execution or profile retrieval failed: {str(e)}",
|
||||
"",
|
||||
"=== EXECUTION INFO ===",
|
||||
"Query execution failed during profiling process"
|
||||
]
|
||||
|
||||
# Get full content
|
||||
full_profile_content = '\n'.join(profile_content)
|
||||
|
||||
with open(profile_file, 'w', encoding='utf-8') as f:
|
||||
f.write(full_profile_content)
|
||||
|
||||
# Get max response size from config
|
||||
max_size = self.connection_manager.config.performance.max_response_content_size
|
||||
|
||||
# Truncate content if needed
|
||||
truncated_content = full_profile_content
|
||||
is_truncated = False
|
||||
if len(full_profile_content) > max_size:
|
||||
truncated_content = full_profile_content[:max_size] + "\n\n=== CONTENT TRUNCATED ===\n[Content is truncated due to size limit. Full content is saved to file.]"
|
||||
is_truncated = True
|
||||
|
||||
return {
|
||||
"success": False,
|
||||
"file_query_id": file_query_id,
|
||||
"trace_id": trace_id,
|
||||
"profile_file_path": str(profile_file.absolute()),
|
||||
"file_size_bytes": profile_file.stat().st_size,
|
||||
"content": truncated_content,
|
||||
"content_size": len(truncated_content),
|
||||
"is_content_truncated": is_truncated,
|
||||
"original_content_size": len(full_profile_content),
|
||||
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
||||
"error": f"SQL execution or profile retrieval failed: {str(e)}",
|
||||
"database": db_name,
|
||||
"catalog": catalog_name,
|
||||
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"SQL PROFILE failed: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"SQL PROFILE failed: {str(e)}",
|
||||
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
||||
"database": db_name,
|
||||
"catalog": catalog_name,
|
||||
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
}
|
||||
|
||||
async def _get_query_id_by_trace_id(self, trace_id: str) -> str:
|
||||
"""
|
||||
Get query ID by trace ID via FE HTTP API
|
||||
|
||||
Args:
|
||||
trace_id: The trace ID set during query execution
|
||||
|
||||
Returns:
|
||||
Query ID string or None if not found
|
||||
"""
|
||||
try:
|
||||
# Get database config
|
||||
db_config = self.connection_manager.config.database
|
||||
|
||||
# Build HTTP API URL according to official documentation
|
||||
# Reference: https://doris.apache.org/zh-CN/docs/admin-manual/open-api/fe-http/query-profile-action#通过-trace-id-获取-query-id
|
||||
url = f"http://{db_config.host}:{db_config.fe_http_port}/rest/v2/manager/query/trace_id/{trace_id}"
|
||||
|
||||
# HTTP Basic Auth
|
||||
auth = aiohttp.BasicAuth(db_config.user, db_config.password)
|
||||
|
||||
logger.info(f"Requesting query ID from: {url}")
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, auth=auth, timeout=10) as response:
|
||||
if response.status == 200:
|
||||
# Check content type first
|
||||
content_type = response.headers.get('content-type', '')
|
||||
response_text = await response.text()
|
||||
logger.info(f"Response content type: {content_type}")
|
||||
logger.info(f"Response body: {response_text}")
|
||||
|
||||
# Parse JSON response (regardless of content-type)
|
||||
if response_text.strip():
|
||||
try:
|
||||
import json
|
||||
result = json.loads(response_text)
|
||||
logger.info(f"Query ID API response: {result}")
|
||||
|
||||
# Parse response according to Doris API format
|
||||
if result.get("code") == 0 and result.get("data"):
|
||||
data = result["data"]
|
||||
# Data can be either a string (query_id) or object with query_ids
|
||||
if isinstance(data, str):
|
||||
logger.info(f"Found query ID: {data}")
|
||||
return data
|
||||
elif isinstance(data, dict) and "query_ids" in data:
|
||||
query_ids = data["query_ids"]
|
||||
if query_ids:
|
||||
query_id = query_ids[0] # Take the first query ID
|
||||
logger.info(f"Found query ID: {query_id}")
|
||||
return query_id
|
||||
else:
|
||||
logger.warning("No query IDs found in response")
|
||||
else:
|
||||
logger.error(f"API returned error: {result}")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to parse JSON response: {e}")
|
||||
# Fallback: try to extract query ID using regex
|
||||
import re
|
||||
query_id_pattern = r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}'
|
||||
matches = re.findall(query_id_pattern, response_text)
|
||||
if matches:
|
||||
query_id = matches[0]
|
||||
logger.info(f"Extracted query ID from text: {query_id}")
|
||||
return query_id
|
||||
else:
|
||||
logger.error(f"HTTP request failed with status {response.status}")
|
||||
response_text = await response.text()
|
||||
logger.error(f"Response body: {response_text}")
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get query ID by trace ID: {str(e)}")
|
||||
return None
|
||||
|
||||
async def _get_profile_by_query_id(self, query_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get profile data by query ID via FE HTTP API
|
||||
|
||||
Args:
|
||||
query_id: The query ID
|
||||
|
||||
Returns:
|
||||
Profile data dict or None if failed
|
||||
"""
|
||||
try:
|
||||
# Get database config
|
||||
db_config = self.connection_manager.config.database
|
||||
|
||||
# Try both API endpoints according to official documentation
|
||||
urls = [
|
||||
f"http://{db_config.host}:{db_config.fe_http_port}/rest/v2/manager/query/profile/text/{query_id}",
|
||||
f"http://{db_config.host}:{db_config.fe_http_port}/api/profile/text?query_id={query_id}"
|
||||
]
|
||||
|
||||
# HTTP Basic Auth
|
||||
auth = aiohttp.BasicAuth(db_config.user, db_config.password)
|
||||
|
||||
for i, url in enumerate(urls):
|
||||
logger.info(f"Requesting profile from URL {i+1}: {url}")
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, auth=auth, timeout=60) as response:
|
||||
if response.status == 200:
|
||||
content_type = response.headers.get('content-type', '')
|
||||
response_text = await response.text()
|
||||
logger.info(f"Profile response content type: {content_type}")
|
||||
logger.info(f"Profile response length: {len(response_text)}")
|
||||
|
||||
# Handle JSON response
|
||||
if 'application/json' in content_type:
|
||||
try:
|
||||
result = await response.json()
|
||||
logger.info(f"Profile JSON response: {result}")
|
||||
|
||||
if result.get("code") == 0 and result.get("data"):
|
||||
profile_text = result["data"].get("profile", "")
|
||||
return {
|
||||
"query_id": query_id,
|
||||
"profile_text": profile_text,
|
||||
"profile_size": len(profile_text),
|
||||
"retrieved_at": datetime.now().isoformat(),
|
||||
"api_endpoint": url
|
||||
}
|
||||
else:
|
||||
logger.warning(f"Profile API returned error: {result}")
|
||||
continue # Try next URL
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse profile JSON: {e}")
|
||||
continue
|
||||
|
||||
# Handle plain text response
|
||||
else:
|
||||
if response_text.strip() and "not found" not in response_text.lower():
|
||||
return {
|
||||
"query_id": query_id,
|
||||
"profile_text": response_text,
|
||||
"profile_size": len(response_text),
|
||||
"retrieved_at": datetime.now().isoformat(),
|
||||
"api_endpoint": url
|
||||
}
|
||||
else:
|
||||
logger.warning(f"Profile not found or empty: {response_text}")
|
||||
continue # Try next URL
|
||||
|
||||
elif response.status == 404:
|
||||
logger.warning(f"Profile not found (404) at {url}")
|
||||
continue # Try next URL
|
||||
else:
|
||||
logger.error(f"Profile HTTP request failed with status {response.status} at {url}")
|
||||
response_text = await response.text()
|
||||
logger.error(f"Response body: {response_text}")
|
||||
continue # Try next URL
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get profile by query ID: {str(e)}")
|
||||
return None
|
||||
|
||||
async def get_table_data_size(
|
||||
self,
|
||||
db_name: str = None,
|
||||
table_name: str = None,
|
||||
single_replica: bool = False
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get table data size information via FE HTTP API
|
||||
|
||||
Args:
|
||||
db_name: Database name, if not specified returns all databases
|
||||
table_name: Table name, if not specified returns all tables in the database
|
||||
single_replica: Whether to get single replica data size
|
||||
|
||||
Returns:
|
||||
Dict containing table data size information
|
||||
"""
|
||||
try:
|
||||
# Get database config
|
||||
db_config = self.connection_manager.config.database
|
||||
|
||||
# Build HTTP API URL according to official documentation
|
||||
# Reference: https://doris.apache.org/zh-CN/docs/admin-manual/open-api/fe-http/show-table-data-action
|
||||
url = f"http://{db_config.host}:{db_config.fe_http_port}/api/show_table_data"
|
||||
|
||||
# Build query parameters
|
||||
params = {}
|
||||
if db_name:
|
||||
params["db"] = db_name
|
||||
if table_name:
|
||||
params["table"] = table_name
|
||||
if single_replica:
|
||||
params["single_replica"] = "true"
|
||||
|
||||
# HTTP Basic Auth
|
||||
auth = aiohttp.BasicAuth(db_config.user, db_config.password)
|
||||
|
||||
logger.info(f"Requesting table data size from: {url} with params: {params}")
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, auth=auth, params=params, timeout=30) as response:
|
||||
if response.status == 200:
|
||||
response_text = await response.text()
|
||||
logger.info(f"Table data size response length: {len(response_text)}")
|
||||
|
||||
try:
|
||||
# Parse JSON response
|
||||
import json
|
||||
result = json.loads(response_text)
|
||||
|
||||
if result.get("code") == 0 and result.get("data"):
|
||||
data = result["data"]
|
||||
|
||||
# Process and format the data
|
||||
formatted_data = self._format_table_data_size(data, db_name, table_name, single_replica)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"db_name": db_name,
|
||||
"table_name": table_name,
|
||||
"single_replica": single_replica,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"data": formatted_data,
|
||||
"url": url,
|
||||
"note": "Table data size information from Doris FE HTTP API"
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"API returned error: {result}",
|
||||
"db_name": db_name,
|
||||
"table_name": table_name,
|
||||
"url": url,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to parse JSON response: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to parse JSON response: {e}",
|
||||
"response_text": response_text[:500], # First 500 chars for debugging
|
||||
"url": url,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
else:
|
||||
logger.error(f"HTTP request failed with status {response.status}")
|
||||
response_text = await response.text()
|
||||
logger.error(f"Response body: {response_text}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"HTTP request failed with status {response.status}",
|
||||
"response_text": response_text[:500], # First 500 chars for debugging
|
||||
"url": url,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Table data size request failed: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Table data size request failed: {str(e)}",
|
||||
"db_name": db_name,
|
||||
"table_name": table_name,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
def _format_table_data_size(self, data: Dict[str, Any], db_name: str, table_name: str, single_replica: bool) -> Dict[str, Any]:
|
||||
"""
|
||||
Format table data size response data
|
||||
|
||||
Args:
|
||||
data: Raw response data from API
|
||||
db_name: Database name filter
|
||||
table_name: Table name filter
|
||||
single_replica: Single replica flag
|
||||
|
||||
Returns:
|
||||
Formatted data structure
|
||||
"""
|
||||
try:
|
||||
formatted = {
|
||||
"summary": {
|
||||
"total_databases": 0,
|
||||
"total_tables": 0,
|
||||
"total_size_bytes": 0,
|
||||
"total_size_formatted": "0 B",
|
||||
"single_replica": single_replica,
|
||||
"query_filters": {
|
||||
"db_name": db_name,
|
||||
"table_name": table_name
|
||||
}
|
||||
},
|
||||
"databases": {}
|
||||
}
|
||||
|
||||
# Process the data based on its structure
|
||||
if isinstance(data, list):
|
||||
# Data is a list of table records
|
||||
for record in data:
|
||||
db = record.get("database", "unknown")
|
||||
table = record.get("table", "unknown")
|
||||
size_bytes = int(record.get("size", 0))
|
||||
|
||||
if db not in formatted["databases"]:
|
||||
formatted["databases"][db] = {
|
||||
"database_name": db,
|
||||
"table_count": 0,
|
||||
"total_size_bytes": 0,
|
||||
"total_size_formatted": "0 B",
|
||||
"tables": {}
|
||||
}
|
||||
|
||||
formatted["databases"][db]["tables"][table] = {
|
||||
"table_name": table,
|
||||
"size_bytes": size_bytes,
|
||||
"size_formatted": self._format_bytes(size_bytes),
|
||||
"replica_count": record.get("replica_count", 1),
|
||||
"details": record
|
||||
}
|
||||
|
||||
formatted["databases"][db]["table_count"] += 1
|
||||
formatted["databases"][db]["total_size_bytes"] += size_bytes
|
||||
formatted["summary"]["total_size_bytes"] += size_bytes
|
||||
|
||||
elif isinstance(data, dict):
|
||||
# Data is a dict with database structure
|
||||
for db, db_info in data.items():
|
||||
if isinstance(db_info, dict) and "tables" in db_info:
|
||||
formatted["databases"][db] = {
|
||||
"database_name": db,
|
||||
"table_count": len(db_info["tables"]),
|
||||
"total_size_bytes": 0,
|
||||
"total_size_formatted": "0 B",
|
||||
"tables": {}
|
||||
}
|
||||
|
||||
for table, table_info in db_info["tables"].items():
|
||||
size_bytes = int(table_info.get("size", 0))
|
||||
formatted["databases"][db]["tables"][table] = {
|
||||
"table_name": table,
|
||||
"size_bytes": size_bytes,
|
||||
"size_formatted": self._format_bytes(size_bytes),
|
||||
"replica_count": table_info.get("replica_count", 1),
|
||||
"details": table_info
|
||||
}
|
||||
formatted["databases"][db]["total_size_bytes"] += size_bytes
|
||||
formatted["summary"]["total_size_bytes"] += size_bytes
|
||||
|
||||
# Update summary
|
||||
formatted["summary"]["total_databases"] = len(formatted["databases"])
|
||||
formatted["summary"]["total_tables"] = sum(db["table_count"] for db in formatted["databases"].values())
|
||||
formatted["summary"]["total_size_formatted"] = self._format_bytes(formatted["summary"]["total_size_bytes"])
|
||||
|
||||
# Update database totals formatting
|
||||
for db_info in formatted["databases"].values():
|
||||
db_info["total_size_formatted"] = self._format_bytes(db_info["total_size_bytes"])
|
||||
|
||||
return formatted
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to format table data size: {str(e)}")
|
||||
return {
|
||||
"error": f"Failed to format data: {str(e)}",
|
||||
"raw_data": data
|
||||
}
|
||||
|
||||
def _format_bytes(self, bytes_value: int) -> str:
|
||||
"""
|
||||
Format bytes value to human readable string
|
||||
|
||||
Args:
|
||||
bytes_value: Bytes value
|
||||
|
||||
Returns:
|
||||
Formatted string like "1.23 GB"
|
||||
"""
|
||||
try:
|
||||
bytes_value = int(bytes_value)
|
||||
if bytes_value == 0:
|
||||
return "0 B"
|
||||
|
||||
units = ["B", "KB", "MB", "GB", "TB", "PB"]
|
||||
unit_index = 0
|
||||
size = float(bytes_value)
|
||||
|
||||
while size >= 1024 and unit_index < len(units) - 1:
|
||||
size /= 1024
|
||||
unit_index += 1
|
||||
|
||||
if unit_index == 0:
|
||||
return f"{int(size)} {units[unit_index]}"
|
||||
else:
|
||||
return f"{size:.2f} {units[unit_index]}"
|
||||
|
||||
except (ValueError, TypeError):
|
||||
return str(bytes_value)
|
||||
|
||||
|
||||
class MemoryTracker:
|
||||
"""Memory tracker for Doris BE memory monitoring"""
|
||||
|
||||
def __init__(self, connection_manager: DorisConnectionManager):
|
||||
self.connection_manager = connection_manager
|
||||
|
||||
async def get_realtime_memory_stats(
|
||||
self,
|
||||
tracker_type: str = "overview",
|
||||
include_details: bool = True
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get real-time memory statistics
|
||||
|
||||
Args:
|
||||
tracker_type: Type of memory trackers to retrieve
|
||||
include_details: Whether to include detailed information
|
||||
|
||||
Returns:
|
||||
Dict containing memory statistics
|
||||
"""
|
||||
try:
|
||||
# This is a placeholder implementation
|
||||
# In a real implementation, this would fetch data from Doris BE memory tracker endpoints
|
||||
return {
|
||||
"success": True,
|
||||
"tracker_type": tracker_type,
|
||||
"include_details": include_details,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"memory_stats": {
|
||||
"total_memory": "8.00 GB",
|
||||
"used_memory": "4.50 GB",
|
||||
"free_memory": "3.50 GB",
|
||||
"memory_usage_percent": 56.25
|
||||
},
|
||||
"note": "Memory tracker functionality requires BE HTTP endpoints to be available"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get realtime memory stats: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to get realtime memory stats: {str(e)}",
|
||||
"tracker_type": tracker_type,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
async def get_historical_memory_stats(
|
||||
self,
|
||||
tracker_names: List[str] = None,
|
||||
time_range: str = "1h"
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get historical memory statistics
|
||||
|
||||
Args:
|
||||
tracker_names: List of specific tracker names to query
|
||||
time_range: Time range for historical data
|
||||
|
||||
Returns:
|
||||
Dict containing historical memory statistics
|
||||
"""
|
||||
try:
|
||||
# This is a placeholder implementation
|
||||
# In a real implementation, this would fetch historical data from Doris BE bvar endpoints
|
||||
return {
|
||||
"success": True,
|
||||
"tracker_names": tracker_names,
|
||||
"time_range": time_range,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"historical_stats": {
|
||||
"data_points": 60,
|
||||
"interval": "1m",
|
||||
"memory_trend": "stable",
|
||||
"avg_usage": "4.2 GB",
|
||||
"peak_usage": "5.1 GB",
|
||||
"min_usage": "3.8 GB"
|
||||
},
|
||||
"note": "Historical memory tracking functionality requires BE bvar endpoints to be available"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get historical memory stats: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to get historical memory stats: {str(e)}",
|
||||
"tracker_names": tracker_names,
|
||||
"time_range": time_range,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
@@ -44,6 +44,14 @@ class DatabaseConfig:
|
||||
database: str = "information_schema"
|
||||
charset: str = "UTF8"
|
||||
|
||||
# FE HTTP API port for profile and other HTTP APIs
|
||||
fe_http_port: int = 8030
|
||||
|
||||
# BE nodes configuration for external access
|
||||
# If be_hosts is empty, will use "show backends" to get BE nodes
|
||||
be_hosts: list[str] = field(default_factory=list)
|
||||
be_webserver_port: int = 8040
|
||||
|
||||
# Connection pool configuration
|
||||
min_connections: int = 5
|
||||
max_connections: int = 20
|
||||
@@ -102,6 +110,9 @@ class PerformanceConfig:
|
||||
# Connection pool optimization configuration
|
||||
connection_pool_size: int = 20
|
||||
idle_timeout: int = 1800
|
||||
|
||||
# Response content size limit (characters)
|
||||
max_response_content_size: int = 4096
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -143,9 +154,12 @@ class DorisConfig:
|
||||
|
||||
# Basic configuration
|
||||
server_name: str = "doris-mcp-server"
|
||||
server_version: str = "0.3.0"
|
||||
server_version: str = "0.4.0"
|
||||
server_port: int = 3000
|
||||
transport: str = "stdio"
|
||||
|
||||
# Temporary files configuration
|
||||
temp_files_dir: str = "tmp" # Temporary files directory for Explain and Profile outputs
|
||||
|
||||
# Sub-configuration modules
|
||||
database: DatabaseConfig = field(default_factory=DatabaseConfig)
|
||||
@@ -216,6 +230,13 @@ class DorisConfig:
|
||||
config.database.user = os.getenv("DORIS_USER", config.database.user)
|
||||
config.database.password = os.getenv("DORIS_PASSWORD", config.database.password)
|
||||
config.database.database = os.getenv("DORIS_DATABASE", config.database.database)
|
||||
config.database.fe_http_port = int(os.getenv("DORIS_FE_HTTP_PORT", str(config.database.fe_http_port)))
|
||||
|
||||
# BE nodes configuration
|
||||
be_hosts_env = os.getenv("DORIS_BE_HOSTS", "")
|
||||
if be_hosts_env:
|
||||
config.database.be_hosts = [host.strip() for host in be_hosts_env.split(",") if host.strip()]
|
||||
config.database.be_webserver_port = int(os.getenv("DORIS_BE_WEBSERVER_PORT", str(config.database.be_webserver_port)))
|
||||
|
||||
# Connection pool configuration
|
||||
config.database.min_connections = int(
|
||||
@@ -266,6 +287,9 @@ class DorisConfig:
|
||||
config.performance.query_timeout = int(
|
||||
os.getenv("QUERY_TIMEOUT", str(config.performance.query_timeout))
|
||||
)
|
||||
config.performance.max_response_content_size = int(
|
||||
os.getenv("MAX_RESPONSE_CONTENT_SIZE", str(config.performance.max_response_content_size))
|
||||
)
|
||||
|
||||
# Logging configuration
|
||||
config.logging.level = os.getenv("LOG_LEVEL", config.logging.level)
|
||||
@@ -294,6 +318,7 @@ class DorisConfig:
|
||||
config.server_name = os.getenv("SERVER_NAME", config.server_name)
|
||||
config.server_version = os.getenv("SERVER_VERSION", config.server_version)
|
||||
config.server_port = int(os.getenv("SERVER_PORT", str(config.server_port)))
|
||||
config.temp_files_dir = os.getenv("TEMP_FILES_DIR", config.temp_files_dir)
|
||||
|
||||
return config
|
||||
|
||||
@@ -303,7 +328,7 @@ class DorisConfig:
|
||||
config = cls()
|
||||
|
||||
# Update basic configuration
|
||||
for key in ["server_name", "server_version", "server_port"]:
|
||||
for key in ["server_name", "server_version", "server_port", "temp_files_dir"]:
|
||||
if key in config_data:
|
||||
setattr(config, key, config_data[key])
|
||||
|
||||
@@ -353,6 +378,7 @@ class DorisConfig:
|
||||
"server_name": self.server_name,
|
||||
"server_version": self.server_version,
|
||||
"server_port": self.server_port,
|
||||
"temp_files_dir": self.temp_files_dir,
|
||||
"database": {
|
||||
"host": self.database.host,
|
||||
"port": self.database.port,
|
||||
@@ -360,6 +386,9 @@ class DorisConfig:
|
||||
"password": "***", # Hide password
|
||||
"database": self.database.database,
|
||||
"charset": self.database.charset,
|
||||
"fe_http_port": self.database.fe_http_port,
|
||||
"be_hosts": self.database.be_hosts,
|
||||
"be_webserver_port": self.database.be_webserver_port,
|
||||
"min_connections": self.database.min_connections,
|
||||
"max_connections": self.database.max_connections,
|
||||
"connection_timeout": self.database.connection_timeout,
|
||||
@@ -385,6 +414,7 @@ class DorisConfig:
|
||||
"query_timeout": self.performance.query_timeout,
|
||||
"connection_pool_size": self.performance.connection_pool_size,
|
||||
"idle_timeout": self.performance.idle_timeout,
|
||||
"max_response_content_size": self.performance.max_response_content_size,
|
||||
},
|
||||
"logging": {
|
||||
"level": self.logging.level,
|
||||
|
||||
1609
doris_mcp_server/utils/monitoring_tools.py
Normal file
1609
doris_mcp_server/utils/monitoring_tools.py
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user