""" Authentication and RBAC dependencies for FastAPI. Provides: - ``get_current_user``: decodes JWT, fetches user from DB, raises 401 on failure. - ``require_role``: factory that returns a dependency enforcing a specific role (admins always pass). """ from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from app.config import settings from app.database import get_db from app.models.user import User # --------------------------------------------------------------------------- # OAuth2 scheme # --------------------------------------------------------------------------- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") # --------------------------------------------------------------------------- # Current-user dependency # --------------------------------------------------------------------------- async def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> User: """Decode the JWT *token*, look up the user in *db*, and return it. Raises :class:`~fastapi.HTTPException` **401** when: - the token cannot be decoded, - the ``sub`` claim is missing, or - no matching active user exists in the database. """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) username: str | None = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = db.query(User).filter(User.username == username).first() if user is None: raise credentials_exception return user # --------------------------------------------------------------------------- # Role-based access control dependency # --------------------------------------------------------------------------- def require_role(required_role: str): """Return a FastAPI dependency that enforces *required_role*. The dependency allows the request to proceed when ``user.role == required_role`` **or** ``user.role == "admin"``. Otherwise it raises :class:`~fastapi.HTTPException` **403**. """ async def role_checker( current_user: User = Depends(get_current_user), ) -> User: if current_user.role != required_role and current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions", ) return current_user return role_checker