"""Authentication router: login, logout and current-user endpoints. The JWT access token is delivered as an **HttpOnly** cookie (``aegis_token``) so it is inaccessible to client-side JavaScript, mitigating XSS token-theft attacks. The JSON response also includes the token in the body for backwards compatibility and for clients that cannot use cookies (e.g. Swagger UI). """ import os from fastapi import APIRouter, Cookie, Depends, Request, Response from fastapi.security import OAuth2PasswordRequestForm from slowapi import Limiter from slowapi.util import get_remote_address from sqlalchemy.orm import Session from jose import jwt, JWTError from app.auth import create_access_token, blacklist_token from app.config import settings from app.database import get_db from app.dependencies.auth import get_current_user from app.domain.unit_of_work import UnitOfWork from app.models.user import User from app.services.auth_service import authenticate_user, change_password as auth_change_password from app.schemas.auth import TokenResponse, UserOut from app.schemas.user import PasswordChange # Rate limiter instance (shares backend state via app.state.limiter) limiter = Limiter(key_func=get_remote_address) router = APIRouter(prefix="/auth", tags=["auth"]) # Detect whether we're behind HTTPS (production) so the cookie can be Secure _IS_HTTPS = os.environ.get("AEGIS_ENV", "").lower() == "production" # Cookie name used to transport the JWT _COOKIE_NAME = "aegis_token" # --------------------------------------------------------------------------- # POST /auth/login # --------------------------------------------------------------------------- @router.post("/login", response_model=TokenResponse) @limiter.limit("5/minute") def login( request: Request, response: Response, form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db), ): """Authenticate a user and return a JWT access token. Rate-limited to **5 attempts per minute per IP** to prevent brute-force attacks. The token is set as an HttpOnly cookie **and** returned in the JSON body for API/Swagger compatibility. """ user = authenticate_user( db, username=form_data.username, password=form_data.password, ) access_token = create_access_token(data={"sub": user.username}) # Set HttpOnly cookie — inaccessible from JS response.set_cookie( key=_COOKIE_NAME, value=access_token, httponly=True, secure=_IS_HTTPS, samesite="strict", max_age=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, path="/", ) return TokenResponse(access_token=access_token) # --------------------------------------------------------------------------- # POST /auth/logout # --------------------------------------------------------------------------- @router.post("/logout") def logout( request: Request, response: Response, aegis_token: str | None = Cookie(None), ): """Clear the authentication cookie and revoke the current token. The token's ``jti`` is added to the Redis blacklist so it cannot be reused even if the cookie has already been copied elsewhere. The blacklist entry auto-expires when the token's ``exp`` is reached. When both HttpOnly cookie and ``Authorization: Bearer`` are present (typical for API clients), **both** are revoked so the session cannot survive on whichever credential the next request prefers. """ bearer = ( request.headers.get("Authorization") or request.headers.get("authorization") or "" ) bearer = bearer.removeprefix("Bearer ").removeprefix("bearer ").strip() seen: set[str] = set() for raw in (aegis_token, bearer): if not raw or raw in seen: continue seen.add(raw) try: payload = jwt.decode( raw, settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) jti = payload.get("jti") exp = payload.get("exp", 0) if jti: blacklist_token(jti, float(exp)) except JWTError: pass # token already invalid — nothing to revoke for this raw value response.delete_cookie( key=_COOKIE_NAME, httponly=True, secure=_IS_HTTPS, samesite="strict", path="/", ) return {"detail": "Logged out"} # --------------------------------------------------------------------------- # GET /auth/me # --------------------------------------------------------------------------- @router.get("/me", response_model=UserOut) def read_current_user(current_user: User = Depends(get_current_user)): """Return the profile of the currently authenticated user.""" return current_user # --------------------------------------------------------------------------- # POST /auth/change-password # --------------------------------------------------------------------------- @router.post("/change-password") def change_password( body: PasswordChange, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Change the current user's password. Requires the current password for verification. On success the ``must_change_password`` flag is cleared so the user can proceed normally. """ auth_change_password( db, current_user, current_password=body.current_password, new_password=body.new_password, ) with UnitOfWork(db) as uow: uow.commit() return {"detail": "Password changed successfully"}