2 Commits
0.5.0 ... 0.5.1

Author SHA1 Message Date
Yijia Su
6d3c128f54 0.5.1 Version (#28)
0.5.1 Version (#28)
2025-07-15 11:56:46 +08:00
Yijia Su
651d524814 [BUG]Optimize and fix the capabilities of 0.5.0 tools (#26)
1. **Unified Naming for CLI Arguments and Environment Variables** 
- All database-related CLI arguments now use the `--doris-*` prefix, and environment variables use `DORIS_*` for consistency and maintainability. 
- Backward compatibility: old `--db-*` arguments are still supported.

2. **Automatic Filtering of System SQL in Slow Query TopN** 
- Slow query analysis now automatically excludes SQL statements involving `__internal_schema`, `information_schema`, and `mysql` system databases, ensuring only business-related slow queries are counted. 
- Filtering is performed at the SQL level using `NOT LIKE` and `state != 'ERR'` for efficiency and safety.

3. **Unified Query Timeout Configuration** 
- If no `timeout` is specified for query execution, the system will use the `config.performance.query_timeout` value as the default, falling back to 30 seconds if not configured.
- This avoids hardcoding and makes timeout management more flexible.

4. **Tool execution optimization**
- Significantly reduce the execution time of some data governance and operation and maintenance tools
- Optimize execution logic and reduce data scanning
- Enable concurrent scanning to speed up retrieval

5. **Log system optimization**
- Fix the Console log printing logic and output the log content correctly
- Add advanced tool execution process log output to facilitate further positioning of error locations

6. **DB Connection optimization**
- Fixed a connection pool acquisition exception caused by deadlock

7. **Other Improvements**
- Help documentation and CLI examples updated to reflect new and legacy parameter compatibility.
- Code comments and documentation further standardized for better team collaboration and open-source community understanding.
2025-07-14 19:04:11 +08:00
11 changed files with 2488 additions and 1471 deletions

View File

@@ -133,7 +133,7 @@ ALERT_WEBHOOK_URL=
# Basic server information
SERVER_NAME=doris-mcp-server
SERVER_VERSION=0.5.0
SERVER_VERSION=0.5.1
SERVER_PORT=3000
# Temporary files directory

View File

@@ -21,7 +21,7 @@ under the License.
Doris MCP (Model Context Protocol) Server is a backend service built with Python and FastAPI. It implements the MCP, allowing clients to interact with it through defined "Tools". It's primarily designed to connect to Apache Doris databases, potentially leveraging Large Language Models (LLMs) for tasks like converting natural language queries to SQL (NL2SQL), executing queries, and performing metadata management and analysis.
## 🚀 What's New in v0.5.0
## 🚀 What's New in v0.5.1
- **🔥 Critical at_eof Connection Fix**: **Complete elimination of at_eof connection pool errors** through redesigned connection pool strategy with zero minimum connections, intelligent health monitoring, automatic retry mechanisms, and self-healing pool recovery - achieving 99.9% connection stability improvement
- **🔧 Revolutionary Logging System**: **Enterprise-grade logging overhaul** with level-based file separation (debug, info, warning, error, critical), automatic cleanup scheduler with 30-day retention, millisecond precision timestamps, dedicated audit trails, and zero-maintenance log management
@@ -32,8 +32,13 @@ Doris MCP (Model Context Protocol) Server is a backend service built with Python
- **⚙️ Enhanced Configuration Management**: Complete ADBC configuration system with environment variable support, dynamic tool registration, and intelligent parameter validation
- **🔒 Security & Compatibility Improvements**: Resolved pandas JSON serialization issues, enhanced enterprise security integration, and maintained full backward compatibility with v0.4.x versions
- **🎯 Modular Architecture**: 6 new specialized tool modules for enterprise analytics with comprehensive English documentation and robust error handling
- **🕒 Global SQL Timeout Configuration Enhancement**: Unified global SQL timeout control via `config/performance/query_timeout`. All SQL executions now use this value by default, with runtime override supported. This ensures consistent timeout behavior across all entry points (MCP tools, API, batch queries, etc.).
- **Bug Fixes for Timeout Application**: Fixed issues where some SQL executions did not correctly apply the global timeout configuration. Now, all SQL executions are consistently controlled by the global timeout setting.
- **Improved Robustness**: Optimized the timeout propagation chain in core classes like `QueryRequest` and `DorisQueryExecutor`, preventing timeout failures due to missing parameters.
- **Documentation & Configuration Updates**: Updated documentation and configuration instructions to clarify the priority and scope of the timeout configuration.
- **Other Bug Fixes & Optimizations**: Various known bug fixes and detail optimizations for improved stability and reliability.
> **🚀 Major Milestone**: This release establishes v0.5.0 as a **production-ready enterprise data governance platform** with **critical stability improvements** (complete at_eof fix + intelligent logging), 23 total tools (14 existing + 7 analytics + 2 ADBC tools), and enterprise-grade system reliability - representing a major advancement in both data intelligence capabilities and operational stability.
> **🚀 Major Milestone**: This release establishes v0.5.1 as a **production-ready enterprise data governance platform** with **critical stability improvements** (complete at_eof fix + intelligent logging + unified SQL timeout), 25 total tools (15 existing + 8 analytics + 2 ADBC tools), and enterprise-grade system reliability - representing a major advancement in both data intelligence capabilities and operational stability.
## Core Features

View File

@@ -618,6 +618,11 @@ Transport Modes:
Examples:
python -m doris_mcp_server --transport stdio
python -m doris_mcp_server --transport http --host 0.0.0.0 --port 3000
python -m doris_mcp_server --transport stdio --doris-host localhost --doris-port 9030
python -m doris_mcp_server --transport http --doris-user admin --doris-database test_db
# Backward compatibility: --db-* parameters are also supported
python -m doris_mcp_server --transport stdio --db-host localhost --db-port 9030
"""
)
@@ -641,26 +646,26 @@ Examples:
)
parser.add_argument(
"--db-host",
"--doris-host", "--db-host",
type=str,
default=os.getenv("DB_HOST", _default_config.database.host),
default=os.getenv("DORIS_HOST", _default_config.database.host),
help=f"Doris database host address (default: {_default_config.database.host})",
)
parser.add_argument(
"--db-port", type=int, default=os.getenv("DB_PORT", _default_config.database.port), help=f"Doris database port number (default: {_default_config.database.port})"
"--doris-port", "--db-port", type=int, default=os.getenv("DORIS_PORT", _default_config.database.port), help=f"Doris database port number (default: {_default_config.database.port})"
)
parser.add_argument(
"--db-user", type=str, default=os.getenv("DB_USER", _default_config.database.user), help=f"Doris database username (default: {_default_config.database.user})"
"--doris-user", "--db-user", type=str, default=os.getenv("DORIS_USER", _default_config.database.user), help=f"Doris database username (default: {_default_config.database.user})"
)
parser.add_argument("--db-password", type=str, default="", help="Doris database password")
parser.add_argument("--doris-password", "--db-password", type=str, default=os.getenv("DORIS_PASSWORD", ""), help="Doris database password")
parser.add_argument(
"--db-database",
"--doris-database", "--db-database",
type=str,
default=os.getenv("DB_DATABASE", _default_config.database.database),
default=os.getenv("DORIS_DATABASE", _default_config.database.database),
help=f"Doris database name (default: {_default_config.database.database})",
)
@@ -684,16 +689,19 @@ async def main():
config = DorisConfig.from_env() # First load from .env file and environment variables
# Command line arguments override configuration (if provided)
if args.db_host != _default_config.database.host: # If not default value, use command line argument
config.database.host = args.db_host
if args.db_port != _default_config.database.port:
config.database.port = args.db_port
if args.db_user != _default_config.database.user:
config.database.user = args.db_user
if args.db_password: # Use password if provided
config.database.password = args.db_password
if args.db_database != _default_config.database.database:
config.database.database = args.db_database
# 🔧 FIX: Set transport from command line arguments
config.transport = args.transport
if args.doris_host != _default_config.database.host: # If not default value, use command line argument
config.database.host = args.doris_host
if args.doris_port != _default_config.database.port:
config.database.port = args.doris_port
if args.doris_user != _default_config.database.user:
config.database.user = args.doris_user
if args.doris_password: # Use password if provided
config.database.password = args.doris_password
if args.doris_database != _default_config.database.database:
config.database.database = args.doris_database
if args.log_level != _default_config.logging.level:
config.logging.level = args.log_level

View File

@@ -61,7 +61,7 @@ class DorisToolsManager:
# Initialize v0.5.0 advanced analytics tools
self.data_governance_tools = DataGovernanceTools(connection_manager)
self.data_exploration_tools = DataExplorationTools(connection_manager)
self.data_quality_tools = DataQualityTools(connection_manager)
self.data_quality_tools = DataQualityTools(connection_manager, connection_manager.config)
self.security_analytics_tools = SecurityAnalyticsTools(connection_manager)
self.dependency_analysis_tools = DependencyAnalysisTools(connection_manager)
self.performance_analytics_tools = PerformanceAnalyticsTools(connection_manager)
@@ -464,41 +464,87 @@ class DorisToolsManager:
# 🔄 Unified Data Quality Analysis Tool (New in v0.5.0)
@mcp.tool(
"analyze_data_quality",
description="""[Function Description]: Comprehensive data quality analysis combining completeness and distribution analysis.
"get_table_basic_info",
description="""[Function Description]: Get basic information about a table including row count, column count, partitions, and size.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to analyze
- analysis_scope (string) [Optional] - Analysis scope, default is "comprehensive"
* "completeness": Only completeness analysis (null rates, business rules)
* "distribution": Only distribution analysis (statistical patterns)
* "comprehensive": Full analysis including both completeness and distribution
- catalog_name (string) [Optional] - Target catalog name
- db_name (string) [Optional] - Target database name
""",
)
async def get_table_basic_info_tool(
table_name: str,
catalog_name: str = None,
db_name: str = None
) -> str:
"""Get table basic information"""
return await self.call_tool("get_table_basic_info", {
"table_name": table_name,
"catalog_name": catalog_name,
"db_name": db_name
})
@mcp.tool(
"analyze_columns",
description="""[Function Description]: Analyze completeness and distribution of specified columns in a table.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to analyze
- columns (array) [Required] - List of column names to analyze
- analysis_types (array) [Optional] - Types of analysis to perform, default is ["both"]
* "completeness": Only completeness analysis (null rates, non-null counts)
* "distribution": Only distribution analysis (statistical patterns by data type)
* "both": Both completeness and distribution analysis
- sample_size (integer) [Optional] - Maximum number of rows to sample, default is 100000
- include_all_columns (boolean) [Optional] - Whether to analyze all columns, default is false
- business_rules (array) [Optional] - Business rule validations in format [{"rule_name": "email_format", "sql_condition": "email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"}]
- catalog_name (string) [Optional] - Target catalog name
- db_name (string) [Optional] - Target database name
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
""",
)
async def analyze_data_quality_tool(
async def analyze_columns_tool(
table_name: str,
analysis_scope: str = "comprehensive",
columns: List[str],
analysis_types: List[str] = None,
sample_size: int = 100000,
include_all_columns: bool = False,
business_rules: List[dict] = None,
catalog_name: str = None,
db_name: str = None,
detailed_response: bool = False
) -> str:
"""Unified data quality analysis tool"""
return await self.call_tool("analyze_data_quality", {
"""Analyze table columns"""
return await self.call_tool("analyze_columns", {
"table_name": table_name,
"analysis_scope": analysis_scope,
"columns": columns,
"analysis_types": analysis_types or ["both"],
"sample_size": sample_size,
"include_all_columns": include_all_columns,
"business_rules": business_rules,
"catalog_name": catalog_name,
"db_name": db_name,
"detailed_response": detailed_response
})
@mcp.tool(
"analyze_table_storage",
description="""[Function Description]: Analyze table's physical distribution and storage information.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to analyze
- catalog_name (string) [Optional] - Target catalog name
- db_name (string) [Optional] - Target database name
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
""",
)
async def analyze_table_storage_tool(
table_name: str,
catalog_name: str = None,
db_name: str = None,
detailed_response: bool = False
) -> str:
"""Analyze table storage"""
return await self.call_tool("analyze_table_storage", {
"table_name": table_name,
"catalog_name": catalog_name,
"db_name": db_name,
"detailed_response": detailed_response
@@ -721,7 +767,7 @@ No parameters required. Returns connection status, configuration, and diagnostic
"""Get ADBC connection information and status"""
return await self.call_tool("get_adbc_connection_info", {})
logger.info("Successfully registered 23 tools to MCP server (14 basic + 7 advanced analytics + 2 ADBC tools)")
logger.info("Successfully registered 25 tools to MCP server (14 basic + 9 advanced analytics + 2 ADBC tools)")
async def list_tools(self) -> List[Tool]:
"""List all available query tools (for stdio mode)"""
@@ -1064,20 +1110,14 @@ No parameters required. Returns connection status, configuration, and diagnostic
},
),
# ==================== v0.5.0 Advanced Analytics Tools ====================
# Atomic Data Quality Analysis Tools
Tool(
name="analyze_data_quality",
description="""[Function Description]: Comprehensive data quality analysis combining completeness and distribution analysis.
name="get_table_basic_info",
description="""[Function Description]: Get basic information about a table including row count, column count, partitions, and size.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to analyze
- analysis_scope (string) [Optional] - Analysis scope, default is "comprehensive"
* "completeness": Only completeness analysis (null rates, business rules)
* "distribution": Only distribution analysis (statistical patterns)
* "comprehensive": Full analysis including both completeness and distribution
- sample_size (integer) [Optional] - Maximum number of rows to sample, default is 100000
- include_all_columns (boolean) [Optional] - Whether to analyze all columns, default is false
- business_rules (array) [Optional] - Business rule validations in format [{"rule_name": "email_format", "sql_condition": "email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"}]
- catalog_name (string) [Optional] - Target catalog name
- db_name (string) [Optional] - Target database name
""",
@@ -1085,10 +1125,58 @@ No parameters required. Returns connection status, configuration, and diagnostic
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Name of the table to analyze"},
"analysis_scope": {"type": "string", "enum": ["completeness", "distribution", "comprehensive"], "description": "Analysis scope", "default": "comprehensive"},
"catalog_name": {"type": "string", "description": "Target catalog name"},
"db_name": {"type": "string", "description": "Target database name"},
},
"required": ["table_name"],
},
),
Tool(
name="analyze_columns",
description="""[Function Description]: Analyze completeness and distribution of specified columns in a table.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to analyze
- columns (array) [Required] - List of column names to analyze
- analysis_types (array) [Optional] - Types of analysis to perform, default is ["both"]
* "completeness": Only completeness analysis (null rates, non-null counts)
* "distribution": Only distribution analysis (statistical patterns by data type)
* "both": Both completeness and distribution analysis
- sample_size (integer) [Optional] - Maximum number of rows to sample, default is 100000
- catalog_name (string) [Optional] - Target catalog name
- db_name (string) [Optional] - Target database name
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
""",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Name of the table to analyze"},
"columns": {"type": "array", "items": {"type": "string"}, "description": "List of column names to analyze"},
"analysis_types": {"type": "array", "items": {"type": "string", "enum": ["completeness", "distribution", "both"]}, "description": "Types of analysis to perform", "default": ["both"]},
"sample_size": {"type": "integer", "description": "Maximum number of rows to sample", "default": 100000},
"include_all_columns": {"type": "boolean", "description": "Whether to analyze all columns", "default": False},
"business_rules": {"type": "array", "items": {"type": "object"}, "description": "Business rule validations"},
"catalog_name": {"type": "string", "description": "Target catalog name"},
"db_name": {"type": "string", "description": "Target database name"},
"detailed_response": {"type": "boolean", "description": "Whether to return detailed response including raw data", "default": False},
},
"required": ["table_name", "columns"],
},
),
Tool(
name="analyze_table_storage",
description="""[Function Description]: Analyze table's physical distribution and storage information.
[Parameter Content]:
- table_name (string) [Required] - Name of the table to analyze
- catalog_name (string) [Optional] - Target catalog name
- db_name (string) [Optional] - Target database name
- detailed_response (boolean) [Optional] - Whether to return detailed response including raw data, default is false
""",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Name of the table to analyze"},
"catalog_name": {"type": "string", "description": "Target catalog name"},
"db_name": {"type": "string", "description": "Target database name"},
"detailed_response": {"type": "boolean", "description": "Whether to return detailed response including raw data", "default": False},
@@ -1096,7 +1184,6 @@ No parameters required. Returns connection status, configuration, and diagnostic
"required": ["table_name"],
},
),
Tool(
name="trace_column_lineage",
description="""[Function Description]: Trace data lineage for specified columns through SQL analysis and dependency mapping.
@@ -1323,9 +1410,13 @@ No parameters required. Returns connection status, configuration, and diagnostic
elif name == "get_historical_memory_stats":
arguments["data_type"] = "historical"
result = await self._get_memory_stats_tool(arguments)
# v0.5.0 Advanced Analytics Tools
elif name == "analyze_data_quality":
result = await self._analyze_data_quality_tool(arguments)
# v0.5.0 Advanced Analytics Tools - Atomic Data Quality Tools
elif name == "get_table_basic_info":
result = await self._get_table_basic_info_tool(arguments)
elif name == "analyze_columns":
result = await self._analyze_columns_tool(arguments)
elif name == "analyze_table_storage":
result = await self._analyze_table_storage_tool(arguments)
elif name == "trace_column_lineage":
result = await self._trace_column_lineage_tool(arguments)
elif name == "monitor_data_freshness":
@@ -1595,26 +1686,46 @@ No parameters required. Returns connection status, configuration, and diagnostic
# ==================== v0.5.0 Advanced Analytics Tools Private Methods ====================
async def _analyze_data_quality_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Unified data quality analysis tool routing"""
async def _get_table_basic_info_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Get table basic information tool routing"""
try:
# Extract parameters
table_name = arguments.get("table_name")
analysis_scope = arguments.get("analysis_scope", "comprehensive")
catalog_name = arguments.get("catalog_name")
db_name = arguments.get("db_name")
# Delegate to atomic data quality tools
result = await self.data_quality_tools.get_table_basic_info(
table_name=table_name,
catalog_name=catalog_name,
db_name=db_name
)
return result
except Exception as e:
return {
"error": str(e),
"analysis_type": "table_basic_info",
"timestamp": datetime.now().isoformat()
}
async def _analyze_columns_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze columns tool routing"""
try:
table_name = arguments.get("table_name")
columns = arguments.get("columns")
analysis_types = arguments.get("analysis_types", ["both"])
sample_size = arguments.get("sample_size", 100000)
include_all_columns = arguments.get("include_all_columns", False)
business_rules = arguments.get("business_rules", [])
catalog_name = arguments.get("catalog_name")
db_name = arguments.get("db_name")
detailed_response = arguments.get("detailed_response", False)
# Delegate to the unified data quality tools
result = await self.data_quality_tools.analyze_data_quality(
# Delegate to atomic data quality tools
result = await self.data_quality_tools.analyze_columns(
table_name=table_name,
analysis_scope=analysis_scope,
columns=columns,
analysis_types=analysis_types,
sample_size=sample_size,
include_all_columns=include_all_columns,
business_rules=business_rules,
catalog_name=catalog_name,
db_name=db_name,
detailed_response=detailed_response
@@ -1625,7 +1736,32 @@ No parameters required. Returns connection status, configuration, and diagnostic
except Exception as e:
return {
"error": str(e),
"analysis_type": "unified_data_quality",
"analysis_type": "columns_analysis",
"timestamp": datetime.now().isoformat()
}
async def _analyze_table_storage_tool(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze table storage tool routing"""
try:
table_name = arguments.get("table_name")
catalog_name = arguments.get("catalog_name")
db_name = arguments.get("db_name")
detailed_response = arguments.get("detailed_response", False)
# Delegate to atomic data quality tools
result = await self.data_quality_tools.analyze_table_storage(
table_name=table_name,
catalog_name=catalog_name,
db_name=db_name,
detailed_response=detailed_response
)
return result
except Exception as e:
return {
"error": str(e),
"analysis_type": "table_storage_analysis",
"timestamp": datetime.now().isoformat()
}

View File

@@ -137,6 +137,33 @@ class PerformanceConfig:
max_response_content_size: int = 4096
@dataclass
class DataQualityConfig:
"""Data quality analysis configuration"""
# Column analysis configuration
max_columns_per_batch: int = 20 # Maximum columns to analyze in a single batch
default_sample_size: int = 100000 # Default sample size for analysis
# Sampling strategy configuration
small_table_threshold: int = 100000 # Tables smaller than this use full table analysis
medium_table_threshold: int = 1000000 # Tables smaller than this use simple LIMIT sampling
# Tables larger than medium_table_threshold use systematic sampling
# Performance optimization
enable_batch_analysis: bool = True # Enable batch analysis for multiple columns
batch_timeout: int = 300 # Timeout for batch analysis in seconds
# Accuracy vs Performance trade-off
enable_fast_mode: bool = False # Use approximate algorithms for faster results
fast_mode_sample_size: int = 10000 # Sample size for fast mode
# Statistical analysis configuration
enable_distribution_analysis: bool = True # Enable distribution analysis
histogram_bins: int = 20 # Number of bins for histogram analysis
percentile_levels: list[float] = field(default_factory=lambda: [0.25, 0.5, 0.75, 0.95, 0.99]) # Percentile levels to calculate
@dataclass
class ADBCConfig:
"""ADBC (Arrow Flight SQL) configuration"""
@@ -208,6 +235,7 @@ class DorisConfig:
database: DatabaseConfig = field(default_factory=DatabaseConfig)
security: SecurityConfig = field(default_factory=SecurityConfig)
performance: PerformanceConfig = field(default_factory=PerformanceConfig)
data_quality: DataQualityConfig = field(default_factory=DataQualityConfig)
logging: LoggingConfig = field(default_factory=LoggingConfig)
monitoring: MonitoringConfig = field(default_factory=MonitoringConfig)
adbc: ADBCConfig = field(default_factory=ADBCConfig)
@@ -404,6 +432,38 @@ class DorisConfig:
os.getenv("ADBC_ENABLED", str(config.adbc.enabled).lower()).lower() == "true"
)
# Data quality configuration
config.data_quality.max_columns_per_batch = int(
os.getenv("DATA_QUALITY_MAX_COLUMNS_PER_BATCH", str(config.data_quality.max_columns_per_batch))
)
config.data_quality.default_sample_size = int(
os.getenv("DATA_QUALITY_DEFAULT_SAMPLE_SIZE", str(config.data_quality.default_sample_size))
)
config.data_quality.small_table_threshold = int(
os.getenv("DATA_QUALITY_SMALL_TABLE_THRESHOLD", str(config.data_quality.small_table_threshold))
)
config.data_quality.medium_table_threshold = int(
os.getenv("DATA_QUALITY_MEDIUM_TABLE_THRESHOLD", str(config.data_quality.medium_table_threshold))
)
config.data_quality.enable_batch_analysis = (
os.getenv("DATA_QUALITY_ENABLE_BATCH_ANALYSIS", str(config.data_quality.enable_batch_analysis).lower()).lower() == "true"
)
config.data_quality.batch_timeout = int(
os.getenv("DATA_QUALITY_BATCH_TIMEOUT", str(config.data_quality.batch_timeout))
)
config.data_quality.enable_fast_mode = (
os.getenv("DATA_QUALITY_ENABLE_FAST_MODE", str(config.data_quality.enable_fast_mode).lower()).lower() == "true"
)
config.data_quality.fast_mode_sample_size = int(
os.getenv("DATA_QUALITY_FAST_MODE_SAMPLE_SIZE", str(config.data_quality.fast_mode_sample_size))
)
config.data_quality.enable_distribution_analysis = (
os.getenv("DATA_QUALITY_ENABLE_DISTRIBUTION_ANALYSIS", str(config.data_quality.enable_distribution_analysis).lower()).lower() == "true"
)
config.data_quality.histogram_bins = int(
os.getenv("DATA_QUALITY_HISTOGRAM_BINS", str(config.data_quality.histogram_bins))
)
# Server configuration
config.server_name = os.getenv("SERVER_NAME", config.server_name)
config.server_version = os.getenv("SERVER_VERSION", config.server_version)
@@ -443,6 +503,13 @@ class DorisConfig:
if hasattr(config.performance, key):
setattr(config.performance, key, value)
# Update data quality configuration
if "data_quality" in config_data:
dq_config = config_data["data_quality"]
for key, value in dq_config.items():
if hasattr(config.data_quality, key):
setattr(config.data_quality, key, value)
# Update logging configuration
if "logging" in config_data:
log_config = config_data["logging"]
@@ -516,6 +583,19 @@ class DorisConfig:
"idle_timeout": self.performance.idle_timeout,
"max_response_content_size": self.performance.max_response_content_size,
},
"data_quality": {
"max_columns_per_batch": self.data_quality.max_columns_per_batch,
"default_sample_size": self.data_quality.default_sample_size,
"small_table_threshold": self.data_quality.small_table_threshold,
"medium_table_threshold": self.data_quality.medium_table_threshold,
"enable_batch_analysis": self.data_quality.enable_batch_analysis,
"batch_timeout": self.data_quality.batch_timeout,
"enable_fast_mode": self.data_quality.enable_fast_mode,
"fast_mode_sample_size": self.data_quality.fast_mode_sample_size,
"enable_distribution_analysis": self.data_quality.enable_distribution_analysis,
"histogram_bins": self.data_quality.histogram_bins,
"percentile_levels": self.data_quality.percentile_levels,
},
"logging": {
"level": self.logging.level,
"format": self.logging.format,
@@ -602,6 +682,31 @@ class DorisConfig:
if self.performance.query_timeout <= 0:
errors.append("Query timeout must be greater than 0")
# Validate data quality configuration
if self.data_quality.max_columns_per_batch <= 0:
errors.append("Max columns per batch must be greater than 0")
if self.data_quality.default_sample_size <= 0:
errors.append("Default sample size must be greater than 0")
if self.data_quality.small_table_threshold <= 0:
errors.append("Small table threshold must be greater than 0")
if self.data_quality.medium_table_threshold <= 0:
errors.append("Medium table threshold must be greater than 0")
if self.data_quality.small_table_threshold >= self.data_quality.medium_table_threshold:
errors.append("Small table threshold must be less than medium table threshold")
if self.data_quality.batch_timeout <= 0:
errors.append("Batch timeout must be greater than 0")
if self.data_quality.fast_mode_sample_size <= 0:
errors.append("Fast mode sample size must be greater than 0")
if self.data_quality.histogram_bins <= 0:
errors.append("Histogram bins must be greater than 0")
# Validate logging configuration
if self.logging.level not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:
errors.append("Log level must be one of DEBUG, INFO, WARNING, ERROR, or CRITICAL")

View File

@@ -59,29 +59,57 @@ class DataGovernanceTools:
"""
try:
start_time = time.time()
# 🚀 PROGRESS: Initialize column lineage tracing
logger.info("=" * 60)
logger.info(f"🔍 Starting Column Lineage Tracing")
logger.info(f"📊 Target: {table_name}.{column_name}")
logger.info(f"🎯 Trace depth: {depth}")
logger.info("=" * 60)
connection = await self.connection_manager.get_connection("query")
full_table_name = self._build_full_table_name(table_name, catalog_name, db_name)
target_column = f"{full_table_name}.{column_name}"
# 1. Verify target column exists
logger.info(f"📝 Full target: {target_column}")
# 🚀 PROGRESS: Step 1 - Verify target column exists
logger.info("🔍 Step 1/4: Verifying target column exists...")
verify_start = time.time()
if not await self._verify_column_exists(connection, full_table_name, column_name):
logger.error(f"❌ Column {column_name} not found in table {full_table_name}")
return {"error": f"Column {column_name} not found in table {full_table_name}"}
# 2. Analyze SQL logs to get lineage relationships
verify_time = time.time() - verify_start
logger.info(f"✅ Column verified in {verify_time:.2f}s")
# 🚀 PROGRESS: Step 2 - Analyze SQL logs for lineage relationships
logger.info(f"📊 Step 2/4: Analyzing SQL logs for lineage (depth={depth})...")
lineage_start = time.time()
source_chain = await self._analyze_sql_logs_for_lineage(
connection, full_table_name, column_name, depth
)
lineage_time = time.time() - lineage_start
logger.info(f"✅ Found {len(source_chain)} lineage relationships in {lineage_time:.2f}s")
# 3. Analyze downstream usage
# 🚀 PROGRESS: Step 3 - Analyze downstream usage
logger.info("⬇️ Step 3/4: Analyzing downstream column usage...")
downstream_start = time.time()
downstream_usage = await self._analyze_downstream_column_usage(
connection, full_table_name, column_name
)
downstream_time = time.time() - downstream_start
logger.info(f"✅ Found {len(downstream_usage)} downstream usages in {downstream_time:.2f}s")
# 4. Analyze field transformation rules
# 🚀 PROGRESS: Step 4 - Extract transformation rules
logger.info("🔄 Step 4/4: Extracting transformation rules...")
transform_start = time.time()
transformation_rules = await self._extract_transformation_rules(
connection, full_table_name, column_name
)
transform_time = time.time() - transform_start
logger.info(f"✅ Found {len(transformation_rules)} transformation rules in {transform_time:.2f}s")
execution_time = time.time() - start_time

File diff suppressed because it is too large Load Diff

View File

@@ -192,33 +192,35 @@ class DorisConnection:
class DorisConnectionManager:
"""Doris database connection manager - Simplified Strategy
"""Doris database connection manager - Enhanced Strategy
Uses direct connection pool management without session-level caching
Uses direct connection pool management with proper synchronization
Implements connection pool health monitoring and proactive cleanup
"""
def __init__(self, config, security_manager=None):
self.config = config
self.security_manager = security_manager
self.pool: Pool | None = None
self.logger = get_logger(__name__)
self.metrics = ConnectionMetrics()
self.security_manager = security_manager
# Remove session-level connection management
# self.session_connections = {} # REMOVED
# Pool health monitoring
self.health_check_interval = 30 # seconds
self.pool_warmup_size = 3 # connections to maintain
# Connection pool state management
self.pool_recovering = False
self.pool_health_check_task = None
self.pool_cleanup_task = None
# Pool recovery lock to prevent race conditions
self.pool_recovery_lock = asyncio.Lock()
self.pool_recovering = False
# Metrics tracking
self.metrics = ConnectionMetrics()
# 🔧 FIX: Add connection acquisition lock to prevent race conditions
self._connection_lock = asyncio.Lock()
self._recovery_lock = asyncio.Lock()
# 🔧 FIX: Add connection acquisition queue to serialize requests
self._connection_semaphore = asyncio.Semaphore(value=20) # Max concurrent acquisitions
# Database connection parameters from config.database
self.pool_recovery_lock = self._recovery_lock # Compatibility alias
self.host = config.database.host
self.port = config.database.port
self.user = config.database.user
@@ -231,9 +233,13 @@ class DorisConnectionManager:
# Connection pool parameters - more conservative settings
self.minsize = config.database.min_connections # This is always 0
self.maxsize = config.database.max_connections or 10
self.maxsize = config.database.max_connections or 20
self.pool_recycle = config.database.max_connection_age or 3600 # 1 hour, more conservative
# 🔧 FIX: Add missing monitoring parameters that were removed during refactoring
self.health_check_interval = 30 # seconds
self.pool_warmup_size = 3 # connections to maintain
async def initialize(self):
"""Initialize connection pool with health monitoring"""
try:
@@ -506,29 +512,36 @@ class DorisConnectionManager:
finally:
self.pool_recovering = False
async def get_connection(self, session_id: str) -> DorisConnection:
"""Get database connection - Simplified Strategy with pool validation
async def _recover_pool_with_lock(self):
"""🔧 FIX: Recovery method that uses the new recovery lock to prevent races"""
async with self._recovery_lock:
if not self.pool_recovering: # Only recover if not already in progress
await self._recover_pool()
Always acquire fresh connection from pool, no session caching
async def get_connection(self, session_id: str) -> DorisConnection:
"""🔧 FIX: Simplified connection acquisition without double locking
Uses only semaphore to prevent too many concurrent acquisitions
"""
# 🔧 FIX: Use only semaphore to limit concurrent acquisitions (remove double locking)
async with self._connection_semaphore:
try:
# Wait for any ongoing recovery to complete
if self.pool_recovering:
self.logger.debug(f"Pool recovery in progress, waiting for completion...")
# Wait for recovery to complete (max 10 seconds)
for _ in range(10):
if not self.pool_recovering:
break
await asyncio.sleep(0.5)
start_wait = time.time()
while self.pool_recovering and (time.time() - start_wait) < 10:
await asyncio.sleep(0.1) # More frequent checks
if self.pool_recovering:
self.logger.error("Pool recovery is taking too long, proceeding anyway")
# Don't raise error, try to continue
# Continue but log the issue
# Check if pool is available
if not self.pool:
self.logger.warning("Connection pool is not available, attempting recovery...")
await self._recover_pool()
await self._recover_pool_with_lock()
if not self.pool:
raise RuntimeError("Connection pool is not available and recovery failed")
@@ -536,19 +549,36 @@ class DorisConnectionManager:
# Check if pool is closed
if self.pool.closed:
self.logger.warning("Connection pool is closed, attempting recovery...")
await self._recover_pool()
await self._recover_pool_with_lock()
if not self.pool or self.pool.closed:
raise RuntimeError("Connection pool is closed and recovery failed")
# Simple strategy: always get fresh connection from pool
raw_conn = await self.pool.acquire()
# 🔧 FIX: Increased timeout to prevent hanging
try:
raw_conn = await asyncio.wait_for(self.pool.acquire(), timeout=10.0)
except asyncio.TimeoutError:
self.logger.error(f"Connection acquisition timed out for session {session_id}")
# Try one recovery attempt
await self._recover_pool_with_lock()
if self.pool and not self.pool.closed:
try:
raw_conn = await asyncio.wait_for(self.pool.acquire(), timeout=5.0)
except asyncio.TimeoutError:
raise RuntimeError("Connection acquisition timed out after recovery")
else:
raise RuntimeError("Connection acquisition timed out")
# Wrap in DorisConnection
doris_conn = DorisConnection(raw_conn, session_id, self.security_manager)
# Simple validation - just check if connection is open
# Basic validation - check if connection is open
if raw_conn.closed:
# Return connection and raise error
try:
self.pool.release(raw_conn)
except Exception:
pass
raise RuntimeError("Acquired connection is already closed")
self.logger.debug(f"✅ Acquired fresh connection for session {session_id}")
@@ -559,24 +589,41 @@ class DorisConnectionManager:
raise
async def release_connection(self, session_id: str, connection: DorisConnection):
"""Release connection back to pool - Simplified Strategy"""
"""🔧 FIX: Release connection back to pool with proper error handling"""
if not connection or not connection.connection:
self.logger.debug(f"No connection to release for session {session_id}")
return
try:
# Check pool availability before attempting release
if not self.pool or self.pool.closed:
self.logger.warning(f"Pool unavailable during release for session {session_id}, force closing connection")
try:
await connection.connection.ensure_closed()
except Exception:
pass
return
# Check connection state before release
if connection.connection.closed:
self.logger.debug(f"Connection already closed for session {session_id}")
return
# 🔧 FIX: Simplified release operation without thread wrapper
try:
if connection and connection.connection:
# Simple strategy: always return to pool
if not connection.connection.closed:
self.pool.release(connection.connection)
self.logger.debug(f"✅ Released connection for session {session_id}")
else:
self.logger.debug(f"Connection already closed for session {session_id}")
except Exception as release_error:
self.logger.warning(f"Connection release failed for session {session_id}: {release_error}, force closing")
await connection.connection.ensure_closed()
except Exception as e:
self.logger.error(f"Error releasing connection for session {session_id}: {e}")
# Force close if release fails
try:
if connection and connection.connection:
await connection.connection.ensure_closed()
except Exception:
pass
except Exception as close_error:
self.logger.debug(f"Error force closing connection: {close_error}")
async def close(self):
"""Close connection manager"""

File diff suppressed because it is too large Load Diff

View File

@@ -56,16 +56,31 @@ class SecurityAnalyticsTools:
"""
try:
start_time = time.time()
# 🚀 PROGRESS: Initialize security analysis
logger.info("=" * 70)
logger.info(f"🔒 Starting Data Access Pattern Analysis")
logger.info(f"📅 Analysis period: {days} days")
logger.info(f"👥 Include system users: {include_system_users}")
logger.info(f"🎯 Min query threshold: {min_query_threshold}")
logger.info("=" * 70)
connection = await self.connection_manager.get_connection("query")
# Define analysis period
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# 1. Get audit log data
logger.info(f"📊 Period: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
# 🚀 PROGRESS: Step 1 - Get audit log data
logger.info("📋 Step 1/5: Retrieving audit log data...")
audit_start = time.time()
audit_data = await self._get_audit_log_data(connection, start_date, end_date, include_system_users)
audit_time = time.time() - audit_start
if not audit_data:
logger.warning("⚠️ No audit data available for the specified period")
return {
"error": "No audit data available for the specified period",
"analysis_period": {
@@ -75,25 +90,49 @@ class SecurityAnalyticsTools:
}
}
# 2. Analyze user access patterns
logger.info(f"✅ Retrieved {len(audit_data)} audit records in {audit_time:.2f}s")
# 🚀 PROGRESS: Step 2 - Analyze user access patterns
logger.info("👤 Step 2/5: Analyzing user access patterns...")
user_start = time.time()
user_access_analysis = await self._analyze_user_access_patterns(
audit_data, min_query_threshold
)
user_time = time.time() - user_start
logger.info(f"✅ Analyzed {len(user_access_analysis)} users in {user_time:.2f}s")
# 3. Analyze role-based access
# 🚀 PROGRESS: Step 3 - Analyze role-based access
logger.info("🎭 Step 3/5: Analyzing role-based access patterns...")
role_start = time.time()
role_access_analysis = await self._analyze_role_access_patterns(
connection, user_access_analysis
)
role_time = time.time() - role_start
logger.info(f"✅ Role analysis completed in {role_time:.2f}s")
# 4. Detect security anomalies
# 🚀 PROGRESS: Step 4 - Detect security anomalies
logger.info("🚨 Step 4/5: Detecting security anomalies...")
anomaly_start = time.time()
security_alerts = await self._detect_security_anomalies(
audit_data, user_access_analysis
)
anomaly_time = time.time() - anomaly_start
logger.info(f"✅ Found {len(security_alerts)} security alerts in {anomaly_time:.2f}s")
# 5. Generate access insights
# Log alert summary
if security_alerts:
high_alerts = sum(1 for alert in security_alerts if alert.get("severity") == "high")
medium_alerts = sum(1 for alert in security_alerts if alert.get("severity") == "medium")
logger.info(f"🚨 Alert breakdown: {high_alerts} high, {medium_alerts} medium")
# 🚀 PROGRESS: Step 5 - Generate access insights
logger.info("💡 Step 5/5: Generating access insights...")
insights_start = time.time()
access_insights = await self._generate_access_insights(
user_access_analysis, role_access_analysis
)
insights_time = time.time() - insights_start
logger.info(f"✅ Access insights generated in {insights_time:.2f}s")
execution_time = time.time() - start_time

View File

@@ -20,7 +20,7 @@ build-backend = "hatchling.build"
[project]
name = "doris-mcp-server"
version = "0.5.0"
version = "0.5.1"
description = "Enterprise-grade Model Context Protocol (MCP) server implementation for Apache Doris"
authors = [
{name = "Yijia Su", email = "freeoneplus@apache.org"}