v0.4.1 preview
This commit is contained in:
@@ -142,11 +142,22 @@ class DorisConnection:
|
||||
self.is_healthy = False
|
||||
return False
|
||||
|
||||
# Check if connection has _reader (aiomysql internal state)
|
||||
# This prevents the 'NoneType' object has no attribute 'at_eof' error
|
||||
if not hasattr(self.connection, '_reader') or self.connection._reader is None:
|
||||
self.is_healthy = False
|
||||
return False
|
||||
|
||||
# Additional check for reader's state
|
||||
if hasattr(self.connection._reader, '_transport') and self.connection._reader._transport is None:
|
||||
self.is_healthy = False
|
||||
return False
|
||||
|
||||
# Try to ping the connection
|
||||
await self.connection.ping()
|
||||
self.is_healthy = True
|
||||
return True
|
||||
except Exception as e:
|
||||
except (AttributeError, OSError, ConnectionError, Exception) as e:
|
||||
# Log the specific error for debugging
|
||||
logging.debug(f"Connection ping failed for session {self.session_id}: {e}")
|
||||
self.is_healthy = False
|
||||
@@ -309,15 +320,34 @@ class DorisConnectionManager:
|
||||
if session_id in self.session_connections:
|
||||
conn = self.session_connections[session_id]
|
||||
try:
|
||||
# Return connection to pool
|
||||
if self.pool and conn.connection and not conn.connection.closed:
|
||||
self.pool.release(conn.connection)
|
||||
# Return connection to pool only if it's valid and not closed
|
||||
if (self.pool and
|
||||
conn.connection and
|
||||
not conn.connection.closed and
|
||||
hasattr(conn.connection, '_reader') and
|
||||
conn.connection._reader is not None):
|
||||
try:
|
||||
# Try to gracefully return to pool
|
||||
self.pool.release(conn.connection)
|
||||
except Exception as pool_error:
|
||||
self.logger.debug(f"Failed to return connection to pool for session {session_id}: {pool_error}")
|
||||
# If pool release fails, try to close the connection directly
|
||||
try:
|
||||
await conn.connection.ensure_closed()
|
||||
except Exception:
|
||||
pass # Ignore errors during forced close
|
||||
|
||||
# Close connection wrapper
|
||||
await conn.close()
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error cleaning up connection for session {session_id}: {e}")
|
||||
# Force close if normal cleanup fails
|
||||
try:
|
||||
if conn.connection and not conn.connection.closed:
|
||||
await conn.connection.ensure_closed()
|
||||
except Exception:
|
||||
pass # Ignore errors during forced close
|
||||
finally:
|
||||
# Remove from session connections
|
||||
del self.session_connections[session_id]
|
||||
@@ -339,12 +369,26 @@ class DorisConnectionManager:
|
||||
try:
|
||||
unhealthy_sessions = []
|
||||
|
||||
# First pass: check basic connectivity
|
||||
for session_id, conn in self.session_connections.items():
|
||||
if not await conn.ping():
|
||||
unhealthy_sessions.append(session_id)
|
||||
|
||||
# Clean up unhealthy connections
|
||||
for session_id in unhealthy_sessions:
|
||||
# Second pass: check for stale connections (over 30 minutes old)
|
||||
current_time = datetime.utcnow()
|
||||
stale_sessions = []
|
||||
for session_id, conn in self.session_connections.items():
|
||||
if session_id not in unhealthy_sessions: # Don't double-check
|
||||
last_used_delta = (current_time - conn.last_used).total_seconds()
|
||||
if last_used_delta > 1800: # 30 minutes
|
||||
# Force a ping check for stale connections
|
||||
if not await conn.ping():
|
||||
stale_sessions.append(session_id)
|
||||
|
||||
all_problematic_sessions = list(set(unhealthy_sessions + stale_sessions))
|
||||
|
||||
# Clean up problematic connections
|
||||
for session_id in all_problematic_sessions:
|
||||
await self._cleanup_session_connection(session_id)
|
||||
self.metrics.failed_connections += 1
|
||||
|
||||
@@ -352,11 +396,19 @@ class DorisConnectionManager:
|
||||
await self._update_connection_metrics()
|
||||
self.metrics.last_health_check = datetime.utcnow()
|
||||
|
||||
if unhealthy_sessions:
|
||||
self.logger.warning(f"Cleaned up {len(unhealthy_sessions)} unhealthy connections")
|
||||
if all_problematic_sessions:
|
||||
self.logger.warning(f"Health check: cleaned up {len(unhealthy_sessions)} unhealthy and {len(stale_sessions)} stale connections")
|
||||
else:
|
||||
self.logger.debug(f"Health check: all {len(self.session_connections)} connections healthy")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Health check failed: {e}")
|
||||
# If health check fails, try to diagnose the issue
|
||||
try:
|
||||
diagnosis = await self.diagnose_connection_health()
|
||||
self.logger.error(f"Connection diagnosis: {diagnosis}")
|
||||
except Exception:
|
||||
pass # Don't let diagnosis failure crash health check
|
||||
|
||||
async def _cleanup_loop(self):
|
||||
"""Background cleanup loop"""
|
||||
@@ -463,6 +515,93 @@ class DorisConnectionManager:
|
||||
self.logger.error(f"Connection test failed: {e}")
|
||||
return False
|
||||
|
||||
async def diagnose_connection_health(self) -> Dict[str, Any]:
|
||||
"""Diagnose connection pool and session health"""
|
||||
diagnosis = {
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"pool_status": "unknown",
|
||||
"session_connections": {},
|
||||
"problematic_connections": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
try:
|
||||
# Check pool status
|
||||
if not self.pool:
|
||||
diagnosis["pool_status"] = "not_initialized"
|
||||
diagnosis["recommendations"].append("Initialize connection pool")
|
||||
return diagnosis
|
||||
|
||||
if self.pool.closed:
|
||||
diagnosis["pool_status"] = "closed"
|
||||
diagnosis["recommendations"].append("Recreate connection pool")
|
||||
return diagnosis
|
||||
|
||||
diagnosis["pool_status"] = "healthy"
|
||||
diagnosis["pool_info"] = {
|
||||
"size": self.pool.size,
|
||||
"free_size": self.pool.freesize,
|
||||
"min_size": self.pool.minsize,
|
||||
"max_size": self.pool.maxsize
|
||||
}
|
||||
|
||||
# Check session connections
|
||||
problematic_sessions = []
|
||||
for session_id, conn in self.session_connections.items():
|
||||
conn_status = {
|
||||
"session_id": session_id,
|
||||
"created_at": conn.created_at.isoformat(),
|
||||
"last_used": conn.last_used.isoformat(),
|
||||
"query_count": conn.query_count,
|
||||
"is_healthy": conn.is_healthy
|
||||
}
|
||||
|
||||
# Detailed connection checks
|
||||
if conn.connection:
|
||||
conn_status["connection_closed"] = conn.connection.closed
|
||||
conn_status["has_reader"] = hasattr(conn.connection, '_reader') and conn.connection._reader is not None
|
||||
|
||||
if hasattr(conn.connection, '_reader') and conn.connection._reader:
|
||||
conn_status["reader_transport"] = conn.connection._reader._transport is not None
|
||||
else:
|
||||
conn_status["reader_transport"] = False
|
||||
else:
|
||||
conn_status["connection_closed"] = True
|
||||
conn_status["has_reader"] = False
|
||||
conn_status["reader_transport"] = False
|
||||
|
||||
# Check if connection is problematic
|
||||
if (not conn.is_healthy or
|
||||
conn_status["connection_closed"] or
|
||||
not conn_status["has_reader"] or
|
||||
not conn_status["reader_transport"]):
|
||||
problematic_sessions.append(session_id)
|
||||
diagnosis["problematic_connections"].append(conn_status)
|
||||
|
||||
diagnosis["session_connections"][session_id] = conn_status
|
||||
|
||||
# Generate recommendations
|
||||
if problematic_sessions:
|
||||
diagnosis["recommendations"].append(f"Clean up {len(problematic_sessions)} problematic connections")
|
||||
|
||||
if self.pool.freesize == 0 and self.pool.size >= self.pool.maxsize:
|
||||
diagnosis["recommendations"].append("Connection pool exhausted - consider increasing max_connections")
|
||||
|
||||
# Auto-cleanup problematic connections
|
||||
for session_id in problematic_sessions:
|
||||
try:
|
||||
await self._cleanup_session_connection(session_id)
|
||||
self.logger.info(f"Auto-cleaned problematic connection for session: {session_id}")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to auto-clean session {session_id}: {e}")
|
||||
|
||||
return diagnosis
|
||||
|
||||
except Exception as e:
|
||||
diagnosis["error"] = str(e)
|
||||
diagnosis["recommendations"].append("Manual intervention required")
|
||||
return diagnosis
|
||||
|
||||
|
||||
class ConnectionPoolMonitor:
|
||||
"""Connection pool monitor
|
||||
|
||||
Reference in New Issue
Block a user