Files
Aegis/backend/app/routers/auth.py
Kitos c5eb6f6dc1 feat(auth): move JWT blacklist to Redis with TTL [FASE-0.2]
Revoke tokens by jti in a dedicated Redis DB, honor TTL from JWT exp on logout, reject revoked tokens in get_current_user, and add FakeRedis-backed API tests.
2026-05-18 13:19:15 +02:00

177 lines
5.5 KiB
Python

"""Authentication router: login, logout and current-user endpoints.
The JWT access token is delivered as an **HttpOnly** cookie
(``aegis_token``) so it is inaccessible to client-side JavaScript,
mitigating XSS token-theft attacks. The JSON response also includes
the token in the body for backwards compatibility and for clients that
cannot use cookies (e.g. Swagger UI).
"""
import os
from fastapi import APIRouter, Cookie, Depends, Request, Response
from fastapi.security import OAuth2PasswordRequestForm
from slowapi import Limiter
from slowapi.util import get_remote_address
from sqlalchemy.orm import Session
from jose import jwt, JWTError
from app.auth import create_access_token, blacklist_token
from app.config import settings
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.domain.unit_of_work import UnitOfWork
from app.models.user import User
from app.services.auth_service import authenticate_user, change_password as auth_change_password
from app.schemas.auth import TokenResponse, UserOut
from app.schemas.user import PasswordChange
# Rate limiter instance (shares backend state via app.state.limiter)
limiter = Limiter(key_func=get_remote_address)
router = APIRouter(prefix="/auth", tags=["auth"])
# Detect whether we're behind HTTPS (production) so the cookie can be Secure
_IS_HTTPS = os.environ.get("AEGIS_ENV", "").lower() == "production"
# Cookie name used to transport the JWT
_COOKIE_NAME = "aegis_token"
# ---------------------------------------------------------------------------
# POST /auth/login
# ---------------------------------------------------------------------------
@router.post("/login", response_model=TokenResponse)
@limiter.limit("5/minute")
def login(
request: Request,
response: Response,
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
):
"""Authenticate a user and return a JWT access token.
Rate-limited to **5 attempts per minute per IP** to prevent brute-force
attacks. The token is set as an HttpOnly cookie **and** returned in the
JSON body for API/Swagger compatibility.
"""
user = authenticate_user(
db,
username=form_data.username,
password=form_data.password,
)
access_token = create_access_token(data={"sub": user.username})
# Set HttpOnly cookie — inaccessible from JS
response.set_cookie(
key=_COOKIE_NAME,
value=access_token,
httponly=True,
secure=_IS_HTTPS,
samesite="strict",
max_age=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
path="/",
)
return TokenResponse(access_token=access_token)
# ---------------------------------------------------------------------------
# POST /auth/logout
# ---------------------------------------------------------------------------
@router.post("/logout")
def logout(
request: Request,
response: Response,
aegis_token: str | None = Cookie(None),
):
"""Clear the authentication cookie and revoke the current token.
The token's ``jti`` is added to the Redis blacklist so it cannot
be reused even if the cookie has already been copied elsewhere.
The blacklist entry auto-expires when the token's ``exp`` is reached.
When both HttpOnly cookie and ``Authorization: Bearer`` are present
(typical for API clients), **both** are revoked so the session cannot
survive on whichever credential the next request prefers.
"""
bearer = (
request.headers.get("Authorization")
or request.headers.get("authorization")
or ""
)
bearer = bearer.removeprefix("Bearer ").removeprefix("bearer ").strip()
seen: set[str] = set()
for raw in (aegis_token, bearer):
if not raw or raw in seen:
continue
seen.add(raw)
try:
payload = jwt.decode(
raw,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
jti = payload.get("jti")
exp = payload.get("exp", 0)
if jti:
blacklist_token(jti, float(exp))
except JWTError:
pass # token already invalid — nothing to revoke for this raw value
response.delete_cookie(
key=_COOKIE_NAME,
httponly=True,
secure=_IS_HTTPS,
samesite="strict",
path="/",
)
return {"detail": "Logged out"}
# ---------------------------------------------------------------------------
# GET /auth/me
# ---------------------------------------------------------------------------
@router.get("/me", response_model=UserOut)
def read_current_user(current_user: User = Depends(get_current_user)):
"""Return the profile of the currently authenticated user."""
return current_user
# ---------------------------------------------------------------------------
# POST /auth/change-password
# ---------------------------------------------------------------------------
@router.post("/change-password")
def change_password(
body: PasswordChange,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Change the current user's password.
Requires the current password for verification. On success the
``must_change_password`` flag is cleared so the user can proceed
normally.
"""
auth_change_password(
db,
current_user,
current_password=body.current_password,
new_password=body.new_password,
)
with UnitOfWork(db) as uow:
uow.commit()
return {"detail": "Password changed successfully"}