[BUG]Further fix the at_eof problem caused by aiomysql (#9)
* fix at_eof bug * update uv.lock
This commit is contained in:
@@ -210,7 +210,8 @@ class DorisConnectionManager:
|
||||
if not self.config.database.password:
|
||||
self.logger.warning("Database password is empty, this may cause connection issues")
|
||||
|
||||
# Create connection pool with additional parameters for stability
|
||||
# Create connection pool with improved stability parameters
|
||||
# Key change: Set minsize=0 to avoid pre-creation issues that cause at_eof errors
|
||||
self.pool = await aiomysql.create_pool(
|
||||
host=self.config.database.host,
|
||||
port=self.config.database.port,
|
||||
@@ -218,22 +219,22 @@ class DorisConnectionManager:
|
||||
password=self.config.database.password,
|
||||
db=self.config.database.database,
|
||||
charset="utf8",
|
||||
minsize=self.config.database.min_connections or 5,
|
||||
minsize=0, # Avoid pre-creation issues - create connections on demand
|
||||
maxsize=self.config.database.max_connections or 20,
|
||||
autocommit=True,
|
||||
connect_timeout=self.connection_timeout,
|
||||
# Additional parameters for stability
|
||||
pool_recycle=3600, # Recycle connections every hour
|
||||
# Enhanced stability parameters
|
||||
pool_recycle=7200, # Recycle connections every 2 hours
|
||||
echo=False, # Don't echo SQL statements
|
||||
)
|
||||
|
||||
# Test the connection pool
|
||||
if not await self.test_connection():
|
||||
raise RuntimeError("Connection pool test failed")
|
||||
# Test the connection pool with a more robust test
|
||||
if not await self._robust_connection_test():
|
||||
raise RuntimeError("Connection pool robust test failed")
|
||||
|
||||
self.logger.info(
|
||||
f"Connection pool initialized successfully, min connections: {self.config.database.min_connections}, "
|
||||
f"max connections: {self.config.database.max_connections}"
|
||||
f"Connection pool initialized successfully with on-demand connection creation, "
|
||||
f"max connections: {self.config.database.max_connections or 20}"
|
||||
)
|
||||
|
||||
# Start background monitoring tasks
|
||||
@@ -252,63 +253,178 @@ class DorisConnectionManager:
|
||||
self.pool = None
|
||||
raise
|
||||
|
||||
async def _robust_connection_test(self) -> bool:
|
||||
"""Perform a robust connection test that validates full connection health"""
|
||||
max_retries = 3
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
self.logger.debug(f"Testing connection pool (attempt {attempt + 1}/{max_retries})")
|
||||
|
||||
# Test connection creation and validation
|
||||
test_conn = await self._create_raw_connection_with_validation()
|
||||
if test_conn:
|
||||
# Test basic query execution
|
||||
async with test_conn.cursor() as cursor:
|
||||
await cursor.execute("SELECT 1")
|
||||
result = await cursor.fetchone()
|
||||
if result and result[0] == 1:
|
||||
self.logger.debug("Connection pool test successful")
|
||||
# Return connection to pool
|
||||
if self.pool:
|
||||
self.pool.release(test_conn)
|
||||
return True
|
||||
else:
|
||||
self.logger.warning("Connection test query returned unexpected result")
|
||||
|
||||
# Close test connection if we get here
|
||||
await test_conn.ensure_closed()
|
||||
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Connection test attempt {attempt + 1} failed: {e}")
|
||||
if attempt == max_retries - 1:
|
||||
self.logger.error("All connection test attempts failed")
|
||||
return False
|
||||
else:
|
||||
# Wait before retry
|
||||
await asyncio.sleep(1.0 * (attempt + 1))
|
||||
|
||||
return False
|
||||
|
||||
async def _create_raw_connection_with_validation(self, max_retries: int = 3):
|
||||
"""Create a raw connection with comprehensive validation"""
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
if not self.pool:
|
||||
raise RuntimeError("Connection pool not initialized")
|
||||
|
||||
# Acquire connection from pool
|
||||
raw_connection = await self.pool.acquire()
|
||||
|
||||
# Basic connection validation
|
||||
if not raw_connection:
|
||||
self.logger.warning(f"Pool returned None connection (attempt {attempt + 1})")
|
||||
continue
|
||||
|
||||
if raw_connection.closed:
|
||||
self.logger.warning(f"Pool returned closed connection (attempt {attempt + 1})")
|
||||
continue
|
||||
|
||||
# Perform a simple ping test instead of checking internal state
|
||||
# Internal state (_reader, _transport) might not be fully initialized yet
|
||||
try:
|
||||
# Test basic connectivity with a simple query
|
||||
async with raw_connection.cursor() as cursor:
|
||||
await cursor.execute("SELECT 1")
|
||||
result = await cursor.fetchone()
|
||||
if result and result[0] == 1:
|
||||
self.logger.debug(f"Successfully created and validated raw connection (attempt {attempt + 1})")
|
||||
return raw_connection
|
||||
else:
|
||||
self.logger.warning(f"Connection test query failed (attempt {attempt + 1})")
|
||||
await raw_connection.ensure_closed()
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
# Check if this is an at_eof error specifically
|
||||
error_str = str(e).lower()
|
||||
if 'at_eof' in error_str or 'nonetype' in error_str:
|
||||
self.logger.warning(f"Connection has at_eof issue (attempt {attempt + 1}): {e}")
|
||||
else:
|
||||
self.logger.warning(f"Connection test failed (attempt {attempt + 1}): {e}")
|
||||
|
||||
try:
|
||||
await raw_connection.ensure_closed()
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Raw connection creation attempt {attempt + 1} failed: {e}")
|
||||
if attempt == max_retries - 1:
|
||||
raise RuntimeError(f"Failed to create valid connection after {max_retries} attempts: {e}")
|
||||
else:
|
||||
# Exponential backoff
|
||||
await asyncio.sleep(0.5 * (2 ** attempt))
|
||||
|
||||
raise RuntimeError("Failed to create valid connection")
|
||||
|
||||
async def get_connection(self, session_id: str) -> DorisConnection:
|
||||
"""Get database connection
|
||||
"""Get database connection with enhanced reliability
|
||||
|
||||
Supports session-level connection reuse to improve performance and consistency
|
||||
"""
|
||||
# Check if there's an existing session connection
|
||||
if session_id in self.session_connections:
|
||||
conn = self.session_connections[session_id]
|
||||
# Check connection health
|
||||
if await conn.ping():
|
||||
# Enhanced connection health check
|
||||
if await self._comprehensive_connection_health_check(conn):
|
||||
return conn
|
||||
else:
|
||||
# Connection is unhealthy, clean up and create new one
|
||||
self.logger.debug(f"Existing connection unhealthy for session {session_id}, creating new one")
|
||||
await self._cleanup_session_connection(session_id)
|
||||
|
||||
# Create new connection
|
||||
return await self._create_new_connection(session_id)
|
||||
# Create new connection with retry logic
|
||||
return await self._create_new_connection_with_retry(session_id)
|
||||
|
||||
async def _create_new_connection(self, session_id: str) -> DorisConnection:
|
||||
"""Create new database connection"""
|
||||
async def _comprehensive_connection_health_check(self, conn: DorisConnection) -> bool:
|
||||
"""Perform comprehensive connection health check"""
|
||||
try:
|
||||
if not self.pool:
|
||||
raise RuntimeError("Connection pool not initialized")
|
||||
|
||||
# Get connection from pool
|
||||
raw_connection = await self.pool.acquire()
|
||||
# Check basic connection state
|
||||
if not conn.connection or conn.connection.closed:
|
||||
return False
|
||||
|
||||
# Validate the raw connection
|
||||
if not raw_connection:
|
||||
raise RuntimeError(f"Failed to acquire connection from pool for session {session_id}")
|
||||
# Instead of checking internal state, perform a simple ping test
|
||||
# This is more reliable and less dependent on aiomysql internals
|
||||
if not await conn.ping():
|
||||
return False
|
||||
|
||||
# Verify the connection is not closed
|
||||
if raw_connection.closed:
|
||||
raise RuntimeError(f"Acquired connection is already closed for session {session_id}")
|
||||
return True
|
||||
|
||||
# Create wrapped connection
|
||||
doris_conn = DorisConnection(raw_connection, session_id, self.security_manager)
|
||||
|
||||
# Test the connection before storing it
|
||||
if not await doris_conn.ping():
|
||||
# If ping fails, release the connection and raise error
|
||||
if self.pool and raw_connection and not raw_connection.closed:
|
||||
self.pool.release(raw_connection)
|
||||
raise RuntimeError(f"New connection failed ping test for session {session_id}")
|
||||
|
||||
# Store in session connections
|
||||
self.session_connections[session_id] = doris_conn
|
||||
|
||||
self.metrics.total_connections += 1
|
||||
self.logger.debug(f"Created new connection for session: {session_id}")
|
||||
|
||||
return doris_conn
|
||||
|
||||
except Exception as e:
|
||||
self.metrics.connection_errors += 1
|
||||
self.logger.error(f"Failed to create connection for session {session_id}: {e}")
|
||||
raise
|
||||
# Check for at_eof errors specifically
|
||||
error_str = str(e).lower()
|
||||
if 'at_eof' in error_str:
|
||||
self.logger.debug(f"Connection health check failed with at_eof error: {e}")
|
||||
else:
|
||||
self.logger.debug(f"Connection health check failed: {e}")
|
||||
return False
|
||||
|
||||
async def _create_new_connection_with_retry(self, session_id: str, max_retries: int = 3) -> DorisConnection:
|
||||
"""Create new database connection with retry logic"""
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
# Get validated raw connection
|
||||
raw_connection = await self._create_raw_connection_with_validation()
|
||||
|
||||
# Create wrapped connection
|
||||
doris_conn = DorisConnection(raw_connection, session_id, self.security_manager)
|
||||
|
||||
# Comprehensive connection test
|
||||
if await self._comprehensive_connection_health_check(doris_conn):
|
||||
# Store in session connections
|
||||
self.session_connections[session_id] = doris_conn
|
||||
self.metrics.total_connections += 1
|
||||
self.logger.debug(f"Successfully created new connection for session: {session_id}")
|
||||
return doris_conn
|
||||
else:
|
||||
# Connection failed health check, clean up and retry
|
||||
self.logger.warning(f"New connection failed health check for session {session_id} (attempt {attempt + 1})")
|
||||
try:
|
||||
await doris_conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Connection creation attempt {attempt + 1} failed for session {session_id}: {e}")
|
||||
if attempt == max_retries - 1:
|
||||
self.metrics.connection_errors += 1
|
||||
raise RuntimeError(f"Failed to create connection for session {session_id} after {max_retries} attempts: {e}")
|
||||
else:
|
||||
# Exponential backoff
|
||||
await asyncio.sleep(0.5 * (2 ** attempt))
|
||||
|
||||
raise RuntimeError(f"Unexpected failure in connection creation for session {session_id}")
|
||||
|
||||
async def release_connection(self, session_id: str):
|
||||
"""Release session connection"""
|
||||
@@ -316,26 +432,47 @@ class DorisConnectionManager:
|
||||
await self._cleanup_session_connection(session_id)
|
||||
|
||||
async def _cleanup_session_connection(self, session_id: str):
|
||||
"""Clean up session connection"""
|
||||
"""Clean up session connection with enhanced safety"""
|
||||
if session_id in self.session_connections:
|
||||
conn = self.session_connections[session_id]
|
||||
try:
|
||||
# Return connection to pool only if it's valid and not closed
|
||||
# Simplified connection validation before returning to pool
|
||||
connection_healthy = False
|
||||
|
||||
if (self.pool and
|
||||
conn.connection and
|
||||
not conn.connection.closed and
|
||||
hasattr(conn.connection, '_reader') and
|
||||
conn.connection._reader is not None):
|
||||
not conn.connection.closed):
|
||||
|
||||
# Test if connection is still healthy with a simple check
|
||||
try:
|
||||
# Quick ping test to see if connection is usable
|
||||
async with conn.connection.cursor() as cursor:
|
||||
await cursor.execute("SELECT 1")
|
||||
await cursor.fetchone()
|
||||
connection_healthy = True
|
||||
except Exception as test_error:
|
||||
self.logger.debug(f"Connection health test failed for session {session_id}: {test_error}")
|
||||
connection_healthy = False
|
||||
|
||||
if connection_healthy:
|
||||
# Connection appears healthy, return to pool
|
||||
try:
|
||||
# Try to gracefully return to pool
|
||||
self.pool.release(conn.connection)
|
||||
self.logger.debug(f"Successfully returned connection to pool for session {session_id}")
|
||||
except Exception as pool_error:
|
||||
self.logger.debug(f"Failed to return connection to pool for session {session_id}: {pool_error}")
|
||||
# If pool release fails, try to close the connection directly
|
||||
try:
|
||||
await conn.connection.ensure_closed()
|
||||
except Exception:
|
||||
pass # Ignore errors during forced close
|
||||
pass
|
||||
else:
|
||||
# Connection is unhealthy, force close
|
||||
self.logger.debug(f"Connection unhealthy for session {session_id}, force closing")
|
||||
try:
|
||||
if conn.connection and not conn.connection.closed:
|
||||
await conn.connection.ensure_closed()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Close connection wrapper
|
||||
await conn.close()
|
||||
@@ -365,24 +502,24 @@ class DorisConnectionManager:
|
||||
self.logger.error(f"Health check error: {e}")
|
||||
|
||||
async def _perform_health_check(self):
|
||||
"""Perform health check"""
|
||||
"""Perform enhanced health check"""
|
||||
try:
|
||||
unhealthy_sessions = []
|
||||
|
||||
# First pass: check basic connectivity
|
||||
# Enhanced health check with comprehensive validation
|
||||
for session_id, conn in self.session_connections.items():
|
||||
if not await conn.ping():
|
||||
if not await self._comprehensive_connection_health_check(conn):
|
||||
unhealthy_sessions.append(session_id)
|
||||
|
||||
# Second pass: check for stale connections (over 30 minutes old)
|
||||
# Check for stale connections (over 30 minutes old)
|
||||
current_time = datetime.utcnow()
|
||||
stale_sessions = []
|
||||
for session_id, conn in self.session_connections.items():
|
||||
if session_id not in unhealthy_sessions: # Don't double-check
|
||||
last_used_delta = (current_time - conn.last_used).total_seconds()
|
||||
if last_used_delta > 1800: # 30 minutes
|
||||
# Force a ping check for stale connections
|
||||
if not await conn.ping():
|
||||
# Force a comprehensive health check for stale connections
|
||||
if not await self._comprehensive_connection_health_check(conn):
|
||||
stale_sessions.append(session_id)
|
||||
|
||||
all_problematic_sessions = list(set(unhealthy_sessions + stale_sessions))
|
||||
@@ -453,9 +590,29 @@ class DorisConnectionManager:
|
||||
async def execute_query(
|
||||
self, session_id: str, sql: str, params: tuple | None = None, auth_context=None
|
||||
) -> QueryResult:
|
||||
"""Execute query"""
|
||||
conn = await self.get_connection(session_id)
|
||||
return await conn.execute(sql, params, auth_context)
|
||||
"""Execute query with enhanced error handling and retry logic"""
|
||||
max_retries = 2
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
conn = await self.get_connection(session_id)
|
||||
return await conn.execute(sql, params, auth_context)
|
||||
except Exception as e:
|
||||
error_msg = str(e).lower()
|
||||
# Check for connection-related errors that warrant retry
|
||||
is_connection_error = any(keyword in error_msg for keyword in [
|
||||
'at_eof', 'connection', 'closed', 'nonetype', 'reader', 'transport'
|
||||
])
|
||||
|
||||
if is_connection_error and attempt < max_retries - 1:
|
||||
self.logger.warning(f"Connection error during query execution (attempt {attempt + 1}): {e}")
|
||||
# Clean up the problematic connection
|
||||
await self.release_connection(session_id)
|
||||
# Wait before retry
|
||||
await asyncio.sleep(0.5 * (attempt + 1))
|
||||
continue
|
||||
else:
|
||||
# Not a connection error or final retry - re-raise
|
||||
raise
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_connection_context(self, session_id: str):
|
||||
@@ -500,20 +657,8 @@ class DorisConnectionManager:
|
||||
self.logger.error(f"Error closing connection manager: {e}")
|
||||
|
||||
async def test_connection(self) -> bool:
|
||||
"""Test database connection"""
|
||||
try:
|
||||
if not self.pool:
|
||||
return False
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
async with conn.cursor() as cursor:
|
||||
await cursor.execute("SELECT 1")
|
||||
result = await cursor.fetchone()
|
||||
return result is not None
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Connection test failed: {e}")
|
||||
return False
|
||||
"""Test database connection using robust connection test"""
|
||||
return await self._robust_connection_test()
|
||||
|
||||
async def diagnose_connection_health(self) -> Dict[str, Any]:
|
||||
"""Diagnose connection pool and session health"""
|
||||
@@ -680,3 +825,5 @@ class ConnectionPoolMonitor:
|
||||
report["recommendations"].append("Connection pool utilization is high, consider increasing pool size")
|
||||
|
||||
return report
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user