From d81fc04b8ffcc5379cb76ec9c964ef639e45858d Mon Sep 17 00:00:00 2001 From: kitos Date: Wed, 20 May 2026 16:43:57 +0200 Subject: [PATCH] =?UTF-8?q?feat(enterprise):=20Phase=2014=20=E2=80=94=20AP?= =?UTF-8?q?I=20Key=20Management=20+=20SSO/SAML=202.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ApiKey model (SHA-256 hash, prefix, scopes, expiry) + Alembic migration (b040ent) - SsoConfig model for SAML 2.0 IdP settings (attribute mapping, auto-provision) - API key auth integrated into get_current_user (aegis_ prefix detection) - Routers: /api/v1/api-keys (full CRUD + revoke) and /api/v1/sso (metadata, login, callback, config) - python3-saml added to requirements; Dockerfile adds libxmlsec1-dev for SAML XML signing - QA script: 52 assertions covering key lifecycle, API key auth, SSO config Co-Authored-By: Claude Sonnet 4.6 --- backend/Dockerfile | 4 + .../versions/b040_enterprise_readiness.py | 75 +++++ backend/app/dependencies/auth.py | 11 + backend/app/main.py | 4 + backend/app/models/__init__.py | 4 + backend/app/models/api_key.py | 81 +++++ backend/app/models/sso_config.py | 49 +++ backend/app/routers/api_keys.py | 104 ++++++ backend/app/routers/sso.py | 117 +++++++ backend/app/schemas/api_key_schema.py | 68 ++++ backend/app/schemas/sso_schema.py | 80 +++++ backend/app/services/api_key_service.py | 151 +++++++++ backend/app/services/sso_service.py | 281 ++++++++++++++++ backend/requirements.txt | 1 + scripts/qa_phase14.py | 305 ++++++++++++++++++ 15 files changed, 1335 insertions(+) create mode 100644 backend/alembic/versions/b040_enterprise_readiness.py create mode 100644 backend/app/models/api_key.py create mode 100644 backend/app/models/sso_config.py create mode 100644 backend/app/routers/api_keys.py create mode 100644 backend/app/routers/sso.py create mode 100644 backend/app/schemas/api_key_schema.py create mode 100644 backend/app/schemas/sso_schema.py create mode 100644 backend/app/services/api_key_service.py create mode 100644 backend/app/services/sso_service.py create mode 100644 scripts/qa_phase14.py diff --git a/backend/Dockerfile b/backend/Dockerfile index 0daba8c..b0b9a44 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -7,6 +7,10 @@ RUN apt-get update && apt-get install -y \ gcc \ libpq-dev \ curl \ + pkg-config \ + libxml2-dev \ + libxmlsec1-dev \ + libxmlsec1-openssl \ && rm -rf /var/lib/apt/lists/* # Copy requirements first for better caching diff --git a/backend/alembic/versions/b040_enterprise_readiness.py b/backend/alembic/versions/b040_enterprise_readiness.py new file mode 100644 index 0000000..789d7ad --- /dev/null +++ b/backend/alembic/versions/b040_enterprise_readiness.py @@ -0,0 +1,75 @@ +"""Phase 14: Enterprise Readiness — api_keys and sso_configs tables. + +Revision ID: b040ent +Revises: b039exec +Create Date: 2026-05-20 +""" + +from alembic import op +import sqlalchemy as sa + +revision = "b040ent" +down_revision = "b039exec" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + + # ── api_keys ────────────────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS api_keys ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(200) NOT NULL, + description TEXT, + key_prefix VARCHAR(13) NOT NULL, + key_hash VARCHAR(64) NOT NULL UNIQUE, + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + scopes JSONB NOT NULL DEFAULT '["read"]', + last_used_at TIMESTAMP WITHOUT TIME ZONE, + expires_at TIMESTAMP WITHOUT TIME ZONE, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now() + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_api_keys_user_id ON api_keys (user_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_api_keys_key_hash ON api_keys (key_hash)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_api_keys_active ON api_keys (is_active)" + )) + + # ── sso_configs ─────────────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS sso_configs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + is_enabled BOOLEAN NOT NULL DEFAULT FALSE, + provider_name VARCHAR(200), + sp_entity_id VARCHAR(500), + sp_acs_url VARCHAR(500), + sp_slo_url VARCHAR(500), + sp_certificate TEXT, + sp_private_key TEXT, + idp_entity_id VARCHAR(500), + idp_sso_url VARCHAR(500), + idp_slo_url VARCHAR(500), + idp_certificate TEXT, + attr_email VARCHAR(200) DEFAULT 'email', + attr_username VARCHAR(200) DEFAULT 'username', + attr_role VARCHAR(200) DEFAULT 'role', + default_role VARCHAR(50) DEFAULT 'viewer', + auto_provision BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(), + updated_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now() + ) + """)) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text("DROP TABLE IF EXISTS api_keys CASCADE")) + conn.execute(sa.text("DROP TABLE IF EXISTS sso_configs CASCADE")) diff --git a/backend/app/dependencies/auth.py b/backend/app/dependencies/auth.py index 9771d02..549e435 100644 --- a/backend/app/dependencies/auth.py +++ b/backend/app/dependencies/auth.py @@ -4,6 +4,7 @@ Authentication and RBAC dependencies for FastAPI. Provides: - ``get_current_user``: decodes JWT from HttpOnly cookie (preferred) or Authorization header (fallback), fetches user from DB, raises 401 on failure. + Also accepts Aegis API keys (``aegis_…`` prefix) as Bearer tokens. - ``require_role``: factory that returns a dependency enforcing a specific role (admins always pass). """ @@ -19,6 +20,7 @@ from app import auth as auth_lib from app.config import settings from app.database import get_db from app.models.user import User +from app.models.api_key import KEY_PREFIX # --------------------------------------------------------------------------- # OAuth2 scheme (reads Authorization header — used as fallback / Swagger UI) @@ -68,6 +70,15 @@ async def get_current_user( if token is None: raise credentials_exception + # ── API Key path (Bearer token starts with "aegis_") ────────────────── + if token.startswith(KEY_PREFIX): + from app.services.api_key_service import authenticate_raw_key + user = authenticate_raw_key(db, token) + if user is None: + raise credentials_exception + return user + + # ── JWT path ────────────────────────────────────────────────────────── try: payload = jwt.decode( token, diff --git a/backend/app/main.py b/backend/app/main.py index 530b088..b29e7b9 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -44,6 +44,8 @@ from app.routers import attack_paths as attack_paths_router from app.routers import knowledge as knowledge_router from app.routers import risk_intelligence as risk_router from app.routers import executive_dashboard as dashboard_router +from app.routers import api_keys as api_keys_router +from app.routers import sso as sso_router from app.domain.errors import DomainError from app.middleware.error_handler import domain_exception_handler from app.middleware.request_context import RequestContextMiddleware @@ -147,6 +149,8 @@ app.include_router(attack_paths_router.router, prefix="/api/v1") app.include_router(knowledge_router.router, prefix="/api/v1") app.include_router(risk_router.router, prefix="/api/v1") app.include_router(dashboard_router.router, prefix="/api/v1") +app.include_router(api_keys_router.router, prefix="/api/v1") +app.include_router(sso_router.router, prefix="/api/v1") @app.get("/health", include_in_schema=False) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 8611969..9f7f314 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -41,6 +41,8 @@ from app.models.attack_path import ( from app.models.knowledge import Playbook, PlaybookVersion, LessonLearned from app.models.risk_intelligence import TechniqueRiskProfile from app.models.executive_dashboard import PostureSnapshot +from app.models.api_key import ApiKey +from app.models.sso_config import SsoConfig __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", @@ -65,4 +67,6 @@ __all__ = [ "Playbook", "PlaybookVersion", "LessonLearned", "TechniqueRiskProfile", "PostureSnapshot", + "ApiKey", + "SsoConfig", ] diff --git a/backend/app/models/api_key.py b/backend/app/models/api_key.py new file mode 100644 index 0000000..4b9b454 --- /dev/null +++ b/backend/app/models/api_key.py @@ -0,0 +1,81 @@ +"""Phase 14: API Key model for programmatic access.""" + +import hashlib +import secrets +import uuid +from datetime import datetime + +from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Index, String, Text +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import relationship + +from app.database import Base + +# ── Key generation constants ────────────────────────────────────────────────── +KEY_PREFIX = "aegis_" +KEY_BYTES = 32 # 32 random bytes → 64 hex chars → 70-char key total +DISPLAY_LEN = 12 # chars stored as prefix for UI display + + +def generate_raw_key() -> str: + """Generate a fresh raw API key (must be shown to user only once).""" + return KEY_PREFIX + secrets.token_hex(KEY_BYTES) + + +def hash_key(raw_key: str) -> str: + """SHA-256 hash of a raw API key for secure storage.""" + return hashlib.sha256(raw_key.encode()).hexdigest() + + +def key_prefix_display(raw_key: str) -> str: + """First DISPLAY_LEN characters of the raw key (safe for UI).""" + return raw_key[:DISPLAY_LEN] + + +# ── Valid scopes ────────────────────────────────────────────────────────────── +VALID_SCOPES = {"read", "write", "admin"} + + +class ApiKey(Base): + """ + Scoped API key for programmatic / BI / SOAR access. + + The full raw key is **never stored** — only a SHA-256 hash. + The first 12 characters (``key_prefix``) are retained for display. + """ + + __tablename__ = "api_keys" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(200), nullable=False) + description = Column(Text, nullable=True) + + # Display only — never use for auth + key_prefix = Column(String(DISPLAY_LEN + 1), nullable=False) + + # Auth token — SHA-256 of the full raw key + key_hash = Column(String(64), nullable=False, unique=True) + + # Owner + user_id = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + ) + + # Permissions + scopes = Column(JSONB, nullable=False, default=["read"]) # ["read","write","admin"] + + # Lifecycle + last_used_at = Column(DateTime, nullable=True) + expires_at = Column(DateTime, nullable=True) + is_active = Column(Boolean, nullable=False, default=True) + created_at = Column(DateTime, default=datetime.utcnow) + + user = relationship("User", foreign_keys=[user_id]) + + __table_args__ = ( + Index("ix_api_keys_user_id", "user_id"), + Index("ix_api_keys_key_hash", "key_hash"), + Index("ix_api_keys_active", "is_active"), + ) diff --git a/backend/app/models/sso_config.py b/backend/app/models/sso_config.py new file mode 100644 index 0000000..3fe73a9 --- /dev/null +++ b/backend/app/models/sso_config.py @@ -0,0 +1,49 @@ +"""Phase 14: SSO / SAML 2.0 configuration model.""" + +import uuid +from datetime import datetime + +from sqlalchemy import Boolean, Column, DateTime, String, Text +from sqlalchemy.dialects.postgresql import JSONB, UUID + +from app.database import Base + + +class SsoConfig(Base): + """ + SAML 2.0 Identity Provider configuration. + + Exactly one row is expected (use upsert). The SP metadata endpoint + reads from this row to generate XML for IdP registration. + """ + + __tablename__ = "sso_configs" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + is_enabled = Column(Boolean, nullable=False, default=False) + provider_name = Column(String(200), nullable=True) # e.g., "Okta", "Azure AD" + + # ── Service Provider (Aegis) settings ──────────────────────────────────── + sp_entity_id = Column(String(500), nullable=True) # e.g., https://aegis.co/api/v1/sso/metadata + sp_acs_url = Column(String(500), nullable=True) # Assertion Consumer Service URL + sp_slo_url = Column(String(500), nullable=True) # Single Logout URL (optional) + sp_certificate = Column(Text, nullable=True) # SP public cert for signed requests + sp_private_key = Column(Text, nullable=True) # SP private key (stored encrypted in future) + + # ── Identity Provider settings ──────────────────────────────────────────── + idp_entity_id = Column(String(500), nullable=True) + idp_sso_url = Column(String(500), nullable=True) # IdP redirect/POST binding URL + idp_slo_url = Column(String(500), nullable=True) # IdP SLO URL + idp_certificate = Column(Text, nullable=True) # IdP X.509 cert for response validation + + # ── Attribute mapping ───────────────────────────────────────────────────── + # SAML attribute name → Aegis field + attr_email = Column(String(200), nullable=True, default="email") + attr_username = Column(String(200), nullable=True, default="username") + attr_role = Column(String(200), nullable=True, default="role") + default_role = Column(String(50), nullable=True, default="viewer") + auto_provision = Column(Boolean, nullable=False, default=True) # create user on first login + + # ── Meta ───────────────────────────────────────────────────────────────── + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) diff --git a/backend/app/routers/api_keys.py b/backend/app/routers/api_keys.py new file mode 100644 index 0000000..8dc6a86 --- /dev/null +++ b/backend/app/routers/api_keys.py @@ -0,0 +1,104 @@ +"""Phase 14: API Key management router.""" + +from typing import List, Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.models.user import User +from app.schemas.api_key_schema import ( + ApiKeyCreate, ApiKeyCreated, ApiKeyOut, ApiKeyUpdate, +) +import app.services.api_key_service as svc + +router = APIRouter(prefix="/api-keys", tags=["API Keys"]) + + +@router.post("", response_model=ApiKeyCreated, status_code=201) +def create_key( + body: ApiKeyCreate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """ + Create a scoped API key. + + The ``raw_key`` field in the response is shown **exactly once** and + cannot be retrieved later. Store it securely. + """ + key, raw_key = svc.create_api_key( + db, + user_id = current_user.id, + name = body.name, + scopes = body.scopes, + description = body.description, + expires_at = body.expires_at, + ) + out = ApiKeyOut.model_validate(key) + return ApiKeyCreated(**out.model_dump(), raw_key=raw_key) + + +@router.get("", response_model=List[ApiKeyOut]) +def list_keys( + include_inactive: bool = Query(False), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """List API keys owned by the current user.""" + # Admins can see all keys; others only see their own + user_id = None if current_user.role == "admin" else current_user.id + return svc.list_api_keys(db, user_id=user_id, include_inactive=include_inactive) + + +@router.get("/{key_id}", response_model=ApiKeyOut) +def get_key( + key_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Get a single API key (owner or admin).""" + user_id = None if current_user.role == "admin" else current_user.id + return svc.get_api_key(db, key_id, user_id=user_id) + + +@router.patch("/{key_id}", response_model=ApiKeyOut) +def update_key( + key_id: UUID, + body: ApiKeyUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Update name, description, scopes, expiry, or active status.""" + user_id = None if current_user.role == "admin" else current_user.id + return svc.update_api_key( + db, key_id, user_id, + name = body.name, + description = body.description, + scopes = body.scopes, + expires_at = body.expires_at, + is_active = body.is_active, + ) + + +@router.post("/{key_id}/revoke", response_model=ApiKeyOut) +def revoke_key( + key_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Revoke an API key (soft-delete — sets is_active=False).""" + user_id = None if current_user.role == "admin" else current_user.id + return svc.revoke_api_key(db, key_id, user_id=user_id) + + +@router.delete("/{key_id}", status_code=204) +def delete_key( + key_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_any_role("admin")), +): + """Permanently delete an API key (admin only).""" + svc.delete_api_key(db, key_id) diff --git a/backend/app/routers/sso.py b/backend/app/routers/sso.py new file mode 100644 index 0000000..c0d004a --- /dev/null +++ b/backend/app/routers/sso.py @@ -0,0 +1,117 @@ +"""Phase 14: SSO / SAML 2.0 router.""" + +from fastapi import APIRouter, Depends, HTTPException, Request, Response, status +from fastapi.responses import RedirectResponse +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app import auth as auth_lib +from app.schemas.sso_schema import ( + SsoConfigCreate, SsoConfigOut, SsoLoginInitResponse, SsoStatusResponse, +) +import app.services.sso_service as svc + +router = APIRouter(prefix="/sso", tags=["SSO"]) + +_COOKIE_NAME = "aegis_token" +_COOKIE_OPTS = {"httponly": True, "samesite": "lax", "secure": False} + + +# ── Public ──────────────────────────────────────────────────────────────────── + +@router.get("/status", response_model=SsoStatusResponse) +def sso_status(db: Session = Depends(get_db)): + """Return whether SSO is enabled and configured (public — for login page).""" + return svc.get_status(db) + + +@router.get("/metadata", response_model=None) +def sp_metadata(db: Session = Depends(get_db)): + """ + Return the Service Provider SAML metadata XML. + + Upload this XML to your IdP (Okta, Azure AD, etc.) to register Aegis. + """ + try: + xml = svc.get_sp_metadata(db) + except Exception as exc: + raise HTTPException(status_code=503, detail=str(exc)) + return Response(content=xml, media_type="application/xml") + + +@router.get("/login") +def sso_login(request: Request, db: Session = Depends(get_db)): + """ + Initiate SAML login — redirects the browser to the IdP. + + The IdP will POST the SAML Response to ``/sso/callback`` after authentication. + """ + request_data = { + "https": request.url.scheme == "https", + "http_host": request.url.hostname, + "path": request.url.path, + "port": str(request.url.port or (443 if request.url.scheme == "https" else 80)), + "get_data": dict(request.query_params), + "post_data": {}, + "query_string": str(request.url.query), + } + try: + result = svc.initiate_login(db, request_data) + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) + return RedirectResponse(url=result["redirect_url"]) + + +@router.post("/callback") +async def sso_callback(request: Request, db: Session = Depends(get_db)): + """ + SAML Assertion Consumer Service (ACS) endpoint. + + The IdP POSTs the SAML Response here. On success, sets the aegis_token + cookie and redirects to the frontend. + """ + form = await request.form() + request_data = { + "https": request.url.scheme == "https", + "http_host": request.url.hostname, + "path": request.url.path, + "port": str(request.url.port or (443 if request.url.scheme == "https" else 80)), + "get_data": dict(request.query_params), + "post_data": dict(form), + "query_string": str(request.url.query), + } + try: + user = svc.process_callback(db, request_data) + except (ValueError, RuntimeError) as exc: + raise HTTPException(status_code=401, detail=str(exc)) + + access_token = auth_lib.create_access_token({"sub": user.username}) + response = RedirectResponse(url="/", status_code=302) + response.set_cookie(_COOKIE_NAME, access_token, **_COOKIE_OPTS) + return response + + +# ── Admin configuration ──────────────────────────────────────────────────────── + +@router.get("/config", response_model=SsoConfigOut) +def get_sso_config( + db: Session = Depends(get_db), + _user=Depends(require_any_role("admin")), +): + """Return the current SSO configuration (admin only).""" + cfg = svc.get_config(db) + if not cfg: + raise HTTPException(status_code=404, detail="SSO not configured yet") + return SsoConfigOut.model_validate(cfg) + + +@router.put("/config", response_model=SsoConfigOut) +def upsert_sso_config( + body: SsoConfigCreate, + db: Session = Depends(get_db), + _user=Depends(require_any_role("admin")), +): + """Create or replace the SSO configuration (admin only).""" + cfg = svc.upsert_config(db, **body.model_dump(exclude_unset=False)) + return SsoConfigOut.model_validate(cfg) diff --git a/backend/app/schemas/api_key_schema.py b/backend/app/schemas/api_key_schema.py new file mode 100644 index 0000000..f70ed37 --- /dev/null +++ b/backend/app/schemas/api_key_schema.py @@ -0,0 +1,68 @@ +"""Phase 14: API Key Pydantic schemas.""" + +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional +from uuid import UUID + +from pydantic import BaseModel, Field, field_validator + +from app.models.api_key import VALID_SCOPES + + +class ApiKeyCreate(BaseModel): + name: str = Field(..., min_length=1, max_length=200) + description: Optional[str] = None + scopes: List[str] = Field(default=["read"]) + expires_at: Optional[datetime] = None + + @field_validator("scopes") + @classmethod + def validate_scopes(cls, v: list) -> list: + invalid = set(v) - VALID_SCOPES + if invalid: + raise ValueError(f"Invalid scopes: {invalid}. Valid: {VALID_SCOPES}") + if not v: + raise ValueError("At least one scope is required") + return v + + +class ApiKeyOut(BaseModel): + """Safe representation — never exposes key_hash.""" + id: UUID + name: str + description: Optional[str] = None + key_prefix: str + user_id: UUID + scopes: List[str] + last_used_at: Optional[datetime] = None + expires_at: Optional[datetime] = None + is_active: bool + created_at: Optional[datetime] = None + + class Config: + from_attributes = True + + +class ApiKeyCreated(ApiKeyOut): + """Returned only once at creation — includes the raw key.""" + raw_key: str = Field(..., description="The full API key — shown only this once.") + + +class ApiKeyUpdate(BaseModel): + name: Optional[str] = Field(None, min_length=1, max_length=200) + description: Optional[str] = None + scopes: Optional[List[str]] = None + expires_at: Optional[datetime] = None + is_active: Optional[bool] = None + + @field_validator("scopes") + @classmethod + def validate_scopes(cls, v: Optional[list]) -> Optional[list]: + if v is None: + return v + invalid = set(v) - VALID_SCOPES + if invalid: + raise ValueError(f"Invalid scopes: {invalid}") + return v diff --git a/backend/app/schemas/sso_schema.py b/backend/app/schemas/sso_schema.py new file mode 100644 index 0000000..79c7913 --- /dev/null +++ b/backend/app/schemas/sso_schema.py @@ -0,0 +1,80 @@ +"""Phase 14: SSO / SAML 2.0 Pydantic schemas.""" + +from __future__ import annotations + +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +class SsoConfigCreate(BaseModel): + is_enabled: bool = False + provider_name: Optional[str] = None + + # SP settings (auto-derived if not provided) + sp_entity_id: Optional[str] = None + sp_acs_url: Optional[str] = None + sp_slo_url: Optional[str] = None + sp_certificate: Optional[str] = None + sp_private_key: Optional[str] = None + + # IdP settings + idp_entity_id: Optional[str] = None + idp_sso_url: Optional[str] = None + idp_slo_url: Optional[str] = None + idp_certificate: Optional[str] = None + + # Attribute mapping + attr_email: Optional[str] = "email" + attr_username: Optional[str] = "username" + attr_role: Optional[str] = "role" + default_role: Optional[str] = "viewer" + auto_provision: bool = True + + +class SsoConfigUpdate(SsoConfigCreate): + """All fields optional for partial updates.""" + pass + + +class SsoConfigOut(BaseModel): + id: UUID + is_enabled: bool + provider_name: Optional[str] = None + + sp_entity_id: Optional[str] = None + sp_acs_url: Optional[str] = None + sp_slo_url: Optional[str] = None + sp_certificate: Optional[str] = None + # sp_private_key is intentionally OMITTED from responses + + idp_entity_id: Optional[str] = None + idp_sso_url: Optional[str] = None + idp_slo_url: Optional[str] = None + idp_certificate: Optional[str] = None + + attr_email: Optional[str] = None + attr_username: Optional[str] = None + attr_role: Optional[str] = None + default_role: Optional[str] = None + auto_provision: bool = True + + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + class Config: + from_attributes = True + + +class SsoLoginInitResponse(BaseModel): + redirect_url: str = Field(..., description="URL to redirect the browser to for IdP login") + request_id: str = Field(..., description="SAML AuthnRequest ID for validation") + + +class SsoStatusResponse(BaseModel): + enabled: bool + provider_name: Optional[str] = None + configured: bool = Field(..., description="True if IdP settings are present") + login_url: Optional[str] = None # /sso/login URL diff --git a/backend/app/services/api_key_service.py b/backend/app/services/api_key_service.py new file mode 100644 index 0000000..11131d8 --- /dev/null +++ b/backend/app/services/api_key_service.py @@ -0,0 +1,151 @@ +"""Phase 14: API Key service — create, list, revoke, authenticate.""" + +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional +from uuid import UUID + +from sqlalchemy.orm import Session + +from app.domain.errors import EntityNotFoundError, DuplicateEntityError +from app.models.api_key import ApiKey, generate_raw_key, hash_key, key_prefix_display +from app.models.user import User + + +# ── Create ──────────────────────────────────────────────────────────────────── + +def create_api_key( + db: Session, + user_id: UUID, + name: str, + scopes: List[str], + description: Optional[str] = None, + expires_at: Optional[datetime] = None, +) -> tuple[ApiKey, str]: + """ + Create a new API key. + + Returns ``(ApiKey, raw_key)`` — the raw_key must be shown to the user + immediately and is never retrievable again. + """ + raw_key = generate_raw_key() + key_hash = hash_key(raw_key) + prefix = key_prefix_display(raw_key) + + # Detect accidental collision (astronomically unlikely) + if db.query(ApiKey).filter(ApiKey.key_hash == key_hash).first(): + raise DuplicateEntityError("ApiKey", "key_hash", "") + + key = ApiKey( + name = name, + description = description, + key_prefix = prefix, + key_hash = key_hash, + user_id = user_id, + scopes = scopes, + expires_at = expires_at, + ) + db.add(key) + db.commit() + db.refresh(key) + return key, raw_key + + +# ── Read ────────────────────────────────────────────────────────────────────── + +def list_api_keys( + db: Session, + user_id: Optional[UUID] = None, + include_inactive: bool = False, +) -> List[ApiKey]: + q = db.query(ApiKey) + if user_id is not None: + q = q.filter(ApiKey.user_id == user_id) + if not include_inactive: + q = q.filter(ApiKey.is_active == True) + return q.order_by(ApiKey.created_at.desc()).all() + + +def get_api_key(db: Session, key_id: UUID, user_id: Optional[UUID] = None) -> ApiKey: + q = db.query(ApiKey).filter(ApiKey.id == key_id) + if user_id is not None: + q = q.filter(ApiKey.user_id == user_id) + key = q.first() + if not key: + raise EntityNotFoundError("ApiKey", str(key_id)) + return key + + +# ── Update / Revoke ─────────────────────────────────────────────────────────── + +def update_api_key( + db: Session, + key_id: UUID, + user_id: Optional[UUID] = None, + *, + name: Optional[str] = None, + description: Optional[str] = None, + scopes: Optional[List[str]] = None, + expires_at: Optional[datetime] = None, + is_active: Optional[bool] = None, +) -> ApiKey: + key = get_api_key(db, key_id, user_id) + if name is not None: + key.name = name + if description is not None: + key.description = description + if scopes is not None: + key.scopes = scopes + if expires_at is not None: + key.expires_at = expires_at + if is_active is not None: + key.is_active = is_active + db.commit() + db.refresh(key) + return key + + +def revoke_api_key( + db: Session, + key_id: UUID, + user_id: Optional[UUID] = None, +) -> ApiKey: + """Soft-revoke: set is_active = False.""" + return update_api_key(db, key_id, user_id, is_active=False) + + +def delete_api_key(db: Session, key_id: UUID, user_id: Optional[UUID] = None) -> None: + """Hard delete — use revoke instead for audit trail.""" + key = get_api_key(db, key_id, user_id) + db.delete(key) + db.commit() + + +# ── Authentication ──────────────────────────────────────────────────────────── + +def authenticate_raw_key(db: Session, raw_key: str) -> Optional[User]: + """ + Verify a raw API key. + + Returns the owning User if the key is valid, active, and not expired. + Updates ``last_used_at`` (throttled to once per request — always updates). + Returns None on any failure. + """ + h = hash_key(raw_key) + key: Optional[ApiKey] = db.query(ApiKey).filter(ApiKey.key_hash == h).first() + + if key is None or not key.is_active: + return None + if key.expires_at and key.expires_at < datetime.utcnow(): + return None + + # Update last_used_at + key.last_used_at = datetime.utcnow() + db.commit() + + user: Optional[User] = db.query(User).filter(User.id == key.user_id).first() + if user is None or not user.is_active: + return None + + return user diff --git a/backend/app/services/sso_service.py b/backend/app/services/sso_service.py new file mode 100644 index 0000000..9279f10 --- /dev/null +++ b/backend/app/services/sso_service.py @@ -0,0 +1,281 @@ +"""Phase 14: SSO / SAML 2.0 service.""" + +from __future__ import annotations + +import logging +from typing import Optional +from uuid import UUID + +from sqlalchemy.orm import Session + +from app.domain.errors import EntityNotFoundError +from app.models.sso_config import SsoConfig +from app.models.user import User + +log = logging.getLogger(__name__) + +# ── python3-saml optional import ────────────────────────────────────────────── +try: + from onelogin.saml2.auth import OneLogin_Saml2_Auth + from onelogin.saml2.settings import OneLogin_Saml2_Settings + from onelogin.saml2.utils import OneLogin_Saml2_Utils + _SAML_AVAILABLE = True +except ImportError: # pragma: no cover + _SAML_AVAILABLE = False + log.warning( + "python3-saml not installed — SAML login/callback endpoints will return 503. " + "Install python3-saml and its system dependencies to enable SSO." + ) + + +# ── Configuration helpers ───────────────────────────────────────────────────── + +def get_config(db: Session) -> Optional[SsoConfig]: + """Return the first (and only) SsoConfig row, or None if none exists.""" + return db.query(SsoConfig).first() + + +def get_or_404(db: Session) -> SsoConfig: + cfg = get_config(db) + if not cfg: + raise EntityNotFoundError("SsoConfig", "singleton") + return cfg + + +def upsert_config(db: Session, **kwargs) -> SsoConfig: + """Create or update the singleton SSO config.""" + cfg = get_config(db) + if cfg: + for k, v in kwargs.items(): + setattr(cfg, k, v) + db.commit() + db.refresh(cfg) + return cfg + cfg = SsoConfig(**kwargs) + db.add(cfg) + db.commit() + db.refresh(cfg) + return cfg + + +def is_configured(cfg: SsoConfig) -> bool: + """Return True if the minimum IdP settings are present.""" + return bool(cfg.idp_entity_id and cfg.idp_sso_url and cfg.idp_certificate) + + +# ── python3-saml settings builder ───────────────────────────────────────────── + +def _build_saml_settings(cfg: SsoConfig) -> dict: + sp: dict = { + "entityId": cfg.sp_entity_id or "", + "assertionConsumerService": { + "url": cfg.sp_acs_url or "", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST", + }, + "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress", + "x509cert": cfg.sp_certificate or "", + "privateKey": cfg.sp_private_key or "", + } + if cfg.sp_slo_url: + sp["singleLogoutService"] = { + "url": cfg.sp_slo_url, + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + } + + idp: dict = { + "entityId": cfg.idp_entity_id or "", + "singleSignOnService": { + "url": cfg.idp_sso_url or "", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "x509cert": cfg.idp_certificate or "", + } + if cfg.idp_slo_url: + idp["singleLogoutService"] = { + "url": cfg.idp_slo_url, + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + } + + return { + "strict": True, + "debug": False, + "sp": sp, + "idp": idp, + } + + +def _build_saml_request(request_data: dict) -> dict: + """Build the request dict expected by python3-saml.""" + return { + "https": "on" if request_data.get("https") else "off", + "http_host": request_data.get("http_host", ""), + "script_name": request_data.get("path", ""), + "server_port": request_data.get("port", "443"), + "get_data": request_data.get("get_data", {}), + "post_data": request_data.get("post_data", {}), + "query_string": request_data.get("query_string", ""), + } + + +# ── Login initiation ────────────────────────────────────────────────────────── + +def initiate_login(db: Session, request_data: dict) -> dict: + """ + Build a SAML AuthnRequest and return the IdP redirect URL. + + ``request_data`` must contain: https, http_host, path, port. + Returns ``{"redirect_url": ..., "request_id": ...}``. + Raises RuntimeError if SAML library not available or SSO not configured. + """ + if not _SAML_AVAILABLE: + raise RuntimeError("SAML library not available — see server logs") + + cfg = get_or_404(db) + if not cfg.is_enabled: + raise RuntimeError("SSO is not enabled") + if not is_configured(cfg): + raise RuntimeError("SSO IdP is not fully configured") + + settings_dict = _build_saml_settings(cfg) + req = _build_saml_request(request_data) + auth = OneLogin_Saml2_Auth(req, old_settings=settings_dict) + redirect_url, request_id = auth.login(return_to=None, force_authn=False, + is_passive=False, set_nameid_policy=True, + name_id_value_req=None), auth.get_last_request_id() + return {"redirect_url": redirect_url, "request_id": request_id} + + +# ── ACS (callback) ──────────────────────────────────────────────────────────── + +def process_callback(db: Session, request_data: dict) -> User: + """ + Process the SAML Response POSTed by the IdP. + + Returns the authenticated User (creating if auto_provision is True). + Raises ValueError on assertion errors. + """ + if not _SAML_AVAILABLE: + raise RuntimeError("SAML library not available — see server logs") + + cfg = get_or_404(db) + if not cfg.is_enabled: + raise RuntimeError("SSO is not enabled") + + settings_dict = _build_saml_settings(cfg) + req = _build_saml_request(request_data) + auth = OneLogin_Saml2_Auth(req, old_settings=settings_dict) + auth.process_response() + + errors = auth.get_errors() + if errors: + raise ValueError(f"SAML assertion errors: {errors}. {auth.get_last_error_reason()}") + + if not auth.is_authenticated(): + raise ValueError("SAML authentication failed — not authenticated") + + # Extract attributes + attrs = auth.get_attributes() + name_id = auth.get_nameid() + email_attr = cfg.attr_email or "email" + username_attr = cfg.attr_username or "username" + role_attr = cfg.attr_role or "role" + + email = _first_attr(attrs, email_attr) or name_id or "" + username = _first_attr(attrs, username_attr) or email.split("@")[0] or name_id + role = _first_attr(attrs, role_attr) or cfg.default_role or "viewer" + + # Validate role + valid_roles = {"admin", "red_lead", "blue_lead", "red_tech", "blue_tech", "viewer"} + if role not in valid_roles: + role = cfg.default_role or "viewer" + + # Look up or provision user + user = db.query(User).filter(User.username == username).first() + if user: + # Refresh role from IdP on every login + user.role = role + db.commit() + return user + + if not cfg.auto_provision: + raise ValueError(f"User '{username}' not found and auto-provisioning is disabled") + + # Create new user (no password — SSO-only) + import secrets as _secrets + from passlib.context import CryptContext + _pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto") + dummy_hash = _pwd_ctx.hash(_secrets.token_hex(32)) + + user = User( + username = username, + email = email, + hashed_password = dummy_hash, + role = role, + is_active = True, + must_change_password = False, + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + +def _first_attr(attrs: dict, name: str) -> Optional[str]: + """Return the first value of a SAML attribute, or None.""" + v = attrs.get(name) + if isinstance(v, list) and v: + return str(v[0]) + if isinstance(v, str): + return v + return None + + +# ── SP Metadata ─────────────────────────────────────────────────────────────── + +def get_sp_metadata(db: Session) -> str: + """ + Generate SP SAML metadata XML. + + Uses python3-saml if available; falls back to a minimal hand-built XML + so the endpoint is always functional for configuration purposes. + """ + cfg = get_or_404(db) + settings_dict = _build_saml_settings(cfg) + + if _SAML_AVAILABLE: + saml_settings = OneLogin_Saml2_Settings(settings=settings_dict, sp_validation_only=True) + metadata = saml_settings.get_sp_metadata() + errors = saml_settings.validate_metadata(metadata) + if errors: + log.warning("SP metadata validation warnings: %s", errors) + return metadata.decode() if isinstance(metadata, bytes) else metadata + + # Fallback: minimal XML without signing (useful for dev/testing) + sp = settings_dict["sp"] + acs = sp.get("assertionConsumerService", {}) + return f""" + + + + +""" + + +def get_status(db: Session) -> dict: + """Return SSO status summary.""" + cfg = get_config(db) + if not cfg: + return {"enabled": False, "provider_name": None, "configured": False, "login_url": None} + return { + "enabled": cfg.is_enabled, + "provider_name": cfg.provider_name, + "configured": is_configured(cfg), + "login_url": "/api/v1/sso/login" if cfg.is_enabled and is_configured(cfg) else None, + } diff --git a/backend/requirements.txt b/backend/requirements.txt index 7dad6aa..1f2043a 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -22,6 +22,7 @@ atlassian-python-api>=4.0.0 tempo-api-python-client>=0.8.0 weasyprint>=62.0 docxtpl>=0.18.0 +python3-saml>=1.15.0 # Testing pytest diff --git a/scripts/qa_phase14.py b/scripts/qa_phase14.py new file mode 100644 index 0000000..5f8e6e2 --- /dev/null +++ b/scripts/qa_phase14.py @@ -0,0 +1,305 @@ +""" +QA script for Phase 14 — Enterprise Readiness (API Keys + SSO Config). +Run with: python -X utf8 scripts/qa_phase14.py +""" + +import sys, os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests + +BASE = "http://localhost:8000/api/v1" +PASS = "\033[92m✓\033[0m" +FAIL = "\033[91m✗\033[0m" + +passed = 0 +failed = 0 + + +def check(label: str, condition: bool, detail: str = ""): + global passed, failed + if condition: + passed += 1 + print(f" {PASS} {label}") + else: + failed += 1 + msg = f" {FAIL} {label}" + if detail: + msg += f" — {detail}" + print(msg) + + +def get_token(username="administrator", password="admin123"): + r = requests.post(f"{BASE}/auth/login", + data={"username": username, "password": password}) + if r.status_code == 200: + return r.json().get("access_token") or r.json().get("token") + raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}") + + +def auth(token): + return {"Authorization": f"Bearer {token}"} + + +# ───────────────────────────────────────────────────────────────────────────── + +def main(): + print("\n====== Phase 14 QA — Enterprise Readiness ======\n") + + token = get_token() + h = auth(token) + + # ── Block 1: Create API key ─────────────────────────────────────────────── + print("── Block 1: Create API key ──") + + r = requests.post(f"{BASE}/api-keys", headers=h, json={ + "name": "QA Test Key", + "description": "Automated QA key", + "scopes": ["read", "write"], + }) + check("POST /api-keys → 201", r.status_code == 201, r.text[:200]) + created = r.json() if r.status_code == 201 else {} + + check("Created key has id", bool(created.get("id"))) + check("Created key has raw_key", bool(created.get("raw_key"))) + check("raw_key starts with aegis_", str(created.get("raw_key", "")).startswith("aegis_")) + check("key_prefix present", bool(created.get("key_prefix"))) + check("key_prefix matches raw_key start", + str(created.get("raw_key", "")).startswith(str(created.get("key_prefix", "X")))) + check("scopes correct", set(created.get("scopes", [])) == {"read", "write"}) + check("is_active = True", created.get("is_active") == True) + check("raw_key NOT in key_prefix", created.get("raw_key") != created.get("key_prefix")) + + raw_key = created.get("raw_key", "") + key_id = created.get("id", "") + + # Create a read-only key + r2 = requests.post(f"{BASE}/api-keys", headers=h, json={ + "name": "QA Read-only Key", + "scopes": ["read"], + }) + check("POST second key (read-only) → 201", r2.status_code == 201, r2.text[:100]) + raw_key_ro = r2.json().get("raw_key", "") if r2.status_code == 201 else "" + key_id_ro = r2.json().get("id", "") if r2.status_code == 201 else "" + + # Invalid scope → 422 + r3 = requests.post(f"{BASE}/api-keys", headers=h, json={ + "name": "Bad Scope Key", + "scopes": ["superuser"], + }) + check("POST with invalid scope → 422", r3.status_code == 422) + + print() + + # ── Block 2: List & Get API keys ───────────────────────────────────────── + print("── Block 2: List & Get API keys ──") + + r = requests.get(f"{BASE}/api-keys", headers=h) + check("GET /api-keys → 200", r.status_code == 200, r.text[:100]) + keys = r.json() if r.status_code == 200 else [] + check("List not empty", len(keys) > 0) + check("List contains our key", any(k["id"] == key_id for k in keys)) + check("raw_key NOT in list response", all("raw_key" not in k for k in keys)) + check("key_hash NOT in list response", all("key_hash" not in k for k in keys)) + + r = requests.get(f"{BASE}/api-keys/{key_id}", headers=h) + check("GET /api-keys/{id} → 200", r.status_code == 200) + k = r.json() if r.status_code == 200 else {} + check("Get returns correct id", k.get("id") == key_id) + check("raw_key NOT in get response", "raw_key" not in k) + + # 404 for non-existent key + r = requests.get( + f"{BASE}/api-keys/00000000-0000-0000-0000-000000000001", headers=h + ) + check("GET non-existent key → 404", r.status_code == 404) + + print() + + # ── Block 3: Authenticate with API key ─────────────────────────────────── + print("── Block 3: Authenticate with API key ──") + + if raw_key: + api_key_header = {"Authorization": f"Bearer {raw_key}"} + + r = requests.get(f"{BASE}/techniques", headers=api_key_header) + check("GET /techniques with API key → 200", r.status_code == 200, + r.text[:150]) + + r = requests.get(f"{BASE}/risk/summary", headers=api_key_header) + check("GET /risk/summary with API key → 200", r.status_code == 200) + + r = requests.get(f"{BASE}/dashboard/kpis", headers=api_key_header) + check("GET /dashboard/kpis with API key → 200", r.status_code == 200) + + # Verify last_used_at was updated + r = requests.get(f"{BASE}/api-keys/{key_id}", headers=h) + check("last_used_at updated after API key use", + r.status_code == 200 and r.json().get("last_used_at") is not None) + + # Invalid / garbage key → 401 + r = requests.get(f"{BASE}/techniques", + headers={"Authorization": "Bearer aegis_deadbeefdeadbeef"}) + check("Invalid API key → 401", r.status_code == 401) + + print() + + # ── Block 4: Update API key ─────────────────────────────────────────────── + print("── Block 4: Update API key ──") + + r = requests.patch(f"{BASE}/api-keys/{key_id}", headers=h, json={ + "name": "QA Test Key (updated)", + "scopes": ["read"], + }) + check("PATCH /api-keys/{id} → 200", r.status_code == 200, r.text[:150]) + upd = r.json() if r.status_code == 200 else {} + check("Name updated", upd.get("name") == "QA Test Key (updated)") + check("Scopes updated to [read]", upd.get("scopes") == ["read"]) + + print() + + # ── Block 5: Revoke API key ─────────────────────────────────────────────── + print("── Block 5: Revoke API key ──") + + if key_id_ro: + r = requests.post(f"{BASE}/api-keys/{key_id_ro}/revoke", headers=h) + check("POST /api-keys/{id}/revoke → 200", r.status_code == 200, + r.text[:100]) + rev = r.json() if r.status_code == 200 else {} + check("is_active = False after revoke", rev.get("is_active") == False) + + # Revoked key should no longer authenticate + if raw_key_ro: + r = requests.get(f"{BASE}/techniques", + headers={"Authorization": f"Bearer {raw_key_ro}"}) + check("Revoked API key → 401", r.status_code == 401) + + print() + + # ── Block 6: SSO status (public) ───────────────────────────────────────── + print("── Block 6: SSO status & config ──") + + r = requests.get(f"{BASE}/sso/status") + check("GET /sso/status → 200 (no auth required)", r.status_code == 200, + r.text[:150]) + status_data = r.json() if r.status_code == 200 else {} + check("SSO status has 'enabled' field", "enabled" in status_data) + check("SSO status has 'configured' field", "configured" in status_data) + + # SP metadata (may be 503 if no config yet, but endpoint exists) + r = requests.get(f"{BASE}/sso/metadata") + check("GET /sso/metadata returns XML or 404/503", + r.status_code in (200, 404, 503)) + if r.status_code == 200: + check("Metadata is XML", "EntityDescriptor" in r.text + or "xml" in r.headers.get("content-type", "")) + + # Get config (none yet → 404 or empty) + r = requests.get(f"{BASE}/sso/config", headers=h) + check("GET /sso/config (admin) → 200 or 404", r.status_code in (200, 404)) + + # Create/update SSO config + sso_payload = { + "is_enabled": False, + "provider_name": "Test IdP", + "sp_entity_id": "https://aegis.example.com/api/v1/sso/metadata", + "sp_acs_url": "https://aegis.example.com/api/v1/sso/callback", + "idp_entity_id": "https://idp.example.com", + "idp_sso_url": "https://idp.example.com/sso/saml", + "idp_certificate": "MIIC...fakecert", + "attr_email": "email", + "attr_username": "username", + "attr_role": "role", + "default_role": "viewer", + "auto_provision": True, + } + r = requests.put(f"{BASE}/sso/config", headers=h, json=sso_payload) + check("PUT /sso/config (admin) → 200", r.status_code == 200, r.text[:200]) + cfg = r.json() if r.status_code == 200 else {} + check("Config has provider_name", cfg.get("provider_name") == "Test IdP") + check("Config has idp_entity_id", bool(cfg.get("idp_entity_id"))) + check("sp_private_key NOT in response", "sp_private_key" not in cfg) + + # Re-fetch config + r = requests.get(f"{BASE}/sso/config", headers=h) + check("GET /sso/config → 200 after upsert", r.status_code == 200) + + # Status should now reflect configured=True + r = requests.get(f"{BASE}/sso/status") + st = r.json() if r.status_code == 200 else {} + check("SSO configured=True after upsert", st.get("configured") == True) + check("SSO enabled=False (we set it off)", st.get("enabled") == False) + + print() + + # ── Block 7: SSO login endpoint (disabled → 503) ────────────────────────── + print("── Block 7: SSO login (disabled) ──") + + r = requests.get(f"{BASE}/sso/login", allow_redirects=False) + # Should be 503 (disabled) or 307/302 (if enabled and library present) + check("GET /sso/login returns 5xx when disabled or no lib", + r.status_code in (503, 302, 307, 400)) + + print() + + # ── Block 8: Auth protection ────────────────────────────────────────────── + print("── Block 8: Auth protection ──") + + r = requests.get(f"{BASE}/api-keys") + check("GET /api-keys without auth → 401", r.status_code == 401) + + r = requests.post(f"{BASE}/api-keys", json={"name": "x", "scopes": ["read"]}) + check("POST /api-keys without auth → 401", r.status_code == 401) + + r = requests.get(f"{BASE}/sso/config") + check("GET /sso/config without auth → 401", r.status_code == 401) + + r = requests.put(f"{BASE}/sso/config", json={"is_enabled": False}) + check("PUT /sso/config without auth → 401", r.status_code == 401) + + # Non-admin cannot delete keys + # (would need a non-admin user setup — skip for now) + + print() + + # ── Block 9: Cleanup ───────────────────────────────────────────────────── + print("── Block 9: Cleanup ──") + + # Hard-delete the QA keys (admin) + for kid in [key_id, key_id_ro]: + if kid: + r = requests.delete(f"{BASE}/api-keys/{kid}", headers=h) + check(f"DELETE /api-keys/{kid[:8]}... → 204", r.status_code == 204, + r.text[:100]) + + print() + + # ── Block 10: Regression ───────────────────────────────────────────────── + print("── Block 10: Regression ──") + + r = requests.get(f"{BASE}/dashboard/kpis", headers=h) + check("GET /dashboard/kpis still works", r.status_code == 200) + + r = requests.get(f"{BASE}/risk/summary", headers=h) + check("GET /risk/summary still works", r.status_code == 200) + + r = requests.get(f"{BASE}/knowledge/playbooks", headers=h) + check("GET /knowledge/playbooks still works", r.status_code == 200) + + r = requests.get(f"{BASE}/techniques", headers=h) + check("GET /techniques still works", r.status_code == 200) + + print() + + # ── Summary ─────────────────────────────────────────────────────────────── + total = passed + failed + print(f"====== Results: {passed}/{total} passed", end="") + if failed: + print(f" — \033[91m{failed} FAILED\033[0m ======\n") + sys.exit(1) + else: + print(" ✓ ALL PASSED ======\n") + + +if __name__ == "__main__": + main()