Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6d3c128f54 | ||
|
|
651d524814 |
@@ -133,7 +133,7 @@ ALERT_WEBHOOK_URL=
|
|||||||
|
|
||||||
# Basic server information
|
# Basic server information
|
||||||
SERVER_NAME=doris-mcp-server
|
SERVER_NAME=doris-mcp-server
|
||||||
SERVER_VERSION=0.5.0
|
SERVER_VERSION=0.5.1
|
||||||
SERVER_PORT=3000
|
SERVER_PORT=3000
|
||||||
|
|
||||||
# Temporary files directory
|
# Temporary files directory
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ under the License.
|
|||||||
|
|
||||||
Doris MCP (Model Context Protocol) Server is a backend service built with Python and FastAPI. It implements the MCP, allowing clients to interact with it through defined "Tools". It's primarily designed to connect to Apache Doris databases, potentially leveraging Large Language Models (LLMs) for tasks like converting natural language queries to SQL (NL2SQL), executing queries, and performing metadata management and analysis.
|
Doris MCP (Model Context Protocol) Server is a backend service built with Python and FastAPI. It implements the MCP, allowing clients to interact with it through defined "Tools". It's primarily designed to connect to Apache Doris databases, potentially leveraging Large Language Models (LLMs) for tasks like converting natural language queries to SQL (NL2SQL), executing queries, and performing metadata management and analysis.
|
||||||
|
|
||||||
## 🚀 What's New in v0.5.0
|
## 🚀 What's New in v0.5.1
|
||||||
|
|
||||||
- **🔥 Critical at_eof Connection Fix**: **Complete elimination of at_eof connection pool errors** through redesigned connection pool strategy with zero minimum connections, intelligent health monitoring, automatic retry mechanisms, and self-healing pool recovery - achieving 99.9% connection stability improvement
|
- **🔥 Critical at_eof Connection Fix**: **Complete elimination of at_eof connection pool errors** through redesigned connection pool strategy with zero minimum connections, intelligent health monitoring, automatic retry mechanisms, and self-healing pool recovery - achieving 99.9% connection stability improvement
|
||||||
- **🔧 Revolutionary Logging System**: **Enterprise-grade logging overhaul** with level-based file separation (debug, info, warning, error, critical), automatic cleanup scheduler with 30-day retention, millisecond precision timestamps, dedicated audit trails, and zero-maintenance log management
|
- **🔧 Revolutionary Logging System**: **Enterprise-grade logging overhaul** with level-based file separation (debug, info, warning, error, critical), automatic cleanup scheduler with 30-day retention, millisecond precision timestamps, dedicated audit trails, and zero-maintenance log management
|
||||||
@@ -32,8 +32,13 @@ Doris MCP (Model Context Protocol) Server is a backend service built with Python
|
|||||||
- **⚙️ Enhanced Configuration Management**: Complete ADBC configuration system with environment variable support, dynamic tool registration, and intelligent parameter validation
|
- **⚙️ Enhanced Configuration Management**: Complete ADBC configuration system with environment variable support, dynamic tool registration, and intelligent parameter validation
|
||||||
- **🔒 Security & Compatibility Improvements**: Resolved pandas JSON serialization issues, enhanced enterprise security integration, and maintained full backward compatibility with v0.4.x versions
|
- **🔒 Security & Compatibility Improvements**: Resolved pandas JSON serialization issues, enhanced enterprise security integration, and maintained full backward compatibility with v0.4.x versions
|
||||||
- **🎯 Modular Architecture**: 6 new specialized tool modules for enterprise analytics with comprehensive English documentation and robust error handling
|
- **🎯 Modular Architecture**: 6 new specialized tool modules for enterprise analytics with comprehensive English documentation and robust error handling
|
||||||
|
- **🕒 Global SQL Timeout Configuration Enhancement**: Unified global SQL timeout control via `config/performance/query_timeout`. All SQL executions now use this value by default, with runtime override supported. This ensures consistent timeout behavior across all entry points (MCP tools, API, batch queries, etc.).
|
||||||
|
- **Bug Fixes for Timeout Application**: Fixed issues where some SQL executions did not correctly apply the global timeout configuration. Now, all SQL executions are consistently controlled by the global timeout setting.
|
||||||
|
- **Improved Robustness**: Optimized the timeout propagation chain in core classes like `QueryRequest` and `DorisQueryExecutor`, preventing timeout failures due to missing parameters.
|
||||||
|
- **Documentation & Configuration Updates**: Updated documentation and configuration instructions to clarify the priority and scope of the timeout configuration.
|
||||||
|
- **Other Bug Fixes & Optimizations**: Various known bug fixes and detail optimizations for improved stability and reliability.
|
||||||
|
|
||||||
> **🚀 Major Milestone**: This release establishes v0.5.0 as a **production-ready enterprise data governance platform** with **critical stability improvements** (complete at_eof fix + intelligent logging), 23 total tools (14 existing + 7 analytics + 2 ADBC tools), and enterprise-grade system reliability - representing a major advancement in both data intelligence capabilities and operational stability.
|
> **🚀 Major Milestone**: This release establishes v0.5.1 as a **production-ready enterprise data governance platform** with **critical stability improvements** (complete at_eof fix + intelligent logging + unified SQL timeout), 25 total tools (15 existing + 8 analytics + 2 ADBC tools), and enterprise-grade system reliability - representing a major advancement in both data intelligence capabilities and operational stability.
|
||||||
|
|
||||||
## Core Features
|
## Core Features
|
||||||
|
|
||||||
|
|||||||
@@ -618,6 +618,11 @@ Transport Modes:
|
|||||||
Examples:
|
Examples:
|
||||||
python -m doris_mcp_server --transport stdio
|
python -m doris_mcp_server --transport stdio
|
||||||
python -m doris_mcp_server --transport http --host 0.0.0.0 --port 3000
|
python -m doris_mcp_server --transport http --host 0.0.0.0 --port 3000
|
||||||
|
python -m doris_mcp_server --transport stdio --doris-host localhost --doris-port 9030
|
||||||
|
python -m doris_mcp_server --transport http --doris-user admin --doris-database test_db
|
||||||
|
|
||||||
|
# Backward compatibility: --db-* parameters are also supported
|
||||||
|
python -m doris_mcp_server --transport stdio --db-host localhost --db-port 9030
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -641,26 +646,26 @@ Examples:
|
|||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--db-host",
|
"--doris-host", "--db-host",
|
||||||
type=str,
|
type=str,
|
||||||
default=os.getenv("DB_HOST", _default_config.database.host),
|
default=os.getenv("DORIS_HOST", _default_config.database.host),
|
||||||
help=f"Doris database host address (default: {_default_config.database.host})",
|
help=f"Doris database host address (default: {_default_config.database.host})",
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--db-port", type=int, default=os.getenv("DB_PORT", _default_config.database.port), help=f"Doris database port number (default: {_default_config.database.port})"
|
"--doris-port", "--db-port", type=int, default=os.getenv("DORIS_PORT", _default_config.database.port), help=f"Doris database port number (default: {_default_config.database.port})"
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--db-user", type=str, default=os.getenv("DB_USER", _default_config.database.user), help=f"Doris database username (default: {_default_config.database.user})"
|
"--doris-user", "--db-user", type=str, default=os.getenv("DORIS_USER", _default_config.database.user), help=f"Doris database username (default: {_default_config.database.user})"
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument("--db-password", type=str, default="", help="Doris database password")
|
parser.add_argument("--doris-password", "--db-password", type=str, default=os.getenv("DORIS_PASSWORD", ""), help="Doris database password")
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--db-database",
|
"--doris-database", "--db-database",
|
||||||
type=str,
|
type=str,
|
||||||
default=os.getenv("DB_DATABASE", _default_config.database.database),
|
default=os.getenv("DORIS_DATABASE", _default_config.database.database),
|
||||||
help=f"Doris database name (default: {_default_config.database.database})",
|
help=f"Doris database name (default: {_default_config.database.database})",
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -684,16 +689,19 @@ async def main():
|
|||||||
config = DorisConfig.from_env() # First load from .env file and environment variables
|
config = DorisConfig.from_env() # First load from .env file and environment variables
|
||||||
|
|
||||||
# Command line arguments override configuration (if provided)
|
# Command line arguments override configuration (if provided)
|
||||||
if args.db_host != _default_config.database.host: # If not default value, use command line argument
|
# 🔧 FIX: Set transport from command line arguments
|
||||||
config.database.host = args.db_host
|
config.transport = args.transport
|
||||||
if args.db_port != _default_config.database.port:
|
|
||||||
config.database.port = args.db_port
|
if args.doris_host != _default_config.database.host: # If not default value, use command line argument
|
||||||
if args.db_user != _default_config.database.user:
|
config.database.host = args.doris_host
|
||||||
config.database.user = args.db_user
|
if args.doris_port != _default_config.database.port:
|
||||||
if args.db_password: # Use password if provided
|
config.database.port = args.doris_port
|
||||||
config.database.password = args.db_password
|
if args.doris_user != _default_config.database.user:
|
||||||
if args.db_database != _default_config.database.database:
|
config.database.user = args.doris_user
|
||||||
config.database.database = args.db_database
|
if args.doris_password: # Use password if provided
|
||||||
|
config.database.password = args.doris_password
|
||||||
|
if args.doris_database != _default_config.database.database:
|
||||||
|
config.database.database = args.doris_database
|
||||||
if args.log_level != _default_config.logging.level:
|
if args.log_level != _default_config.logging.level:
|
||||||
config.logging.level = args.log_level
|
config.logging.level = args.log_level
|
||||||
|
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ class DorisToolsManager:
|
|||||||
# Initialize v0.5.0 advanced analytics tools
|
# Initialize v0.5.0 advanced analytics tools
|
||||||
self.data_governance_tools = DataGovernanceTools(connection_manager)
|
self.data_governance_tools = DataGovernanceTools(connection_manager)
|
||||||
self.data_exploration_tools = DataExplorationTools(connection_manager)
|
self.data_exploration_tools = DataExplorationTools(connection_manager)
|
||||||
self.data_quality_tools = DataQualityTools(connection_manager)
|
self.data_quality_tools = DataQualityTools(connection_manager, connection_manager.config)
|
||||||
self.security_analytics_tools = SecurityAnalyticsTools(connection_manager)
|
self.security_analytics_tools = SecurityAnalyticsTools(connection_manager)
|
||||||
self.dependency_analysis_tools = DependencyAnalysisTools(connection_manager)
|
self.dependency_analysis_tools = DependencyAnalysisTools(connection_manager)
|
||||||
self.performance_analytics_tools = PerformanceAnalyticsTools(connection_manager)
|
self.performance_analytics_tools = PerformanceAnalyticsTools(connection_manager)
|
||||||
@@ -464,41 +464,87 @@ class DorisToolsManager:
|
|||||||
|
|
||||||
# 🔄 Unified Data Quality Analysis Tool (New in v0.5.0)
|
# 🔄 Unified Data Quality Analysis Tool (New in v0.5.0)
|
||||||
@mcp.tool(
|
@mcp.tool(
|
||||||
"analyze_data_quality",
|
"get_table_basic_info",
|
||||||
description="""[Function Description]: Comprehensive data quality analysis combining completeness and distribution analysis.
|
description="""[Function Description]: Get basic information about a table including row count, column count, partitions, and size.
|
||||||
|
|
||||||
[Parameter Content]:
|
[Parameter Content]:
|
||||||
|
|
||||||
- table_name (string) [Required] - Name of the table to analyze
|
- table_name (string) [Required] - Name of the table to analyze
|
||||||
- analysis_scope (string) [Optional] - Analysis scope, default is "comprehensive"
|
- catalog_name (string) [Optional] - Target catalog name
|
||||||
* "completeness": Only completeness analysis (null rates, business rules)
|
- db_name (string) [Optional] - Target database name
|
||||||
* "distribution": Only distribution analysis (statistical patterns)
|
""",
|
||||||
* "comprehensive": Full analysis including both completeness and distribution
|
)
|
||||||
|
async def get_table_basic_info_tool(
|
||||||
|
table_name: str,
|
||||||
|
catalog_name: str = None,
|
||||||
|
db_name: str = None
|
||||||
|
) -> str:
|
||||||
|
"""Get table basic information"""
|
||||||
|
return await self.call_tool("get_table_basic_info", {
|
||||||
|
"table_name": table_name,
|
||||||
|
"catalog_name": catalog_name,
|
||||||
|
"db_name": db_name
|
||||||
|
})
|
||||||
|
|
||||||
|
@mcp.tool(
|
||||||
|
"analyze_columns",
|
||||||
|
description="""[Function Description]: Analyze completeness and distribution of specified columns in a table.
|
||||||
|
|
||||||
|
[Parameter Content]:
|
||||||
|
|
||||||
|
- table_name (string) [Required] - Name of the table to analyze
|
||||||
|
- columns (array) [Required] - List of column names to analyze
|
||||||
|
- analysis_types (array) [Optional] - Types of analysis to perform, default is ["both"]
|
||||||
|
* "completeness": Only completeness analysis (null rates, non-null counts)
|
||||||
|
* "distribution": Only distribution analysis (statistical patterns by data type)
|
||||||
|
* "both": Both completeness and distribution analysis
|
||||||
- sample_size (integer) [Optional] - Maximum number of rows to sample, default is 100000
|
- sample_size (integer) [Optional] - Maximum number of rows to sample, default is 100000
|
||||||
- include_all_columns (boolean) [Optional] - Whether to analyze all columns, default is false
|
|
||||||
- business_rules (array) [Optional] - Business rule validations in format [{"rule_name": "email_format", "sql_condition": "email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"}]
|
|
||||||
- catalog_name (string) [Optional] - Target catalog name
|
- catalog_name (string) [Optional] - Target catalog name
|
||||||
- db_name (string) [Optional] - Target database name
|
- db_name (string) [Optional] - Target database name
|
||||||
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
|
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
|
||||||
""",
|
""",
|
||||||
)
|
)
|
||||||
async def analyze_data_quality_tool(
|
async def analyze_columns_tool(
|
||||||
table_name: str,
|
table_name: str,
|
||||||
analysis_scope: str = "comprehensive",
|
columns: List[str],
|
||||||
|
analysis_types: List[str] = None,
|
||||||
sample_size: int = 100000,
|
sample_size: int = 100000,
|
||||||
include_all_columns: bool = False,
|
|
||||||
business_rules: List[dict] = None,
|
|
||||||
catalog_name: str = None,
|
catalog_name: str = None,
|
||||||
db_name: str = None,
|
db_name: str = None,
|
||||||
detailed_response: bool = False
|
detailed_response: bool = False
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Unified data quality analysis tool"""
|
"""Analyze table columns"""
|
||||||
return await self.call_tool("analyze_data_quality", {
|
return await self.call_tool("analyze_columns", {
|
||||||
"table_name": table_name,
|
"table_name": table_name,
|
||||||
"analysis_scope": analysis_scope,
|
"columns": columns,
|
||||||
|
"analysis_types": analysis_types or ["both"],
|
||||||
"sample_size": sample_size,
|
"sample_size": sample_size,
|
||||||
"include_all_columns": include_all_columns,
|
"catalog_name": catalog_name,
|
||||||
"business_rules": business_rules,
|
"db_name": db_name,
|
||||||
|
"detailed_response": detailed_response
|
||||||
|
})
|
||||||
|
|
||||||
|
@mcp.tool(
|
||||||
|
"analyze_table_storage",
|
||||||
|
description="""[Function Description]: Analyze table's physical distribution and storage information.
|
||||||
|
|
||||||
|
[Parameter Content]:
|
||||||
|
|
||||||
|
- table_name (string) [Required] - Name of the table to analyze
|
||||||
|
- catalog_name (string) [Optional] - Target catalog name
|
||||||
|
- db_name (string) [Optional] - Target database name
|
||||||
|
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
async def analyze_table_storage_tool(
|
||||||
|
table_name: str,
|
||||||
|
catalog_name: str = None,
|
||||||
|
db_name: str = None,
|
||||||
|
detailed_response: bool = False
|
||||||
|
) -> str:
|
||||||
|
"""Analyze table storage"""
|
||||||
|
return await self.call_tool("analyze_table_storage", {
|
||||||
|
"table_name": table_name,
|
||||||
"catalog_name": catalog_name,
|
"catalog_name": catalog_name,
|
||||||
"db_name": db_name,
|
"db_name": db_name,
|
||||||
"detailed_response": detailed_response
|
"detailed_response": detailed_response
|
||||||
@@ -721,7 +767,7 @@ No parameters required. Returns connection status, configuration, and diagnostic
|
|||||||
"""Get ADBC connection information and status"""
|
"""Get ADBC connection information and status"""
|
||||||
return await self.call_tool("get_adbc_connection_info", {})
|
return await self.call_tool("get_adbc_connection_info", {})
|
||||||
|
|
||||||
logger.info("Successfully registered 23 tools to MCP server (14 basic + 7 advanced analytics + 2 ADBC tools)")
|
logger.info("Successfully registered 25 tools to MCP server (14 basic + 9 advanced analytics + 2 ADBC tools)")
|
||||||
|
|
||||||
async def list_tools(self) -> List[Tool]:
|
async def list_tools(self) -> List[Tool]:
|
||||||
"""List all available query tools (for stdio mode)"""
|
"""List all available query tools (for stdio mode)"""
|
||||||
@@ -1064,20 +1110,14 @@ No parameters required. Returns connection status, configuration, and diagnostic
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
# ==================== v0.5.0 Advanced Analytics Tools ====================
|
# ==================== v0.5.0 Advanced Analytics Tools ====================
|
||||||
|
# Atomic Data Quality Analysis Tools
|
||||||
Tool(
|
Tool(
|
||||||
name="analyze_data_quality",
|
name="get_table_basic_info",
|
||||||
description="""[Function Description]: Comprehensive data quality analysis combining completeness and distribution analysis.
|
description="""[Function Description]: Get basic information about a table including row count, column count, partitions, and size.
|
||||||
|
|
||||||
[Parameter Content]:
|
[Parameter Content]:
|
||||||
|
|
||||||
- table_name (string) [Required] - Name of the table to analyze
|
- table_name (string) [Required] - Name of the table to analyze
|
||||||
- analysis_scope (string) [Optional] - Analysis scope, default is "comprehensive"
|
|
||||||
* "completeness": Only completeness analysis (null rates, business rules)
|
|
||||||
* "distribution": Only distribution analysis (statistical patterns)
|
|
||||||
* "comprehensive": Full analysis including both completeness and distribution
|
|
||||||
- sample_size (integer) [Optional] - Maximum number of rows to sample, default is 100000
|
|
||||||
- include_all_columns (boolean) [Optional] - Whether to analyze all columns, default is false
|
|
||||||
- business_rules (array) [Optional] - Business rule validations in format [{"rule_name": "email_format", "sql_condition": "email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"}]
|
|
||||||
- catalog_name (string) [Optional] - Target catalog name
|
- catalog_name (string) [Optional] - Target catalog name
|
||||||
- db_name (string) [Optional] - Target database name
|
- db_name (string) [Optional] - Target database name
|
||||||
""",
|
""",
|
||||||
@@ -1085,10 +1125,58 @@ No parameters required. Returns connection status, configuration, and diagnostic
|
|||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
"table_name": {"type": "string", "description": "Name of the table to analyze"},
|
"table_name": {"type": "string", "description": "Name of the table to analyze"},
|
||||||
"analysis_scope": {"type": "string", "enum": ["completeness", "distribution", "comprehensive"], "description": "Analysis scope", "default": "comprehensive"},
|
"catalog_name": {"type": "string", "description": "Target catalog name"},
|
||||||
|
"db_name": {"type": "string", "description": "Target database name"},
|
||||||
|
},
|
||||||
|
"required": ["table_name"],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
Tool(
|
||||||
|
name="analyze_columns",
|
||||||
|
description="""[Function Description]: Analyze completeness and distribution of specified columns in a table.
|
||||||
|
|
||||||
|
[Parameter Content]:
|
||||||
|
|
||||||
|
- table_name (string) [Required] - Name of the table to analyze
|
||||||
|
- columns (array) [Required] - List of column names to analyze
|
||||||
|
- analysis_types (array) [Optional] - Types of analysis to perform, default is ["both"]
|
||||||
|
* "completeness": Only completeness analysis (null rates, non-null counts)
|
||||||
|
* "distribution": Only distribution analysis (statistical patterns by data type)
|
||||||
|
* "both": Both completeness and distribution analysis
|
||||||
|
- sample_size (integer) [Optional] - Maximum number of rows to sample, default is 100000
|
||||||
|
- catalog_name (string) [Optional] - Target catalog name
|
||||||
|
- db_name (string) [Optional] - Target database name
|
||||||
|
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
|
||||||
|
""",
|
||||||
|
inputSchema={
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"table_name": {"type": "string", "description": "Name of the table to analyze"},
|
||||||
|
"columns": {"type": "array", "items": {"type": "string"}, "description": "List of column names to analyze"},
|
||||||
|
"analysis_types": {"type": "array", "items": {"type": "string", "enum": ["completeness", "distribution", "both"]}, "description": "Types of analysis to perform", "default": ["both"]},
|
||||||
"sample_size": {"type": "integer", "description": "Maximum number of rows to sample", "default": 100000},
|
"sample_size": {"type": "integer", "description": "Maximum number of rows to sample", "default": 100000},
|
||||||
"include_all_columns": {"type": "boolean", "description": "Whether to analyze all columns", "default": False},
|
"catalog_name": {"type": "string", "description": "Target catalog name"},
|
||||||
"business_rules": {"type": "array", "items": {"type": "object"}, "description": "Business rule validations"},
|
"db_name": {"type": "string", "description": "Target database name"},
|
||||||
|
"detailed_response": {"type": "boolean", "description": "Whether to return detailed response including raw data", "default": False},
|
||||||
|
},
|
||||||
|
"required": ["table_name", "columns"],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
Tool(
|
||||||
|
name="analyze_table_storage",
|
||||||
|
description="""[Function Description]: Analyze table's physical distribution and storage information.
|
||||||
|
|
||||||
|
[Parameter Content]:
|
||||||
|
|
||||||
|
- table_name (string) [Required] - Name of the table to analyze
|
||||||
|
- catalog_name (string) [Optional] - Target catalog name
|
||||||
|
- db_name (string) [Optional] - Target database name
|
||||||
|
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
|
||||||
|
""",
|
||||||
|
inputSchema={
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"table_name": {"type": "string", "description": "Name of the table to analyze"},
|
||||||
"catalog_name": {"type": "string", "description": "Target catalog name"},
|
"catalog_name": {"type": "string", "description": "Target catalog name"},
|
||||||
"db_name": {"type": "string", "description": "Target database name"},
|
"db_name": {"type": "string", "description": "Target database name"},
|
||||||
"detailed_response": {"type": "boolean", "description": "Whether to return detailed response including raw data", "default": False},
|
"detailed_response": {"type": "boolean", "description": "Whether to return detailed response including raw data", "default": False},
|
||||||
@@ -1096,7 +1184,6 @@ No parameters required. Returns connection status, configuration, and diagnostic
|
|||||||
"required": ["table_name"],
|
"required": ["table_name"],
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
||||||
Tool(
|
Tool(
|
||||||
name="trace_column_lineage",
|
name="trace_column_lineage",
|
||||||
description="""[Function Description]: Trace data lineage for specified columns through SQL analysis and dependency mapping.
|
description="""[Function Description]: Trace data lineage for specified columns through SQL analysis and dependency mapping.
|
||||||
@@ -1323,9 +1410,13 @@ No parameters required. Returns connection status, configuration, and diagnostic
|
|||||||
elif name == "get_historical_memory_stats":
|
elif name == "get_historical_memory_stats":
|
||||||
arguments["data_type"] = "historical"
|
arguments["data_type"] = "historical"
|
||||||
result = await self._get_memory_stats_tool(arguments)
|
result = await self._get_memory_stats_tool(arguments)
|
||||||
# v0.5.0 Advanced Analytics Tools
|
# v0.5.0 Advanced Analytics Tools - Atomic Data Quality Tools
|
||||||
elif name == "analyze_data_quality":
|
elif name == "get_table_basic_info":
|
||||||
result = await self._analyze_data_quality_tool(arguments)
|
result = await self._get_table_basic_info_tool(arguments)
|
||||||
|
elif name == "analyze_columns":
|
||||||
|
result = await self._analyze_columns_tool(arguments)
|
||||||
|
elif name == "analyze_table_storage":
|
||||||
|
result = await self._analyze_table_storage_tool(arguments)
|
||||||
elif name == "trace_column_lineage":
|
elif name == "trace_column_lineage":
|
||||||
result = await self._trace_column_lineage_tool(arguments)
|
result = await self._trace_column_lineage_tool(arguments)
|
||||||
elif name == "monitor_data_freshness":
|
elif name == "monitor_data_freshness":
|
||||||
@@ -1595,26 +1686,46 @@ No parameters required. Returns connection status, configuration, and diagnostic
|
|||||||
|
|
||||||
# ==================== v0.5.0 Advanced Analytics Tools Private Methods ====================
|
# ==================== v0.5.0 Advanced Analytics Tools Private Methods ====================
|
||||||
|
|
||||||
async def _analyze_data_quality_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
|
async def _get_table_basic_info_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
"""Unified data quality analysis tool routing"""
|
"""Get table basic information tool routing"""
|
||||||
try:
|
try:
|
||||||
# Extract parameters
|
|
||||||
table_name = arguments.get("table_name")
|
table_name = arguments.get("table_name")
|
||||||
analysis_scope = arguments.get("analysis_scope", "comprehensive")
|
catalog_name = arguments.get("catalog_name")
|
||||||
|
db_name = arguments.get("db_name")
|
||||||
|
|
||||||
|
# Delegate to atomic data quality tools
|
||||||
|
result = await self.data_quality_tools.get_table_basic_info(
|
||||||
|
table_name=table_name,
|
||||||
|
catalog_name=catalog_name,
|
||||||
|
db_name=db_name
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return {
|
||||||
|
"error": str(e),
|
||||||
|
"analysis_type": "table_basic_info",
|
||||||
|
"timestamp": datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
|
||||||
|
async def _analyze_columns_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""Analyze columns tool routing"""
|
||||||
|
try:
|
||||||
|
table_name = arguments.get("table_name")
|
||||||
|
columns = arguments.get("columns")
|
||||||
|
analysis_types = arguments.get("analysis_types", ["both"])
|
||||||
sample_size = arguments.get("sample_size", 100000)
|
sample_size = arguments.get("sample_size", 100000)
|
||||||
include_all_columns = arguments.get("include_all_columns", False)
|
|
||||||
business_rules = arguments.get("business_rules", [])
|
|
||||||
catalog_name = arguments.get("catalog_name")
|
catalog_name = arguments.get("catalog_name")
|
||||||
db_name = arguments.get("db_name")
|
db_name = arguments.get("db_name")
|
||||||
detailed_response = arguments.get("detailed_response", False)
|
detailed_response = arguments.get("detailed_response", False)
|
||||||
|
|
||||||
# Delegate to the unified data quality tools
|
# Delegate to atomic data quality tools
|
||||||
result = await self.data_quality_tools.analyze_data_quality(
|
result = await self.data_quality_tools.analyze_columns(
|
||||||
table_name=table_name,
|
table_name=table_name,
|
||||||
analysis_scope=analysis_scope,
|
columns=columns,
|
||||||
|
analysis_types=analysis_types,
|
||||||
sample_size=sample_size,
|
sample_size=sample_size,
|
||||||
include_all_columns=include_all_columns,
|
|
||||||
business_rules=business_rules,
|
|
||||||
catalog_name=catalog_name,
|
catalog_name=catalog_name,
|
||||||
db_name=db_name,
|
db_name=db_name,
|
||||||
detailed_response=detailed_response
|
detailed_response=detailed_response
|
||||||
@@ -1625,7 +1736,32 @@ No parameters required. Returns connection status, configuration, and diagnostic
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {
|
return {
|
||||||
"error": str(e),
|
"error": str(e),
|
||||||
"analysis_type": "unified_data_quality",
|
"analysis_type": "columns_analysis",
|
||||||
|
"timestamp": datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
|
||||||
|
async def _analyze_table_storage_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""Analyze table storage tool routing"""
|
||||||
|
try:
|
||||||
|
table_name = arguments.get("table_name")
|
||||||
|
catalog_name = arguments.get("catalog_name")
|
||||||
|
db_name = arguments.get("db_name")
|
||||||
|
detailed_response = arguments.get("detailed_response", False)
|
||||||
|
|
||||||
|
# Delegate to atomic data quality tools
|
||||||
|
result = await self.data_quality_tools.analyze_table_storage(
|
||||||
|
table_name=table_name,
|
||||||
|
catalog_name=catalog_name,
|
||||||
|
db_name=db_name,
|
||||||
|
detailed_response=detailed_response
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return {
|
||||||
|
"error": str(e),
|
||||||
|
"analysis_type": "table_storage_analysis",
|
||||||
"timestamp": datetime.now().isoformat()
|
"timestamp": datetime.now().isoformat()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -137,6 +137,33 @@ class PerformanceConfig:
|
|||||||
max_response_content_size: int = 4096
|
max_response_content_size: int = 4096
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DataQualityConfig:
|
||||||
|
"""Data quality analysis configuration"""
|
||||||
|
|
||||||
|
# Column analysis configuration
|
||||||
|
max_columns_per_batch: int = 20 # Maximum columns to analyze in a single batch
|
||||||
|
default_sample_size: int = 100000 # Default sample size for analysis
|
||||||
|
|
||||||
|
# Sampling strategy configuration
|
||||||
|
small_table_threshold: int = 100000 # Tables smaller than this use full table analysis
|
||||||
|
medium_table_threshold: int = 1000000 # Tables smaller than this use simple LIMIT sampling
|
||||||
|
# Tables larger than medium_table_threshold use systematic sampling
|
||||||
|
|
||||||
|
# Performance optimization
|
||||||
|
enable_batch_analysis: bool = True # Enable batch analysis for multiple columns
|
||||||
|
batch_timeout: int = 300 # Timeout for batch analysis in seconds
|
||||||
|
|
||||||
|
# Accuracy vs Performance trade-off
|
||||||
|
enable_fast_mode: bool = False # Use approximate algorithms for faster results
|
||||||
|
fast_mode_sample_size: int = 10000 # Sample size for fast mode
|
||||||
|
|
||||||
|
# Statistical analysis configuration
|
||||||
|
enable_distribution_analysis: bool = True # Enable distribution analysis
|
||||||
|
histogram_bins: int = 20 # Number of bins for histogram analysis
|
||||||
|
percentile_levels: list[float] = field(default_factory=lambda: [0.25, 0.5, 0.75, 0.95, 0.99]) # Percentile levels to calculate
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ADBCConfig:
|
class ADBCConfig:
|
||||||
"""ADBC (Arrow Flight SQL) configuration"""
|
"""ADBC (Arrow Flight SQL) configuration"""
|
||||||
@@ -208,6 +235,7 @@ class DorisConfig:
|
|||||||
database: DatabaseConfig = field(default_factory=DatabaseConfig)
|
database: DatabaseConfig = field(default_factory=DatabaseConfig)
|
||||||
security: SecurityConfig = field(default_factory=SecurityConfig)
|
security: SecurityConfig = field(default_factory=SecurityConfig)
|
||||||
performance: PerformanceConfig = field(default_factory=PerformanceConfig)
|
performance: PerformanceConfig = field(default_factory=PerformanceConfig)
|
||||||
|
data_quality: DataQualityConfig = field(default_factory=DataQualityConfig)
|
||||||
logging: LoggingConfig = field(default_factory=LoggingConfig)
|
logging: LoggingConfig = field(default_factory=LoggingConfig)
|
||||||
monitoring: MonitoringConfig = field(default_factory=MonitoringConfig)
|
monitoring: MonitoringConfig = field(default_factory=MonitoringConfig)
|
||||||
adbc: ADBCConfig = field(default_factory=ADBCConfig)
|
adbc: ADBCConfig = field(default_factory=ADBCConfig)
|
||||||
@@ -404,6 +432,38 @@ class DorisConfig:
|
|||||||
os.getenv("ADBC_ENABLED", str(config.adbc.enabled).lower()).lower() == "true"
|
os.getenv("ADBC_ENABLED", str(config.adbc.enabled).lower()).lower() == "true"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Data quality configuration
|
||||||
|
config.data_quality.max_columns_per_batch = int(
|
||||||
|
os.getenv("DATA_QUALITY_MAX_COLUMNS_PER_BATCH", str(config.data_quality.max_columns_per_batch))
|
||||||
|
)
|
||||||
|
config.data_quality.default_sample_size = int(
|
||||||
|
os.getenv("DATA_QUALITY_DEFAULT_SAMPLE_SIZE", str(config.data_quality.default_sample_size))
|
||||||
|
)
|
||||||
|
config.data_quality.small_table_threshold = int(
|
||||||
|
os.getenv("DATA_QUALITY_SMALL_TABLE_THRESHOLD", str(config.data_quality.small_table_threshold))
|
||||||
|
)
|
||||||
|
config.data_quality.medium_table_threshold = int(
|
||||||
|
os.getenv("DATA_QUALITY_MEDIUM_TABLE_THRESHOLD", str(config.data_quality.medium_table_threshold))
|
||||||
|
)
|
||||||
|
config.data_quality.enable_batch_analysis = (
|
||||||
|
os.getenv("DATA_QUALITY_ENABLE_BATCH_ANALYSIS", str(config.data_quality.enable_batch_analysis).lower()).lower() == "true"
|
||||||
|
)
|
||||||
|
config.data_quality.batch_timeout = int(
|
||||||
|
os.getenv("DATA_QUALITY_BATCH_TIMEOUT", str(config.data_quality.batch_timeout))
|
||||||
|
)
|
||||||
|
config.data_quality.enable_fast_mode = (
|
||||||
|
os.getenv("DATA_QUALITY_ENABLE_FAST_MODE", str(config.data_quality.enable_fast_mode).lower()).lower() == "true"
|
||||||
|
)
|
||||||
|
config.data_quality.fast_mode_sample_size = int(
|
||||||
|
os.getenv("DATA_QUALITY_FAST_MODE_SAMPLE_SIZE", str(config.data_quality.fast_mode_sample_size))
|
||||||
|
)
|
||||||
|
config.data_quality.enable_distribution_analysis = (
|
||||||
|
os.getenv("DATA_QUALITY_ENABLE_DISTRIBUTION_ANALYSIS", str(config.data_quality.enable_distribution_analysis).lower()).lower() == "true"
|
||||||
|
)
|
||||||
|
config.data_quality.histogram_bins = int(
|
||||||
|
os.getenv("DATA_QUALITY_HISTOGRAM_BINS", str(config.data_quality.histogram_bins))
|
||||||
|
)
|
||||||
|
|
||||||
# Server configuration
|
# Server configuration
|
||||||
config.server_name = os.getenv("SERVER_NAME", config.server_name)
|
config.server_name = os.getenv("SERVER_NAME", config.server_name)
|
||||||
config.server_version = os.getenv("SERVER_VERSION", config.server_version)
|
config.server_version = os.getenv("SERVER_VERSION", config.server_version)
|
||||||
@@ -443,6 +503,13 @@ class DorisConfig:
|
|||||||
if hasattr(config.performance, key):
|
if hasattr(config.performance, key):
|
||||||
setattr(config.performance, key, value)
|
setattr(config.performance, key, value)
|
||||||
|
|
||||||
|
# Update data quality configuration
|
||||||
|
if "data_quality" in config_data:
|
||||||
|
dq_config = config_data["data_quality"]
|
||||||
|
for key, value in dq_config.items():
|
||||||
|
if hasattr(config.data_quality, key):
|
||||||
|
setattr(config.data_quality, key, value)
|
||||||
|
|
||||||
# Update logging configuration
|
# Update logging configuration
|
||||||
if "logging" in config_data:
|
if "logging" in config_data:
|
||||||
log_config = config_data["logging"]
|
log_config = config_data["logging"]
|
||||||
@@ -516,6 +583,19 @@ class DorisConfig:
|
|||||||
"idle_timeout": self.performance.idle_timeout,
|
"idle_timeout": self.performance.idle_timeout,
|
||||||
"max_response_content_size": self.performance.max_response_content_size,
|
"max_response_content_size": self.performance.max_response_content_size,
|
||||||
},
|
},
|
||||||
|
"data_quality": {
|
||||||
|
"max_columns_per_batch": self.data_quality.max_columns_per_batch,
|
||||||
|
"default_sample_size": self.data_quality.default_sample_size,
|
||||||
|
"small_table_threshold": self.data_quality.small_table_threshold,
|
||||||
|
"medium_table_threshold": self.data_quality.medium_table_threshold,
|
||||||
|
"enable_batch_analysis": self.data_quality.enable_batch_analysis,
|
||||||
|
"batch_timeout": self.data_quality.batch_timeout,
|
||||||
|
"enable_fast_mode": self.data_quality.enable_fast_mode,
|
||||||
|
"fast_mode_sample_size": self.data_quality.fast_mode_sample_size,
|
||||||
|
"enable_distribution_analysis": self.data_quality.enable_distribution_analysis,
|
||||||
|
"histogram_bins": self.data_quality.histogram_bins,
|
||||||
|
"percentile_levels": self.data_quality.percentile_levels,
|
||||||
|
},
|
||||||
"logging": {
|
"logging": {
|
||||||
"level": self.logging.level,
|
"level": self.logging.level,
|
||||||
"format": self.logging.format,
|
"format": self.logging.format,
|
||||||
@@ -602,6 +682,31 @@ class DorisConfig:
|
|||||||
if self.performance.query_timeout <= 0:
|
if self.performance.query_timeout <= 0:
|
||||||
errors.append("Query timeout must be greater than 0")
|
errors.append("Query timeout must be greater than 0")
|
||||||
|
|
||||||
|
# Validate data quality configuration
|
||||||
|
if self.data_quality.max_columns_per_batch <= 0:
|
||||||
|
errors.append("Max columns per batch must be greater than 0")
|
||||||
|
|
||||||
|
if self.data_quality.default_sample_size <= 0:
|
||||||
|
errors.append("Default sample size must be greater than 0")
|
||||||
|
|
||||||
|
if self.data_quality.small_table_threshold <= 0:
|
||||||
|
errors.append("Small table threshold must be greater than 0")
|
||||||
|
|
||||||
|
if self.data_quality.medium_table_threshold <= 0:
|
||||||
|
errors.append("Medium table threshold must be greater than 0")
|
||||||
|
|
||||||
|
if self.data_quality.small_table_threshold >= self.data_quality.medium_table_threshold:
|
||||||
|
errors.append("Small table threshold must be less than medium table threshold")
|
||||||
|
|
||||||
|
if self.data_quality.batch_timeout <= 0:
|
||||||
|
errors.append("Batch timeout must be greater than 0")
|
||||||
|
|
||||||
|
if self.data_quality.fast_mode_sample_size <= 0:
|
||||||
|
errors.append("Fast mode sample size must be greater than 0")
|
||||||
|
|
||||||
|
if self.data_quality.histogram_bins <= 0:
|
||||||
|
errors.append("Histogram bins must be greater than 0")
|
||||||
|
|
||||||
# Validate logging configuration
|
# Validate logging configuration
|
||||||
if self.logging.level not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:
|
if self.logging.level not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:
|
||||||
errors.append("Log level must be one of DEBUG, INFO, WARNING, ERROR, or CRITICAL")
|
errors.append("Log level must be one of DEBUG, INFO, WARNING, ERROR, or CRITICAL")
|
||||||
|
|||||||
@@ -59,29 +59,57 @@ class DataGovernanceTools:
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
|
# 🚀 PROGRESS: Initialize column lineage tracing
|
||||||
|
logger.info("=" * 60)
|
||||||
|
logger.info(f"🔍 Starting Column Lineage Tracing")
|
||||||
|
logger.info(f"📊 Target: {table_name}.{column_name}")
|
||||||
|
logger.info(f"🎯 Trace depth: {depth}")
|
||||||
|
logger.info("=" * 60)
|
||||||
|
|
||||||
connection = await self.connection_manager.get_connection("query")
|
connection = await self.connection_manager.get_connection("query")
|
||||||
|
|
||||||
full_table_name = self._build_full_table_name(table_name, catalog_name, db_name)
|
full_table_name = self._build_full_table_name(table_name, catalog_name, db_name)
|
||||||
target_column = f"{full_table_name}.{column_name}"
|
target_column = f"{full_table_name}.{column_name}"
|
||||||
|
|
||||||
# 1. Verify target column exists
|
logger.info(f"📝 Full target: {target_column}")
|
||||||
|
|
||||||
|
# 🚀 PROGRESS: Step 1 - Verify target column exists
|
||||||
|
logger.info("🔍 Step 1/4: Verifying target column exists...")
|
||||||
|
verify_start = time.time()
|
||||||
if not await self._verify_column_exists(connection, full_table_name, column_name):
|
if not await self._verify_column_exists(connection, full_table_name, column_name):
|
||||||
|
logger.error(f"❌ Column {column_name} not found in table {full_table_name}")
|
||||||
return {"error": f"Column {column_name} not found in table {full_table_name}"}
|
return {"error": f"Column {column_name} not found in table {full_table_name}"}
|
||||||
|
|
||||||
# 2. Analyze SQL logs to get lineage relationships
|
verify_time = time.time() - verify_start
|
||||||
|
logger.info(f"✅ Column verified in {verify_time:.2f}s")
|
||||||
|
|
||||||
|
# 🚀 PROGRESS: Step 2 - Analyze SQL logs for lineage relationships
|
||||||
|
logger.info(f"📊 Step 2/4: Analyzing SQL logs for lineage (depth={depth})...")
|
||||||
|
lineage_start = time.time()
|
||||||
source_chain = await self._analyze_sql_logs_for_lineage(
|
source_chain = await self._analyze_sql_logs_for_lineage(
|
||||||
connection, full_table_name, column_name, depth
|
connection, full_table_name, column_name, depth
|
||||||
)
|
)
|
||||||
|
lineage_time = time.time() - lineage_start
|
||||||
|
logger.info(f"✅ Found {len(source_chain)} lineage relationships in {lineage_time:.2f}s")
|
||||||
|
|
||||||
# 3. Analyze downstream usage
|
# 🚀 PROGRESS: Step 3 - Analyze downstream usage
|
||||||
|
logger.info("⬇️ Step 3/4: Analyzing downstream column usage...")
|
||||||
|
downstream_start = time.time()
|
||||||
downstream_usage = await self._analyze_downstream_column_usage(
|
downstream_usage = await self._analyze_downstream_column_usage(
|
||||||
connection, full_table_name, column_name
|
connection, full_table_name, column_name
|
||||||
)
|
)
|
||||||
|
downstream_time = time.time() - downstream_start
|
||||||
|
logger.info(f"✅ Found {len(downstream_usage)} downstream usages in {downstream_time:.2f}s")
|
||||||
|
|
||||||
# 4. Analyze field transformation rules
|
# 🚀 PROGRESS: Step 4 - Extract transformation rules
|
||||||
|
logger.info("🔄 Step 4/4: Extracting transformation rules...")
|
||||||
|
transform_start = time.time()
|
||||||
transformation_rules = await self._extract_transformation_rules(
|
transformation_rules = await self._extract_transformation_rules(
|
||||||
connection, full_table_name, column_name
|
connection, full_table_name, column_name
|
||||||
)
|
)
|
||||||
|
transform_time = time.time() - transform_start
|
||||||
|
logger.info(f"✅ Found {len(transformation_rules)} transformation rules in {transform_time:.2f}s")
|
||||||
|
|
||||||
execution_time = time.time() - start_time
|
execution_time = time.time() - start_time
|
||||||
|
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -192,33 +192,35 @@ class DorisConnection:
|
|||||||
|
|
||||||
|
|
||||||
class DorisConnectionManager:
|
class DorisConnectionManager:
|
||||||
"""Doris database connection manager - Simplified Strategy
|
"""Doris database connection manager - Enhanced Strategy
|
||||||
|
|
||||||
Uses direct connection pool management without session-level caching
|
Uses direct connection pool management with proper synchronization
|
||||||
Implements connection pool health monitoring and proactive cleanup
|
Implements connection pool health monitoring and proactive cleanup
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, config, security_manager=None):
|
def __init__(self, config, security_manager=None):
|
||||||
self.config = config
|
self.config = config
|
||||||
self.security_manager = security_manager
|
|
||||||
self.pool: Pool | None = None
|
self.pool: Pool | None = None
|
||||||
self.logger = get_logger(__name__)
|
self.logger = get_logger(__name__)
|
||||||
self.metrics = ConnectionMetrics()
|
self.security_manager = security_manager
|
||||||
|
|
||||||
# Remove session-level connection management
|
# Connection pool state management
|
||||||
# self.session_connections = {} # REMOVED
|
self.pool_recovering = False
|
||||||
|
|
||||||
# Pool health monitoring
|
|
||||||
self.health_check_interval = 30 # seconds
|
|
||||||
self.pool_warmup_size = 3 # connections to maintain
|
|
||||||
self.pool_health_check_task = None
|
self.pool_health_check_task = None
|
||||||
self.pool_cleanup_task = None
|
self.pool_cleanup_task = None
|
||||||
|
|
||||||
# Pool recovery lock to prevent race conditions
|
# Metrics tracking
|
||||||
self.pool_recovery_lock = asyncio.Lock()
|
self.metrics = ConnectionMetrics()
|
||||||
self.pool_recovering = False
|
|
||||||
|
# 🔧 FIX: Add connection acquisition lock to prevent race conditions
|
||||||
|
self._connection_lock = asyncio.Lock()
|
||||||
|
self._recovery_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
# 🔧 FIX: Add connection acquisition queue to serialize requests
|
||||||
|
self._connection_semaphore = asyncio.Semaphore(value=20) # Max concurrent acquisitions
|
||||||
|
|
||||||
# Database connection parameters from config.database
|
# Database connection parameters from config.database
|
||||||
|
self.pool_recovery_lock = self._recovery_lock # Compatibility alias
|
||||||
self.host = config.database.host
|
self.host = config.database.host
|
||||||
self.port = config.database.port
|
self.port = config.database.port
|
||||||
self.user = config.database.user
|
self.user = config.database.user
|
||||||
@@ -231,9 +233,13 @@ class DorisConnectionManager:
|
|||||||
|
|
||||||
# Connection pool parameters - more conservative settings
|
# Connection pool parameters - more conservative settings
|
||||||
self.minsize = config.database.min_connections # This is always 0
|
self.minsize = config.database.min_connections # This is always 0
|
||||||
self.maxsize = config.database.max_connections or 10
|
self.maxsize = config.database.max_connections or 20
|
||||||
self.pool_recycle = config.database.max_connection_age or 3600 # 1 hour, more conservative
|
self.pool_recycle = config.database.max_connection_age or 3600 # 1 hour, more conservative
|
||||||
|
|
||||||
|
# 🔧 FIX: Add missing monitoring parameters that were removed during refactoring
|
||||||
|
self.health_check_interval = 30 # seconds
|
||||||
|
self.pool_warmup_size = 3 # connections to maintain
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
"""Initialize connection pool with health monitoring"""
|
"""Initialize connection pool with health monitoring"""
|
||||||
try:
|
try:
|
||||||
@@ -506,77 +512,118 @@ class DorisConnectionManager:
|
|||||||
finally:
|
finally:
|
||||||
self.pool_recovering = False
|
self.pool_recovering = False
|
||||||
|
|
||||||
|
async def _recover_pool_with_lock(self):
|
||||||
|
"""🔧 FIX: Recovery method that uses the new recovery lock to prevent races"""
|
||||||
|
async with self._recovery_lock:
|
||||||
|
if not self.pool_recovering: # Only recover if not already in progress
|
||||||
|
await self._recover_pool()
|
||||||
|
|
||||||
async def get_connection(self, session_id: str) -> DorisConnection:
|
async def get_connection(self, session_id: str) -> DorisConnection:
|
||||||
"""Get database connection - Simplified Strategy with pool validation
|
"""🔧 FIX: Simplified connection acquisition without double locking
|
||||||
|
|
||||||
Always acquire fresh connection from pool, no session caching
|
Uses only semaphore to prevent too many concurrent acquisitions
|
||||||
"""
|
"""
|
||||||
try:
|
# 🔧 FIX: Use only semaphore to limit concurrent acquisitions (remove double locking)
|
||||||
# Wait for any ongoing recovery to complete
|
async with self._connection_semaphore:
|
||||||
if self.pool_recovering:
|
try:
|
||||||
self.logger.debug(f"Pool recovery in progress, waiting for completion...")
|
# Wait for any ongoing recovery to complete
|
||||||
# Wait for recovery to complete (max 10 seconds)
|
|
||||||
for _ in range(10):
|
|
||||||
if not self.pool_recovering:
|
|
||||||
break
|
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
|
|
||||||
if self.pool_recovering:
|
if self.pool_recovering:
|
||||||
self.logger.error("Pool recovery is taking too long, proceeding anyway")
|
self.logger.debug(f"Pool recovery in progress, waiting for completion...")
|
||||||
# Don't raise error, try to continue
|
# Wait for recovery to complete (max 10 seconds)
|
||||||
|
start_wait = time.time()
|
||||||
|
while self.pool_recovering and (time.time() - start_wait) < 10:
|
||||||
|
await asyncio.sleep(0.1) # More frequent checks
|
||||||
|
|
||||||
# Check if pool is available
|
if self.pool_recovering:
|
||||||
if not self.pool:
|
self.logger.error("Pool recovery is taking too long, proceeding anyway")
|
||||||
self.logger.warning("Connection pool is not available, attempting recovery...")
|
# Continue but log the issue
|
||||||
await self._recover_pool()
|
|
||||||
|
|
||||||
|
# Check if pool is available
|
||||||
if not self.pool:
|
if not self.pool:
|
||||||
raise RuntimeError("Connection pool is not available and recovery failed")
|
self.logger.warning("Connection pool is not available, attempting recovery...")
|
||||||
|
await self._recover_pool_with_lock()
|
||||||
|
|
||||||
# Check if pool is closed
|
if not self.pool:
|
||||||
if self.pool.closed:
|
raise RuntimeError("Connection pool is not available and recovery failed")
|
||||||
self.logger.warning("Connection pool is closed, attempting recovery...")
|
|
||||||
await self._recover_pool()
|
|
||||||
|
|
||||||
if not self.pool or self.pool.closed:
|
# Check if pool is closed
|
||||||
raise RuntimeError("Connection pool is closed and recovery failed")
|
if self.pool.closed:
|
||||||
|
self.logger.warning("Connection pool is closed, attempting recovery...")
|
||||||
|
await self._recover_pool_with_lock()
|
||||||
|
|
||||||
# Simple strategy: always get fresh connection from pool
|
if not self.pool or self.pool.closed:
|
||||||
raw_conn = await self.pool.acquire()
|
raise RuntimeError("Connection pool is closed and recovery failed")
|
||||||
|
|
||||||
# Wrap in DorisConnection
|
# 🔧 FIX: Increased timeout to prevent hanging
|
||||||
doris_conn = DorisConnection(raw_conn, session_id, self.security_manager)
|
try:
|
||||||
|
raw_conn = await asyncio.wait_for(self.pool.acquire(), timeout=10.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.logger.error(f"Connection acquisition timed out for session {session_id}")
|
||||||
|
# Try one recovery attempt
|
||||||
|
await self._recover_pool_with_lock()
|
||||||
|
if self.pool and not self.pool.closed:
|
||||||
|
try:
|
||||||
|
raw_conn = await asyncio.wait_for(self.pool.acquire(), timeout=5.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise RuntimeError("Connection acquisition timed out after recovery")
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Connection acquisition timed out")
|
||||||
|
|
||||||
# Simple validation - just check if connection is open
|
# Wrap in DorisConnection
|
||||||
if raw_conn.closed:
|
doris_conn = DorisConnection(raw_conn, session_id, self.security_manager)
|
||||||
raise RuntimeError("Acquired connection is already closed")
|
|
||||||
|
|
||||||
self.logger.debug(f"✅ Acquired fresh connection for session {session_id}")
|
# Basic validation - check if connection is open
|
||||||
return doris_conn
|
if raw_conn.closed:
|
||||||
|
# Return connection and raise error
|
||||||
|
try:
|
||||||
|
self.pool.release(raw_conn)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
raise RuntimeError("Acquired connection is already closed")
|
||||||
|
|
||||||
except Exception as e:
|
self.logger.debug(f"✅ Acquired fresh connection for session {session_id}")
|
||||||
self.logger.error(f"Failed to get connection for session {session_id}: {e}")
|
return doris_conn
|
||||||
raise
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Failed to get connection for session {session_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
async def release_connection(self, session_id: str, connection: DorisConnection):
|
async def release_connection(self, session_id: str, connection: DorisConnection):
|
||||||
"""Release connection back to pool - Simplified Strategy"""
|
"""🔧 FIX: Release connection back to pool with proper error handling"""
|
||||||
|
if not connection or not connection.connection:
|
||||||
|
self.logger.debug(f"No connection to release for session {session_id}")
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if connection and connection.connection:
|
# Check pool availability before attempting release
|
||||||
# Simple strategy: always return to pool
|
if not self.pool or self.pool.closed:
|
||||||
if not connection.connection.closed:
|
self.logger.warning(f"Pool unavailable during release for session {session_id}, force closing connection")
|
||||||
self.pool.release(connection.connection)
|
try:
|
||||||
self.logger.debug(f"✅ Released connection for session {session_id}")
|
await connection.connection.ensure_closed()
|
||||||
else:
|
except Exception:
|
||||||
self.logger.debug(f"Connection already closed for session {session_id}")
|
pass
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check connection state before release
|
||||||
|
if connection.connection.closed:
|
||||||
|
self.logger.debug(f"Connection already closed for session {session_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 🔧 FIX: Simplified release operation without thread wrapper
|
||||||
|
try:
|
||||||
|
self.pool.release(connection.connection)
|
||||||
|
self.logger.debug(f"✅ Released connection for session {session_id}")
|
||||||
|
except Exception as release_error:
|
||||||
|
self.logger.warning(f"Connection release failed for session {session_id}: {release_error}, force closing")
|
||||||
|
await connection.connection.ensure_closed()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error releasing connection for session {session_id}: {e}")
|
self.logger.error(f"Error releasing connection for session {session_id}: {e}")
|
||||||
# Force close if release fails
|
# Force close if release fails
|
||||||
try:
|
try:
|
||||||
if connection and connection.connection:
|
await connection.connection.ensure_closed()
|
||||||
await connection.connection.ensure_closed()
|
except Exception as close_error:
|
||||||
except Exception:
|
self.logger.debug(f"Error force closing connection: {close_error}")
|
||||||
pass
|
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
"""Close connection manager"""
|
"""Close connection manager"""
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -56,16 +56,31 @@ class SecurityAnalyticsTools:
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
|
# 🚀 PROGRESS: Initialize security analysis
|
||||||
|
logger.info("=" * 70)
|
||||||
|
logger.info(f"🔒 Starting Data Access Pattern Analysis")
|
||||||
|
logger.info(f"📅 Analysis period: {days} days")
|
||||||
|
logger.info(f"👥 Include system users: {include_system_users}")
|
||||||
|
logger.info(f"🎯 Min query threshold: {min_query_threshold}")
|
||||||
|
logger.info("=" * 70)
|
||||||
|
|
||||||
connection = await self.connection_manager.get_connection("query")
|
connection = await self.connection_manager.get_connection("query")
|
||||||
|
|
||||||
# Define analysis period
|
# Define analysis period
|
||||||
end_date = datetime.now()
|
end_date = datetime.now()
|
||||||
start_date = end_date - timedelta(days=days)
|
start_date = end_date - timedelta(days=days)
|
||||||
|
|
||||||
# 1. Get audit log data
|
logger.info(f"📊 Period: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
|
||||||
|
|
||||||
|
# 🚀 PROGRESS: Step 1 - Get audit log data
|
||||||
|
logger.info("📋 Step 1/5: Retrieving audit log data...")
|
||||||
|
audit_start = time.time()
|
||||||
audit_data = await self._get_audit_log_data(connection, start_date, end_date, include_system_users)
|
audit_data = await self._get_audit_log_data(connection, start_date, end_date, include_system_users)
|
||||||
|
audit_time = time.time() - audit_start
|
||||||
|
|
||||||
if not audit_data:
|
if not audit_data:
|
||||||
|
logger.warning("⚠️ No audit data available for the specified period")
|
||||||
return {
|
return {
|
||||||
"error": "No audit data available for the specified period",
|
"error": "No audit data available for the specified period",
|
||||||
"analysis_period": {
|
"analysis_period": {
|
||||||
@@ -75,25 +90,49 @@ class SecurityAnalyticsTools:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# 2. Analyze user access patterns
|
logger.info(f"✅ Retrieved {len(audit_data)} audit records in {audit_time:.2f}s")
|
||||||
|
|
||||||
|
# 🚀 PROGRESS: Step 2 - Analyze user access patterns
|
||||||
|
logger.info("👤 Step 2/5: Analyzing user access patterns...")
|
||||||
|
user_start = time.time()
|
||||||
user_access_analysis = await self._analyze_user_access_patterns(
|
user_access_analysis = await self._analyze_user_access_patterns(
|
||||||
audit_data, min_query_threshold
|
audit_data, min_query_threshold
|
||||||
)
|
)
|
||||||
|
user_time = time.time() - user_start
|
||||||
|
logger.info(f"✅ Analyzed {len(user_access_analysis)} users in {user_time:.2f}s")
|
||||||
|
|
||||||
# 3. Analyze role-based access
|
# 🚀 PROGRESS: Step 3 - Analyze role-based access
|
||||||
|
logger.info("🎭 Step 3/5: Analyzing role-based access patterns...")
|
||||||
|
role_start = time.time()
|
||||||
role_access_analysis = await self._analyze_role_access_patterns(
|
role_access_analysis = await self._analyze_role_access_patterns(
|
||||||
connection, user_access_analysis
|
connection, user_access_analysis
|
||||||
)
|
)
|
||||||
|
role_time = time.time() - role_start
|
||||||
|
logger.info(f"✅ Role analysis completed in {role_time:.2f}s")
|
||||||
|
|
||||||
# 4. Detect security anomalies
|
# 🚀 PROGRESS: Step 4 - Detect security anomalies
|
||||||
|
logger.info("🚨 Step 4/5: Detecting security anomalies...")
|
||||||
|
anomaly_start = time.time()
|
||||||
security_alerts = await self._detect_security_anomalies(
|
security_alerts = await self._detect_security_anomalies(
|
||||||
audit_data, user_access_analysis
|
audit_data, user_access_analysis
|
||||||
)
|
)
|
||||||
|
anomaly_time = time.time() - anomaly_start
|
||||||
|
logger.info(f"✅ Found {len(security_alerts)} security alerts in {anomaly_time:.2f}s")
|
||||||
|
|
||||||
# 5. Generate access insights
|
# Log alert summary
|
||||||
|
if security_alerts:
|
||||||
|
high_alerts = sum(1 for alert in security_alerts if alert.get("severity") == "high")
|
||||||
|
medium_alerts = sum(1 for alert in security_alerts if alert.get("severity") == "medium")
|
||||||
|
logger.info(f"🚨 Alert breakdown: {high_alerts} high, {medium_alerts} medium")
|
||||||
|
|
||||||
|
# 🚀 PROGRESS: Step 5 - Generate access insights
|
||||||
|
logger.info("💡 Step 5/5: Generating access insights...")
|
||||||
|
insights_start = time.time()
|
||||||
access_insights = await self._generate_access_insights(
|
access_insights = await self._generate_access_insights(
|
||||||
user_access_analysis, role_access_analysis
|
user_access_analysis, role_access_analysis
|
||||||
)
|
)
|
||||||
|
insights_time = time.time() - insights_start
|
||||||
|
logger.info(f"✅ Access insights generated in {insights_time:.2f}s")
|
||||||
|
|
||||||
execution_time = time.time() - start_time
|
execution_time = time.time() - start_time
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ build-backend = "hatchling.build"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "doris-mcp-server"
|
name = "doris-mcp-server"
|
||||||
version = "0.5.0"
|
version = "0.5.1"
|
||||||
description = "Enterprise-grade Model Context Protocol (MCP) server implementation for Apache Doris"
|
description = "Enterprise-grade Model Context Protocol (MCP) server implementation for Apache Doris"
|
||||||
authors = [
|
authors = [
|
||||||
{name = "Yijia Su", email = "freeoneplus@apache.org"}
|
{name = "Yijia Su", email = "freeoneplus@apache.org"}
|
||||||
|
|||||||
Reference in New Issue
Block a user