[improvement]Add bucket information in the output of analyze_table_storage (#33)
This commit is contained in:
@@ -25,7 +25,7 @@ import time
|
||||
import math
|
||||
import statistics
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, cast
|
||||
from collections import Counter, defaultdict
|
||||
|
||||
from .db import DorisConnectionManager
|
||||
@@ -376,6 +376,54 @@ class DataQualityTools:
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get table partitions: {str(e)}")
|
||||
return []
|
||||
|
||||
async def _get_table_bucket_info(self, connection, table_name: str, db_name: Optional[str] = None) -> Optional[Dict]:
|
||||
"""Get table buckets information"""
|
||||
try:
|
||||
# Query bucket information
|
||||
ddl_statement = await self._get_table_ddl(connection, table_name, db_name)
|
||||
if not ddl_statement:
|
||||
logger.error(f"Could not retrieve DDL for table {table_name}.")
|
||||
return None
|
||||
|
||||
pattern = r"DISTRIBUTED BY (HASH\(([^)]+)\)|RANDOM) BUCKETS (\d+|AUTO)"
|
||||
matches = re.findall(pattern, cast(str, ddl_statement))
|
||||
|
||||
if matches:
|
||||
dist_type, columns, buckets = matches[0]
|
||||
column_list = [col.strip().strip("`") for col in columns.split(",")]
|
||||
if dist_type.startswith('HASH'):
|
||||
return {
|
||||
"type": "HASH",
|
||||
"columns": column_list,
|
||||
"bucket_num": buckets,
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"type": "RANDOM",
|
||||
"bucket_num": buckets,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get table buckets: {str(e)}")
|
||||
return None
|
||||
|
||||
async def _get_table_ddl(
|
||||
self, connection, table_name: str, db_name: Optional[str]
|
||||
) -> Optional[str]:
|
||||
"""Get table DDL statement"""
|
||||
try:
|
||||
query = (
|
||||
f"SHOW CREATE TABLE {db_name}.{table_name}"
|
||||
if db_name
|
||||
else f"SHOW CREATE TABLE {table_name}"
|
||||
)
|
||||
result = await connection.execute(query)
|
||||
if result.data:
|
||||
return result.data[0].get("Create Table")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting DDL for table {table_name}: {e}")
|
||||
return None
|
||||
|
||||
async def _get_table_size_info(self, connection, table_name: str) -> Dict[str, Any]:
|
||||
"""Get table size information"""
|
||||
@@ -1007,8 +1055,14 @@ class DataQualityTools:
|
||||
partition_analysis["balance_score"] = 0.0
|
||||
else:
|
||||
partition_analysis["balance_score"] = 1.0 if len(row_counts) == 1 else 0.0
|
||||
|
||||
# Get bucket information
|
||||
bucket_info = await self._get_table_bucket_info(connection, table_name, db_name)
|
||||
|
||||
return partition_analysis
|
||||
return {
|
||||
"partition_analysis": partition_analysis,
|
||||
"bucket_analysis": bucket_info
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to analyze physical distribution: {str(e)}")
|
||||
@@ -1057,4 +1111,4 @@ class DataQualityTools:
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to analyze storage info: {str(e)}")
|
||||
return {"error": str(e)}
|
||||
return {"error": str(e)}
|
||||
|
||||
Reference in New Issue
Block a user