210 lines
7.9 KiB
Python
210 lines
7.9 KiB
Python
"""
|
|
Resilient HTTP Connection Pool Manager
|
|
"""
|
|
import atexit
|
|
from enum import verify
|
|
import threading
|
|
from contextlib import contextmanager
|
|
from typing import Dict, Generator
|
|
import httpx
|
|
|
|
class ResilientConnectionManager:
|
|
"""
|
|
Elastic Connection Manager
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._connection_pools: Dict[str, httpx.Client] = {}
|
|
self._pool_lock = threading.Lock()
|
|
self._is_closed = False
|
|
|
|
# Resource cleaning when the registration program exits
|
|
atexit.register(self._cleanup_all_pools)
|
|
|
|
def get_persistent_client(self, service_profile: str = "standard") -> httpx.Client:
|
|
"""
|
|
Get persistent client - main interface
|
|
|
|
Args:
|
|
service_profile: Service configuration file
|
|
- "standard": General API (60s timeout)
|
|
- "cloud_api": Cloud API (120s timeout, suitable for Azure)
|
|
- "ai_inference": AI Reasoning Services (180s timeout, suitable for OpenAI/VLLM)
|
|
- "batch_processing": Batch Processing Services (300s timeout)
|
|
"""
|
|
if self._is_closed:
|
|
raise RuntimeError("Connection manager is closed")
|
|
|
|
if service_profile not in self._connection_pools:
|
|
with self._pool_lock:
|
|
# Double-checked locking pattern
|
|
if service_profile not in self._connection_pools:
|
|
self._connection_pools[service_profile] = self._create_optimized_client(service_profile)
|
|
|
|
return self._connection_pools[service_profile]
|
|
|
|
def _create_optimized_client(self, service_profile: str) -> httpx.Client:
|
|
"""Create an optimized client based on the service profile"""
|
|
|
|
# Service profile mapping
|
|
profile_configs = {
|
|
"standard": {
|
|
"timeout": 60.0,
|
|
"max_keepalive": 10,
|
|
"max_connections": 50,
|
|
"description": "General API Services"
|
|
},
|
|
"cloud_api": {
|
|
"timeout": 120.0,
|
|
"max_keepalive": 8,
|
|
"max_connections": 25,
|
|
"description": "Cloud API Services (Azure Search, Storage ...)"
|
|
},
|
|
"ai_inference": {
|
|
"timeout": 180.0,
|
|
"max_keepalive": 5,
|
|
"max_connections": 15,
|
|
"description": "AI Reasoning Services (OpenAI, VLLM ...)"
|
|
},
|
|
"batch_processing": {
|
|
"timeout": 300.0,
|
|
"max_keepalive": 3,
|
|
"max_connections": 10,
|
|
"description": "Batch processing and long-term tasks"
|
|
}
|
|
}
|
|
|
|
config = profile_configs.get(service_profile, profile_configs["standard"])
|
|
|
|
return httpx.Client(
|
|
timeout=config["timeout"],
|
|
limits=httpx.Limits(
|
|
max_keepalive_connections=config["max_keepalive"],
|
|
max_connections=config["max_connections"],
|
|
keepalive_expiry=300 # 5 minutes to keep alive
|
|
),
|
|
follow_redirects=True,
|
|
verify=False
|
|
)
|
|
|
|
@contextmanager
|
|
def resilient_session(self, service_profile: str = "standard"):
|
|
"""
|
|
Elastic Session Context Manager - Recommended for retry scenarios
|
|
|
|
Example of usage:
|
|
with connection_manager.resilient_session("ai_inference") as client:
|
|
for retry in range(3):
|
|
response = client.post(...)
|
|
"""
|
|
client = self.get_persistent_client(service_profile)
|
|
# Directly return the client without using the with statement
|
|
# Because the client is already managed in the connection pool, no additional context management is needed
|
|
try:
|
|
yield client
|
|
finally:
|
|
# Do not close the client here, keep the connection pool alive
|
|
pass
|
|
|
|
def get_pool_statistics(self) -> Dict[str, Dict]:
|
|
"""Get connection pool statistics - for monitoring"""
|
|
stats = {}
|
|
with self._pool_lock:
|
|
for profile, client in self._connection_pools.items():
|
|
try:
|
|
# httpx internal connection pool information
|
|
pool_info = {
|
|
"is_closed": client.is_closed,
|
|
"timeout": str(client.timeout),
|
|
"max_connections": client._transport._pool._pool_factory.limits.max_connections, # type: ignore
|
|
"profile": profile
|
|
}
|
|
stats[profile] = pool_info
|
|
except Exception:
|
|
stats[profile] = {"error": "Statistical information cannot be obtained"}
|
|
return stats
|
|
|
|
def force_refresh_pool(self, service_profile: str):
|
|
"""Force refresh the specified connection pool - for fault recovery"""
|
|
with self._pool_lock:
|
|
if service_profile in self._connection_pools:
|
|
try:
|
|
self._connection_pools[service_profile].close()
|
|
except Exception:
|
|
pass
|
|
del self._connection_pools[service_profile]
|
|
|
|
def _cleanup_all_pools(self):
|
|
"""Clean all connection pools - Memory security"""
|
|
with self._pool_lock:
|
|
if not self._is_closed:
|
|
for profile, client in list(self._connection_pools.items()):
|
|
try:
|
|
client.close()
|
|
except Exception:
|
|
pass # Ignore errors during cleaning
|
|
|
|
self._connection_pools.clear()
|
|
self._is_closed = True
|
|
|
|
|
|
# =============================================================================
|
|
# Global instances and convenient interfaces
|
|
# =============================================================================
|
|
|
|
# Global Elastic Connection Manager
|
|
_resilient_manager = ResilientConnectionManager()
|
|
|
|
# Main public interface
|
|
def get_persistent_http_client(service_profile: str = "standard") -> httpx.Client:
|
|
"""
|
|
Get persistent HTTP client - main interface
|
|
|
|
Recommended service configuration profiles:
|
|
- "standard": generic API
|
|
- "cloud_api": Azure/cloud service API
|
|
- "ai_inference": OpenAI/VLLM etc. AI services
|
|
- "batch_processing": long-term batch processing tasks
|
|
"""
|
|
return _resilient_manager.get_persistent_client(service_profile)
|
|
|
|
def resilient_http_session(service_profile: str = "standard"):
|
|
"""
|
|
Elastic HTTP Session Context Manager - Recommended for retry logic
|
|
|
|
Example of usage:
|
|
with resilient_http_session("ai_inference") as client:
|
|
for retry in range(3):
|
|
response = client.post(endpoint, json=data)
|
|
"""
|
|
return _resilient_manager.resilient_session(service_profile)
|
|
|
|
def get_connection_pool_stats() -> Dict[str, Dict]:
|
|
"""Get connection pool statistics"""
|
|
return _resilient_manager.get_pool_statistics()
|
|
|
|
def refresh_connection_pool(service_profile: str):
|
|
"""Refresh the specified connection pool"""
|
|
_resilient_manager.force_refresh_pool(service_profile)
|
|
|
|
|
|
# =============================================================================
|
|
# Convenient dedicated client interfaces - more intuitive naming
|
|
# =============================================================================
|
|
|
|
def get_standard_client() -> httpx.Client:
|
|
"""Get the standard client (generic HTTP request)"""
|
|
return get_persistent_http_client("standard")
|
|
|
|
def get_cloud_api_client() -> httpx.Client:
|
|
"""Get dedicated cloud API clients (Azure Search, Storage, etc.)"""
|
|
return get_persistent_http_client("cloud_api")
|
|
|
|
def get_ai_inference_client() -> httpx.Client:
|
|
"""Get AI Inference Dedicated Clients (OpenAI, VLLM, etc.)"""
|
|
return get_persistent_http_client("ai_inference")
|
|
|
|
def get_batch_processing_client() -> httpx.Client:
|
|
"""Get a batch-specific client (long-term task)"""
|
|
return get_persistent_http_client("batch_processing")
|