#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ OAuth HTTP Handlers Provides HTTP endpoints for OAuth authentication flow """ from typing import Dict, Any from urllib.parse import parse_qs, urlparse import json from starlette.responses import JSONResponse, RedirectResponse, HTMLResponse from starlette.requests import Request from ..utils.logger import get_logger logger = get_logger(__name__) class OAuthHandlers: """OAuth HTTP request handlers""" def __init__(self, security_manager): """Initialize OAuth handlers Args: security_manager: DorisSecurityManager instance """ self.security_manager = security_manager logger.info("OAuth handlers initialized") async def handle_login(self, request: Request) -> JSONResponse: """Handle OAuth login initiation Returns JSON with authorization URL and state """ try: # Check if OAuth is enabled oauth_info = self.security_manager.get_oauth_provider_info() if not oauth_info.get("enabled"): return JSONResponse( {"error": "OAuth authentication is not enabled"}, status_code=400 ) # Get authorization URL authorization_url, state = self.security_manager.get_oauth_authorization_url() return JSONResponse({ "authorization_url": authorization_url, "state": state, "provider": oauth_info.get("provider"), "message": "Navigate to authorization_url to complete OAuth login" }) except Exception as e: logger.error(f"OAuth login initiation failed: {e}") return JSONResponse( {"error": f"OAuth login failed: {str(e)}"}, status_code=500 ) async def handle_callback(self, request: Request) -> JSONResponse: """Handle OAuth callback Processes the OAuth callback and returns authentication result """ try: # Get query parameters query_params = dict(request.query_params) # Check for error in callback if "error" in query_params: error_description = query_params.get("error_description", "Unknown error") logger.warning(f"OAuth callback error: {query_params['error']} - {error_description}") return JSONResponse( { "error": query_params["error"], "error_description": error_description, "error_uri": query_params.get("error_uri") }, status_code=400 ) # Extract required parameters code = query_params.get("code") state = query_params.get("state") if not code or not state: return JSONResponse( {"error": "Missing required parameters: code and state"}, status_code=400 ) # Handle OAuth callback auth_context = await self.security_manager.handle_oauth_callback(code, state) # Return successful authentication response return JSONResponse({ "success": True, "user_id": auth_context.user_id, "roles": auth_context.roles, "permissions": auth_context.permissions, "security_level": auth_context.security_level.value, "session_id": auth_context.session_id, "message": "OAuth authentication successful" }) except Exception as e: logger.error(f"OAuth callback handling failed: {e}") return JSONResponse( {"error": f"OAuth callback failed: {str(e)}"}, status_code=500 ) async def handle_provider_info(self, request: Request) -> JSONResponse: """Handle OAuth provider information request Returns information about the configured OAuth provider """ try: provider_info = self.security_manager.get_oauth_provider_info() return JSONResponse(provider_info) except Exception as e: logger.error(f"Failed to get OAuth provider info: {e}") return JSONResponse( {"error": f"Failed to get provider info: {str(e)}"}, status_code=500 ) async def handle_demo_page(self, request: Request) -> HTMLResponse: """Handle OAuth demo page Returns a simple HTML page for testing OAuth flow """ oauth_info = self.security_manager.get_oauth_provider_info() if not oauth_info.get("enabled"): return HTMLResponse("""
OAuth authentication is not enabled.
Please configure OAuth settings in your security configuration.
""") html_content = f"""Provider: {oauth_info.get('provider', 'N/A')}
Client ID: {oauth_info.get('client_id', 'N/A')}
Scopes: {', '.join(oauth_info.get('scopes', []))}
PKCE Enabled: {oauth_info.get('pkce_enabled', False)}
Click the button below to start OAuth authentication flow:
GET /auth/login - Initiate OAuth loginGET /auth/callback - OAuth callback handlerGET /auth/provider - Provider information