From 5923cc1c8973069a6d54eca1948a10488cbf409e Mon Sep 17 00:00:00 2001 From: Yijia Su <54164178+FreeOnePlus@users.noreply.github.com> Date: Fri, 29 Aug 2025 08:48:38 +0800 Subject: [PATCH] [BUG]Fix security bug (#50) * 0.5.1 Version * fix 0.5.1 schema async bug * fix security bug --- doris_mcp_server/utils/query_executor.py | 83 ++++++++++++++++++++---- 1 file changed, 69 insertions(+), 14 deletions(-) diff --git a/doris_mcp_server/utils/query_executor.py b/doris_mcp_server/utils/query_executor.py index 027ae06..63f5d54 100644 --- a/doris_mcp_server/utils/query_executor.py +++ b/doris_mcp_server/utils/query_executor.py @@ -557,22 +557,67 @@ class DorisQueryExecutor: "data": None } + # Import required security modules + from .security import DorisSecurityManager, AuthContext, SecurityLevel + + # Create proper auth context with read-only permissions + auth_context = AuthContext( + user_id=user_id, + roles=["read_only_user"], # Restrictive role for MCP interface + permissions=["read_data"], # Only read permissions + session_id=session_id, + security_level=SecurityLevel.INTERNAL + ) + + # Perform SQL security validation if enabled + if hasattr(self.connection_manager, 'config') and hasattr(self.connection_manager.config, 'security'): + if self.connection_manager.config.security.enable_security_check: + try: + security_manager = DorisSecurityManager(self.connection_manager.config) + validation_result = await security_manager.validate_sql_security(sql, auth_context) + + if not validation_result.is_valid: + self.logger.warning(f"SQL security validation failed for query: {sql[:100]}...") + return { + "success": False, + "error": f"SQL security validation failed: {validation_result.error_message}", + "error_type": "security_violation", + "blocked_operations": validation_result.blocked_operations, + "risk_level": validation_result.risk_level, + "data": None, + "metadata": { + "query": sql, + "validation_details": { + "blocked_operations": validation_result.blocked_operations, + "risk_level": validation_result.risk_level + } + } + } + else: + self.logger.debug(f"SQL security validation passed for query: {sql[:100]}...") + except Exception as security_error: + self.logger.error(f"Security validation error: {str(security_error)}") + # In case of security validation error, fail safe + return { + "success": False, + "error": f"Security validation system error: {str(security_error)}", + "error_type": "security_system_error", + "data": None, + "metadata": { + "query": sql, + "security_error": str(security_error) + } + } + else: + self.logger.info("SQL security check is disabled in configuration") + else: + self.logger.warning("Security configuration not found, proceeding without validation") + # Add LIMIT if not present and it's a SELECT query if sql.upper().startswith("SELECT") and "LIMIT" not in sql.upper(): if sql.endswith(";"): sql = sql[:-1] sql = f"{sql} LIMIT {limit}" - - # Create auth context for MCP calls - class MockAuthContext: - def __init__(self): - self.user_id = user_id - self.roles = ["data_analyst"] - self.permissions = ["read_data", "execute_query"] - self.session_id = session_id - self.security_level = "internal" - - auth_context = MockAuthContext() # Create query request query_request = QueryRequest( @@ -827,9 +872,13 @@ class QueryPerformanceMonitor: # Unified convenience function for MCP integration async def execute_sql_query(sql: str, connection_manager: DorisConnectionManager, **kwargs) -> Dict[str, Any]: - """Execute SQL query - unified convenience function for MCP tools""" + """Execute SQL query - unified convenience function for MCP tools + + This function now includes security validation to ensure safe query execution. + All queries are validated against the configured security policies before execution. + """ try: - # Create query executor + # Create query executor with the connection manager's configuration executor = DorisQueryExecutor(connection_manager) try: @@ -839,6 +888,7 @@ async def execute_sql_query(sql: str, connection_manager: DorisConnectionManager session_id = kwargs.get("session_id", "mcp_session") user_id = kwargs.get("user_id", "mcp_user") + # The execute_sql_for_mcp method now includes security validation result = await executor.execute_sql_for_mcp( sql=sql, limit=limit, @@ -854,5 +904,10 @@ async def execute_sql_query(sql: str, connection_manager: DorisConnectionManager return { "success": False, "error": f"Query execution failed: {str(e)}", - "data": None + "error_type": "execution_error", + "data": None, + "metadata": { + "query": sql, + "execution_error": str(e) + } }