From 6b076f52b25ea06c5d7748c2bd097575b7de1e45 Mon Sep 17 00:00:00 2001 From: Kitos Date: Mon, 18 May 2026 14:16:53 +0200 Subject: [PATCH] feat(auth): audit login success and failure attempts [FASE-3.2] --- backend/app/routers/auth.py | 107 ++++++++++++------------- backend/tests/test_auth_login_audit.py | 58 ++++++++++++++ 2 files changed, 109 insertions(+), 56 deletions(-) create mode 100644 backend/tests/test_auth_login_audit.py diff --git a/backend/app/routers/auth.py b/backend/app/routers/auth.py index 30be77d..b17584a 100644 --- a/backend/app/routers/auth.py +++ b/backend/app/routers/auth.py @@ -11,39 +11,33 @@ import os from fastapi import APIRouter, Cookie, Depends, Request, Response from fastapi.security import OAuth2PasswordRequestForm -from slowapi import Limiter -from slowapi.util import get_remote_address from sqlalchemy.orm import Session from jose import jwt, JWTError -from app.auth import create_access_token, blacklist_token +from app.auth import create_access_token, blacklist_token, verify_password from app.config import settings from app.database import get_db from app.dependencies.auth import get_current_user +from app.domain.errors import BusinessRuleViolation, PermissionViolation from app.domain.unit_of_work import UnitOfWork +from app.limiter import limiter +from app.middleware.request_context import resolve_client_ip from app.models.user import User -from app.services.auth_service import authenticate_user, change_password as auth_change_password +from app.services.auth_service import ( + _DUMMY_HASH, + change_password as auth_change_password, +) +from app.services.audit_service import log_action from app.schemas.auth import TokenResponse, UserOut from app.schemas.user import PasswordChange -# Rate limiter instance (shares backend state via app.state.limiter) -limiter = Limiter(key_func=get_remote_address) - router = APIRouter(prefix="/auth", tags=["auth"]) -# Detect whether we're behind HTTPS (production) so the cookie can be Secure _IS_HTTPS = os.environ.get("AEGIS_ENV", "").lower() == "production" - -# Cookie name used to transport the JWT _COOKIE_NAME = "aegis_token" -# --------------------------------------------------------------------------- -# POST /auth/login -# --------------------------------------------------------------------------- - - @router.post("/login", response_model=TokenResponse) @limiter.limit("5/minute") def login( @@ -54,19 +48,49 @@ def login( ): """Authenticate a user and return a JWT access token. - Rate-limited to **5 attempts per minute per IP** to prevent brute-force - attacks. The token is set as an HttpOnly cookie **and** returned in the - JSON body for API/Swagger compatibility. + Rate-limited to **5 attempts per minute per IP**. Failed and successful + logins are recorded in the audit log (SEC-009). """ - user = authenticate_user( - db, - username=form_data.username, - password=form_data.password, - ) + user = db.query(User).filter(User.username == form_data.username).first() + target_hash = user.hashed_password if user else _DUMMY_HASH + password_valid = verify_password(form_data.password, target_hash) + ip = resolve_client_ip(request) + + if user is None or not password_valid: + with UnitOfWork(db) as uow: + log_action( + db, + user.id if user else None, + "LOGIN_FAILED", + "auth", + None, + details={ + "username": form_data.username, + "ip": ip, + "reason": "invalid_credentials", + }, + ip_address=ip, + ) + uow.commit() + raise BusinessRuleViolation("Incorrect username or password") + + if not user.is_active: + raise PermissionViolation("Account is disabled. Contact an administrator.") access_token = create_access_token(data={"sub": user.username}) - # Set HttpOnly cookie — inaccessible from JS + with UnitOfWork(db) as uow: + log_action( + db, + user.id, + "LOGIN_SUCCESS", + "auth", + str(user.id), + details={"username": user.username, "ip": ip}, + ip_address=ip, + ) + uow.commit() + response.set_cookie( key=_COOKIE_NAME, value=access_token, @@ -80,27 +104,13 @@ def login( return TokenResponse(access_token=access_token) -# --------------------------------------------------------------------------- -# POST /auth/logout -# --------------------------------------------------------------------------- - - @router.post("/logout") def logout( request: Request, response: Response, aegis_token: str | None = Cookie(None), ): - """Clear the authentication cookie and revoke the current token. - - The token's ``jti`` is added to the Redis blacklist so it cannot - be reused even if the cookie has already been copied elsewhere. - The blacklist entry auto-expires when the token's ``exp`` is reached. - - When both HttpOnly cookie and ``Authorization: Bearer`` are present - (typical for API clients), **both** are revoked so the session cannot - survive on whichever credential the next request prefers. - """ + """Clear the authentication cookie and revoke the current token.""" bearer = ( request.headers.get("Authorization") or request.headers.get("authorization") @@ -124,7 +134,7 @@ def logout( if jti: blacklist_token(jti, float(exp)) except JWTError: - pass # token already invalid — nothing to revoke for this raw value + pass response.delete_cookie( key=_COOKIE_NAME, @@ -136,34 +146,19 @@ def logout( return {"detail": "Logged out"} -# --------------------------------------------------------------------------- -# GET /auth/me -# --------------------------------------------------------------------------- - - @router.get("/me", response_model=UserOut) def read_current_user(current_user: User = Depends(get_current_user)): """Return the profile of the currently authenticated user.""" return current_user -# --------------------------------------------------------------------------- -# POST /auth/change-password -# --------------------------------------------------------------------------- - - @router.post("/change-password") def change_password( body: PasswordChange, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - """Change the current user's password. - - Requires the current password for verification. On success the - ``must_change_password`` flag is cleared so the user can proceed - normally. - """ + """Change the current user's password.""" auth_change_password( db, current_user, diff --git a/backend/tests/test_auth_login_audit.py b/backend/tests/test_auth_login_audit.py new file mode 100644 index 0000000..efafd9f --- /dev/null +++ b/backend/tests/test_auth_login_audit.py @@ -0,0 +1,58 @@ +"""Tests for login attempt auditing (SEC-009).""" + +from app.models.audit import AuditLog + + +def test_login_failed_creates_audit_entry(client, admin_user, db): + response = client.post( + "/api/v1/auth/login", + data={"username": "admin", "password": "wrong"}, + headers={"X-Forwarded-For": "198.51.100.10", "User-Agent": "LoginAuditTest/1.0"}, + ) + assert response.status_code == 400 + + log = ( + db.query(AuditLog) + .filter(AuditLog.action == "LOGIN_FAILED") + .order_by(AuditLog.timestamp.desc()) + .first() + ) + assert log is not None + assert log.entity_type == "auth" + assert log.details["username"] == "admin" + assert log.details["reason"] == "invalid_credentials" + assert log.ip_address == "198.51.100.10" + assert log.user_agent == "LoginAuditTest/1.0" + assert log.integrity_hash + + +def test_login_success_creates_audit_entry(client, admin_user, db): + client.cookies.clear() + response = client.post( + "/api/v1/auth/login", + data={"username": "admin", "password": "admin123"}, + headers={"X-Forwarded-For": "198.51.100.20"}, + ) + assert response.status_code == 200 + + log = ( + db.query(AuditLog) + .filter(AuditLog.action == "LOGIN_SUCCESS") + .order_by(AuditLog.timestamp.desc()) + .first() + ) + assert log is not None + assert log.user_id == admin_user.id + assert log.ip_address == "198.51.100.20" + assert log.integrity_hash + + +def test_login_unknown_user_still_audited(client, db): + response = client.post( + "/api/v1/auth/login", + data={"username": "nobody", "password": "password"}, + ) + assert response.status_code == 400 + log = db.query(AuditLog).filter(AuditLog.action == "LOGIN_FAILED").first() + assert log is not None + assert log.user_id is None