Files
AIRegulation-DocAnalysis/backend/app/api/dependencies/auth.py
wangwei 9fea9c6a53 1. Add 登陆功能
2. 调整字体大小
3. 新增部分功能
2026-06-05 18:00:31 +08:00

73 lines
2.5 KiB
Python

"""FastAPI dependencies for JWT authentication.
Usage in a route:
from app.api.dependencies.auth import get_current_user, require_role
from app.domain.auth.models import UserRole
@router.get("/protected")
async def protected(user: UserClaims = Depends(get_current_user)):
return {"user": user.username}
@router.delete("/admin-only")
async def admin_only(user: UserClaims = Depends(require_role(UserRole.ADMIN))):
...
"""
from __future__ import annotations
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.config.settings import settings
from app.domain.auth.models import UserClaims, UserRole
from app.shared.bootstrap import get_jwt_handler
# Use Bearer token scheme — client sends `Authorization: Bearer <token>`.
_bearer = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(_bearer),
) -> UserClaims:
"""Extract and validate the JWT from the Authorization header.
Returns the decoded UserClaims on success.
Raises HTTP 401 when the token is missing, expired, or invalid.
When auth_enabled=False (development), returns a synthetic admin user.
"""
if not settings.auth_enabled:
# Development bypass — never enable this in production.
return UserClaims(user_id="dev", username="dev-admin", role=UserRole.ADMIN)
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing authentication token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
return get_jwt_handler().decode_token(credentials.credentials)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(exc),
headers={"WWW-Authenticate": "Bearer"},
) from exc
def require_role(*roles: UserRole):
"""Return a dependency that enforces one of the given roles.
Example:
Depends(require_role(UserRole.ADMIN, UserRole.LEGAL))
"""
async def _check(user: UserClaims = Depends(get_current_user)) -> UserClaims:
"""Verify the user holds one of the required roles."""
if user.role not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{user.role}' is not permitted. Required: {[r.value for r in roles]}",
)
return user
return _check