security: fix 6 vulnerabilities identified in SDLC audit
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

- fix(auth): enforce API key scopes in require_role/require_any_role;
  attach _api_key_scopes to user on API key auth; add require_scope()
  dependency — scopes were stored but never enforced (CWE-285)

- fix(sso): read SECURE_COOKIES env var for SSO cookie instead of
  hardcoded secure=False — SAML sessions now respect HTTPS config (CWE-614)

- fix(webhooks): SSRF prevention — validate webhook URLs against private
  and reserved CIDRs at creation/update time (CWE-918)

- fix(knowledge): restrict playbook/lesson create, update and restore
  to admin/red_lead/blue_lead roles — was open to any authenticated user (CWE-284)

- fix(alerts): restrict alert acknowledge/resolve/dismiss to admin/lead
  roles — any user could silence security alerts (CWE-284)

- security: delete get_admin_creds.py, check_auth.py, deploy.py scripts
  containing hardcoded root SSH credentials and production DB access;
  add scripts/.gitignore to prevent reintroduction (CWE-798)
This commit is contained in:
kitos
2026-05-22 09:46:29 +02:00
parent f36c633d16
commit 6f4901b611
7 changed files with 145 additions and 14 deletions

View File

@@ -123,11 +123,27 @@ async def require_password_changed(
return current_user return current_user
def _check_api_key_scope(user: User, required_scope: str) -> None:
"""Raise 403 if the request was authenticated via an API key that lacks *required_scope*.
When authenticated via JWT (browser session), ``_api_key_scopes`` is not set
and the check is skipped — full access is granted based on role alone.
"""
key_scopes = getattr(user, "_api_key_scopes", None)
if key_scopes is not None and required_scope not in key_scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"API key scope '{required_scope}' required for this operation",
)
def require_role(required_role: str): def require_role(required_role: str):
"""Return a FastAPI dependency that enforces *required_role*. """Return a FastAPI dependency that enforces *required_role*.
The dependency allows the request to proceed when The dependency allows the request to proceed when
``user.role == required_role`` **or** ``user.role == "admin"``. ``user.role == required_role`` **or** ``user.role == "admin"``.
Also enforces API key scopes: admin-role endpoints require the ``admin``
scope; all other role-restricted endpoints require ``write``.
Otherwise it raises :class:`~fastapi.HTTPException` **403**. Otherwise it raises :class:`~fastapi.HTTPException` **403**.
""" """
@@ -139,6 +155,8 @@ def require_role(required_role: str):
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions", detail="Not enough permissions",
) )
scope = "admin" if required_role == "admin" else "write"
_check_api_key_scope(current_user, scope)
return current_user return current_user
return role_checker return role_checker
@@ -147,7 +165,11 @@ def require_role(required_role: str):
def require_any_role(*roles: str): def require_any_role(*roles: str):
"""Return a FastAPI dependency that enforces **any** of the given *roles*. """Return a FastAPI dependency that enforces **any** of the given *roles*.
Admins always pass. Usage example:: Admins always pass. Also enforces API key scopes: if the only accepted
role is ``admin``, the key must carry the ``admin`` scope; otherwise the
``write`` scope is required.
Usage example::
@router.patch("/resource", dependencies=[Depends(require_any_role("red_lead", "blue_lead"))]) @router.patch("/resource", dependencies=[Depends(require_any_role("red_lead", "blue_lead"))])
""" """
@@ -160,6 +182,27 @@ def require_any_role(*roles: str):
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions", detail="Not enough permissions",
) )
scope = "admin" if set(roles) == {"admin"} else "write"
_check_api_key_scope(current_user, scope)
return current_user return current_user
return role_checker return role_checker
def require_scope(scope: str):
"""Return a dependency that enforces the API key carries *scope*.
JWT-authenticated requests (browser sessions) bypass this check entirely.
Use on mutation endpoints that don't already use ``require_role`` /
``require_any_role``::
@router.post("/resource", dependencies=[Depends(require_scope("write"))])
"""
async def scope_checker(
current_user: User = Depends(get_current_user),
) -> User:
_check_api_key_scope(current_user, scope)
return current_user
return scope_checker

View File

@@ -42,7 +42,7 @@ def list_playbooks(
def create_playbook( def create_playbook(
body: PlaybookCreate, body: PlaybookCreate,
db: Session = Depends(get_db), db: Session = Depends(get_db),
user=Depends(get_current_user), user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
): ):
return pb_svc.create_playbook(db, body.model_dump(), user.id) return pb_svc.create_playbook(db, body.model_dump(), user.id)
@@ -61,7 +61,7 @@ def update_playbook(
playbook_id: UUID, playbook_id: UUID,
body: PlaybookUpdate, body: PlaybookUpdate,
db: Session = Depends(get_db), db: Session = Depends(get_db),
user=Depends(get_current_user), user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
): ):
return pb_svc.update_playbook(db, playbook_id, body.model_dump(exclude_unset=True), user.id) return pb_svc.update_playbook(db, playbook_id, body.model_dump(exclude_unset=True), user.id)
@@ -91,7 +91,7 @@ def restore_version(
playbook_id: UUID, playbook_id: UUID,
version: int, version: int,
db: Session = Depends(get_db), db: Session = Depends(get_db),
user=Depends(get_current_user), user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
): ):
"""Roll the playbook back to a specific historical version.""" """Roll the playbook back to a specific historical version."""
return pb_svc.restore_version(db, playbook_id, version, user.id) return pb_svc.restore_version(db, playbook_id, version, user.id)
@@ -159,7 +159,7 @@ def list_lessons(
def create_lesson( def create_lesson(
body: LessonLearnedCreate, body: LessonLearnedCreate,
db: Session = Depends(get_db), db: Session = Depends(get_db),
user=Depends(get_current_user), user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
): ):
return ll_svc.create_lesson_learned(db, body.model_dump(), user.id) return ll_svc.create_lesson_learned(db, body.model_dump(), user.id)
@@ -178,7 +178,7 @@ def update_lesson(
lesson_id: UUID, lesson_id: UUID,
body: LessonLearnedUpdate, body: LessonLearnedUpdate,
db: Session = Depends(get_db), db: Session = Depends(get_db),
user=Depends(get_current_user), user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
): ):
return ll_svc.update_lesson_learned( return ll_svc.update_lesson_learned(
db, lesson_id, body.model_dump(exclude_unset=True), user.id db, lesson_id, body.model_dump(exclude_unset=True), user.id

View File

@@ -88,9 +88,9 @@ def get_alert(
def acknowledge_alert( def acknowledge_alert(
alert_id: UUID, alert_id: UUID,
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")),
): ):
"""Acknowledge an open alert.""" """Acknowledge an open alert (admin / lead roles only)."""
return svc.acknowledge(db, alert_id, current_user.id) return svc.acknowledge(db, alert_id, current_user.id)
@@ -98,9 +98,9 @@ def acknowledge_alert(
def resolve_alert( def resolve_alert(
alert_id: UUID, alert_id: UUID,
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")),
): ):
"""Mark an alert as resolved.""" """Mark an alert as resolved (admin / lead roles only)."""
return svc.resolve(db, alert_id, current_user.id) return svc.resolve(db, alert_id, current_user.id)
@@ -108,9 +108,9 @@ def resolve_alert(
def dismiss_alert( def dismiss_alert(
alert_id: UUID, alert_id: UUID,
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")),
): ):
"""Dismiss an alert (won't re-fire until cooldown resets).""" """Dismiss an alert (admin / lead roles only — won't re-fire until cooldown resets)."""
return svc.dismiss(db, alert_id, current_user.id) return svc.dismiss(db, alert_id, current_user.id)

View File

@@ -1,5 +1,7 @@
"""Phase 14: SSO / SAML 2.0 router.""" """Phase 14: SSO / SAML 2.0 router."""
import os
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -15,7 +17,19 @@ import app.services.sso_service as svc
router = APIRouter(prefix="/sso", tags=["SSO"]) router = APIRouter(prefix="/sso", tags=["SSO"])
_COOKIE_NAME = "aegis_token" _COOKIE_NAME = "aegis_token"
_COOKIE_OPTS = {"httponly": True, "samesite": "lax", "secure": False}
# Mirror the same SECURE_COOKIES logic used in the auth router so that
# SAML-authenticated sessions respect the deployment's HTTPS configuration.
_aegis_env = os.environ.get("AEGIS_ENV", "development").lower()
_secure_cookie_env = os.environ.get("SECURE_COOKIES", "auto").lower()
if _secure_cookie_env == "false":
_IS_HTTPS = False
elif _secure_cookie_env == "true":
_IS_HTTPS = True
else: # "auto" — active only when AEGIS_ENV=production
_IS_HTTPS = _aegis_env == "production"
_COOKIE_OPTS = {"httponly": True, "samesite": "lax", "secure": _IS_HTTPS}
# ── Public ──────────────────────────────────────────────────────────────────── # ── Public ────────────────────────────────────────────────────────────────────

View File

@@ -1,8 +1,55 @@
"""Pydantic schemas for Webhook endpoints.""" """Pydantic schemas for Webhook endpoints."""
import ipaddress
import socket
import uuid import uuid
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from pydantic import BaseModel, HttpUrl, ConfigDict from urllib.parse import urlparse
from pydantic import BaseModel, ConfigDict, field_validator
# RFC-5735 / RFC-1918 / RFC-3927 — ranges that must never be webhook targets
_BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("169.254.0.0/16"), # link-local / AWS IMDS
ipaddress.ip_network("127.0.0.0/8"), # loopback
ipaddress.ip_network("::1/128"), # IPv6 loopback
ipaddress.ip_network("fc00::/7"), # IPv6 ULA
]
def _validate_webhook_url(url: str) -> str:
"""Reject URLs that point to private/reserved addresses (SSRF prevention)."""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError("Webhook URL must use http or https")
hostname = parsed.hostname
if not hostname:
raise ValueError("Webhook URL must include a hostname")
# Resolve hostname to IP(s) and reject any private/reserved address
try:
infos = socket.getaddrinfo(hostname, None)
for info in infos:
raw_ip = info[4][0]
try:
ip_obj = ipaddress.ip_address(raw_ip)
except ValueError:
continue
for network in _BLOCKED_NETWORKS:
if ip_obj in network:
raise ValueError(
f"Webhook URL resolves to a private/reserved address ({raw_ip}) "
"and cannot be used"
)
except OSError:
# DNS resolution failure — allow (will fail at dispatch time)
pass
return url
class WebhookConfigCreate(BaseModel): class WebhookConfigCreate(BaseModel):
name: str name: str
@@ -11,6 +58,12 @@ class WebhookConfigCreate(BaseModel):
events: list[str] = [] events: list[str] = []
is_active: bool = True is_active: bool = True
@field_validator("url")
@classmethod
def url_must_be_external(cls, v: str) -> str:
return _validate_webhook_url(v)
class WebhookConfigUpdate(BaseModel): class WebhookConfigUpdate(BaseModel):
name: str | None = None name: str | None = None
url: str | None = None url: str | None = None
@@ -18,6 +71,13 @@ class WebhookConfigUpdate(BaseModel):
events: list[str] | None = None events: list[str] | None = None
is_active: bool | None = None is_active: bool | None = None
@field_validator("url")
@classmethod
def url_must_be_external(cls, v: str | None) -> str | None:
if v is None:
return v
return _validate_webhook_url(v)
class WebhookConfigOut(BaseModel): class WebhookConfigOut(BaseModel):
id: uuid.UUID id: uuid.UUID
name: str name: str

View File

@@ -148,4 +148,8 @@ def authenticate_raw_key(db: Session, raw_key: str) -> Optional[User]:
if user is None or not user.is_active: if user is None or not user.is_active:
return None return None
# Attach the key's scopes to the user instance so scope-enforcement
# dependencies can verify them without an additional DB query.
# _api_key_scopes=None means "full user access" (JWT path).
user._api_key_scopes = key.scopes or []
return user return user

10
scripts/.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
# ─── Never commit scripts that contain server credentials ───────────────────
# These files connect directly to production infrastructure and must never
# enter version control. They are excluded here as a defence-in-depth measure.
deploy.py
check_auth.py
get_admin_creds.py
# Also ignore any ad-hoc debug / one-off scripts
debug_*.py
reset_*.py