"""JWT access token creation and decoding. Uses python-jose for HS256 token signing. Token expiry is enforced at decode time so expired tokens are rejected even if the signature is valid. """ from __future__ import annotations from datetime import UTC, datetime, timedelta from typing import Any from jose import JWTError, jwt from loguru import logger from app.domain.auth.models import UserClaims, UserRole class JWTHandler: """Create and validate HS256 JWT access tokens. A single shared instance is wired by bootstrap.py. Use get_jwt_handler() from shared.bootstrap for all token operations. """ def __init__( self, *, secret_key: str, algorithm: str = "HS256", expire_minutes: int = 480, ) -> None: """Initialise the handler with signing credentials and token lifetime.""" self._secret = secret_key self._algorithm = algorithm self._expire_minutes = expire_minutes def create_access_token( self, *, user_id: str, username: str, role: str, ) -> str: """Return a signed JWT containing user identity and role claims.""" now = datetime.now(UTC) payload: dict[str, Any] = { "sub": user_id, "username": username, "role": role, "iat": now, "exp": now + timedelta(minutes=self._expire_minutes), } return jwt.encode(payload, self._secret, algorithm=self._algorithm) def decode_token(self, token: str) -> UserClaims: """Decode and validate a JWT, returning UserClaims. Raises ValueError with a descriptive message on expiry, tampering, or any other validation failure so callers do not need to know jose. """ try: payload = jwt.decode(token, self._secret, algorithms=[self._algorithm]) except JWTError as exc: msg = str(exc).lower() if "expired" in msg: raise ValueError("Token expired") from exc raise ValueError(f"Invalid token: {exc}") from exc user_id = payload.get("sub") username = payload.get("username", "") role_str = payload.get("role", UserRole.READONLY.value) if not user_id: raise ValueError("Token missing subject claim") try: role = UserRole(role_str) except ValueError: logger.warning("Unknown role in token: {}, defaulting to readonly", role_str) role = UserRole.READONLY return UserClaims(user_id=user_id, username=username, role=role)