Files
catonline_ai/vw-document-ai-indexer/resilient_http_pool.py
2025-09-26 17:15:54 +08:00

210 lines
7.9 KiB
Python

"""
Resilient HTTP Connection Pool Manager
"""
import atexit
from enum import verify
import threading
from contextlib import contextmanager
from typing import Dict, Generator
import httpx
class ResilientConnectionManager:
"""
Elastic Connection Manager
"""
def __init__(self):
self._connection_pools: Dict[str, httpx.Client] = {}
self._pool_lock = threading.Lock()
self._is_closed = False
# Resource cleaning when the registration program exits
atexit.register(self._cleanup_all_pools)
def get_persistent_client(self, service_profile: str = "standard") -> httpx.Client:
"""
Get persistent client - main interface
Args:
service_profile: Service configuration file
- "standard": General API (60s timeout)
- "cloud_api": Cloud API (120s timeout, suitable for Azure)
- "ai_inference": AI Reasoning Services (180s timeout, suitable for OpenAI/VLLM)
- "batch_processing": Batch Processing Services (300s timeout)
"""
if self._is_closed:
raise RuntimeError("Connection manager is closed")
if service_profile not in self._connection_pools:
with self._pool_lock:
# Double-checked locking pattern
if service_profile not in self._connection_pools:
self._connection_pools[service_profile] = self._create_optimized_client(service_profile)
return self._connection_pools[service_profile]
def _create_optimized_client(self, service_profile: str) -> httpx.Client:
"""Create an optimized client based on the service profile"""
# Service profile mapping
profile_configs = {
"standard": {
"timeout": 60.0,
"max_keepalive": 10,
"max_connections": 50,
"description": "General API Services"
},
"cloud_api": {
"timeout": 120.0,
"max_keepalive": 8,
"max_connections": 25,
"description": "Cloud API Services (Azure Search, Storage ...)"
},
"ai_inference": {
"timeout": 180.0,
"max_keepalive": 5,
"max_connections": 15,
"description": "AI Reasoning Services (OpenAI, VLLM ...)"
},
"batch_processing": {
"timeout": 300.0,
"max_keepalive": 3,
"max_connections": 10,
"description": "Batch processing and long-term tasks"
}
}
config = profile_configs.get(service_profile, profile_configs["standard"])
return httpx.Client(
timeout=config["timeout"],
limits=httpx.Limits(
max_keepalive_connections=config["max_keepalive"],
max_connections=config["max_connections"],
keepalive_expiry=300 # 5 minutes to keep alive
),
follow_redirects=True,
verify=False
)
@contextmanager
def resilient_session(self, service_profile: str = "standard"):
"""
Elastic Session Context Manager - Recommended for retry scenarios
Example of usage:
with connection_manager.resilient_session("ai_inference") as client:
for retry in range(3):
response = client.post(...)
"""
client = self.get_persistent_client(service_profile)
# Directly return the client without using the with statement
# Because the client is already managed in the connection pool, no additional context management is needed
try:
yield client
finally:
# Do not close the client here, keep the connection pool alive
pass
def get_pool_statistics(self) -> Dict[str, Dict]:
"""Get connection pool statistics - for monitoring"""
stats = {}
with self._pool_lock:
for profile, client in self._connection_pools.items():
try:
# httpx internal connection pool information
pool_info = {
"is_closed": client.is_closed,
"timeout": str(client.timeout),
"max_connections": client._transport._pool._pool_factory.limits.max_connections, # type: ignore
"profile": profile
}
stats[profile] = pool_info
except Exception:
stats[profile] = {"error": "Statistical information cannot be obtained"}
return stats
def force_refresh_pool(self, service_profile: str):
"""Force refresh the specified connection pool - for fault recovery"""
with self._pool_lock:
if service_profile in self._connection_pools:
try:
self._connection_pools[service_profile].close()
except Exception:
pass
del self._connection_pools[service_profile]
def _cleanup_all_pools(self):
"""Clean all connection pools - Memory security"""
with self._pool_lock:
if not self._is_closed:
for profile, client in list(self._connection_pools.items()):
try:
client.close()
except Exception:
pass # Ignore errors during cleaning
self._connection_pools.clear()
self._is_closed = True
# =============================================================================
# Global instances and convenient interfaces
# =============================================================================
# Global Elastic Connection Manager
_resilient_manager = ResilientConnectionManager()
# Main public interface
def get_persistent_http_client(service_profile: str = "standard") -> httpx.Client:
"""
Get persistent HTTP client - main interface
Recommended service configuration profiles:
- "standard": generic API
- "cloud_api": Azure/cloud service API
- "ai_inference": OpenAI/VLLM etc. AI services
- "batch_processing": long-term batch processing tasks
"""
return _resilient_manager.get_persistent_client(service_profile)
def resilient_http_session(service_profile: str = "standard"):
"""
Elastic HTTP Session Context Manager - Recommended for retry logic
Example of usage:
with resilient_http_session("ai_inference") as client:
for retry in range(3):
response = client.post(endpoint, json=data)
"""
return _resilient_manager.resilient_session(service_profile)
def get_connection_pool_stats() -> Dict[str, Dict]:
"""Get connection pool statistics"""
return _resilient_manager.get_pool_statistics()
def refresh_connection_pool(service_profile: str):
"""Refresh the specified connection pool"""
_resilient_manager.force_refresh_pool(service_profile)
# =============================================================================
# Convenient dedicated client interfaces - more intuitive naming
# =============================================================================
def get_standard_client() -> httpx.Client:
"""Get the standard client (generic HTTP request)"""
return get_persistent_http_client("standard")
def get_cloud_api_client() -> httpx.Client:
"""Get dedicated cloud API clients (Azure Search, Storage, etc.)"""
return get_persistent_http_client("cloud_api")
def get_ai_inference_client() -> httpx.Client:
"""Get AI Inference Dedicated Clients (OpenAI, VLLM, etc.)"""
return get_persistent_http_client("ai_inference")
def get_batch_processing_client() -> httpx.Client:
"""Get a batch-specific client (long-term task)"""
return get_persistent_http_client("batch_processing")