From a125a2f5f831595ebfb2814f9fd7519200d90829 Mon Sep 17 00:00:00 2001 From: Yijia Su <54164178+FreeOnePlus@users.noreply.github.com> Date: Tue, 4 Nov 2025 14:45:38 +0800 Subject: [PATCH] [fix]Fixed five known issues, including token authentication and multi-worker operation. (#63) * 0.6.1Version * fix 0.5.1 schema async bug * fix security bug * fix security bug * Add complete Token, JWT, OAuth authentication system * Add complete Token, JWT, OAuth authentication system * Add complete Token, JWT, OAuth authentication system * Add complete Token, JWT, OAuth authentication system * Add a controllable MCP Server DB Pool permission authentication system, connect it with the Doris permission system, and provide it to enterprise-level applications concurrently with the multi-Worker mode. * Add Tokens Management * change version * fix stdio start bug * fix stdio start bug * fix stdio start bug --- doris_mcp_server/main.py | 16 ++- doris_mcp_server/utils/db.py | 59 +++++++-- doris_mcp_server/utils/query_executor.py | 91 ++++++++------ doris_mcp_server/utils/schema_extractor.py | 69 +++++++++-- doris_mcp_server/utils/security.py | 133 +++++++++++++++++---- start_server.sh | 15 +-- 6 files changed, 295 insertions(+), 88 deletions(-) diff --git a/doris_mcp_server/main.py b/doris_mcp_server/main.py index b28cce7..2630aeb 100644 --- a/doris_mcp_server/main.py +++ b/doris_mcp_server/main.py @@ -634,14 +634,24 @@ class DorisServer: try: # Extract authentication information auth_info = await self._extract_auth_info_from_scope(scope, headers) - + # Authenticate the request auth_context = await self.security_manager.authenticate_request(auth_info) self.logger.info(f"MCP request authenticated: token_id={auth_context.token_id}, client_ip={auth_context.client_ip}") - + # Store auth context in scope for potential use by tools/resources scope["auth_context"] = auth_context - + + # FIX for Issue #62 Bug 1: Set auth_context in context variable + # This allows tools to access token information for token-bound database configuration + try: + from contextvars import ContextVar + auth_context_var: ContextVar = ContextVar('mcp_auth_context', default=None) + auth_context_var.set(auth_context) + self.logger.debug(f"Set auth_context in context variable with token: {bool(hasattr(auth_context, 'token') and auth_context.token)}") + except Exception as ctx_error: + self.logger.warning(f"Failed to set auth_context in context variable: {ctx_error}") + except Exception as auth_error: self.logger.error(f"MCP authentication failed: {auth_error}") # Return 401 Unauthorized diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py index 69c769b..7d2acb4 100644 --- a/doris_mcp_server/utils/db.py +++ b/doris_mcp_server/utils/db.py @@ -95,12 +95,14 @@ class DorisConnection: await cursor.execute(sql, params) # Check if it's a query statement (statement that returns result set) + # FIX for Issue #62 Bug 5: Added WITH support for Common Table Expressions (CTE) sql_upper = sql.strip().upper() - if (sql_upper.startswith("SELECT") or - sql_upper.startswith("SHOW") or - sql_upper.startswith("DESCRIBE") or - sql_upper.startswith("DESC") or - sql_upper.startswith("EXPLAIN")): + if (sql_upper.startswith("SELECT") or + sql_upper.startswith("SHOW") or + sql_upper.startswith("DESCRIBE") or + sql_upper.startswith("DESC") or + sql_upper.startswith("EXPLAIN") or + sql_upper.startswith("WITH")): # FIX: Support CTE queries data = await cursor.fetchall() row_count = len(data) else: @@ -250,7 +252,16 @@ class DorisConnectionManager: self.logger = get_logger(__name__) self.security_manager = security_manager self.token_manager = token_manager # Token manager for token-bound DB config - self.session_cache = DorisSessionCache(self) + + # FIX for Issue #58 Problem 1: Disable session caching to prevent connection sharing + # Session caching causes multiple threads to share the same MySQL connection, + # leading to race conditions and deadlocks in multi-threaded environments + # By disabling caching, each request gets a fresh connection from the pool + self.session_cache = DorisSessionCache( + self, + cache_system_session=False, # Disabled to prevent multi-thread issues + cache_user_session=False # Disabled to prevent multi-thread issues + ) # Store original database config for fallback self.original_db_config = { @@ -1258,17 +1269,43 @@ class DorisConnectionManager: async def execute_query( self, session_id: str, sql: str, params: tuple | None = None, auth_context=None ) -> QueryResult: - """Execute query - Simplified Strategy with automatic connection management""" + """Execute query - Simplified Strategy with automatic connection management + + FIX for Issue #62 Bug 1: Configure token-bound database before query execution + """ connection = None try: - # Always get fresh connection from pool + # FIX: Configure database for token BEFORE getting connection + # This ensures token-bound database configuration is used instead of global config + if auth_context and hasattr(auth_context, 'token') and auth_context.token: + try: + success, config_source = await self.configure_for_token(auth_context.token) + if success: + self.logger.info(f"Session {session_id}: Using {config_source} database configuration") + else: + self.logger.warning(f"Session {session_id}: Token configuration failed, may use global config") + except Exception as token_config_error: + # SECURITY: If token should have config but configuration fails, don't fallback + # This prevents privilege escalation (using high-privilege default user) + if self.token_manager: + self.logger.error(f"Session {session_id}: Token database configuration failed: {token_config_error}") + raise RuntimeError( + f"Failed to configure database for authenticated token. " + f"This is a security measure to prevent using default high-privilege credentials. " + f"Error: {token_config_error}" + ) + else: + # No token manager, can use global config + self.logger.warning(f"Session {session_id}: No token manager, using global config") + + # Always get fresh connection from pool (with configured database) connection = await self.get_connection(session_id) - + # Execute query result = await connection.execute(sql, params, auth_context) - + return result - + except Exception as e: self.logger.error(f"Query execution failed for session {session_id}: {e}") raise diff --git a/doris_mcp_server/utils/query_executor.py b/doris_mcp_server/utils/query_executor.py index 3d76677..e342a95 100644 --- a/doris_mcp_server/utils/query_executor.py +++ b/doris_mcp_server/utils/query_executor.py @@ -541,17 +541,21 @@ class DorisQueryExecutor: await self.query_cache.clear_all() async def execute_sql_for_mcp( - self, - sql: str, - limit: int = 1000, + self, + sql: str, + limit: int = 1000, timeout: int = 30, session_id: str = "mcp_session", - user_id: str = "mcp_user" + user_id: str = "mcp_user", + auth_context = None # FIX for Issue #62 Bug 1: Accept auth_context with token ) -> Dict[str, Any]: - """Execute SQL query for MCP interface - unified method""" + """Execute SQL query for MCP interface - unified method + + FIX for Issue #62 Bug 1: Now accepts auth_context parameter to support token-bound database configuration + """ max_retries = 2 retry_count = 0 - + while retry_count <= max_retries: try: if not sql: @@ -564,14 +568,20 @@ class DorisQueryExecutor: # Import required security modules from .security import DorisSecurityManager, AuthContext, SecurityLevel - # Create proper auth context with read-only permissions - auth_context = AuthContext( - user_id=user_id, - roles=["read_only_user"], # Restrictive role for MCP interface - permissions=["read_data"], # Only read permissions - session_id=session_id, - security_level=SecurityLevel.INTERNAL - ) + # FIX: Use provided auth_context if available (contains token for DB config) + # Otherwise create default auth context for backward compatibility + if auth_context is None: + auth_context = AuthContext( + user_id=user_id, + roles=["read_only_user"], # Restrictive role for MCP interface + permissions=["read_data"], # Only read permissions + session_id=session_id, + security_level=SecurityLevel.INTERNAL, + token="" # No token in default context + ) + else: + # Use provided auth_context (may contain token for database configuration) + self.logger.debug(f"Using provided auth_context with token: {bool(hasattr(auth_context, 'token') and auth_context.token)}") # Perform SQL security validation if enabled if hasattr(self.connection_manager, 'config') and hasattr(self.connection_manager.config, 'security'): @@ -579,7 +589,7 @@ class DorisQueryExecutor: try: security_manager = DorisSecurityManager(self.connection_manager.config) validation_result = await security_manager.validate_sql_security(sql, auth_context) - + if not validation_result.is_valid: self.logger.warning(f"SQL security validation failed for query: {sql[:100]}...") return { @@ -877,33 +887,42 @@ class QueryPerformanceMonitor: # Unified convenience function for MCP integration async def execute_sql_query(sql: str, connection_manager: DorisConnectionManager, **kwargs) -> Dict[str, Any]: """Execute SQL query - unified convenience function for MCP tools - + This function now includes security validation to ensure safe query execution. All queries are validated against the configured security policies before execution. + + FIX for Issue #62 Bug 1: Now supports auth_context parameter for token-bound database configuration + FIX for Issue #58 Problem 2: Removed executor.close() to prevent ClosedResourceError in multi-worker mode """ try: # Create query executor with the connection manager's configuration executor = DorisQueryExecutor(connection_manager) - - try: - # Extract parameters from kwargs or use defaults - limit = kwargs.get("limit", 1000) - timeout = kwargs.get("timeout", 30) - session_id = kwargs.get("session_id", "mcp_session") - user_id = kwargs.get("user_id", "mcp_user") - - # The execute_sql_for_mcp method now includes security validation - result = await executor.execute_sql_for_mcp( - sql=sql, - limit=limit, - timeout=timeout, - session_id=session_id, - user_id=user_id - ) - return result - finally: - await executor.close() - + + # Extract parameters from kwargs or use defaults + limit = kwargs.get("limit", 1000) + timeout = kwargs.get("timeout", 30) + session_id = kwargs.get("session_id", "mcp_session") + user_id = kwargs.get("user_id", "mcp_user") + auth_context = kwargs.get("auth_context", None) # FIX: Extract auth_context + + # The execute_sql_for_mcp method now includes security validation + result = await executor.execute_sql_for_mcp( + sql=sql, + limit=limit, + timeout=timeout, + session_id=session_id, + user_id=user_id, + auth_context=auth_context # FIX: Pass auth_context with token + ) + + # FIX for Issue #58 Problem 2: Do NOT close executor here + # In multi-worker mode, closing here causes ClosedResourceError + # The executor's resources (cache, background tasks) will be managed + # by the connection_manager lifecycle and Python's garbage collection + # This prevents premature cleanup while MCP session manager is still processing + + return result + except Exception as e: return { "success": False, diff --git a/doris_mcp_server/utils/schema_extractor.py b/doris_mcp_server/utils/schema_extractor.py index 2f0fb79..78eb524 100644 --- a/doris_mcp_server/utils/schema_extractor.py +++ b/doris_mcp_server/utils/schema_extractor.py @@ -1454,32 +1454,83 @@ class MetadataExtractor: return response_data async def exec_query_for_mcp( - self, - sql: str, - db_name: str = None, - catalog_name: str = None, - max_rows: int = 100, + self, + sql: str, + db_name: str = None, + catalog_name: str = None, + max_rows: int = 100, timeout: int = 30 ) -> Dict[str, Any]: """ Execute SQL query and return results, supports catalog federation queries Unified interface for MCP tools + + FIX for Issue #62 Bug 1: Now retrieves auth_context from context variable to support token-bound database configuration + FIX for Issue #62 Bug 3: Now uses db_name and catalog_name parameters to switch database context """ logger.info(f"Executing SQL query: {sql}, DB: {db_name}, Catalog: {catalog_name}, MaxRows: {max_rows}, Timeout: {timeout}") - + try: if not sql: return self._format_response(success=False, error="No SQL statement provided", message="Please provide SQL statement to execute") + # FIX for Issue #62 Bug 3: Build context switching SQL if db_name or catalog_name is specified + final_sql = sql + if catalog_name or db_name: + context_statements = [] + + if catalog_name: + # Switch to specified catalog + context_statements.append(f"USE CATALOG `{catalog_name}`") + logger.debug(f"Switching to catalog: {catalog_name}") + + if db_name: + # Switch to specified database + if catalog_name: + context_statements.append(f"USE `{catalog_name}`.`{db_name}`") + else: + context_statements.append(f"USE `{db_name}`") + logger.debug(f"Switching to database: {db_name}") + + # Combine context switching with original SQL + if context_statements: + # Remove trailing semicolon from context statements if present + context_sql = "; ".join(context_statements) + # Ensure original SQL doesn't start with semicolon + sql_clean = sql.lstrip(";").strip() + final_sql = f"{context_sql}; {sql_clean}" + logger.debug(f"Modified SQL with context switching: {final_sql[:200]}...") + + # FIX: Try to get auth_context from context variable (set by HTTP middleware) + # This allows token-bound database configuration to work + auth_context = None + try: + from contextvars import ContextVar + from .security import AuthContext + + # Try to get auth_context from context variable + # This will be set by the HTTP request handler in main.py + auth_context_var: ContextVar = ContextVar('mcp_auth_context', default=None) + auth_context = auth_context_var.get() + + if auth_context: + logger.debug(f"Retrieved auth_context from context variable with token: {bool(hasattr(auth_context, 'token') and auth_context.token)}") + else: + logger.debug("No auth_context found in context variable, using default") + except Exception as ctx_error: + logger.debug(f"Could not retrieve auth_context from context variable: {ctx_error}") + auth_context = None + # Import query executor from .query_executor import execute_sql_query - # Call execute_sql_query to execute query + # Call execute_sql_query to execute query with auth_context exec_result = await execute_sql_query( - sql=sql, + sql=final_sql, # Use modified SQL with context switching connection_manager=self.connection_manager, limit=max_rows, - timeout=timeout + timeout=timeout, + auth_context=auth_context # FIX: Pass auth_context with token ) return exec_result diff --git a/doris_mcp_server/utils/security.py b/doris_mcp_server/utils/security.py index 1497db2..95471e1 100644 --- a/doris_mcp_server/utils/security.py +++ b/doris_mcp_server/utils/security.py @@ -939,28 +939,69 @@ class SQLSecurityValidator: async def _check_sql_injection( self, sql: str, parsed: Statement ) -> ValidationResult: - """Check SQL injection risks""" - # Check common SQL injection patterns + """Check SQL injection risks with improved pattern detection + + FIX for Issue #62 Bug 2: Improved patterns to reduce false positives + Now better distinguishes between legitimate SQL (like BETWEEN...AND) and injection attempts + """ + # Improved injection patterns that are more specific and less prone to false positives injection_patterns = [ - r"(?i)(? bool: - """Check suspicious quote and comment patterns""" - # Check unmatched quotes - single_quotes = sql.count("'") - double_quotes = sql.count('"') + """Check suspicious quote and comment patterns with improved detection - if single_quotes % 2 != 0 or double_quotes % 2 != 0: - return True + FIX for Issue #62 Bug 2: Improved detection to reduce false positives + Now distinguishes between legitimate comments/strings and injection attempts + """ + try: + # Use sqlparse to parse the SQL and distinguish between code and comments/strings + import sqlparse + from sqlparse.tokens import Comment, String - # Check SQL comments - if "--" in sql or "/*" in sql: - return True + # Parse the SQL + parsed = sqlparse.parse(sql) + if not parsed: + # If parsing fails, be conservative + return True - return False + statement = parsed[0] + + # Check for unmatched quotes ONLY in non-string tokens + # This prevents false positives from legitimate string content + non_string_content = [] + has_string_tokens = False + + for token in statement.flatten(): + if token.ttype in (String.Single, String.Double): + has_string_tokens = True + # Skip string content - quotes inside strings are legitimate + continue + elif token.ttype in (Comment.Single, Comment.Multi): + # Comments are generally OK, but check for suspicious injection patterns + comment_value = str(token).lower() + # Check if comment contains dangerous SQL keywords + dangerous_in_comments = ['drop', 'delete', 'insert', 'update', 'union', 'exec', 'execute'] + if any(keyword in comment_value for keyword in dangerous_in_comments): + self.logger.warning(f"Suspicious SQL keyword in comment: {token}") + return True + # Normal comments are OK + continue + else: + # Accumulate non-string, non-comment content + non_string_content.append(str(token)) + + # Check for unmatched quotes in non-string content + non_string_text = ''.join(non_string_content) + single_quotes = non_string_text.count("'") + double_quotes = non_string_text.count('"') + + # Only flag if there are unmatched quotes in actual SQL code (not in strings) + if single_quotes % 2 != 0 or double_quotes % 2 != 0: + return True + + # FIX: Don't flag legitimate SQL comments + # Comments are OK as long as they don't contain dangerous patterns (already checked above) + + return False + + except Exception as e: + self.logger.debug(f"SQL parsing error in quote/comment check: {e}") + # On parsing error, fall back to conservative check + # But be more lenient than before + return False # Don't flag on parse errors to reduce false positives async def _check_blocked_keywords(self, parsed: Statement) -> ValidationResult: """Check blocked keywords""" diff --git a/start_server.sh b/start_server.sh index 18284af..56290aa 100755 --- a/start_server.sh +++ b/start_server.sh @@ -64,9 +64,10 @@ else fi # Set HTTP-specific environment variables +# FIX for Issue #62 Bug 4: Use SERVER_PORT instead of MCP_PORT for consistency with code export MCP_TRANSPORT_TYPE="http" export MCP_HOST="${MCP_HOST:-0.0.0.0}" -export MCP_PORT="${MCP_PORT:-3000}" +export SERVER_PORT="${SERVER_PORT:-3000}" # Changed from MCP_PORT to SERVER_PORT export WORKERS="${WORKERS:-1}" export ALLOWED_ORIGINS="${ALLOWED_ORIGINS:-*}" export LOG_LEVEL="${LOG_LEVEL:-info}" @@ -77,15 +78,15 @@ export MCP_DEBUG_ADAPTER="true" export PYTHONPATH="$(pwd):$PYTHONPATH" echo -e "${GREEN}Starting MCP server (Streamable HTTP mode)...${NC}" -echo -e "${YELLOW}Service will run on http://${MCP_HOST}:${MCP_PORT}/mcp${NC}" -echo -e "${YELLOW}Health Check: http://${MCP_HOST}:${MCP_PORT}/health${NC}" -echo -e "${YELLOW}MCP Endpoint: http://${MCP_HOST}:${MCP_PORT}/mcp${NC}" -echo -e "${YELLOW}Local access: http://localhost:${MCP_PORT}/mcp${NC}" +echo -e "${YELLOW}Service will run on http://${MCP_HOST}:${SERVER_PORT}/mcp${NC}" +echo -e "${YELLOW}Health Check: http://${MCP_HOST}:${SERVER_PORT}/health${NC}" +echo -e "${YELLOW}MCP Endpoint: http://${MCP_HOST}:${SERVER_PORT}/mcp${NC}" +echo -e "${YELLOW}Local access: http://localhost:${SERVER_PORT}/mcp${NC}" echo -e "${YELLOW}Workers: ${WORKERS}${NC}" echo -e "${YELLOW}Use Ctrl+C to stop the service${NC}" # Start the server in HTTP mode (Streamable HTTP) -python -m doris_mcp_server.main --transport http --host ${MCP_HOST} --port ${MCP_PORT} --workers ${WORKERS} +python -m doris_mcp_server.main --transport http --host ${MCP_HOST} --port ${SERVER_PORT} --workers ${WORKERS} # Check exit status if [ $? -ne 0 ]; then @@ -97,4 +98,4 @@ fi echo -e "${YELLOW}Tip: If the page displays abnormally, please clear your browser cache or use incognito mode${NC}" echo -e "${YELLOW}Chrome browser clear cache shortcut: Ctrl+Shift+Del (Windows) or Cmd+Shift+Del (Mac)${NC}" echo -e "${CYAN}For testing HTTP endpoints, you can use:${NC}" -echo -e "${CYAN} curl -X POST http://localhost:${MCP_PORT}/mcp -H 'Content-Type: application/json' -d '{\"method\":\"tools/list\"}'${NC}" \ No newline at end of file +echo -e "${CYAN} curl -X POST http://localhost:${SERVER_PORT}/mcp -H 'Content-Type: application/json' -d '{\"method\":\"tools/list\"}'${NC}" \ No newline at end of file