diff --git a/doris_mcp_server/utils/data_quality_tools.py b/doris_mcp_server/utils/data_quality_tools.py index 9058f4f..3984bc7 100644 --- a/doris_mcp_server/utils/data_quality_tools.py +++ b/doris_mcp_server/utils/data_quality_tools.py @@ -25,7 +25,7 @@ import time import math import statistics from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, cast from collections import Counter, defaultdict from .db import DorisConnectionManager @@ -376,6 +376,54 @@ class DataQualityTools: except Exception as e: logger.warning(f"Failed to get table partitions: {str(e)}") return [] + + async def _get_table_bucket_info(self, connection, table_name: str, db_name: Optional[str] = None) -> Optional[Dict]: + """Get table buckets information""" + try: + # Query bucket information + ddl_statement = await self._get_table_ddl(connection, table_name, db_name) + if not ddl_statement: + logger.error(f"Could not retrieve DDL for table {table_name}.") + return None + + pattern = r"DISTRIBUTED BY (HASH\(([^)]+)\)|RANDOM) BUCKETS (\d+|AUTO)" + matches = re.findall(pattern, cast(str, ddl_statement)) + + if matches: + dist_type, columns, buckets = matches[0] + column_list = [col.strip().strip("`") for col in columns.split(",")] + if dist_type.startswith('HASH'): + return { + "type": "HASH", + "columns": column_list, + "bucket_num": buckets, + } + else: + return { + "type": "RANDOM", + "bucket_num": buckets, + } + except Exception as e: + logger.warning(f"Failed to get table buckets: {str(e)}") + return None + + async def _get_table_ddl( + self, connection, table_name: str, db_name: Optional[str] + ) -> Optional[str]: + """Get table DDL statement""" + try: + query = ( + f"SHOW CREATE TABLE {db_name}.{table_name}" + if db_name + else f"SHOW CREATE TABLE {table_name}" + ) + result = await connection.execute(query) + if result.data: + return result.data[0].get("Create Table") + return None + except Exception as e: + logger.error(f"Error getting DDL for table {table_name}: {e}") + return None async def _get_table_size_info(self, connection, table_name: str) -> Dict[str, Any]: """Get table size information""" @@ -1007,8 +1055,14 @@ class DataQualityTools: partition_analysis["balance_score"] = 0.0 else: partition_analysis["balance_score"] = 1.0 if len(row_counts) == 1 else 0.0 + + # Get bucket information + bucket_info = await self._get_table_bucket_info(connection, table_name, db_name) - return partition_analysis + return { + "partition_analysis": partition_analysis, + "bucket_analysis": bucket_info + } except Exception as e: logger.warning(f"Failed to analyze physical distribution: {str(e)}") @@ -1057,4 +1111,4 @@ class DataQualityTools: except Exception as e: logger.warning(f"Failed to analyze storage info: {str(e)}") - return {"error": str(e)} \ No newline at end of file + return {"error": str(e)}