1277 lines
55 KiB
Python
1277 lines
55 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
"""
|
|
Data Analysis Tools Module
|
|
Provides data analysis functions including table analysis, column statistics, performance monitoring, etc.
|
|
"""
|
|
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List
|
|
import uuid
|
|
import aiohttp
|
|
import hashlib
|
|
from pathlib import Path
|
|
|
|
from .db import DorisConnectionManager
|
|
from .logger import get_logger
|
|
from .sql_security_utils import (
|
|
SQLSecurityError,
|
|
validate_identifier,
|
|
quote_identifier,
|
|
build_table_reference,
|
|
get_auth_context
|
|
)
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class TableAnalyzer:
|
|
"""Table analyzer"""
|
|
|
|
def __init__(self, connection_manager: DorisConnectionManager):
|
|
self.connection_manager = connection_manager
|
|
|
|
async def get_table_summary(
|
|
self,
|
|
table_name: str,
|
|
include_sample: bool = True,
|
|
sample_size: int = 10
|
|
) -> Dict[str, Any]:
|
|
"""Get table summary information"""
|
|
# SECURITY FIX: Validate table_name and get auth_context
|
|
try:
|
|
validate_identifier(table_name, "table name")
|
|
except SQLSecurityError as e:
|
|
raise ValueError(f"Invalid table name: {e}")
|
|
|
|
auth_context = get_auth_context()
|
|
connection = await self.connection_manager.get_connection("query")
|
|
|
|
# Get table basic information using parameterized query
|
|
table_info_sql = """
|
|
SELECT
|
|
table_name,
|
|
table_comment,
|
|
table_rows,
|
|
create_time,
|
|
engine
|
|
FROM information_schema.tables
|
|
WHERE table_schema = DATABASE()
|
|
AND table_name = %s
|
|
"""
|
|
|
|
table_info_result = await connection.execute(table_info_sql, params=(table_name,), auth_context=auth_context)
|
|
if not table_info_result.data:
|
|
raise ValueError(f"Table {table_name} does not exist")
|
|
|
|
table_info = table_info_result.data[0]
|
|
|
|
# Get column information using parameterized query
|
|
columns_sql = """
|
|
SELECT
|
|
column_name,
|
|
data_type,
|
|
is_nullable,
|
|
column_comment
|
|
FROM information_schema.columns
|
|
WHERE table_schema = DATABASE()
|
|
AND table_name = %s
|
|
ORDER BY ordinal_position
|
|
"""
|
|
|
|
columns_result = await connection.execute(columns_sql, params=(table_name,), auth_context=auth_context)
|
|
|
|
summary = {
|
|
"table_name": table_info["table_name"],
|
|
"comment": table_info.get("table_comment"),
|
|
"row_count": table_info.get("table_rows", 0),
|
|
"create_time": str(table_info.get("create_time")),
|
|
"engine": table_info.get("engine"),
|
|
"column_count": len(columns_result.data),
|
|
"columns": columns_result.data,
|
|
}
|
|
|
|
# Get sample data using quoted identifier
|
|
if include_sample and sample_size > 0:
|
|
quoted_table = quote_identifier(table_name, "table name")
|
|
sample_sql = f"SELECT * FROM {quoted_table} LIMIT {sample_size}"
|
|
sample_result = await connection.execute(sample_sql, auth_context=auth_context)
|
|
summary["sample_data"] = sample_result.data
|
|
|
|
return summary
|
|
|
|
async def analyze_column(
|
|
self,
|
|
table_name: str,
|
|
column_name: str,
|
|
analysis_type: str = "basic"
|
|
) -> Dict[str, Any]:
|
|
"""Analyze column statistics"""
|
|
try:
|
|
connection = await self.connection_manager.get_connection("query")
|
|
|
|
# Basic statistics
|
|
basic_stats_sql = f"""
|
|
SELECT
|
|
'{column_name}' as column_name,
|
|
COUNT(*) as total_count,
|
|
COUNT({column_name}) as non_null_count,
|
|
COUNT(DISTINCT {column_name}) as distinct_count
|
|
FROM {table_name}
|
|
"""
|
|
|
|
auth_context = get_auth_context()
|
|
basic_result = await connection.execute(basic_stats_sql, auth_context=auth_context)
|
|
if not basic_result.data:
|
|
return {
|
|
"success": False,
|
|
"error": f"Unable to get statistics for table {table_name} column {column_name}"
|
|
}
|
|
|
|
analysis = basic_result.data[0].copy()
|
|
analysis["success"] = True
|
|
analysis["analysis_type"] = analysis_type
|
|
|
|
if analysis_type in ["distribution", "detailed"]:
|
|
# Data distribution analysis
|
|
distribution_sql = f"""
|
|
SELECT
|
|
{column_name} as value,
|
|
COUNT(*) as frequency
|
|
FROM {table_name}
|
|
WHERE {column_name} IS NOT NULL
|
|
GROUP BY {column_name}
|
|
ORDER BY frequency DESC
|
|
LIMIT 20
|
|
"""
|
|
|
|
distribution_result = await connection.execute(distribution_sql, auth_context=auth_context)
|
|
analysis["value_distribution"] = distribution_result.data
|
|
|
|
if analysis_type == "detailed":
|
|
# Detailed statistics (for numeric types)
|
|
try:
|
|
numeric_stats_sql = f"""
|
|
SELECT
|
|
MIN({column_name}) as min_value,
|
|
MAX({column_name}) as max_value,
|
|
AVG({column_name}) as avg_value
|
|
FROM {table_name}
|
|
WHERE {column_name} IS NOT NULL
|
|
"""
|
|
|
|
numeric_result = await connection.execute(numeric_stats_sql, auth_context=auth_context)
|
|
if numeric_result.data:
|
|
analysis.update(numeric_result.data[0])
|
|
except Exception:
|
|
# Non-numeric columns don't support numeric statistics
|
|
pass
|
|
|
|
return analysis
|
|
|
|
except Exception as e:
|
|
logger.error(f"Column analysis failed: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"column_name": column_name,
|
|
"table_name": table_name
|
|
}
|
|
|
|
async def analyze_table_relationships(
|
|
self,
|
|
table_name: str,
|
|
depth: int = 2
|
|
) -> Dict[str, Any]:
|
|
"""Analyze table relationships"""
|
|
connection = await self.connection_manager.get_connection("system")
|
|
|
|
# Get table basic information
|
|
table_info_sql = f"""
|
|
SELECT
|
|
table_name,
|
|
table_comment,
|
|
table_rows
|
|
FROM information_schema.tables
|
|
WHERE table_schema = DATABASE()
|
|
AND table_name = '{table_name}'
|
|
"""
|
|
|
|
auth_context = get_auth_context()
|
|
table_result = await connection.execute(table_info_sql, auth_context=auth_context)
|
|
if not table_result.data:
|
|
raise ValueError(f"Table {table_name} does not exist")
|
|
|
|
# Get all tables list (for analyzing potential relationships)
|
|
all_tables_sql = """
|
|
SELECT
|
|
table_name,
|
|
table_comment
|
|
FROM information_schema.tables
|
|
WHERE table_schema = DATABASE()
|
|
AND table_type = 'BASE TABLE'
|
|
AND table_name != %s
|
|
"""
|
|
|
|
all_tables_result = await connection.execute(all_tables_sql, params=(table_name,), auth_context=auth_context)
|
|
|
|
return {
|
|
"center_table": table_result.data[0],
|
|
"related_tables": all_tables_result.data,
|
|
"depth": depth,
|
|
"note": "Table relationship analysis based on column name similarity and business logic inference",
|
|
}
|
|
|
|
|
|
class PerformanceMonitor:
|
|
"""Performance monitor"""
|
|
|
|
def __init__(self, connection_manager: DorisConnectionManager):
|
|
self.connection_manager = connection_manager
|
|
|
|
async def get_performance_stats(
|
|
self,
|
|
metric_type: str = "queries",
|
|
time_range: str = "1h"
|
|
) -> Dict[str, Any]:
|
|
"""Get performance statistics"""
|
|
connection = await self.connection_manager.get_connection("system")
|
|
|
|
# Convert time range to seconds
|
|
time_mapping = {
|
|
"1h": 3600,
|
|
"6h": 21600,
|
|
"24h": 86400,
|
|
"7d": 604800
|
|
}
|
|
|
|
seconds = time_mapping.get(time_range, 3600)
|
|
|
|
if metric_type == "queries":
|
|
# Query performance metrics
|
|
stats = {
|
|
"metric_type": "queries",
|
|
"time_range": time_range,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"total_queries": 0,
|
|
"avg_execution_time": 0.0,
|
|
"slow_queries": 0,
|
|
"error_queries": 0,
|
|
"note": "Query performance statistics (simulated data)"
|
|
}
|
|
|
|
elif metric_type == "connections":
|
|
# Connection statistics
|
|
connection_metrics = await self.connection_manager.get_metrics()
|
|
stats = {
|
|
"metric_type": "connections",
|
|
"time_range": time_range,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"total_connections": connection_metrics.total_connections,
|
|
"active_connections": connection_metrics.active_connections,
|
|
"idle_connections": connection_metrics.idle_connections,
|
|
"failed_connections": connection_metrics.failed_connections,
|
|
"connection_errors": connection_metrics.connection_errors,
|
|
"avg_connection_time": connection_metrics.avg_connection_time,
|
|
"last_health_check": connection_metrics.last_health_check.isoformat() if connection_metrics.last_health_check else None
|
|
}
|
|
|
|
elif metric_type == "tables":
|
|
# Table-level statistics
|
|
tables_sql = """
|
|
SELECT
|
|
table_name,
|
|
table_rows,
|
|
data_length,
|
|
index_length,
|
|
create_time,
|
|
update_time
|
|
FROM information_schema.tables
|
|
WHERE table_schema = DATABASE()
|
|
AND table_type = 'BASE TABLE'
|
|
ORDER BY table_rows DESC
|
|
LIMIT 20
|
|
"""
|
|
|
|
auth_context = get_auth_context()
|
|
tables_result = await connection.execute(tables_sql, auth_context=auth_context)
|
|
stats = {
|
|
"metric_type": "tables",
|
|
"time_range": time_range,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"table_count": len(tables_result.data),
|
|
"tables": tables_result.data
|
|
}
|
|
|
|
elif metric_type == "system":
|
|
# System-level metrics (simulated)
|
|
stats = {
|
|
"metric_type": "system",
|
|
"time_range": time_range,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"cpu_usage": 45.2,
|
|
"memory_usage": 68.5,
|
|
"disk_usage": 72.1,
|
|
"network_io": {
|
|
"bytes_sent": 1024000,
|
|
"bytes_received": 2048000
|
|
},
|
|
"note": "System metrics (simulated data)"
|
|
}
|
|
|
|
else:
|
|
raise ValueError(f"Unsupported metric type: {metric_type}")
|
|
|
|
return stats
|
|
|
|
async def get_query_history(
|
|
self,
|
|
limit: int = 50,
|
|
order_by: str = "time"
|
|
) -> Dict[str, Any]:
|
|
"""Get query history"""
|
|
# Since Doris doesn't have a built-in query history table,
|
|
# we return simulated data
|
|
return {
|
|
"total_queries": 0,
|
|
"queries": [],
|
|
"limit": limit,
|
|
"order_by": order_by,
|
|
"note": "Query history feature requires audit log configuration"
|
|
}
|
|
|
|
|
|
class SQLAnalyzer:
|
|
"""SQL analyzer for EXPLAIN and PROFILE operations"""
|
|
|
|
def __init__(self, connection_manager: DorisConnectionManager):
|
|
self.connection_manager = connection_manager
|
|
|
|
async def get_sql_explain(
|
|
self,
|
|
sql: str,
|
|
verbose: bool = False,
|
|
db_name: str = None,
|
|
catalog_name: str = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get SQL execution plan using EXPLAIN command based on Doris syntax
|
|
|
|
Args:
|
|
sql: SQL statement to explain
|
|
verbose: Whether to show verbose information
|
|
db_name: Target database name
|
|
catalog_name: Target catalog name
|
|
|
|
Returns:
|
|
Dict containing explain plan file path, content, and basic info
|
|
"""
|
|
try:
|
|
# Generate unique query ID for file naming
|
|
import time
|
|
query_hash = hashlib.md5(sql.encode()).hexdigest()[:8]
|
|
timestamp = int(time.time())
|
|
query_id = f"{timestamp}_{query_hash}"
|
|
|
|
# Ensure temp directory exists
|
|
temp_dir = Path(self.connection_manager.config.temp_files_dir)
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create explain file path
|
|
explain_file = temp_dir / f"explain_{query_id}.txt"
|
|
|
|
logger.info(f"Generating SQL explain for query ID: {query_id}")
|
|
|
|
# Switch database if specified
|
|
# SECURITY FIX: Validate and quote db_name
|
|
if db_name:
|
|
try:
|
|
validate_identifier(db_name, "database name")
|
|
except SQLSecurityError as e:
|
|
return {"success": False, "error": f"Invalid database name: {e}"}
|
|
safe_db = quote_identifier(db_name, "database name")
|
|
await self.connection_manager.execute_query("explain_session", f"USE {safe_db}")
|
|
|
|
# Construct EXPLAIN query
|
|
explain_type = "EXPLAIN VERBOSE" if verbose else "EXPLAIN"
|
|
explain_sql = f"{explain_type} {sql.strip().rstrip(';')}"
|
|
|
|
logger.info(f"Executing explain query: {explain_sql}")
|
|
|
|
# Execute explain query
|
|
result = await self.connection_manager.execute_query("explain_session", explain_sql)
|
|
|
|
# Format explain output
|
|
explain_content = []
|
|
explain_content.append(f"=== SQL EXPLAIN PLAN ===")
|
|
explain_content.append(f"Query ID: {query_id}")
|
|
explain_content.append(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
explain_content.append(f"Database: {db_name or 'current'}")
|
|
explain_content.append(f"Verbose: {verbose}")
|
|
explain_content.append("")
|
|
explain_content.append("=== ORIGINAL SQL ===")
|
|
explain_content.append(sql)
|
|
explain_content.append("")
|
|
explain_content.append("=== EXPLAIN QUERY ===")
|
|
explain_content.append(explain_sql)
|
|
explain_content.append("")
|
|
explain_content.append("=== EXECUTION PLAN ===")
|
|
|
|
if result and result.data:
|
|
for row in result.data:
|
|
if isinstance(row, dict):
|
|
# Handle dict format
|
|
for key, value in row.items():
|
|
explain_content.append(f"{key}: {value}")
|
|
elif isinstance(row, (list, tuple)):
|
|
# Handle tuple/list format
|
|
explain_content.append(" | ".join(str(col) for col in row))
|
|
else:
|
|
# Handle string format
|
|
explain_content.append(str(row))
|
|
else:
|
|
explain_content.append("No execution plan data returned")
|
|
|
|
explain_content.append("")
|
|
explain_content.append("=== METADATA ===")
|
|
explain_content.append(f"Execution time: {result.execution_time if result else 'N/A'} seconds")
|
|
explain_content.append(f"Rows returned: {len(result.data) if result and result.data else 0}")
|
|
|
|
# Get full content
|
|
full_content = '\n'.join(explain_content)
|
|
|
|
# Write to file
|
|
with open(explain_file, 'w', encoding='utf-8') as f:
|
|
f.write(full_content)
|
|
|
|
logger.info(f"Explain plan saved to: {explain_file.absolute()}")
|
|
|
|
# Get max response size from config
|
|
max_size = self.connection_manager.config.performance.max_response_content_size
|
|
|
|
# Truncate content if needed
|
|
truncated_content = full_content
|
|
is_truncated = False
|
|
if len(full_content) > max_size:
|
|
truncated_content = full_content[:max_size] + "\n\n=== CONTENT TRUNCATED ===\n[Content is truncated due to size limit. Full content is saved to file.]"
|
|
is_truncated = True
|
|
|
|
return {
|
|
"success": True,
|
|
"query_id": query_id,
|
|
"explain_file_path": str(explain_file.absolute()),
|
|
"file_size_bytes": explain_file.stat().st_size,
|
|
"content": truncated_content,
|
|
"content_size": len(truncated_content),
|
|
"is_content_truncated": is_truncated,
|
|
"original_content_size": len(full_content),
|
|
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
|
"verbose": verbose,
|
|
"database": db_name,
|
|
"catalog": catalog_name,
|
|
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
"execution_time": result.execution_time if result else None,
|
|
"plan_lines_count": len(result.data) if result and result.data else 0
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get SQL explain: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"error": f"Failed to get SQL explain: {str(e)}",
|
|
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
|
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
async def get_sql_profile(
|
|
self,
|
|
sql: str,
|
|
db_name: str = None,
|
|
catalog_name: str = None,
|
|
timeout: int = 30
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get SQL execution profile by setting trace ID and fetching profile via HTTP API
|
|
|
|
Args:
|
|
sql: SQL statement to profile
|
|
db_name: Target database name
|
|
catalog_name: Target catalog name
|
|
timeout: Query timeout in seconds
|
|
|
|
Returns:
|
|
Dict containing profile file path, content, and basic info
|
|
"""
|
|
try:
|
|
# Generate unique trace ID and query ID for file naming
|
|
trace_id = str(uuid.uuid4())
|
|
import time
|
|
query_hash = hashlib.md5(sql.encode()).hexdigest()[:8]
|
|
timestamp = int(time.time())
|
|
file_query_id = f"{timestamp}_{query_hash}"
|
|
|
|
# Ensure temp directory exists
|
|
temp_dir = Path(self.connection_manager.config.temp_files_dir)
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create profile file path
|
|
profile_file = temp_dir / f"profile_{file_query_id}.txt"
|
|
|
|
logger.info(f"Generated trace ID for SQL profiling: {trace_id}")
|
|
logger.info(f"Profile will be saved to: {profile_file}")
|
|
|
|
connection = await self.connection_manager.get_connection("query")
|
|
|
|
try:
|
|
# Switch to specified database/catalog if provided
|
|
# SECURITY FIX: Validate identifiers before using in SQL
|
|
if catalog_name:
|
|
try:
|
|
validate_identifier(catalog_name, "catalog name")
|
|
except SQLSecurityError as e:
|
|
return {"success": False, "error": f"Invalid catalog name: {e}"}
|
|
safe_catalog = quote_identifier(catalog_name, "catalog name")
|
|
auth_context = get_auth_context()
|
|
await connection.execute(f"SWITCH {safe_catalog}", auth_context=auth_context)
|
|
if db_name:
|
|
try:
|
|
validate_identifier(db_name, "database name")
|
|
except SQLSecurityError as e:
|
|
return {"success": False, "error": f"Invalid database name: {e}"}
|
|
safe_db = quote_identifier(db_name, "database name")
|
|
await connection.execute(f"USE {safe_db}", auth_context=auth_context)
|
|
|
|
# Set trace ID for the session using session variable
|
|
# According to official docs: set session_context="trace_id:your_trace_id"
|
|
await connection.execute(f'set session_context="trace_id:{trace_id}"', auth_context=auth_context)
|
|
logger.info(f"Set trace ID: {trace_id}")
|
|
|
|
# Enable profile
|
|
await connection.execute(f'set enable_profile=true', auth_context=auth_context)
|
|
logger.info(f"Enabled profile")
|
|
|
|
# Execute the SQL statement
|
|
logger.info(f"Executing SQL with trace ID: {sql}")
|
|
start_time = time.time()
|
|
sql_result = await connection.execute(sql, auth_context=auth_context)
|
|
execution_time = time.time() - start_time
|
|
logger.info(f"SQL execution completed in {execution_time:.3f}s")
|
|
|
|
# Get query ID from trace ID via HTTP API
|
|
query_id = await self._get_query_id_by_trace_id(trace_id)
|
|
if not query_id:
|
|
return {
|
|
"success": False,
|
|
"error": "Failed to get query ID from trace ID",
|
|
"trace_id": trace_id,
|
|
"sql": sql,
|
|
"execution_time": execution_time
|
|
}
|
|
|
|
logger.info(f"Retrieved query ID: {query_id}")
|
|
|
|
# Get profile data
|
|
profile_data = await self._get_profile_by_query_id(query_id)
|
|
|
|
if not profile_data:
|
|
# Save error info to file
|
|
profile_content = [
|
|
f"=== SQL PROFILE RESULT ===",
|
|
f"File Query ID: {file_query_id}",
|
|
f"Trace ID: {trace_id}",
|
|
f"Query ID: {query_id}",
|
|
f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}",
|
|
f"Database: {db_name or 'current'}",
|
|
f"Status: FAILED",
|
|
"",
|
|
"=== ORIGINAL SQL ===",
|
|
sql,
|
|
"",
|
|
"=== ERROR INFO ===",
|
|
"Failed to get profile data. This may be due to:",
|
|
"1) Profile data not generated yet",
|
|
"2) Query ID expired",
|
|
"3) Insufficient permissions to access profile data",
|
|
"",
|
|
"=== EXECUTION INFO ===",
|
|
f"Query execution: SUCCESSFUL",
|
|
f"Execution time: {execution_time:.3f} seconds",
|
|
f"Note: Query execution was successful, but profile data is not available"
|
|
]
|
|
|
|
# Get full content
|
|
full_profile_content = '\n'.join(profile_content)
|
|
|
|
with open(profile_file, 'w', encoding='utf-8') as f:
|
|
f.write(full_profile_content)
|
|
|
|
# Get max response size from config
|
|
max_size = self.connection_manager.config.performance.max_response_content_size
|
|
|
|
# Truncate content if needed
|
|
truncated_content = full_profile_content
|
|
is_truncated = False
|
|
if len(full_profile_content) > max_size:
|
|
truncated_content = full_profile_content[:max_size] + "\n\n=== CONTENT TRUNCATED ===\n[Content is truncated due to size limit. Full content is saved to file.]"
|
|
is_truncated = True
|
|
|
|
return {
|
|
"success": False,
|
|
"file_query_id": file_query_id,
|
|
"trace_id": trace_id,
|
|
"query_id": query_id,
|
|
"profile_file_path": str(profile_file.absolute()),
|
|
"file_size_bytes": profile_file.stat().st_size,
|
|
"content": truncated_content,
|
|
"content_size": len(truncated_content),
|
|
"is_content_truncated": is_truncated,
|
|
"original_content_size": len(full_profile_content),
|
|
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
|
"execution_time": execution_time,
|
|
"error": "Failed to get profile data",
|
|
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
# Format profile output
|
|
profile_content = []
|
|
profile_content.append(f"=== SQL PROFILE RESULT ===")
|
|
profile_content.append(f"File Query ID: {file_query_id}")
|
|
profile_content.append(f"Trace ID: {trace_id}")
|
|
profile_content.append(f"Query ID: {query_id}")
|
|
profile_content.append(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
profile_content.append(f"Database: {db_name or 'current'}")
|
|
profile_content.append(f"Status: SUCCESS")
|
|
profile_content.append("")
|
|
profile_content.append("=== ORIGINAL SQL ===")
|
|
profile_content.append(sql)
|
|
profile_content.append("")
|
|
profile_content.append("=== EXECUTION INFO ===")
|
|
profile_content.append(f"Execution time: {execution_time:.3f} seconds")
|
|
if hasattr(sql_result, 'data') and sql_result.data:
|
|
profile_content.append(f"Result rows: {len(sql_result.data)}")
|
|
if sql_result.data and sql_result.data[0]:
|
|
profile_content.append(f"Result columns: {list(sql_result.data[0].keys())}")
|
|
profile_content.append("")
|
|
profile_content.append("=== PROFILE DATA ===")
|
|
|
|
if isinstance(profile_data, dict):
|
|
import json
|
|
profile_content.append(json.dumps(profile_data, indent=2, ensure_ascii=False))
|
|
else:
|
|
profile_content.append(str(profile_data))
|
|
|
|
# Get full content
|
|
full_profile_content = '\n'.join(profile_content)
|
|
|
|
# Write to file
|
|
with open(profile_file, 'w', encoding='utf-8') as f:
|
|
f.write(full_profile_content)
|
|
|
|
logger.info(f"Profile data saved to: {profile_file.absolute()}")
|
|
|
|
# Get max response size from config
|
|
max_size = self.connection_manager.config.performance.max_response_content_size
|
|
|
|
# Truncate content if needed
|
|
truncated_content = full_profile_content
|
|
is_truncated = False
|
|
if len(full_profile_content) > max_size:
|
|
truncated_content = full_profile_content[:max_size] + "\n\n=== CONTENT TRUNCATED ===\n[Content is truncated due to size limit. Full content is saved to file.]"
|
|
is_truncated = True
|
|
|
|
return {
|
|
"success": True,
|
|
"file_query_id": file_query_id,
|
|
"trace_id": trace_id,
|
|
"query_id": query_id,
|
|
"profile_file_path": str(profile_file.absolute()),
|
|
"file_size_bytes": profile_file.stat().st_size,
|
|
"content": truncated_content,
|
|
"content_size": len(truncated_content),
|
|
"is_content_truncated": is_truncated,
|
|
"original_content_size": len(full_profile_content),
|
|
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
|
"database": db_name,
|
|
"catalog": catalog_name,
|
|
"execution_time": execution_time,
|
|
"sql_result_summary": {
|
|
"row_count": len(sql_result.data) if hasattr(sql_result, 'data') and sql_result.data else 0,
|
|
"columns": list(sql_result.data[0].keys()) if hasattr(sql_result, 'data') and sql_result.data and sql_result.data[0] else []
|
|
},
|
|
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during SQL execution or profile retrieval: {str(e)}")
|
|
# Save error info to file
|
|
profile_content = [
|
|
f"=== SQL PROFILE RESULT ===",
|
|
f"File Query ID: {file_query_id}",
|
|
f"Trace ID: {trace_id}",
|
|
f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}",
|
|
f"Database: {db_name or 'current'}",
|
|
f"Status: ERROR",
|
|
"",
|
|
"=== ORIGINAL SQL ===",
|
|
sql,
|
|
"",
|
|
"=== ERROR INFO ===",
|
|
f"SQL execution or profile retrieval failed: {str(e)}",
|
|
"",
|
|
"=== EXECUTION INFO ===",
|
|
"Query execution failed during profiling process"
|
|
]
|
|
|
|
# Get full content
|
|
full_profile_content = '\n'.join(profile_content)
|
|
|
|
with open(profile_file, 'w', encoding='utf-8') as f:
|
|
f.write(full_profile_content)
|
|
|
|
# Get max response size from config
|
|
max_size = self.connection_manager.config.performance.max_response_content_size
|
|
|
|
# Truncate content if needed
|
|
truncated_content = full_profile_content
|
|
is_truncated = False
|
|
if len(full_profile_content) > max_size:
|
|
truncated_content = full_profile_content[:max_size] + "\n\n=== CONTENT TRUNCATED ===\n[Content is truncated due to size limit. Full content is saved to file.]"
|
|
is_truncated = True
|
|
|
|
return {
|
|
"success": False,
|
|
"file_query_id": file_query_id,
|
|
"trace_id": trace_id,
|
|
"profile_file_path": str(profile_file.absolute()),
|
|
"file_size_bytes": profile_file.stat().st_size,
|
|
"content": truncated_content,
|
|
"content_size": len(truncated_content),
|
|
"is_content_truncated": is_truncated,
|
|
"original_content_size": len(full_profile_content),
|
|
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
|
"error": f"SQL execution or profile retrieval failed: {str(e)}",
|
|
"database": db_name,
|
|
"catalog": catalog_name,
|
|
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"SQL PROFILE failed: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"error": f"SQL PROFILE failed: {str(e)}",
|
|
"sql_preview": sql[:100] + "..." if len(sql) > 100 else sql,
|
|
"database": db_name,
|
|
"catalog": catalog_name,
|
|
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
async def _get_query_id_by_trace_id(self, trace_id: str) -> str:
|
|
"""
|
|
Get query ID by trace ID via FE HTTP API
|
|
|
|
Args:
|
|
trace_id: The trace ID set during query execution
|
|
|
|
Returns:
|
|
Query ID string or None if not found
|
|
"""
|
|
try:
|
|
# Get database config
|
|
db_config = self.connection_manager.config.database
|
|
|
|
# Build HTTP API URL according to official documentation
|
|
# Reference: https://doris.apache.org/zh-CN/docs/admin-manual/open-api/fe-http/query-profile-action#通过-trace-id-获取-query-id
|
|
url = f"http://{db_config.host}:{db_config.fe_http_port}/rest/v2/manager/query/trace_id/{trace_id}"
|
|
|
|
# HTTP Basic Auth
|
|
auth = aiohttp.BasicAuth(db_config.user, db_config.password)
|
|
|
|
logger.info(f"Requesting query ID from: {url}")
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, auth=auth, timeout=10) as response:
|
|
if response.status == 200:
|
|
# Check content type first
|
|
content_type = response.headers.get('content-type', '')
|
|
response_text = await response.text()
|
|
logger.info(f"Response content type: {content_type}")
|
|
logger.info(f"Response body: {response_text}")
|
|
|
|
# Parse JSON response (regardless of content-type)
|
|
if response_text.strip():
|
|
try:
|
|
import json
|
|
result = json.loads(response_text)
|
|
logger.info(f"Query ID API response: {result}")
|
|
|
|
# Parse response according to Doris API format
|
|
if result.get("code") == 0 and result.get("data"):
|
|
data = result["data"]
|
|
# Data can be either a string (query_id) or object with query_ids
|
|
if isinstance(data, str):
|
|
logger.info(f"Found query ID: {data}")
|
|
return data
|
|
elif isinstance(data, dict) and "query_ids" in data:
|
|
query_ids = data["query_ids"]
|
|
if query_ids:
|
|
query_id = query_ids[0] # Take the first query ID
|
|
logger.info(f"Found query ID: {query_id}")
|
|
return query_id
|
|
else:
|
|
logger.warning("No query IDs found in response")
|
|
else:
|
|
logger.error(f"API returned error: {result}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse JSON response: {e}")
|
|
# Fallback: try to extract query ID using regex
|
|
import re
|
|
query_id_pattern = r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}'
|
|
matches = re.findall(query_id_pattern, response_text)
|
|
if matches:
|
|
query_id = matches[0]
|
|
logger.info(f"Extracted query ID from text: {query_id}")
|
|
return query_id
|
|
else:
|
|
logger.error(f"HTTP request failed with status {response.status}")
|
|
response_text = await response.text()
|
|
logger.error(f"Response body: {response_text}")
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get query ID by trace ID: {str(e)}")
|
|
return None
|
|
|
|
async def _get_profile_by_query_id(self, query_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get profile data by query ID via FE HTTP API
|
|
|
|
Args:
|
|
query_id: The query ID
|
|
|
|
Returns:
|
|
Profile data dict or None if failed
|
|
"""
|
|
try:
|
|
# Get database config
|
|
db_config = self.connection_manager.config.database
|
|
|
|
# Try both API endpoints according to official documentation
|
|
urls = [
|
|
f"http://{db_config.host}:{db_config.fe_http_port}/rest/v2/manager/query/profile/text/{query_id}",
|
|
f"http://{db_config.host}:{db_config.fe_http_port}/api/profile/text?query_id={query_id}"
|
|
]
|
|
|
|
# HTTP Basic Auth
|
|
auth = aiohttp.BasicAuth(db_config.user, db_config.password)
|
|
|
|
for i, url in enumerate(urls):
|
|
logger.info(f"Requesting profile from URL {i+1}: {url}")
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, auth=auth, timeout=60) as response:
|
|
if response.status == 200:
|
|
content_type = response.headers.get('content-type', '')
|
|
response_text = await response.text()
|
|
logger.info(f"Profile response content type: {content_type}")
|
|
logger.info(f"Profile response length: {len(response_text)}")
|
|
|
|
# Handle JSON response
|
|
if 'application/json' in content_type:
|
|
try:
|
|
result = await response.json()
|
|
logger.info(f"Profile JSON response: {result}")
|
|
|
|
if result.get("code") == 0 and result.get("data"):
|
|
profile_text = result["data"].get("profile", "")
|
|
return {
|
|
"query_id": query_id,
|
|
"profile_text": profile_text,
|
|
"profile_size": len(profile_text),
|
|
"retrieved_at": datetime.now().isoformat(),
|
|
"api_endpoint": url
|
|
}
|
|
else:
|
|
logger.warning(f"Profile API returned error: {result}")
|
|
continue # Try next URL
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse profile JSON: {e}")
|
|
continue
|
|
|
|
# Handle plain text response
|
|
else:
|
|
if response_text.strip() and "not found" not in response_text.lower():
|
|
return {
|
|
"query_id": query_id,
|
|
"profile_text": response_text,
|
|
"profile_size": len(response_text),
|
|
"retrieved_at": datetime.now().isoformat(),
|
|
"api_endpoint": url
|
|
}
|
|
else:
|
|
logger.warning(f"Profile not found or empty: {response_text}")
|
|
continue # Try next URL
|
|
|
|
elif response.status == 404:
|
|
logger.warning(f"Profile not found (404) at {url}")
|
|
continue # Try next URL
|
|
else:
|
|
logger.error(f"Profile HTTP request failed with status {response.status} at {url}")
|
|
response_text = await response.text()
|
|
logger.error(f"Response body: {response_text}")
|
|
continue # Try next URL
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get profile by query ID: {str(e)}")
|
|
return None
|
|
|
|
async def get_table_data_size(
|
|
self,
|
|
db_name: str = None,
|
|
table_name: str = None,
|
|
single_replica: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get table data size information via FE HTTP API
|
|
|
|
Args:
|
|
db_name: Database name, if not specified returns all databases
|
|
table_name: Table name, if not specified returns all tables in the database
|
|
single_replica: Whether to get single replica data size
|
|
|
|
Returns:
|
|
Dict containing table data size information
|
|
"""
|
|
try:
|
|
# Get database config
|
|
db_config = self.connection_manager.config.database
|
|
|
|
# Build HTTP API URL according to official documentation
|
|
# Reference: https://doris.apache.org/zh-CN/docs/admin-manual/open-api/fe-http/show-table-data-action
|
|
url = f"http://{db_config.host}:{db_config.fe_http_port}/api/show_table_data"
|
|
|
|
# Build query parameters
|
|
params = {}
|
|
if db_name:
|
|
params["db"] = db_name
|
|
if table_name:
|
|
params["table"] = table_name
|
|
if single_replica:
|
|
params["single_replica"] = "true"
|
|
|
|
# HTTP Basic Auth
|
|
auth = aiohttp.BasicAuth(db_config.user, db_config.password)
|
|
|
|
logger.info(f"Requesting table data size from: {url} with params: {params}")
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, auth=auth, params=params, timeout=30) as response:
|
|
if response.status == 200:
|
|
response_text = await response.text()
|
|
logger.info(f"Table data size response length: {len(response_text)}")
|
|
|
|
try:
|
|
# Parse JSON response
|
|
import json
|
|
result = json.loads(response_text)
|
|
|
|
if result.get("code") == 0 and result.get("data"):
|
|
data = result["data"]
|
|
|
|
# Process and format the data
|
|
formatted_data = self._format_table_data_size(data, db_name, table_name, single_replica)
|
|
|
|
return {
|
|
"success": True,
|
|
"db_name": db_name,
|
|
"table_name": table_name,
|
|
"single_replica": single_replica,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": formatted_data,
|
|
"url": url,
|
|
"note": "Table data size information from Doris FE HTTP API"
|
|
}
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"error": f"API returned error: {result}",
|
|
"db_name": db_name,
|
|
"table_name": table_name,
|
|
"url": url,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse JSON response: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": f"Failed to parse JSON response: {e}",
|
|
"response_text": response_text[:500], # First 500 chars for debugging
|
|
"url": url,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
else:
|
|
logger.error(f"HTTP request failed with status {response.status}")
|
|
response_text = await response.text()
|
|
logger.error(f"Response body: {response_text}")
|
|
return {
|
|
"success": False,
|
|
"error": f"HTTP request failed with status {response.status}",
|
|
"response_text": response_text[:500], # First 500 chars for debugging
|
|
"url": url,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Table data size request failed: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"error": f"Table data size request failed: {str(e)}",
|
|
"db_name": db_name,
|
|
"table_name": table_name,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
def _format_table_data_size(self, data: Dict[str, Any], db_name: str, table_name: str, single_replica: bool) -> Dict[str, Any]:
|
|
"""
|
|
Format table data size response data
|
|
|
|
Args:
|
|
data: Raw response data from API
|
|
db_name: Database name filter
|
|
table_name: Table name filter
|
|
single_replica: Single replica flag
|
|
|
|
Returns:
|
|
Formatted data structure
|
|
"""
|
|
try:
|
|
formatted = {
|
|
"summary": {
|
|
"total_databases": 0,
|
|
"total_tables": 0,
|
|
"total_size_bytes": 0,
|
|
"total_size_formatted": "0 B",
|
|
"single_replica": single_replica,
|
|
"query_filters": {
|
|
"db_name": db_name,
|
|
"table_name": table_name
|
|
}
|
|
},
|
|
"databases": {}
|
|
}
|
|
|
|
# Process the data based on its structure
|
|
if isinstance(data, list):
|
|
# Data is a list of table records
|
|
for record in data:
|
|
db = record.get("database", "unknown")
|
|
table = record.get("table", "unknown")
|
|
size_bytes = int(record.get("size", 0))
|
|
|
|
if db not in formatted["databases"]:
|
|
formatted["databases"][db] = {
|
|
"database_name": db,
|
|
"table_count": 0,
|
|
"total_size_bytes": 0,
|
|
"total_size_formatted": "0 B",
|
|
"tables": {}
|
|
}
|
|
|
|
formatted["databases"][db]["tables"][table] = {
|
|
"table_name": table,
|
|
"size_bytes": size_bytes,
|
|
"size_formatted": self._format_bytes(size_bytes),
|
|
"replica_count": record.get("replica_count", 1),
|
|
"details": record
|
|
}
|
|
|
|
formatted["databases"][db]["table_count"] += 1
|
|
formatted["databases"][db]["total_size_bytes"] += size_bytes
|
|
formatted["summary"]["total_size_bytes"] += size_bytes
|
|
|
|
elif isinstance(data, dict):
|
|
# Data is a dict with database structure
|
|
for db, db_info in data.items():
|
|
if isinstance(db_info, dict) and "tables" in db_info:
|
|
formatted["databases"][db] = {
|
|
"database_name": db,
|
|
"table_count": len(db_info["tables"]),
|
|
"total_size_bytes": 0,
|
|
"total_size_formatted": "0 B",
|
|
"tables": {}
|
|
}
|
|
|
|
for table, table_info in db_info["tables"].items():
|
|
size_bytes = int(table_info.get("size", 0))
|
|
formatted["databases"][db]["tables"][table] = {
|
|
"table_name": table,
|
|
"size_bytes": size_bytes,
|
|
"size_formatted": self._format_bytes(size_bytes),
|
|
"replica_count": table_info.get("replica_count", 1),
|
|
"details": table_info
|
|
}
|
|
formatted["databases"][db]["total_size_bytes"] += size_bytes
|
|
formatted["summary"]["total_size_bytes"] += size_bytes
|
|
|
|
# Update summary
|
|
formatted["summary"]["total_databases"] = len(formatted["databases"])
|
|
formatted["summary"]["total_tables"] = sum(db["table_count"] for db in formatted["databases"].values())
|
|
formatted["summary"]["total_size_formatted"] = self._format_bytes(formatted["summary"]["total_size_bytes"])
|
|
|
|
# Update database totals formatting
|
|
for db_info in formatted["databases"].values():
|
|
db_info["total_size_formatted"] = self._format_bytes(db_info["total_size_bytes"])
|
|
|
|
return formatted
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to format table data size: {str(e)}")
|
|
return {
|
|
"error": f"Failed to format data: {str(e)}",
|
|
"raw_data": data
|
|
}
|
|
|
|
def _format_bytes(self, bytes_value: int) -> str:
|
|
"""
|
|
Format bytes value to human readable string
|
|
|
|
Args:
|
|
bytes_value: Bytes value
|
|
|
|
Returns:
|
|
Formatted string like "1.23 GB"
|
|
"""
|
|
try:
|
|
bytes_value = int(bytes_value)
|
|
if bytes_value == 0:
|
|
return "0 B"
|
|
|
|
units = ["B", "KB", "MB", "GB", "TB", "PB"]
|
|
unit_index = 0
|
|
size = float(bytes_value)
|
|
|
|
while size >= 1024 and unit_index < len(units) - 1:
|
|
size /= 1024
|
|
unit_index += 1
|
|
|
|
if unit_index == 0:
|
|
return f"{int(size)} {units[unit_index]}"
|
|
else:
|
|
return f"{size:.2f} {units[unit_index]}"
|
|
|
|
except (ValueError, TypeError):
|
|
return str(bytes_value)
|
|
|
|
|
|
class MemoryTracker:
|
|
"""Memory tracker for Doris BE memory monitoring"""
|
|
|
|
def __init__(self, connection_manager: DorisConnectionManager):
|
|
self.connection_manager = connection_manager
|
|
|
|
async def get_realtime_memory_stats(
|
|
self,
|
|
tracker_type: str = "overview",
|
|
include_details: bool = True
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get real-time memory statistics
|
|
|
|
Args:
|
|
tracker_type: Type of memory trackers to retrieve
|
|
include_details: Whether to include detailed information
|
|
|
|
Returns:
|
|
Dict containing memory statistics
|
|
"""
|
|
try:
|
|
# This is a placeholder implementation
|
|
# In a real implementation, this would fetch data from Doris BE memory tracker endpoints
|
|
return {
|
|
"success": True,
|
|
"tracker_type": tracker_type,
|
|
"include_details": include_details,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"memory_stats": {
|
|
"total_memory": "8.00 GB",
|
|
"used_memory": "4.50 GB",
|
|
"free_memory": "3.50 GB",
|
|
"memory_usage_percent": 56.25
|
|
},
|
|
"note": "Memory tracker functionality requires BE HTTP endpoints to be available"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get realtime memory stats: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"error": f"Failed to get realtime memory stats: {str(e)}",
|
|
"tracker_type": tracker_type,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
async def get_historical_memory_stats(
|
|
self,
|
|
tracker_names: List[str] = None,
|
|
time_range: str = "1h"
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get historical memory statistics
|
|
|
|
Args:
|
|
tracker_names: List of specific tracker names to query
|
|
time_range: Time range for historical data
|
|
|
|
Returns:
|
|
Dict containing historical memory statistics
|
|
"""
|
|
try:
|
|
# This is a placeholder implementation
|
|
# In a real implementation, this would fetch historical data from Doris BE bvar endpoints
|
|
return {
|
|
"success": True,
|
|
"tracker_names": tracker_names,
|
|
"time_range": time_range,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"historical_stats": {
|
|
"data_points": 60,
|
|
"interval": "1m",
|
|
"memory_trend": "stable",
|
|
"avg_usage": "4.2 GB",
|
|
"peak_usage": "5.1 GB",
|
|
"min_usage": "3.8 GB"
|
|
},
|
|
"note": "Historical memory tracking functionality requires BE bvar endpoints to be available"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get historical memory stats: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"error": f"Failed to get historical memory stats: {str(e)}",
|
|
"tracker_names": tracker_names,
|
|
"time_range": time_range,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|