95 lines
2.8 KiB
Python
95 lines
2.8 KiB
Python
import logging
|
|
import json
|
|
import time
|
|
from typing import Dict, Any, Optional
|
|
from datetime import datetime
|
|
|
|
|
|
def setup_logging(level: str = "INFO", format_type: str = "json") -> None:
|
|
"""Setup structured logging"""
|
|
if format_type == "json":
|
|
formatter = JsonFormatter()
|
|
else:
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(formatter)
|
|
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(getattr(logging, level.upper()))
|
|
root_logger.addHandler(handler)
|
|
|
|
|
|
class JsonFormatter(logging.Formatter):
|
|
"""JSON log formatter"""
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
log_data = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"level": record.levelname,
|
|
"logger": record.name,
|
|
"message": record.getMessage(),
|
|
}
|
|
|
|
# Add extra fields
|
|
if hasattr(record, "request_id"):
|
|
log_data["request_id"] = getattr(record, "request_id")
|
|
if hasattr(record, "session_id"):
|
|
log_data["session_id"] = getattr(record, "session_id")
|
|
if hasattr(record, "duration_ms"):
|
|
log_data["duration_ms"] = getattr(record, "duration_ms")
|
|
|
|
return json.dumps(log_data)
|
|
|
|
|
|
class Timer:
|
|
"""Simple timer context manager"""
|
|
|
|
def __init__(self):
|
|
self.start_time = None
|
|
self.end_time = None
|
|
|
|
def __enter__(self):
|
|
self.start_time = time.time()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.end_time = time.time()
|
|
|
|
@property
|
|
def elapsed_ms(self) -> int:
|
|
if self.start_time and self.end_time:
|
|
return int((self.end_time - self.start_time) * 1000)
|
|
return 0
|
|
|
|
|
|
def redact_secrets(data: Dict[str, Any], secret_keys: list[str] | None = None) -> Dict[str, Any]:
|
|
"""Redact sensitive information from logs"""
|
|
if secret_keys is None:
|
|
secret_keys = ["api_key", "password", "token", "secret", "key"]
|
|
|
|
redacted = {}
|
|
for key, value in data.items():
|
|
if any(secret in key.lower() for secret in secret_keys):
|
|
redacted[key] = "***REDACTED***"
|
|
elif isinstance(value, dict):
|
|
redacted[key] = redact_secrets(value, secret_keys)
|
|
else:
|
|
redacted[key] = value
|
|
|
|
return redacted
|
|
|
|
|
|
def generate_request_id() -> str:
|
|
"""Generate unique request ID"""
|
|
return f"req_{int(time.time() * 1000)}_{hash(time.time()) % 10000:04d}"
|
|
|
|
|
|
def truncate_text(text: str, max_length: int = 1000, suffix: str = "...") -> str:
|
|
"""Truncate text to maximum length"""
|
|
if len(text) <= max_length:
|
|
return text
|
|
return text[:max_length - len(suffix)] + suffix
|