import logging import json import time from typing import Dict, Any, Optional from datetime import datetime def setup_logging(level: str = "INFO", format_type: str = "json") -> None: """Setup structured logging""" if format_type == "json": formatter = JsonFormatter() else: formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler = logging.StreamHandler() handler.setFormatter(formatter) root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, level.upper())) root_logger.addHandler(handler) class JsonFormatter(logging.Formatter): """JSON log formatter""" def format(self, record: logging.LogRecord) -> str: log_data = { "timestamp": datetime.utcnow().isoformat(), "level": record.levelname, "logger": record.name, "message": record.getMessage(), } # Add extra fields if hasattr(record, "request_id"): log_data["request_id"] = getattr(record, "request_id") if hasattr(record, "session_id"): log_data["session_id"] = getattr(record, "session_id") if hasattr(record, "duration_ms"): log_data["duration_ms"] = getattr(record, "duration_ms") return json.dumps(log_data) class Timer: """Simple timer context manager""" def __init__(self): self.start_time = None self.end_time = None def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): self.end_time = time.time() @property def elapsed_ms(self) -> int: if self.start_time and self.end_time: return int((self.end_time - self.start_time) * 1000) return 0 def redact_secrets(data: Dict[str, Any], secret_keys: list[str] | None = None) -> Dict[str, Any]: """Redact sensitive information from logs""" if secret_keys is None: secret_keys = ["api_key", "password", "token", "secret", "key"] redacted = {} for key, value in data.items(): if any(secret in key.lower() for secret in secret_keys): redacted[key] = "***REDACTED***" elif isinstance(value, dict): redacted[key] = redact_secrets(value, secret_keys) else: redacted[key] = value return redacted def generate_request_id() -> str: """Generate unique request ID""" return f"req_{int(time.time() * 1000)}_{hash(time.time()) % 10000:04d}" def truncate_text(text: str, max_length: int = 1000, suffix: str = "...") -> str: """Truncate text to maximum length""" if len(text) <= max_length: return text return text[:max_length - len(suffix)] + suffix