83 lines
2.6 KiB
Python
83 lines
2.6 KiB
Python
"""JWT access token creation and decoding.
|
|
|
|
Uses python-jose for HS256 token signing. Token expiry is enforced at
|
|
decode time so expired tokens are rejected even if the signature is valid.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
from typing import Any
|
|
|
|
from jose import JWTError, jwt
|
|
from loguru import logger
|
|
|
|
from app.domain.auth.models import UserClaims, UserRole
|
|
|
|
|
|
class JWTHandler:
|
|
"""Create and validate HS256 JWT access tokens.
|
|
|
|
A single shared instance is wired by bootstrap.py. Use
|
|
get_jwt_handler() from shared.bootstrap for all token operations.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
secret_key: str,
|
|
algorithm: str = "HS256",
|
|
expire_minutes: int = 480,
|
|
) -> None:
|
|
"""Initialise the handler with signing credentials and token lifetime."""
|
|
self._secret = secret_key
|
|
self._algorithm = algorithm
|
|
self._expire_minutes = expire_minutes
|
|
|
|
def create_access_token(
|
|
self,
|
|
*,
|
|
user_id: str,
|
|
username: str,
|
|
role: str,
|
|
) -> str:
|
|
"""Return a signed JWT containing user identity and role claims."""
|
|
now = datetime.now(UTC)
|
|
payload: dict[str, Any] = {
|
|
"sub": user_id,
|
|
"username": username,
|
|
"role": role,
|
|
"iat": now,
|
|
"exp": now + timedelta(minutes=self._expire_minutes),
|
|
}
|
|
return jwt.encode(payload, self._secret, algorithm=self._algorithm)
|
|
|
|
def decode_token(self, token: str) -> UserClaims:
|
|
"""Decode and validate a JWT, returning UserClaims.
|
|
|
|
Raises ValueError with a descriptive message on expiry, tampering,
|
|
or any other validation failure so callers do not need to know jose.
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, self._secret, algorithms=[self._algorithm])
|
|
except JWTError as exc:
|
|
msg = str(exc).lower()
|
|
if "expired" in msg:
|
|
raise ValueError("Token expired") from exc
|
|
raise ValueError(f"Invalid token: {exc}") from exc
|
|
|
|
user_id = payload.get("sub")
|
|
username = payload.get("username", "")
|
|
role_str = payload.get("role", UserRole.READONLY.value)
|
|
|
|
if not user_id:
|
|
raise ValueError("Token missing subject claim")
|
|
|
|
try:
|
|
role = UserRole(role_str)
|
|
except ValueError:
|
|
logger.warning("Unknown role in token: {}, defaulting to readonly", role_str)
|
|
role = UserRole.READONLY
|
|
|
|
return UserClaims(user_id=user_id, username=username, role=role)
|