[BUG]Optimize and fix the capabilities of 0.5.0 tools (#26)

1. **Unified Naming for CLI Arguments and Environment Variables** 
- All database-related CLI arguments now use the `--doris-*` prefix, and environment variables use `DORIS_*` for consistency and maintainability. 
- Backward compatibility: old `--db-*` arguments are still supported.

2. **Automatic Filtering of System SQL in Slow Query TopN** 
- Slow query analysis now automatically excludes SQL statements involving `__internal_schema`, `information_schema`, and `mysql` system databases, ensuring only business-related slow queries are counted. 
- Filtering is performed at the SQL level using `NOT LIKE` and `state != 'ERR'` for efficiency and safety.

3. **Unified Query Timeout Configuration** 
- If no `timeout` is specified for query execution, the system will use the `config.performance.query_timeout` value as the default, falling back to 30 seconds if not configured.
- This avoids hardcoding and makes timeout management more flexible.

4. **Tool execution optimization**
- Significantly reduce the execution time of some data governance and operation and maintenance tools
- Optimize execution logic and reduce data scanning
- Enable concurrent scanning to speed up retrieval

5. **Log system optimization**
- Fix the Console log printing logic and output the log content correctly
- Add advanced tool execution process log output to facilitate further positioning of error locations

6. **DB Connection optimization**
- Fixed a connection pool acquisition exception caused by deadlock

7. **Other Improvements**
- Help documentation and CLI examples updated to reflect new and legacy parameter compatibility.
- Code comments and documentation further standardized for better team collaboration and open-source community understanding.
This commit is contained in:
Yijia Su
2025-07-14 19:04:11 +08:00
committed by GitHub
parent 54572d0861
commit 651d524814
8 changed files with 2479 additions and 1467 deletions

View File

@@ -137,6 +137,33 @@ class PerformanceConfig:
max_response_content_size: int = 4096
@dataclass
class DataQualityConfig:
"""Data quality analysis configuration"""
# Column analysis configuration
max_columns_per_batch: int = 20 # Maximum columns to analyze in a single batch
default_sample_size: int = 100000 # Default sample size for analysis
# Sampling strategy configuration
small_table_threshold: int = 100000 # Tables smaller than this use full table analysis
medium_table_threshold: int = 1000000 # Tables smaller than this use simple LIMIT sampling
# Tables larger than medium_table_threshold use systematic sampling
# Performance optimization
enable_batch_analysis: bool = True # Enable batch analysis for multiple columns
batch_timeout: int = 300 # Timeout for batch analysis in seconds
# Accuracy vs Performance trade-off
enable_fast_mode: bool = False # Use approximate algorithms for faster results
fast_mode_sample_size: int = 10000 # Sample size for fast mode
# Statistical analysis configuration
enable_distribution_analysis: bool = True # Enable distribution analysis
histogram_bins: int = 20 # Number of bins for histogram analysis
percentile_levels: list[float] = field(default_factory=lambda: [0.25, 0.5, 0.75, 0.95, 0.99]) # Percentile levels to calculate
@dataclass
class ADBCConfig:
"""ADBC (Arrow Flight SQL) configuration"""
@@ -208,6 +235,7 @@ class DorisConfig:
database: DatabaseConfig = field(default_factory=DatabaseConfig)
security: SecurityConfig = field(default_factory=SecurityConfig)
performance: PerformanceConfig = field(default_factory=PerformanceConfig)
data_quality: DataQualityConfig = field(default_factory=DataQualityConfig)
logging: LoggingConfig = field(default_factory=LoggingConfig)
monitoring: MonitoringConfig = field(default_factory=MonitoringConfig)
adbc: ADBCConfig = field(default_factory=ADBCConfig)
@@ -404,6 +432,38 @@ class DorisConfig:
os.getenv("ADBC_ENABLED", str(config.adbc.enabled).lower()).lower() == "true"
)
# Data quality configuration
config.data_quality.max_columns_per_batch = int(
os.getenv("DATA_QUALITY_MAX_COLUMNS_PER_BATCH", str(config.data_quality.max_columns_per_batch))
)
config.data_quality.default_sample_size = int(
os.getenv("DATA_QUALITY_DEFAULT_SAMPLE_SIZE", str(config.data_quality.default_sample_size))
)
config.data_quality.small_table_threshold = int(
os.getenv("DATA_QUALITY_SMALL_TABLE_THRESHOLD", str(config.data_quality.small_table_threshold))
)
config.data_quality.medium_table_threshold = int(
os.getenv("DATA_QUALITY_MEDIUM_TABLE_THRESHOLD", str(config.data_quality.medium_table_threshold))
)
config.data_quality.enable_batch_analysis = (
os.getenv("DATA_QUALITY_ENABLE_BATCH_ANALYSIS", str(config.data_quality.enable_batch_analysis).lower()).lower() == "true"
)
config.data_quality.batch_timeout = int(
os.getenv("DATA_QUALITY_BATCH_TIMEOUT", str(config.data_quality.batch_timeout))
)
config.data_quality.enable_fast_mode = (
os.getenv("DATA_QUALITY_ENABLE_FAST_MODE", str(config.data_quality.enable_fast_mode).lower()).lower() == "true"
)
config.data_quality.fast_mode_sample_size = int(
os.getenv("DATA_QUALITY_FAST_MODE_SAMPLE_SIZE", str(config.data_quality.fast_mode_sample_size))
)
config.data_quality.enable_distribution_analysis = (
os.getenv("DATA_QUALITY_ENABLE_DISTRIBUTION_ANALYSIS", str(config.data_quality.enable_distribution_analysis).lower()).lower() == "true"
)
config.data_quality.histogram_bins = int(
os.getenv("DATA_QUALITY_HISTOGRAM_BINS", str(config.data_quality.histogram_bins))
)
# Server configuration
config.server_name = os.getenv("SERVER_NAME", config.server_name)
config.server_version = os.getenv("SERVER_VERSION", config.server_version)
@@ -443,6 +503,13 @@ class DorisConfig:
if hasattr(config.performance, key):
setattr(config.performance, key, value)
# Update data quality configuration
if "data_quality" in config_data:
dq_config = config_data["data_quality"]
for key, value in dq_config.items():
if hasattr(config.data_quality, key):
setattr(config.data_quality, key, value)
# Update logging configuration
if "logging" in config_data:
log_config = config_data["logging"]
@@ -516,6 +583,19 @@ class DorisConfig:
"idle_timeout": self.performance.idle_timeout,
"max_response_content_size": self.performance.max_response_content_size,
},
"data_quality": {
"max_columns_per_batch": self.data_quality.max_columns_per_batch,
"default_sample_size": self.data_quality.default_sample_size,
"small_table_threshold": self.data_quality.small_table_threshold,
"medium_table_threshold": self.data_quality.medium_table_threshold,
"enable_batch_analysis": self.data_quality.enable_batch_analysis,
"batch_timeout": self.data_quality.batch_timeout,
"enable_fast_mode": self.data_quality.enable_fast_mode,
"fast_mode_sample_size": self.data_quality.fast_mode_sample_size,
"enable_distribution_analysis": self.data_quality.enable_distribution_analysis,
"histogram_bins": self.data_quality.histogram_bins,
"percentile_levels": self.data_quality.percentile_levels,
},
"logging": {
"level": self.logging.level,
"format": self.logging.format,
@@ -602,6 +682,31 @@ class DorisConfig:
if self.performance.query_timeout <= 0:
errors.append("Query timeout must be greater than 0")
# Validate data quality configuration
if self.data_quality.max_columns_per_batch <= 0:
errors.append("Max columns per batch must be greater than 0")
if self.data_quality.default_sample_size <= 0:
errors.append("Default sample size must be greater than 0")
if self.data_quality.small_table_threshold <= 0:
errors.append("Small table threshold must be greater than 0")
if self.data_quality.medium_table_threshold <= 0:
errors.append("Medium table threshold must be greater than 0")
if self.data_quality.small_table_threshold >= self.data_quality.medium_table_threshold:
errors.append("Small table threshold must be less than medium table threshold")
if self.data_quality.batch_timeout <= 0:
errors.append("Batch timeout must be greater than 0")
if self.data_quality.fast_mode_sample_size <= 0:
errors.append("Fast mode sample size must be greater than 0")
if self.data_quality.histogram_bins <= 0:
errors.append("Histogram bins must be greater than 0")
# Validate logging configuration
if self.logging.level not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:
errors.append("Log level must be one of DEBUG, INFO, WARNING, ERROR, or CRITICAL")

View File

@@ -59,29 +59,57 @@ class DataGovernanceTools:
"""
try:
start_time = time.time()
# 🚀 PROGRESS: Initialize column lineage tracing
logger.info("=" * 60)
logger.info(f"🔍 Starting Column Lineage Tracing")
logger.info(f"📊 Target: {table_name}.{column_name}")
logger.info(f"🎯 Trace depth: {depth}")
logger.info("=" * 60)
connection = await self.connection_manager.get_connection("query")
full_table_name = self._build_full_table_name(table_name, catalog_name, db_name)
target_column = f"{full_table_name}.{column_name}"
# 1. Verify target column exists
logger.info(f"📝 Full target: {target_column}")
# 🚀 PROGRESS: Step 1 - Verify target column exists
logger.info("🔍 Step 1/4: Verifying target column exists...")
verify_start = time.time()
if not await self._verify_column_exists(connection, full_table_name, column_name):
logger.error(f"❌ Column {column_name} not found in table {full_table_name}")
return {"error": f"Column {column_name} not found in table {full_table_name}"}
# 2. Analyze SQL logs to get lineage relationships
verify_time = time.time() - verify_start
logger.info(f"✅ Column verified in {verify_time:.2f}s")
# 🚀 PROGRESS: Step 2 - Analyze SQL logs for lineage relationships
logger.info(f"📊 Step 2/4: Analyzing SQL logs for lineage (depth={depth})...")
lineage_start = time.time()
source_chain = await self._analyze_sql_logs_for_lineage(
connection, full_table_name, column_name, depth
)
lineage_time = time.time() - lineage_start
logger.info(f"✅ Found {len(source_chain)} lineage relationships in {lineage_time:.2f}s")
# 3. Analyze downstream usage
# 🚀 PROGRESS: Step 3 - Analyze downstream usage
logger.info("⬇️ Step 3/4: Analyzing downstream column usage...")
downstream_start = time.time()
downstream_usage = await self._analyze_downstream_column_usage(
connection, full_table_name, column_name
)
downstream_time = time.time() - downstream_start
logger.info(f"✅ Found {len(downstream_usage)} downstream usages in {downstream_time:.2f}s")
# 4. Analyze field transformation rules
# 🚀 PROGRESS: Step 4 - Extract transformation rules
logger.info("🔄 Step 4/4: Extracting transformation rules...")
transform_start = time.time()
transformation_rules = await self._extract_transformation_rules(
connection, full_table_name, column_name
)
transform_time = time.time() - transform_start
logger.info(f"✅ Found {len(transformation_rules)} transformation rules in {transform_time:.2f}s")
execution_time = time.time() - start_time

File diff suppressed because it is too large Load Diff

View File

@@ -175,7 +175,7 @@ class DorisConnection:
self.logger.debug(f"Connection {self.session_id} ping failed: {query_error}")
self.is_healthy = False
return False
except Exception as e:
# Catch any other unexpected errors
self.logger.debug(f"Connection {self.session_id} ping failed with unexpected error: {e}")
@@ -192,33 +192,35 @@ class DorisConnection:
class DorisConnectionManager:
"""Doris database connection manager - Simplified Strategy
"""Doris database connection manager - Enhanced Strategy
Uses direct connection pool management without session-level caching
Uses direct connection pool management with proper synchronization
Implements connection pool health monitoring and proactive cleanup
"""
def __init__(self, config, security_manager=None):
self.config = config
self.security_manager = security_manager
self.pool: Pool | None = None
self.logger = get_logger(__name__)
self.metrics = ConnectionMetrics()
# Remove session-level connection management
# self.session_connections = {} # REMOVED
# Pool health monitoring
self.health_check_interval = 30 # seconds
self.pool_warmup_size = 3 # connections to maintain
self.security_manager = security_manager
# Connection pool state management
self.pool_recovering = False
self.pool_health_check_task = None
self.pool_cleanup_task = None
# Pool recovery lock to prevent race conditions
self.pool_recovery_lock = asyncio.Lock()
self.pool_recovering = False
# Metrics tracking
self.metrics = ConnectionMetrics()
# 🔧 FIX: Add connection acquisition lock to prevent race conditions
self._connection_lock = asyncio.Lock()
self._recovery_lock = asyncio.Lock()
# 🔧 FIX: Add connection acquisition queue to serialize requests
self._connection_semaphore = asyncio.Semaphore(value=20) # Max concurrent acquisitions
# Database connection parameters from config.database
self.pool_recovery_lock = self._recovery_lock # Compatibility alias
self.host = config.database.host
self.port = config.database.port
self.user = config.database.user
@@ -231,8 +233,12 @@ class DorisConnectionManager:
# Connection pool parameters - more conservative settings
self.minsize = config.database.min_connections # This is always 0
self.maxsize = config.database.max_connections or 10
self.maxsize = config.database.max_connections or 20
self.pool_recycle = config.database.max_connection_age or 3600 # 1 hour, more conservative
# 🔧 FIX: Add missing monitoring parameters that were removed during refactoring
self.health_check_interval = 30 # seconds
self.pool_warmup_size = 3 # connections to maintain
async def initialize(self):
"""Initialize connection pool with health monitoring"""
@@ -257,7 +263,7 @@ class DorisConnectionManager:
# Test initial connection
if not await self._test_pool_health():
raise RuntimeError("Connection pool health check failed")
# Start background monitoring tasks
self.pool_health_check_task = asyncio.create_task(self._pool_health_monitor())
self.pool_cleanup_task = asyncio.create_task(self._pool_cleanup_monitor())
@@ -307,7 +313,7 @@ class DorisConnectionManager:
self.logger.warning(f"Failed to release warmup connection: {e}")
self.logger.info(f"✅ Pool warmup completed, {len(warmup_connections)} connections created")
except Exception as e:
self.logger.error(f"Pool warmup failed: {e}")
# Clean up any remaining connections
@@ -505,78 +511,119 @@ class DorisConnectionManager:
finally:
self.pool_recovering = False
async def _recover_pool_with_lock(self):
"""🔧 FIX: Recovery method that uses the new recovery lock to prevent races"""
async with self._recovery_lock:
if not self.pool_recovering: # Only recover if not already in progress
await self._recover_pool()
async def get_connection(self, session_id: str) -> DorisConnection:
"""Get database connection - Simplified Strategy with pool validation
"""🔧 FIX: Simplified connection acquisition without double locking
Always acquire fresh connection from pool, no session caching
Uses only semaphore to prevent too many concurrent acquisitions
"""
try:
# Wait for any ongoing recovery to complete
if self.pool_recovering:
self.logger.debug(f"Pool recovery in progress, waiting for completion...")
# Wait for recovery to complete (max 10 seconds)
for _ in range(10):
if not self.pool_recovering:
break
await asyncio.sleep(0.5)
# 🔧 FIX: Use only semaphore to limit concurrent acquisitions (remove double locking)
async with self._connection_semaphore:
try:
# Wait for any ongoing recovery to complete
if self.pool_recovering:
self.logger.error("Pool recovery is taking too long, proceeding anyway")
# Don't raise error, try to continue
# Check if pool is available
if not self.pool:
self.logger.warning("Connection pool is not available, attempting recovery...")
await self._recover_pool()
self.logger.debug(f"Pool recovery in progress, waiting for completion...")
# Wait for recovery to complete (max 10 seconds)
start_wait = time.time()
while self.pool_recovering and (time.time() - start_wait) < 10:
await asyncio.sleep(0.1) # More frequent checks
if self.pool_recovering:
self.logger.error("Pool recovery is taking too long, proceeding anyway")
# Continue but log the issue
# Check if pool is available
if not self.pool:
raise RuntimeError("Connection pool is not available and recovery failed")
# Check if pool is closed
if self.pool.closed:
self.logger.warning("Connection pool is closed, attempting recovery...")
await self._recover_pool()
self.logger.warning("Connection pool is not available, attempting recovery...")
await self._recover_pool_with_lock()
if not self.pool:
raise RuntimeError("Connection pool is not available and recovery failed")
if not self.pool or self.pool.closed:
raise RuntimeError("Connection pool is closed and recovery failed")
# Simple strategy: always get fresh connection from pool
raw_conn = await self.pool.acquire()
# Wrap in DorisConnection
doris_conn = DorisConnection(raw_conn, session_id, self.security_manager)
# Simple validation - just check if connection is open
if raw_conn.closed:
raise RuntimeError("Acquired connection is already closed")
self.logger.debug(f"✅ Acquired fresh connection for session {session_id}")
return doris_conn
except Exception as e:
self.logger.error(f"Failed to get connection for session {session_id}: {e}")
raise
# Check if pool is closed
if self.pool.closed:
self.logger.warning("Connection pool is closed, attempting recovery...")
await self._recover_pool_with_lock()
if not self.pool or self.pool.closed:
raise RuntimeError("Connection pool is closed and recovery failed")
# 🔧 FIX: Increased timeout to prevent hanging
try:
raw_conn = await asyncio.wait_for(self.pool.acquire(), timeout=10.0)
except asyncio.TimeoutError:
self.logger.error(f"Connection acquisition timed out for session {session_id}")
# Try one recovery attempt
await self._recover_pool_with_lock()
if self.pool and not self.pool.closed:
try:
raw_conn = await asyncio.wait_for(self.pool.acquire(), timeout=5.0)
except asyncio.TimeoutError:
raise RuntimeError("Connection acquisition timed out after recovery")
else:
raise RuntimeError("Connection acquisition timed out")
# Wrap in DorisConnection
doris_conn = DorisConnection(raw_conn, session_id, self.security_manager)
# Basic validation - check if connection is open
if raw_conn.closed:
# Return connection and raise error
try:
self.pool.release(raw_conn)
except Exception:
pass
raise RuntimeError("Acquired connection is already closed")
self.logger.debug(f"✅ Acquired fresh connection for session {session_id}")
return doris_conn
except Exception as e:
self.logger.error(f"Failed to get connection for session {session_id}: {e}")
raise
async def release_connection(self, session_id: str, connection: DorisConnection):
"""Release connection back to pool - Simplified Strategy"""
try:
if connection and connection.connection:
# Simple strategy: always return to pool
if not connection.connection.closed:
self.pool.release(connection.connection)
self.logger.debug(f"✅ Released connection for session {session_id}")
else:
self.logger.debug(f"Connection already closed for session {session_id}")
"""🔧 FIX: Release connection back to pool with proper error handling"""
if not connection or not connection.connection:
self.logger.debug(f"No connection to release for session {session_id}")
return
try:
# Check pool availability before attempting release
if not self.pool or self.pool.closed:
self.logger.warning(f"Pool unavailable during release for session {session_id}, force closing connection")
try:
await connection.connection.ensure_closed()
except Exception:
pass
return
# Check connection state before release
if connection.connection.closed:
self.logger.debug(f"Connection already closed for session {session_id}")
return
# 🔧 FIX: Simplified release operation without thread wrapper
try:
self.pool.release(connection.connection)
self.logger.debug(f"✅ Released connection for session {session_id}")
except Exception as release_error:
self.logger.warning(f"Connection release failed for session {session_id}: {release_error}, force closing")
await connection.connection.ensure_closed()
except Exception as e:
self.logger.error(f"Error releasing connection for session {session_id}: {e}")
# Force close if release fails
try:
if connection and connection.connection:
await connection.connection.ensure_closed()
except Exception:
pass
await connection.connection.ensure_closed()
except Exception as close_error:
self.logger.debug(f"Error force closing connection: {close_error}")
async def close(self):
"""Close connection manager"""

File diff suppressed because it is too large Load Diff

View File

@@ -56,16 +56,31 @@ class SecurityAnalyticsTools:
"""
try:
start_time = time.time()
# 🚀 PROGRESS: Initialize security analysis
logger.info("=" * 70)
logger.info(f"🔒 Starting Data Access Pattern Analysis")
logger.info(f"📅 Analysis period: {days} days")
logger.info(f"👥 Include system users: {include_system_users}")
logger.info(f"🎯 Min query threshold: {min_query_threshold}")
logger.info("=" * 70)
connection = await self.connection_manager.get_connection("query")
# Define analysis period
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# 1. Get audit log data
logger.info(f"📊 Period: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
# 🚀 PROGRESS: Step 1 - Get audit log data
logger.info("📋 Step 1/5: Retrieving audit log data...")
audit_start = time.time()
audit_data = await self._get_audit_log_data(connection, start_date, end_date, include_system_users)
audit_time = time.time() - audit_start
if not audit_data:
logger.warning("⚠️ No audit data available for the specified period")
return {
"error": "No audit data available for the specified period",
"analysis_period": {
@@ -75,25 +90,49 @@ class SecurityAnalyticsTools:
}
}
# 2. Analyze user access patterns
logger.info(f"✅ Retrieved {len(audit_data)} audit records in {audit_time:.2f}s")
# 🚀 PROGRESS: Step 2 - Analyze user access patterns
logger.info("👤 Step 2/5: Analyzing user access patterns...")
user_start = time.time()
user_access_analysis = await self._analyze_user_access_patterns(
audit_data, min_query_threshold
)
user_time = time.time() - user_start
logger.info(f"✅ Analyzed {len(user_access_analysis)} users in {user_time:.2f}s")
# 3. Analyze role-based access
# 🚀 PROGRESS: Step 3 - Analyze role-based access
logger.info("🎭 Step 3/5: Analyzing role-based access patterns...")
role_start = time.time()
role_access_analysis = await self._analyze_role_access_patterns(
connection, user_access_analysis
)
role_time = time.time() - role_start
logger.info(f"✅ Role analysis completed in {role_time:.2f}s")
# 4. Detect security anomalies
# 🚀 PROGRESS: Step 4 - Detect security anomalies
logger.info("🚨 Step 4/5: Detecting security anomalies...")
anomaly_start = time.time()
security_alerts = await self._detect_security_anomalies(
audit_data, user_access_analysis
)
anomaly_time = time.time() - anomaly_start
logger.info(f"✅ Found {len(security_alerts)} security alerts in {anomaly_time:.2f}s")
# 5. Generate access insights
# Log alert summary
if security_alerts:
high_alerts = sum(1 for alert in security_alerts if alert.get("severity") == "high")
medium_alerts = sum(1 for alert in security_alerts if alert.get("severity") == "medium")
logger.info(f"🚨 Alert breakdown: {high_alerts} high, {medium_alerts} medium")
# 🚀 PROGRESS: Step 5 - Generate access insights
logger.info("💡 Step 5/5: Generating access insights...")
insights_start = time.time()
access_insights = await self._generate_access_insights(
user_access_analysis, role_access_analysis
)
insights_time = time.time() - insights_start
logger.info(f"✅ Access insights generated in {insights_time:.2f}s")
execution_time = time.time() - start_time