Files
Aegis/backend/app/dependencies/auth.py
kitos 6f4901b611
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
security: fix 6 vulnerabilities identified in SDLC audit
- fix(auth): enforce API key scopes in require_role/require_any_role;
  attach _api_key_scopes to user on API key auth; add require_scope()
  dependency — scopes were stored but never enforced (CWE-285)

- fix(sso): read SECURE_COOKIES env var for SSO cookie instead of
  hardcoded secure=False — SAML sessions now respect HTTPS config (CWE-614)

- fix(webhooks): SSRF prevention — validate webhook URLs against private
  and reserved CIDRs at creation/update time (CWE-918)

- fix(knowledge): restrict playbook/lesson create, update and restore
  to admin/red_lead/blue_lead roles — was open to any authenticated user (CWE-284)

- fix(alerts): restrict alert acknowledge/resolve/dismiss to admin/lead
  roles — any user could silence security alerts (CWE-284)

- security: delete get_admin_creds.py, check_auth.py, deploy.py scripts
  containing hardcoded root SSH credentials and production DB access;
  add scripts/.gitignore to prevent reintroduction (CWE-798)
2026-05-22 09:46:29 +02:00

209 lines
7.4 KiB
Python

"""
Authentication and RBAC dependencies for FastAPI.
Provides:
- ``get_current_user``: decodes JWT from HttpOnly cookie (preferred) or
Authorization header (fallback), fetches user from DB, raises 401 on failure.
Also accepts Aegis API keys (``aegis_…`` prefix) as Bearer tokens.
- ``require_role``: factory that returns a dependency enforcing a specific role
(admins always pass).
"""
from typing import Optional
from fastapi import Cookie, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app import auth as auth_lib
from app.config import settings
from app.database import get_db
from app.models.user import User
from app.models.api_key import KEY_PREFIX
# ---------------------------------------------------------------------------
# OAuth2 scheme (reads Authorization header — used as fallback / Swagger UI)
# ---------------------------------------------------------------------------
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login", auto_error=False)
# Cookie name — must match the one set in the auth router
_COOKIE_NAME = "aegis_token"
# ---------------------------------------------------------------------------
# Current-user dependency
# ---------------------------------------------------------------------------
async def get_current_user(
aegis_token: Optional[str] = Cookie(None),
bearer_token: Optional[str] = Depends(oauth2_scheme),
db: Session = Depends(get_db),
) -> User:
"""Decode the JWT, look up the user in *db*, and return it.
Token resolution order:
1. ``aegis_token`` **HttpOnly cookie** (preferred — immune to XSS).
2. ``Authorization: Bearer <token>`` header (fallback for API clients
and Swagger UI).
Raises :class:`~fastapi.HTTPException` **401** when:
- no token is found in either location,
- the token cannot be decoded,
- the ``sub`` claim is missing, or
- no matching active user exists in the database.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
revoked_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
# Prefer cookie, fall back to header
token = aegis_token or bearer_token
if token is None:
raise credentials_exception
# ── API Key path (Bearer token starts with "aegis_") ──────────────────
if token.startswith(KEY_PREFIX):
from app.services.api_key_service import authenticate_raw_key
user = authenticate_raw_key(db, token)
if user is None:
raise credentials_exception
return user
# ── JWT path ──────────────────────────────────────────────────────────
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
username: str | None = payload.get("sub")
if username is None:
raise credentials_exception
# Check token blacklist (revoked tokens)
jti: str | None = payload.get("jti")
if jti and auth_lib.is_token_blacklisted(jti):
raise revoked_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if user is None or not user.is_active:
raise credentials_exception
return user
# ---------------------------------------------------------------------------
# Role-based access control dependency
# ---------------------------------------------------------------------------
async def require_password_changed(
current_user: User = Depends(get_current_user),
) -> User:
"""Block all requests when the user still needs to change their password.
Only ``/auth/change-password`` and ``/auth/me`` are exempt — those
endpoints do **not** depend on this function.
"""
if getattr(current_user, "must_change_password", False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="PASSWORD_CHANGE_REQUIRED",
)
return current_user
def _check_api_key_scope(user: User, required_scope: str) -> None:
"""Raise 403 if the request was authenticated via an API key that lacks *required_scope*.
When authenticated via JWT (browser session), ``_api_key_scopes`` is not set
and the check is skipped — full access is granted based on role alone.
"""
key_scopes = getattr(user, "_api_key_scopes", None)
if key_scopes is not None and required_scope not in key_scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"API key scope '{required_scope}' required for this operation",
)
def require_role(required_role: str):
"""Return a FastAPI dependency that enforces *required_role*.
The dependency allows the request to proceed when
``user.role == required_role`` **or** ``user.role == "admin"``.
Also enforces API key scopes: admin-role endpoints require the ``admin``
scope; all other role-restricted endpoints require ``write``.
Otherwise it raises :class:`~fastapi.HTTPException` **403**.
"""
async def role_checker(
current_user: User = Depends(get_current_user),
) -> User:
if current_user.role != required_role and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
scope = "admin" if required_role == "admin" else "write"
_check_api_key_scope(current_user, scope)
return current_user
return role_checker
def require_any_role(*roles: str):
"""Return a FastAPI dependency that enforces **any** of the given *roles*.
Admins always pass. Also enforces API key scopes: if the only accepted
role is ``admin``, the key must carry the ``admin`` scope; otherwise the
``write`` scope is required.
Usage example::
@router.patch("/resource", dependencies=[Depends(require_any_role("red_lead", "blue_lead"))])
"""
async def role_checker(
current_user: User = Depends(get_current_user),
) -> User:
if current_user.role != "admin" and current_user.role not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
scope = "admin" if set(roles) == {"admin"} else "write"
_check_api_key_scope(current_user, scope)
return current_user
return role_checker
def require_scope(scope: str):
"""Return a dependency that enforces the API key carries *scope*.
JWT-authenticated requests (browser sessions) bypass this check entirely.
Use on mutation endpoints that don't already use ``require_role`` /
``require_any_role``::
@router.post("/resource", dependencies=[Depends(require_scope("write"))])
"""
async def scope_checker(
current_user: User = Depends(get_current_user),
) -> User:
_check_api_key_scope(current_user, scope)
return current_user
return scope_checker