feat(auth): audit login success and failure attempts [FASE-3.2]

This commit is contained in:
2026-05-18 14:16:53 +02:00
parent c0aff4cbeb
commit 6b076f52b2
2 changed files with 109 additions and 56 deletions

View File

@@ -11,39 +11,33 @@ import os
from fastapi import APIRouter, Cookie, Depends, Request, Response from fastapi import APIRouter, Cookie, Depends, Request, Response
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
from slowapi import Limiter
from slowapi.util import get_remote_address
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from jose import jwt, JWTError from jose import jwt, JWTError
from app.auth import create_access_token, blacklist_token from app.auth import create_access_token, blacklist_token, verify_password
from app.config import settings from app.config import settings
from app.database import get_db from app.database import get_db
from app.dependencies.auth import get_current_user from app.dependencies.auth import get_current_user
from app.domain.errors import BusinessRuleViolation, PermissionViolation
from app.domain.unit_of_work import UnitOfWork from app.domain.unit_of_work import UnitOfWork
from app.limiter import limiter
from app.middleware.request_context import resolve_client_ip
from app.models.user import User from app.models.user import User
from app.services.auth_service import authenticate_user, change_password as auth_change_password from app.services.auth_service import (
_DUMMY_HASH,
change_password as auth_change_password,
)
from app.services.audit_service import log_action
from app.schemas.auth import TokenResponse, UserOut from app.schemas.auth import TokenResponse, UserOut
from app.schemas.user import PasswordChange from app.schemas.user import PasswordChange
# Rate limiter instance (shares backend state via app.state.limiter)
limiter = Limiter(key_func=get_remote_address)
router = APIRouter(prefix="/auth", tags=["auth"]) router = APIRouter(prefix="/auth", tags=["auth"])
# Detect whether we're behind HTTPS (production) so the cookie can be Secure
_IS_HTTPS = os.environ.get("AEGIS_ENV", "").lower() == "production" _IS_HTTPS = os.environ.get("AEGIS_ENV", "").lower() == "production"
# Cookie name used to transport the JWT
_COOKIE_NAME = "aegis_token" _COOKIE_NAME = "aegis_token"
# ---------------------------------------------------------------------------
# POST /auth/login
# ---------------------------------------------------------------------------
@router.post("/login", response_model=TokenResponse) @router.post("/login", response_model=TokenResponse)
@limiter.limit("5/minute") @limiter.limit("5/minute")
def login( def login(
@@ -54,19 +48,49 @@ def login(
): ):
"""Authenticate a user and return a JWT access token. """Authenticate a user and return a JWT access token.
Rate-limited to **5 attempts per minute per IP** to prevent brute-force Rate-limited to **5 attempts per minute per IP**. Failed and successful
attacks. The token is set as an HttpOnly cookie **and** returned in the logins are recorded in the audit log (SEC-009).
JSON body for API/Swagger compatibility.
""" """
user = authenticate_user( user = db.query(User).filter(User.username == form_data.username).first()
target_hash = user.hashed_password if user else _DUMMY_HASH
password_valid = verify_password(form_data.password, target_hash)
ip = resolve_client_ip(request)
if user is None or not password_valid:
with UnitOfWork(db) as uow:
log_action(
db, db,
username=form_data.username, user.id if user else None,
password=form_data.password, "LOGIN_FAILED",
"auth",
None,
details={
"username": form_data.username,
"ip": ip,
"reason": "invalid_credentials",
},
ip_address=ip,
) )
uow.commit()
raise BusinessRuleViolation("Incorrect username or password")
if not user.is_active:
raise PermissionViolation("Account is disabled. Contact an administrator.")
access_token = create_access_token(data={"sub": user.username}) access_token = create_access_token(data={"sub": user.username})
# Set HttpOnly cookie — inaccessible from JS with UnitOfWork(db) as uow:
log_action(
db,
user.id,
"LOGIN_SUCCESS",
"auth",
str(user.id),
details={"username": user.username, "ip": ip},
ip_address=ip,
)
uow.commit()
response.set_cookie( response.set_cookie(
key=_COOKIE_NAME, key=_COOKIE_NAME,
value=access_token, value=access_token,
@@ -80,27 +104,13 @@ def login(
return TokenResponse(access_token=access_token) return TokenResponse(access_token=access_token)
# ---------------------------------------------------------------------------
# POST /auth/logout
# ---------------------------------------------------------------------------
@router.post("/logout") @router.post("/logout")
def logout( def logout(
request: Request, request: Request,
response: Response, response: Response,
aegis_token: str | None = Cookie(None), aegis_token: str | None = Cookie(None),
): ):
"""Clear the authentication cookie and revoke the current token. """Clear the authentication cookie and revoke the current token."""
The token's ``jti`` is added to the Redis blacklist so it cannot
be reused even if the cookie has already been copied elsewhere.
The blacklist entry auto-expires when the token's ``exp`` is reached.
When both HttpOnly cookie and ``Authorization: Bearer`` are present
(typical for API clients), **both** are revoked so the session cannot
survive on whichever credential the next request prefers.
"""
bearer = ( bearer = (
request.headers.get("Authorization") request.headers.get("Authorization")
or request.headers.get("authorization") or request.headers.get("authorization")
@@ -124,7 +134,7 @@ def logout(
if jti: if jti:
blacklist_token(jti, float(exp)) blacklist_token(jti, float(exp))
except JWTError: except JWTError:
pass # token already invalid — nothing to revoke for this raw value pass
response.delete_cookie( response.delete_cookie(
key=_COOKIE_NAME, key=_COOKIE_NAME,
@@ -136,34 +146,19 @@ def logout(
return {"detail": "Logged out"} return {"detail": "Logged out"}
# ---------------------------------------------------------------------------
# GET /auth/me
# ---------------------------------------------------------------------------
@router.get("/me", response_model=UserOut) @router.get("/me", response_model=UserOut)
def read_current_user(current_user: User = Depends(get_current_user)): def read_current_user(current_user: User = Depends(get_current_user)):
"""Return the profile of the currently authenticated user.""" """Return the profile of the currently authenticated user."""
return current_user return current_user
# ---------------------------------------------------------------------------
# POST /auth/change-password
# ---------------------------------------------------------------------------
@router.post("/change-password") @router.post("/change-password")
def change_password( def change_password(
body: PasswordChange, body: PasswordChange,
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
"""Change the current user's password. """Change the current user's password."""
Requires the current password for verification. On success the
``must_change_password`` flag is cleared so the user can proceed
normally.
"""
auth_change_password( auth_change_password(
db, db,
current_user, current_user,

View File

@@ -0,0 +1,58 @@
"""Tests for login attempt auditing (SEC-009)."""
from app.models.audit import AuditLog
def test_login_failed_creates_audit_entry(client, admin_user, db):
response = client.post(
"/api/v1/auth/login",
data={"username": "admin", "password": "wrong"},
headers={"X-Forwarded-For": "198.51.100.10", "User-Agent": "LoginAuditTest/1.0"},
)
assert response.status_code == 400
log = (
db.query(AuditLog)
.filter(AuditLog.action == "LOGIN_FAILED")
.order_by(AuditLog.timestamp.desc())
.first()
)
assert log is not None
assert log.entity_type == "auth"
assert log.details["username"] == "admin"
assert log.details["reason"] == "invalid_credentials"
assert log.ip_address == "198.51.100.10"
assert log.user_agent == "LoginAuditTest/1.0"
assert log.integrity_hash
def test_login_success_creates_audit_entry(client, admin_user, db):
client.cookies.clear()
response = client.post(
"/api/v1/auth/login",
data={"username": "admin", "password": "admin123"},
headers={"X-Forwarded-For": "198.51.100.20"},
)
assert response.status_code == 200
log = (
db.query(AuditLog)
.filter(AuditLog.action == "LOGIN_SUCCESS")
.order_by(AuditLog.timestamp.desc())
.first()
)
assert log is not None
assert log.user_id == admin_user.id
assert log.ip_address == "198.51.100.20"
assert log.integrity_hash
def test_login_unknown_user_still_audited(client, db):
response = client.post(
"/api/v1/auth/login",
data={"username": "nobody", "password": "password"},
)
assert response.status_code == 400
log = db.query(AuditLog).filter(AuditLog.action == "LOGIN_FAILED").first()
assert log is not None
assert log.user_id is None