73 lines
2.5 KiB
Python
73 lines
2.5 KiB
Python
|
|
"""FastAPI dependencies for JWT authentication.
|
||
|
|
|
||
|
|
Usage in a route:
|
||
|
|
from app.api.dependencies.auth import get_current_user, require_role
|
||
|
|
from app.domain.auth.models import UserRole
|
||
|
|
|
||
|
|
@router.get("/protected")
|
||
|
|
async def protected(user: UserClaims = Depends(get_current_user)):
|
||
|
|
return {"user": user.username}
|
||
|
|
|
||
|
|
@router.delete("/admin-only")
|
||
|
|
async def admin_only(user: UserClaims = Depends(require_role(UserRole.ADMIN))):
|
||
|
|
...
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from fastapi import Depends, HTTPException, status
|
||
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||
|
|
|
||
|
|
from app.config.settings import settings
|
||
|
|
from app.domain.auth.models import UserClaims, UserRole
|
||
|
|
from app.shared.bootstrap import get_jwt_handler
|
||
|
|
|
||
|
|
# Use Bearer token scheme — client sends `Authorization: Bearer <token>`.
|
||
|
|
_bearer = HTTPBearer(auto_error=False)
|
||
|
|
|
||
|
|
|
||
|
|
async def get_current_user(
|
||
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(_bearer),
|
||
|
|
) -> UserClaims:
|
||
|
|
"""Extract and validate the JWT from the Authorization header.
|
||
|
|
|
||
|
|
Returns the decoded UserClaims on success.
|
||
|
|
Raises HTTP 401 when the token is missing, expired, or invalid.
|
||
|
|
When auth_enabled=False (development), returns a synthetic admin user.
|
||
|
|
"""
|
||
|
|
if not settings.auth_enabled:
|
||
|
|
# Development bypass — never enable this in production.
|
||
|
|
return UserClaims(user_id="dev", username="dev-admin", role=UserRole.ADMIN)
|
||
|
|
|
||
|
|
if credentials is None:
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
|
detail="Missing authentication token",
|
||
|
|
headers={"WWW-Authenticate": "Bearer"},
|
||
|
|
)
|
||
|
|
try:
|
||
|
|
return get_jwt_handler().decode_token(credentials.credentials)
|
||
|
|
except ValueError as exc:
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
|
detail=str(exc),
|
||
|
|
headers={"WWW-Authenticate": "Bearer"},
|
||
|
|
) from exc
|
||
|
|
|
||
|
|
|
||
|
|
def require_role(*roles: UserRole):
|
||
|
|
"""Return a dependency that enforces one of the given roles.
|
||
|
|
|
||
|
|
Example:
|
||
|
|
Depends(require_role(UserRole.ADMIN, UserRole.LEGAL))
|
||
|
|
"""
|
||
|
|
async def _check(user: UserClaims = Depends(get_current_user)) -> UserClaims:
|
||
|
|
"""Verify the user holds one of the required roles."""
|
||
|
|
if user.role not in roles:
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
||
|
|
detail=f"Role '{user.role}' is not permitted. Required: {[r.value for r in roles]}",
|
||
|
|
)
|
||
|
|
return user
|
||
|
|
return _check
|