#!/usr/bin/env python3 """ Token Authentication HTTP Handlers Provides HTTP endpoints for token management including creation, revocation, listing, and statistics. Used for administrative token management in HTTP mode. """ import json from typing import Dict, Any from starlette.requests import Request from starlette.responses import JSONResponse, HTMLResponse from ..utils.logger import get_logger from ..utils.security import SecurityLevel class TokenHandlers: """Token Authentication HTTP Handlers""" def __init__(self, security_manager): self.security_manager = security_manager self.logger = get_logger(__name__) async def handle_create_token(self, request: Request) -> JSONResponse: """Handle token creation request""" try: # Check if token manager is available if not self.security_manager.auth_provider.token_manager: return JSONResponse({ "error": "Token authentication is not enabled" }, status_code=503) # Parse request data if request.method == "GET": # GET request with query parameters query_params = dict(request.query_params) token_id = query_params.get("token_id") user_id = query_params.get("user_id") roles = query_params.get("roles", "").split(",") if query_params.get("roles") else [] permissions = query_params.get("permissions", "").split(",") if query_params.get("permissions") else [] security_level_str = query_params.get("security_level", "internal") expires_hours_str = query_params.get("expires_hours") description = query_params.get("description", "") custom_token = query_params.get("custom_token") else: # POST request with JSON body try: body = await request.json() except: return JSONResponse({ "error": "Invalid JSON body" }, status_code=400) token_id = body.get("token_id") user_id = body.get("user_id") roles = body.get("roles", []) permissions = body.get("permissions", []) security_level_str = body.get("security_level", "internal") expires_hours_str = body.get("expires_hours") description = body.get("description", "") custom_token = body.get("custom_token") # Validate required fields if not token_id or not user_id: return JSONResponse({ "error": "token_id and user_id are required" }, status_code=400) # Parse security level try: security_level = SecurityLevel(security_level_str.lower()) except ValueError: security_level = SecurityLevel.INTERNAL # Parse expires_hours expires_hours = None if expires_hours_str: try: expires_hours = int(expires_hours_str) except ValueError: return JSONResponse({ "error": "expires_hours must be an integer" }, status_code=400) # Create token try: token = await self.security_manager.create_token( token_id=token_id, user_id=user_id, roles=roles, permissions=permissions, security_level=security_level, expires_hours=expires_hours, description=description, custom_token=custom_token ) return JSONResponse({ "success": True, "token_id": token_id, "user_id": user_id, "token": token, "roles": roles, "permissions": permissions, "security_level": security_level.value, "expires_hours": expires_hours, "description": description, "message": "Token created successfully" }) except Exception as e: self.logger.error(f"Token creation failed: {e}") return JSONResponse({ "error": f"Token creation failed: {str(e)}" }, status_code=400) except Exception as e: self.logger.error(f"Error in handle_create_token: {e}") return JSONResponse({ "error": f"Internal server error: {str(e)}" }, status_code=500) async def handle_revoke_token(self, request: Request) -> JSONResponse: """Handle token revocation request""" try: # Check if token manager is available if not self.security_manager.auth_provider.token_manager: return JSONResponse({ "error": "Token authentication is not enabled" }, status_code=503) # Get token_id from query parameters or path token_id = request.query_params.get("token_id") if not token_id and request.method == "DELETE": # Try to get from path: /token/revoke/{token_id} path_parts = str(request.url.path).split("/") if len(path_parts) >= 4: token_id = path_parts[-1] if not token_id: return JSONResponse({ "error": "token_id is required" }, status_code=400) # Revoke token success = await self.security_manager.revoke_token(token_id) if success: return JSONResponse({ "success": True, "token_id": token_id, "message": "Token revoked successfully" }) else: return JSONResponse({ "success": False, "token_id": token_id, "message": "Token not found or already revoked" }, status_code=404) except Exception as e: self.logger.error(f"Error in handle_revoke_token: {e}") return JSONResponse({ "error": f"Internal server error: {str(e)}" }, status_code=500) async def handle_list_tokens(self, request: Request) -> JSONResponse: """Handle token listing request""" try: # Check if token manager is available if not self.security_manager.auth_provider.token_manager: return JSONResponse({ "error": "Token authentication is not enabled" }, status_code=503) # Get tokens list tokens = await self.security_manager.list_tokens() return JSONResponse({ "success": True, "count": len(tokens), "tokens": tokens }) except Exception as e: self.logger.error(f"Error in handle_list_tokens: {e}") return JSONResponse({ "error": f"Internal server error: {str(e)}" }, status_code=500) async def handle_token_stats(self, request: Request) -> JSONResponse: """Handle token statistics request""" try: # Check if token manager is available if not self.security_manager.auth_provider.token_manager: return JSONResponse({ "error": "Token authentication is not enabled" }, status_code=503) # Get token statistics stats = self.security_manager.get_token_stats() return JSONResponse({ "success": True, "stats": stats }) except Exception as e: self.logger.error(f"Error in handle_token_stats: {e}") return JSONResponse({ "error": f"Internal server error: {str(e)}" }, status_code=500) async def handle_cleanup_tokens(self, request: Request) -> JSONResponse: """Handle expired tokens cleanup request""" try: # Check if token manager is available if not self.security_manager.auth_provider.token_manager: return JSONResponse({ "error": "Token authentication is not enabled" }, status_code=503) # Cleanup expired tokens cleaned_count = await self.security_manager.cleanup_expired_tokens() return JSONResponse({ "success": True, "cleaned_count": cleaned_count, "message": f"Cleaned up {cleaned_count} expired tokens" }) except Exception as e: self.logger.error(f"Error in handle_cleanup_tokens: {e}") return JSONResponse({ "error": f"Internal server error: {str(e)}" }, status_code=500) async def handle_demo_page(self, request: Request) -> HTMLResponse: """Handle token management demo page""" try: # Check if token manager is available if not self.security_manager.auth_provider.token_manager: html_content = """ Token Management - Not Available

Token Management

Token authentication is not enabled on this server.
""" return HTMLResponse(html_content) # Get current stats for demo stats = self.security_manager.get_token_stats() html_content = f""" Doris MCP Server - Token Management

🔐 Doris MCP Server - Token Management

📊 Token Statistics

{stats.get('total_tokens', 0)}
Total Tokens
{stats.get('active_tokens', 0)}
Active Tokens
{stats.get('expired_tokens', 0)}
Expired Tokens

Token Expiry: {'Enabled' if stats.get('expiry_enabled') else 'Disabled'}

Default Expiry: {stats.get('default_expiry_hours', 0)} hours

➕ Create New Token

📋 Token Management

Revoke Token

🔧 API Endpoints

Use these endpoints for programmatic token management:

""" return HTMLResponse(html_content) except Exception as e: self.logger.error(f"Error in handle_demo_page: {e}") error_html = f""" Token Management Error

Token Management Error

Error loading token management page: {str(e)}

""" return HTMLResponse(error_html, status_code=500)