Files
Aegis/backend/app/routers/auth.py
Kitos 64d64080e0 fix: resolve 20 security vulnerabilities from comprehensive audit
Critical (1-3):
- Replace hardcoded admin credentials with secure auto-generation (seed.py)
- Enforce SECRET_KEY configuration, fail in production if missing (config.py)
- Add Zip Slip and Zip Bomb protection to all ZIP import services

High/Medium (4-9):
- Add 50MB file size limit and extension whitelist to evidence uploads
- Configure CORS origins via environment variable instead of hardcoded
- Migrate JWT storage from localStorage to HttpOnly cookies (frontend+backend)
- Add rate limiting (5/min) on login endpoint via slowapi
- Replace generic dict payloads with Pydantic schemas (mass assignment)

Medium (10-17):
- Check is_active on login to prevent disabled users from authenticating
- Sanitize exception messages in API responses (system, data_sources)
- Escape LIKE wildcards in all ilike search filters across 8 routers
- Run Docker container as non-root user (appuser)
- Make MINIO_SECURE configurable via environment variable
- Add password complexity policy (12+ chars, upper/lower/digit/special)
- Implement JWT token revocation via in-memory blacklist + reduce TTL to 15min
- Replace xml.etree with defusedxml to prevent Billion Laughs attacks

Low (18-20):
- Add security headers to Nginx (CSP, X-Frame-Options, HSTS-ready, etc.)
- Disable Swagger UI/ReDoc/OpenAPI in production
- Restrict /health endpoint to internal networks via Nginx ACL

Also: rewrite install.sh as interactive wizard for guided deployment,
fix test-from-template validation error (technique_id UUID vs MITRE ID)
2026-02-11 08:56:26 +01:00

139 lines
4.5 KiB
Python

"""Authentication router: login, logout and current-user endpoints.
The JWT access token is delivered as an **HttpOnly** cookie
(``aegis_token``) so it is inaccessible to client-side JavaScript,
mitigating XSS token-theft attacks. The JSON response also includes
the token in the body for backwards compatibility and for clients that
cannot use cookies (e.g. Swagger UI).
"""
import os
from fastapi import APIRouter, Cookie, Depends, HTTPException, Request, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from slowapi import Limiter
from slowapi.util import get_remote_address
from sqlalchemy.orm import Session
from jose import jwt, JWTError
from app.auth import verify_password, create_access_token, blacklist_token
from app.config import settings
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.user import User
from app.schemas.auth import TokenResponse, UserOut
# Rate limiter instance (shares backend state via app.state.limiter)
limiter = Limiter(key_func=get_remote_address)
router = APIRouter(prefix="/auth", tags=["auth"])
# Detect whether we're behind HTTPS (production) so the cookie can be Secure
_IS_HTTPS = os.environ.get("AEGIS_ENV", "").lower() == "production"
# Cookie name used to transport the JWT
_COOKIE_NAME = "aegis_token"
# ---------------------------------------------------------------------------
# POST /auth/login
# ---------------------------------------------------------------------------
@router.post("/login", response_model=TokenResponse)
@limiter.limit("5/minute")
def login(
request: Request,
response: Response,
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
):
"""Authenticate a user and return a JWT access token.
Rate-limited to **5 attempts per minute per IP** to prevent brute-force
attacks. The token is set as an HttpOnly cookie **and** returned in the
JSON body for API/Swagger compatibility.
"""
user = db.query(User).filter(User.username == form_data.username).first()
if user is None or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect username or password",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is disabled. Contact an administrator.",
)
access_token = create_access_token(data={"sub": user.username})
# Set HttpOnly cookie — inaccessible from JS
response.set_cookie(
key=_COOKIE_NAME,
value=access_token,
httponly=True,
secure=_IS_HTTPS,
samesite="strict",
max_age=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
path="/",
)
return TokenResponse(access_token=access_token)
# ---------------------------------------------------------------------------
# POST /auth/logout
# ---------------------------------------------------------------------------
@router.post("/logout")
def logout(
request: Request,
response: Response,
aegis_token: str | None = Cookie(None),
):
"""Clear the authentication cookie and revoke the current token.
The token's ``jti`` is added to an in-memory blacklist so it cannot
be reused even if the cookie has already been copied elsewhere.
"""
# Attempt to blacklist the token's jti
token = aegis_token or request.headers.get("Authorization", "").removeprefix("Bearer ").strip()
if token:
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
jti = payload.get("jti")
exp = payload.get("exp", 0)
if jti:
blacklist_token(jti, float(exp))
except JWTError:
pass # token already invalid — nothing to revoke
response.delete_cookie(
key=_COOKIE_NAME,
httponly=True,
secure=_IS_HTTPS,
samesite="strict",
path="/",
)
return {"detail": "Logged out"}
# ---------------------------------------------------------------------------
# GET /auth/me
# ---------------------------------------------------------------------------
@router.get("/me", response_model=UserOut)
def read_current_user(current_user: User = Depends(get_current_user)):
"""Return the profile of the currently authenticated user."""
return current_user