From c1ce9a5cc7e1f73221bb9062069a8da5d4d8f4d9 Mon Sep 17 00:00:00 2001 From: Yijia Su <54164178+FreeOnePlus@users.noreply.github.com> Date: Wed, 2 Jul 2025 19:57:45 +0800 Subject: [PATCH] [Config]Delete the minimum data pool variable (#11) * fix at_eof bug * update uv.lock * fix bug and change pool min values --- .env.example | 3 +- doris_mcp_server/utils/config.py | 23 ++--- doris_mcp_server/utils/db.py | 7 +- doris_mcp_server/utils/query_executor.py | 119 ++++++++++------------- 4 files changed, 71 insertions(+), 81 deletions(-) diff --git a/.env.example b/.env.example index 0c632a1..03f9e47 100644 --- a/.env.example +++ b/.env.example @@ -28,7 +28,6 @@ DORIS_BE_WEBSERVER_PORT=8040 # Connection Pool Configuration # ============================================================================= -DORIS_MIN_CONNECTIONS=5 DORIS_MAX_CONNECTIONS=20 DORIS_CONNECTION_TIMEOUT=30 DORIS_HEALTH_CHECK_INTERVAL=60 @@ -86,5 +85,5 @@ ALERT_WEBHOOK_URL= # ============================================================================= SERVER_NAME=doris-mcp-server -SERVER_VERSION=0.4.1 +SERVER_VERSION=0.4.2 SERVER_PORT=3000 diff --git a/doris_mcp_server/utils/config.py b/doris_mcp_server/utils/config.py index 4f566b4..d48d118 100644 --- a/doris_mcp_server/utils/config.py +++ b/doris_mcp_server/utils/config.py @@ -53,12 +53,19 @@ class DatabaseConfig: be_webserver_port: int = 8040 # Connection pool configuration - min_connections: int = 5 + # Note: min_connections is fixed at 0 to avoid at_eof connection issues + # This prevents pre-creation of connections which can cause state problems + _min_connections: int = field(default=0, init=False) # Internal use only, always 0 max_connections: int = 20 connection_timeout: int = 30 health_check_interval: int = 60 max_connection_age: int = 3600 + @property + def min_connections(self) -> int: + """Minimum connections is always 0 to prevent at_eof issues""" + return self._min_connections + @dataclass class SecurityConfig: @@ -248,9 +255,6 @@ class DorisConfig: config.database.be_webserver_port = int(os.getenv("DORIS_BE_WEBSERVER_PORT", str(config.database.be_webserver_port))) # Connection pool configuration - config.database.min_connections = int( - os.getenv("DORIS_MIN_CONNECTIONS", str(config.database.min_connections)) - ) config.database.max_connections = int( os.getenv("DORIS_MAX_CONNECTIONS", str(config.database.max_connections)) ) @@ -414,7 +418,7 @@ class DorisConfig: "fe_http_port": self.database.fe_http_port, "be_hosts": self.database.be_hosts, "be_webserver_port": self.database.be_webserver_port, - "min_connections": self.database.min_connections, + "min_connections": self.database.min_connections, # Always 0, shown for reference "max_connections": self.database.max_connections, "connection_timeout": self.database.connection_timeout, "health_check_interval": self.database.health_check_interval, @@ -492,11 +496,8 @@ class DorisConfig: if not self.database.user: errors.append("Database username cannot be empty") - if self.database.min_connections <= 0: - errors.append("Minimum connections must be greater than 0") - - if self.database.max_connections <= self.database.min_connections: - errors.append("Maximum connections must be greater than minimum connections") + if self.database.max_connections <= 0: + errors.append("Maximum connections must be greater than 0") # Validate security configuration if self.security.auth_type not in ["token", "basic", "oauth"]: @@ -549,7 +550,7 @@ class DorisConfig: return { "server": f"{self.server_name} v{self.server_version}", "database": f"{self.database.host}:{self.database.port}/{self.database.database}", - "connection_pool": f"{self.database.min_connections}-{self.database.max_connections}", + "connection_pool": f"0-{self.database.max_connections} (min fixed at 0 for stability)", "security": { "auth_type": self.security.auth_type, "masking_enabled": self.security.enable_masking, diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py index f6cc9f5..e04b4df 100644 --- a/doris_mcp_server/utils/db.py +++ b/doris_mcp_server/utils/db.py @@ -219,7 +219,8 @@ class DorisConnectionManager: password=self.config.database.password, db=self.config.database.database, charset="utf8", - minsize=0, # Avoid pre-creation issues - create connections on demand + minsize=self.config.database.min_connections, # Always 0 per configuration to avoid at_eof issues + maxsize=self.config.database.max_connections or 20, autocommit=True, connect_timeout=self.connection_timeout, @@ -234,6 +235,8 @@ class DorisConnectionManager: self.logger.info( f"Connection pool initialized successfully with on-demand connection creation, " + f"min connections: {self.config.database.min_connections}, " + f"max connections: {self.config.database.max_connections or 20}" ) @@ -472,7 +475,7 @@ class DorisConnectionManager: if conn.connection and not conn.connection.closed: await conn.connection.ensure_closed() except Exception: - pass + pass # Ignore errors during forced close # Close connection wrapper await conn.close() diff --git a/doris_mcp_server/utils/query_executor.py b/doris_mcp_server/utils/query_executor.py index e7b736b..367dd9c 100644 --- a/doris_mcp_server/utils/query_executor.py +++ b/doris_mcp_server/utils/query_executor.py @@ -585,63 +585,57 @@ class DorisQueryExecutor: timeout=timeout, cache_enabled=False # Disable cache for MCP calls to ensure fresh data ) - + # Execute query with retry logic - try: - result = await self.execute_query(query_request, auth_context) - - # Serialize data for JSON response - serialized_data = [] - for row in result.data: - serialized_data.append(self._serialize_row_data(row)) + result = await self.execute_query(query_request, auth_context) + + # Serialize data for JSON response + serialized_data = [] + for row in result.data: + serialized_data.append(self._serialize_row_data(row)) - return { - "success": True, - "data": serialized_data, - "row_count": result.row_count, - "execution_time": result.execution_time, - "metadata": { - "columns": result.metadata.get("columns", []), - "query": sql - } + return { + "success": True, + "data": serialized_data, + "row_count": result.row_count, + "execution_time": result.execution_time, + "metadata": { + "columns": result.metadata.get("columns", []), + "query": sql } - - except Exception as query_error: - # Check if it's a connection-related error that we should retry - error_str = str(query_error).lower() - connection_errors = [ - "at_eof", "connection", "closed", "nonetype", - "transport", "reader", "broken pipe", "connection reset" - ] - - is_connection_error = any(err in error_str for err in connection_errors) - - if is_connection_error and retry_count < max_retries: - retry_count += 1 - self.logger.warning(f"Connection error detected, retrying ({retry_count}/{max_retries}): {query_error}") - - # Release the problematic connection - try: - await self.connection_manager.release_connection(session_id) - except Exception: - pass # Ignore cleanup errors - - # Wait a bit before retry - await asyncio.sleep(0.5 * retry_count) - continue - else: - # Re-raise if not a connection error or max retries exceeded - raise query_error - + } + except Exception as e: error_msg = str(e) + error_str = error_msg.lower() - # If we've exhausted retries or it's not a connection error, return error - if retry_count >= max_retries or "at_eof" not in error_msg.lower(): + # Check if it's a connection-related error that we should retry + connection_errors = [ + "at_eof", "connection", "closed", "nonetype", + "transport", "reader", "broken pipe", "connection reset" + ] + + is_connection_error = any(err in error_str for err in connection_errors) + + if is_connection_error and retry_count < max_retries: + retry_count += 1 + self.logger.warning(f"Connection error detected, retrying ({retry_count}/{max_retries}): {e}") + + # Release the problematic connection + try: + await self.connection_manager.release_connection(session_id) + except Exception: + pass # Ignore cleanup errors + + # Wait a bit before retry + await asyncio.sleep(0.5 * retry_count) + continue + else: + # If we've exhausted retries or it's not a connection error, return error error_analysis = self._analyze_error(error_msg) return { - "success": False, + "success": False, "error": error_analysis.get("user_message", error_msg), "error_type": error_analysis.get("error_type", "general_error"), "data": None, @@ -651,24 +645,17 @@ class DorisQueryExecutor: "retry_count": retry_count } } - else: - # Try one more time for connection errors - retry_count += 1 - if retry_count <= max_retries: - self.logger.warning(f"Retrying query due to connection error ({retry_count}/{max_retries}): {e}") - await asyncio.sleep(0.5 * retry_count) - continue - else: - return { - "success": False, - "error": f"Query failed after {max_retries} retries: {error_msg}", - "data": None, - "metadata": { - "query": sql, - "error_details": error_msg, - "retry_count": retry_count - } - } + + # This should never be reached, but just in case + return { + "success": False, + "error": "Maximum retries exceeded", + "data": None, + "metadata": { + "query": sql, + "retry_count": retry_count + } + } def _serialize_row_data(self, row_data: Dict[str, Any]) -> Dict[str, Any]: """Serialize row data for JSON response"""