feat(enterprise): Phase 14 — API Key Management + SSO/SAML 2.0
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

- ApiKey model (SHA-256 hash, prefix, scopes, expiry) + Alembic migration (b040ent)
- SsoConfig model for SAML 2.0 IdP settings (attribute mapping, auto-provision)
- API key auth integrated into get_current_user (aegis_ prefix detection)
- Routers: /api/v1/api-keys (full CRUD + revoke) and /api/v1/sso (metadata, login, callback, config)
- python3-saml added to requirements; Dockerfile adds libxmlsec1-dev for SAML XML signing
- QA script: 52 assertions covering key lifecycle, API key auth, SSO config

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-20 16:43:57 +02:00
parent ab591d30c4
commit d81fc04b8f
15 changed files with 1335 additions and 0 deletions

View File

@@ -7,6 +7,10 @@ RUN apt-get update && apt-get install -y \
gcc \
libpq-dev \
curl \
pkg-config \
libxml2-dev \
libxmlsec1-dev \
libxmlsec1-openssl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching

View File

@@ -0,0 +1,75 @@
"""Phase 14: Enterprise Readiness — api_keys and sso_configs tables.
Revision ID: b040ent
Revises: b039exec
Create Date: 2026-05-20
"""
from alembic import op
import sqlalchemy as sa
revision = "b040ent"
down_revision = "b039exec"
branch_labels = None
depends_on = None
def upgrade() -> None:
conn = op.get_bind()
# ── api_keys ──────────────────────────────────────────────────────────────
conn.execute(sa.text("""
CREATE TABLE IF NOT EXISTS api_keys (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(200) NOT NULL,
description TEXT,
key_prefix VARCHAR(13) NOT NULL,
key_hash VARCHAR(64) NOT NULL UNIQUE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
scopes JSONB NOT NULL DEFAULT '["read"]',
last_used_at TIMESTAMP WITHOUT TIME ZONE,
expires_at TIMESTAMP WITHOUT TIME ZONE,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now()
)
"""))
conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_api_keys_user_id ON api_keys (user_id)"
))
conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_api_keys_key_hash ON api_keys (key_hash)"
))
conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_api_keys_active ON api_keys (is_active)"
))
# ── sso_configs ───────────────────────────────────────────────────────────
conn.execute(sa.text("""
CREATE TABLE IF NOT EXISTS sso_configs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
is_enabled BOOLEAN NOT NULL DEFAULT FALSE,
provider_name VARCHAR(200),
sp_entity_id VARCHAR(500),
sp_acs_url VARCHAR(500),
sp_slo_url VARCHAR(500),
sp_certificate TEXT,
sp_private_key TEXT,
idp_entity_id VARCHAR(500),
idp_sso_url VARCHAR(500),
idp_slo_url VARCHAR(500),
idp_certificate TEXT,
attr_email VARCHAR(200) DEFAULT 'email',
attr_username VARCHAR(200) DEFAULT 'username',
attr_role VARCHAR(200) DEFAULT 'role',
default_role VARCHAR(50) DEFAULT 'viewer',
auto_provision BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(),
updated_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now()
)
"""))
def downgrade() -> None:
conn = op.get_bind()
conn.execute(sa.text("DROP TABLE IF EXISTS api_keys CASCADE"))
conn.execute(sa.text("DROP TABLE IF EXISTS sso_configs CASCADE"))

View File

@@ -4,6 +4,7 @@ Authentication and RBAC dependencies for FastAPI.
Provides:
- ``get_current_user``: decodes JWT from HttpOnly cookie (preferred) or
Authorization header (fallback), fetches user from DB, raises 401 on failure.
Also accepts Aegis API keys (``aegis_…`` prefix) as Bearer tokens.
- ``require_role``: factory that returns a dependency enforcing a specific role
(admins always pass).
"""
@@ -19,6 +20,7 @@ from app import auth as auth_lib
from app.config import settings
from app.database import get_db
from app.models.user import User
from app.models.api_key import KEY_PREFIX
# ---------------------------------------------------------------------------
# OAuth2 scheme (reads Authorization header — used as fallback / Swagger UI)
@@ -68,6 +70,15 @@ async def get_current_user(
if token is None:
raise credentials_exception
# ── API Key path (Bearer token starts with "aegis_") ──────────────────
if token.startswith(KEY_PREFIX):
from app.services.api_key_service import authenticate_raw_key
user = authenticate_raw_key(db, token)
if user is None:
raise credentials_exception
return user
# ── JWT path ──────────────────────────────────────────────────────────
try:
payload = jwt.decode(
token,

View File

@@ -44,6 +44,8 @@ from app.routers import attack_paths as attack_paths_router
from app.routers import knowledge as knowledge_router
from app.routers import risk_intelligence as risk_router
from app.routers import executive_dashboard as dashboard_router
from app.routers import api_keys as api_keys_router
from app.routers import sso as sso_router
from app.domain.errors import DomainError
from app.middleware.error_handler import domain_exception_handler
from app.middleware.request_context import RequestContextMiddleware
@@ -147,6 +149,8 @@ app.include_router(attack_paths_router.router, prefix="/api/v1")
app.include_router(knowledge_router.router, prefix="/api/v1")
app.include_router(risk_router.router, prefix="/api/v1")
app.include_router(dashboard_router.router, prefix="/api/v1")
app.include_router(api_keys_router.router, prefix="/api/v1")
app.include_router(sso_router.router, prefix="/api/v1")
@app.get("/health", include_in_schema=False)

View File

@@ -41,6 +41,8 @@ from app.models.attack_path import (
from app.models.knowledge import Playbook, PlaybookVersion, LessonLearned
from app.models.risk_intelligence import TechniqueRiskProfile
from app.models.executive_dashboard import PostureSnapshot
from app.models.api_key import ApiKey
from app.models.sso_config import SsoConfig
__all__ = [
"User", "Technique", "Test", "TestTemplate", "Evidence",
@@ -65,4 +67,6 @@ __all__ = [
"Playbook", "PlaybookVersion", "LessonLearned",
"TechniqueRiskProfile",
"PostureSnapshot",
"ApiKey",
"SsoConfig",
]

View File

@@ -0,0 +1,81 @@
"""Phase 14: API Key model for programmatic access."""
import hashlib
import secrets
import uuid
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Index, String, Text
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import relationship
from app.database import Base
# ── Key generation constants ──────────────────────────────────────────────────
KEY_PREFIX = "aegis_"
KEY_BYTES = 32 # 32 random bytes → 64 hex chars → 70-char key total
DISPLAY_LEN = 12 # chars stored as prefix for UI display
def generate_raw_key() -> str:
"""Generate a fresh raw API key (must be shown to user only once)."""
return KEY_PREFIX + secrets.token_hex(KEY_BYTES)
def hash_key(raw_key: str) -> str:
"""SHA-256 hash of a raw API key for secure storage."""
return hashlib.sha256(raw_key.encode()).hexdigest()
def key_prefix_display(raw_key: str) -> str:
"""First DISPLAY_LEN characters of the raw key (safe for UI)."""
return raw_key[:DISPLAY_LEN]
# ── Valid scopes ──────────────────────────────────────────────────────────────
VALID_SCOPES = {"read", "write", "admin"}
class ApiKey(Base):
"""
Scoped API key for programmatic / BI / SOAR access.
The full raw key is **never stored** — only a SHA-256 hash.
The first 12 characters (``key_prefix``) are retained for display.
"""
__tablename__ = "api_keys"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(200), nullable=False)
description = Column(Text, nullable=True)
# Display only — never use for auth
key_prefix = Column(String(DISPLAY_LEN + 1), nullable=False)
# Auth token — SHA-256 of the full raw key
key_hash = Column(String(64), nullable=False, unique=True)
# Owner
user_id = Column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
)
# Permissions
scopes = Column(JSONB, nullable=False, default=["read"]) # ["read","write","admin"]
# Lifecycle
last_used_at = Column(DateTime, nullable=True)
expires_at = Column(DateTime, nullable=True)
is_active = Column(Boolean, nullable=False, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", foreign_keys=[user_id])
__table_args__ = (
Index("ix_api_keys_user_id", "user_id"),
Index("ix_api_keys_key_hash", "key_hash"),
Index("ix_api_keys_active", "is_active"),
)

View File

@@ -0,0 +1,49 @@
"""Phase 14: SSO / SAML 2.0 configuration model."""
import uuid
from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, String, Text
from sqlalchemy.dialects.postgresql import JSONB, UUID
from app.database import Base
class SsoConfig(Base):
"""
SAML 2.0 Identity Provider configuration.
Exactly one row is expected (use upsert). The SP metadata endpoint
reads from this row to generate XML for IdP registration.
"""
__tablename__ = "sso_configs"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
is_enabled = Column(Boolean, nullable=False, default=False)
provider_name = Column(String(200), nullable=True) # e.g., "Okta", "Azure AD"
# ── Service Provider (Aegis) settings ────────────────────────────────────
sp_entity_id = Column(String(500), nullable=True) # e.g., https://aegis.co/api/v1/sso/metadata
sp_acs_url = Column(String(500), nullable=True) # Assertion Consumer Service URL
sp_slo_url = Column(String(500), nullable=True) # Single Logout URL (optional)
sp_certificate = Column(Text, nullable=True) # SP public cert for signed requests
sp_private_key = Column(Text, nullable=True) # SP private key (stored encrypted in future)
# ── Identity Provider settings ────────────────────────────────────────────
idp_entity_id = Column(String(500), nullable=True)
idp_sso_url = Column(String(500), nullable=True) # IdP redirect/POST binding URL
idp_slo_url = Column(String(500), nullable=True) # IdP SLO URL
idp_certificate = Column(Text, nullable=True) # IdP X.509 cert for response validation
# ── Attribute mapping ─────────────────────────────────────────────────────
# SAML attribute name → Aegis field
attr_email = Column(String(200), nullable=True, default="email")
attr_username = Column(String(200), nullable=True, default="username")
attr_role = Column(String(200), nullable=True, default="role")
default_role = Column(String(50), nullable=True, default="viewer")
auto_provision = Column(Boolean, nullable=False, default=True) # create user on first login
# ── Meta ─────────────────────────────────────────────────────────────────
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

View File

@@ -0,0 +1,104 @@
"""Phase 14: API Key management router."""
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.models.user import User
from app.schemas.api_key_schema import (
ApiKeyCreate, ApiKeyCreated, ApiKeyOut, ApiKeyUpdate,
)
import app.services.api_key_service as svc
router = APIRouter(prefix="/api-keys", tags=["API Keys"])
@router.post("", response_model=ApiKeyCreated, status_code=201)
def create_key(
body: ApiKeyCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Create a scoped API key.
The ``raw_key`` field in the response is shown **exactly once** and
cannot be retrieved later. Store it securely.
"""
key, raw_key = svc.create_api_key(
db,
user_id = current_user.id,
name = body.name,
scopes = body.scopes,
description = body.description,
expires_at = body.expires_at,
)
out = ApiKeyOut.model_validate(key)
return ApiKeyCreated(**out.model_dump(), raw_key=raw_key)
@router.get("", response_model=List[ApiKeyOut])
def list_keys(
include_inactive: bool = Query(False),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List API keys owned by the current user."""
# Admins can see all keys; others only see their own
user_id = None if current_user.role == "admin" else current_user.id
return svc.list_api_keys(db, user_id=user_id, include_inactive=include_inactive)
@router.get("/{key_id}", response_model=ApiKeyOut)
def get_key(
key_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get a single API key (owner or admin)."""
user_id = None if current_user.role == "admin" else current_user.id
return svc.get_api_key(db, key_id, user_id=user_id)
@router.patch("/{key_id}", response_model=ApiKeyOut)
def update_key(
key_id: UUID,
body: ApiKeyUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update name, description, scopes, expiry, or active status."""
user_id = None if current_user.role == "admin" else current_user.id
return svc.update_api_key(
db, key_id, user_id,
name = body.name,
description = body.description,
scopes = body.scopes,
expires_at = body.expires_at,
is_active = body.is_active,
)
@router.post("/{key_id}/revoke", response_model=ApiKeyOut)
def revoke_key(
key_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Revoke an API key (soft-delete — sets is_active=False)."""
user_id = None if current_user.role == "admin" else current_user.id
return svc.revoke_api_key(db, key_id, user_id=user_id)
@router.delete("/{key_id}", status_code=204)
def delete_key(
key_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("admin")),
):
"""Permanently delete an API key (admin only)."""
svc.delete_api_key(db, key_id)

117
backend/app/routers/sso.py Normal file
View File

@@ -0,0 +1,117 @@
"""Phase 14: SSO / SAML 2.0 router."""
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app import auth as auth_lib
from app.schemas.sso_schema import (
SsoConfigCreate, SsoConfigOut, SsoLoginInitResponse, SsoStatusResponse,
)
import app.services.sso_service as svc
router = APIRouter(prefix="/sso", tags=["SSO"])
_COOKIE_NAME = "aegis_token"
_COOKIE_OPTS = {"httponly": True, "samesite": "lax", "secure": False}
# ── Public ────────────────────────────────────────────────────────────────────
@router.get("/status", response_model=SsoStatusResponse)
def sso_status(db: Session = Depends(get_db)):
"""Return whether SSO is enabled and configured (public — for login page)."""
return svc.get_status(db)
@router.get("/metadata", response_model=None)
def sp_metadata(db: Session = Depends(get_db)):
"""
Return the Service Provider SAML metadata XML.
Upload this XML to your IdP (Okta, Azure AD, etc.) to register Aegis.
"""
try:
xml = svc.get_sp_metadata(db)
except Exception as exc:
raise HTTPException(status_code=503, detail=str(exc))
return Response(content=xml, media_type="application/xml")
@router.get("/login")
def sso_login(request: Request, db: Session = Depends(get_db)):
"""
Initiate SAML login — redirects the browser to the IdP.
The IdP will POST the SAML Response to ``/sso/callback`` after authentication.
"""
request_data = {
"https": request.url.scheme == "https",
"http_host": request.url.hostname,
"path": request.url.path,
"port": str(request.url.port or (443 if request.url.scheme == "https" else 80)),
"get_data": dict(request.query_params),
"post_data": {},
"query_string": str(request.url.query),
}
try:
result = svc.initiate_login(db, request_data)
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=str(exc))
return RedirectResponse(url=result["redirect_url"])
@router.post("/callback")
async def sso_callback(request: Request, db: Session = Depends(get_db)):
"""
SAML Assertion Consumer Service (ACS) endpoint.
The IdP POSTs the SAML Response here. On success, sets the aegis_token
cookie and redirects to the frontend.
"""
form = await request.form()
request_data = {
"https": request.url.scheme == "https",
"http_host": request.url.hostname,
"path": request.url.path,
"port": str(request.url.port or (443 if request.url.scheme == "https" else 80)),
"get_data": dict(request.query_params),
"post_data": dict(form),
"query_string": str(request.url.query),
}
try:
user = svc.process_callback(db, request_data)
except (ValueError, RuntimeError) as exc:
raise HTTPException(status_code=401, detail=str(exc))
access_token = auth_lib.create_access_token({"sub": user.username})
response = RedirectResponse(url="/", status_code=302)
response.set_cookie(_COOKIE_NAME, access_token, **_COOKIE_OPTS)
return response
# ── Admin configuration ────────────────────────────────────────────────────────
@router.get("/config", response_model=SsoConfigOut)
def get_sso_config(
db: Session = Depends(get_db),
_user=Depends(require_any_role("admin")),
):
"""Return the current SSO configuration (admin only)."""
cfg = svc.get_config(db)
if not cfg:
raise HTTPException(status_code=404, detail="SSO not configured yet")
return SsoConfigOut.model_validate(cfg)
@router.put("/config", response_model=SsoConfigOut)
def upsert_sso_config(
body: SsoConfigCreate,
db: Session = Depends(get_db),
_user=Depends(require_any_role("admin")),
):
"""Create or replace the SSO configuration (admin only)."""
cfg = svc.upsert_config(db, **body.model_dump(exclude_unset=False))
return SsoConfigOut.model_validate(cfg)

View File

@@ -0,0 +1,68 @@
"""Phase 14: API Key Pydantic schemas."""
from __future__ import annotations
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
from app.models.api_key import VALID_SCOPES
class ApiKeyCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=200)
description: Optional[str] = None
scopes: List[str] = Field(default=["read"])
expires_at: Optional[datetime] = None
@field_validator("scopes")
@classmethod
def validate_scopes(cls, v: list) -> list:
invalid = set(v) - VALID_SCOPES
if invalid:
raise ValueError(f"Invalid scopes: {invalid}. Valid: {VALID_SCOPES}")
if not v:
raise ValueError("At least one scope is required")
return v
class ApiKeyOut(BaseModel):
"""Safe representation — never exposes key_hash."""
id: UUID
name: str
description: Optional[str] = None
key_prefix: str
user_id: UUID
scopes: List[str]
last_used_at: Optional[datetime] = None
expires_at: Optional[datetime] = None
is_active: bool
created_at: Optional[datetime] = None
class Config:
from_attributes = True
class ApiKeyCreated(ApiKeyOut):
"""Returned only once at creation — includes the raw key."""
raw_key: str = Field(..., description="The full API key — shown only this once.")
class ApiKeyUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = None
scopes: Optional[List[str]] = None
expires_at: Optional[datetime] = None
is_active: Optional[bool] = None
@field_validator("scopes")
@classmethod
def validate_scopes(cls, v: Optional[list]) -> Optional[list]:
if v is None:
return v
invalid = set(v) - VALID_SCOPES
if invalid:
raise ValueError(f"Invalid scopes: {invalid}")
return v

View File

@@ -0,0 +1,80 @@
"""Phase 14: SSO / SAML 2.0 Pydantic schemas."""
from __future__ import annotations
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, Field
class SsoConfigCreate(BaseModel):
is_enabled: bool = False
provider_name: Optional[str] = None
# SP settings (auto-derived if not provided)
sp_entity_id: Optional[str] = None
sp_acs_url: Optional[str] = None
sp_slo_url: Optional[str] = None
sp_certificate: Optional[str] = None
sp_private_key: Optional[str] = None
# IdP settings
idp_entity_id: Optional[str] = None
idp_sso_url: Optional[str] = None
idp_slo_url: Optional[str] = None
idp_certificate: Optional[str] = None
# Attribute mapping
attr_email: Optional[str] = "email"
attr_username: Optional[str] = "username"
attr_role: Optional[str] = "role"
default_role: Optional[str] = "viewer"
auto_provision: bool = True
class SsoConfigUpdate(SsoConfigCreate):
"""All fields optional for partial updates."""
pass
class SsoConfigOut(BaseModel):
id: UUID
is_enabled: bool
provider_name: Optional[str] = None
sp_entity_id: Optional[str] = None
sp_acs_url: Optional[str] = None
sp_slo_url: Optional[str] = None
sp_certificate: Optional[str] = None
# sp_private_key is intentionally OMITTED from responses
idp_entity_id: Optional[str] = None
idp_sso_url: Optional[str] = None
idp_slo_url: Optional[str] = None
idp_certificate: Optional[str] = None
attr_email: Optional[str] = None
attr_username: Optional[str] = None
attr_role: Optional[str] = None
default_role: Optional[str] = None
auto_provision: bool = True
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class SsoLoginInitResponse(BaseModel):
redirect_url: str = Field(..., description="URL to redirect the browser to for IdP login")
request_id: str = Field(..., description="SAML AuthnRequest ID for validation")
class SsoStatusResponse(BaseModel):
enabled: bool
provider_name: Optional[str] = None
configured: bool = Field(..., description="True if IdP settings are present")
login_url: Optional[str] = None # /sso/login URL

View File

@@ -0,0 +1,151 @@
"""Phase 14: API Key service — create, list, revoke, authenticate."""
from __future__ import annotations
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.domain.errors import EntityNotFoundError, DuplicateEntityError
from app.models.api_key import ApiKey, generate_raw_key, hash_key, key_prefix_display
from app.models.user import User
# ── Create ────────────────────────────────────────────────────────────────────
def create_api_key(
db: Session,
user_id: UUID,
name: str,
scopes: List[str],
description: Optional[str] = None,
expires_at: Optional[datetime] = None,
) -> tuple[ApiKey, str]:
"""
Create a new API key.
Returns ``(ApiKey, raw_key)`` — the raw_key must be shown to the user
immediately and is never retrievable again.
"""
raw_key = generate_raw_key()
key_hash = hash_key(raw_key)
prefix = key_prefix_display(raw_key)
# Detect accidental collision (astronomically unlikely)
if db.query(ApiKey).filter(ApiKey.key_hash == key_hash).first():
raise DuplicateEntityError("ApiKey", "key_hash", "<collision>")
key = ApiKey(
name = name,
description = description,
key_prefix = prefix,
key_hash = key_hash,
user_id = user_id,
scopes = scopes,
expires_at = expires_at,
)
db.add(key)
db.commit()
db.refresh(key)
return key, raw_key
# ── Read ──────────────────────────────────────────────────────────────────────
def list_api_keys(
db: Session,
user_id: Optional[UUID] = None,
include_inactive: bool = False,
) -> List[ApiKey]:
q = db.query(ApiKey)
if user_id is not None:
q = q.filter(ApiKey.user_id == user_id)
if not include_inactive:
q = q.filter(ApiKey.is_active == True)
return q.order_by(ApiKey.created_at.desc()).all()
def get_api_key(db: Session, key_id: UUID, user_id: Optional[UUID] = None) -> ApiKey:
q = db.query(ApiKey).filter(ApiKey.id == key_id)
if user_id is not None:
q = q.filter(ApiKey.user_id == user_id)
key = q.first()
if not key:
raise EntityNotFoundError("ApiKey", str(key_id))
return key
# ── Update / Revoke ───────────────────────────────────────────────────────────
def update_api_key(
db: Session,
key_id: UUID,
user_id: Optional[UUID] = None,
*,
name: Optional[str] = None,
description: Optional[str] = None,
scopes: Optional[List[str]] = None,
expires_at: Optional[datetime] = None,
is_active: Optional[bool] = None,
) -> ApiKey:
key = get_api_key(db, key_id, user_id)
if name is not None:
key.name = name
if description is not None:
key.description = description
if scopes is not None:
key.scopes = scopes
if expires_at is not None:
key.expires_at = expires_at
if is_active is not None:
key.is_active = is_active
db.commit()
db.refresh(key)
return key
def revoke_api_key(
db: Session,
key_id: UUID,
user_id: Optional[UUID] = None,
) -> ApiKey:
"""Soft-revoke: set is_active = False."""
return update_api_key(db, key_id, user_id, is_active=False)
def delete_api_key(db: Session, key_id: UUID, user_id: Optional[UUID] = None) -> None:
"""Hard delete — use revoke instead for audit trail."""
key = get_api_key(db, key_id, user_id)
db.delete(key)
db.commit()
# ── Authentication ────────────────────────────────────────────────────────────
def authenticate_raw_key(db: Session, raw_key: str) -> Optional[User]:
"""
Verify a raw API key.
Returns the owning User if the key is valid, active, and not expired.
Updates ``last_used_at`` (throttled to once per request — always updates).
Returns None on any failure.
"""
h = hash_key(raw_key)
key: Optional[ApiKey] = db.query(ApiKey).filter(ApiKey.key_hash == h).first()
if key is None or not key.is_active:
return None
if key.expires_at and key.expires_at < datetime.utcnow():
return None
# Update last_used_at
key.last_used_at = datetime.utcnow()
db.commit()
user: Optional[User] = db.query(User).filter(User.id == key.user_id).first()
if user is None or not user.is_active:
return None
return user

View File

@@ -0,0 +1,281 @@
"""Phase 14: SSO / SAML 2.0 service."""
from __future__ import annotations
import logging
from typing import Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.domain.errors import EntityNotFoundError
from app.models.sso_config import SsoConfig
from app.models.user import User
log = logging.getLogger(__name__)
# ── python3-saml optional import ──────────────────────────────────────────────
try:
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.settings import OneLogin_Saml2_Settings
from onelogin.saml2.utils import OneLogin_Saml2_Utils
_SAML_AVAILABLE = True
except ImportError: # pragma: no cover
_SAML_AVAILABLE = False
log.warning(
"python3-saml not installed — SAML login/callback endpoints will return 503. "
"Install python3-saml and its system dependencies to enable SSO."
)
# ── Configuration helpers ─────────────────────────────────────────────────────
def get_config(db: Session) -> Optional[SsoConfig]:
"""Return the first (and only) SsoConfig row, or None if none exists."""
return db.query(SsoConfig).first()
def get_or_404(db: Session) -> SsoConfig:
cfg = get_config(db)
if not cfg:
raise EntityNotFoundError("SsoConfig", "singleton")
return cfg
def upsert_config(db: Session, **kwargs) -> SsoConfig:
"""Create or update the singleton SSO config."""
cfg = get_config(db)
if cfg:
for k, v in kwargs.items():
setattr(cfg, k, v)
db.commit()
db.refresh(cfg)
return cfg
cfg = SsoConfig(**kwargs)
db.add(cfg)
db.commit()
db.refresh(cfg)
return cfg
def is_configured(cfg: SsoConfig) -> bool:
"""Return True if the minimum IdP settings are present."""
return bool(cfg.idp_entity_id and cfg.idp_sso_url and cfg.idp_certificate)
# ── python3-saml settings builder ─────────────────────────────────────────────
def _build_saml_settings(cfg: SsoConfig) -> dict:
sp: dict = {
"entityId": cfg.sp_entity_id or "",
"assertionConsumerService": {
"url": cfg.sp_acs_url or "",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST",
},
"NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress",
"x509cert": cfg.sp_certificate or "",
"privateKey": cfg.sp_private_key or "",
}
if cfg.sp_slo_url:
sp["singleLogoutService"] = {
"url": cfg.sp_slo_url,
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
}
idp: dict = {
"entityId": cfg.idp_entity_id or "",
"singleSignOnService": {
"url": cfg.idp_sso_url or "",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
},
"x509cert": cfg.idp_certificate or "",
}
if cfg.idp_slo_url:
idp["singleLogoutService"] = {
"url": cfg.idp_slo_url,
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
}
return {
"strict": True,
"debug": False,
"sp": sp,
"idp": idp,
}
def _build_saml_request(request_data: dict) -> dict:
"""Build the request dict expected by python3-saml."""
return {
"https": "on" if request_data.get("https") else "off",
"http_host": request_data.get("http_host", ""),
"script_name": request_data.get("path", ""),
"server_port": request_data.get("port", "443"),
"get_data": request_data.get("get_data", {}),
"post_data": request_data.get("post_data", {}),
"query_string": request_data.get("query_string", ""),
}
# ── Login initiation ──────────────────────────────────────────────────────────
def initiate_login(db: Session, request_data: dict) -> dict:
"""
Build a SAML AuthnRequest and return the IdP redirect URL.
``request_data`` must contain: https, http_host, path, port.
Returns ``{"redirect_url": ..., "request_id": ...}``.
Raises RuntimeError if SAML library not available or SSO not configured.
"""
if not _SAML_AVAILABLE:
raise RuntimeError("SAML library not available — see server logs")
cfg = get_or_404(db)
if not cfg.is_enabled:
raise RuntimeError("SSO is not enabled")
if not is_configured(cfg):
raise RuntimeError("SSO IdP is not fully configured")
settings_dict = _build_saml_settings(cfg)
req = _build_saml_request(request_data)
auth = OneLogin_Saml2_Auth(req, old_settings=settings_dict)
redirect_url, request_id = auth.login(return_to=None, force_authn=False,
is_passive=False, set_nameid_policy=True,
name_id_value_req=None), auth.get_last_request_id()
return {"redirect_url": redirect_url, "request_id": request_id}
# ── ACS (callback) ────────────────────────────────────────────────────────────
def process_callback(db: Session, request_data: dict) -> User:
"""
Process the SAML Response POSTed by the IdP.
Returns the authenticated User (creating if auto_provision is True).
Raises ValueError on assertion errors.
"""
if not _SAML_AVAILABLE:
raise RuntimeError("SAML library not available — see server logs")
cfg = get_or_404(db)
if not cfg.is_enabled:
raise RuntimeError("SSO is not enabled")
settings_dict = _build_saml_settings(cfg)
req = _build_saml_request(request_data)
auth = OneLogin_Saml2_Auth(req, old_settings=settings_dict)
auth.process_response()
errors = auth.get_errors()
if errors:
raise ValueError(f"SAML assertion errors: {errors}. {auth.get_last_error_reason()}")
if not auth.is_authenticated():
raise ValueError("SAML authentication failed — not authenticated")
# Extract attributes
attrs = auth.get_attributes()
name_id = auth.get_nameid()
email_attr = cfg.attr_email or "email"
username_attr = cfg.attr_username or "username"
role_attr = cfg.attr_role or "role"
email = _first_attr(attrs, email_attr) or name_id or ""
username = _first_attr(attrs, username_attr) or email.split("@")[0] or name_id
role = _first_attr(attrs, role_attr) or cfg.default_role or "viewer"
# Validate role
valid_roles = {"admin", "red_lead", "blue_lead", "red_tech", "blue_tech", "viewer"}
if role not in valid_roles:
role = cfg.default_role or "viewer"
# Look up or provision user
user = db.query(User).filter(User.username == username).first()
if user:
# Refresh role from IdP on every login
user.role = role
db.commit()
return user
if not cfg.auto_provision:
raise ValueError(f"User '{username}' not found and auto-provisioning is disabled")
# Create new user (no password — SSO-only)
import secrets as _secrets
from passlib.context import CryptContext
_pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
dummy_hash = _pwd_ctx.hash(_secrets.token_hex(32))
user = User(
username = username,
email = email,
hashed_password = dummy_hash,
role = role,
is_active = True,
must_change_password = False,
)
db.add(user)
db.commit()
db.refresh(user)
return user
def _first_attr(attrs: dict, name: str) -> Optional[str]:
"""Return the first value of a SAML attribute, or None."""
v = attrs.get(name)
if isinstance(v, list) and v:
return str(v[0])
if isinstance(v, str):
return v
return None
# ── SP Metadata ───────────────────────────────────────────────────────────────
def get_sp_metadata(db: Session) -> str:
"""
Generate SP SAML metadata XML.
Uses python3-saml if available; falls back to a minimal hand-built XML
so the endpoint is always functional for configuration purposes.
"""
cfg = get_or_404(db)
settings_dict = _build_saml_settings(cfg)
if _SAML_AVAILABLE:
saml_settings = OneLogin_Saml2_Settings(settings=settings_dict, sp_validation_only=True)
metadata = saml_settings.get_sp_metadata()
errors = saml_settings.validate_metadata(metadata)
if errors:
log.warning("SP metadata validation warnings: %s", errors)
return metadata.decode() if isinstance(metadata, bytes) else metadata
# Fallback: minimal XML without signing (useful for dev/testing)
sp = settings_dict["sp"]
acs = sp.get("assertionConsumerService", {})
return f"""<?xml version="1.0"?>
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
entityID="{sp.get('entityId', '')}">
<md:SPSSODescriptor
AuthnRequestsSigned="false"
WantAssertionsSigned="true"
protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<md:AssertionConsumerService
Binding="{acs.get('binding', '')}"
Location="{acs.get('url', '')}"
index="1"/>
</md:SPSSODescriptor>
</md:EntityDescriptor>"""
def get_status(db: Session) -> dict:
"""Return SSO status summary."""
cfg = get_config(db)
if not cfg:
return {"enabled": False, "provider_name": None, "configured": False, "login_url": None}
return {
"enabled": cfg.is_enabled,
"provider_name": cfg.provider_name,
"configured": is_configured(cfg),
"login_url": "/api/v1/sso/login" if cfg.is_enabled and is_configured(cfg) else None,
}

View File

@@ -22,6 +22,7 @@ atlassian-python-api>=4.0.0
tempo-api-python-client>=0.8.0
weasyprint>=62.0
docxtpl>=0.18.0
python3-saml>=1.15.0
# Testing
pytest

305
scripts/qa_phase14.py Normal file
View File

@@ -0,0 +1,305 @@
"""
QA script for Phase 14 — Enterprise Readiness (API Keys + SSO Config).
Run with: python -X utf8 scripts/qa_phase14.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import requests
BASE = "http://localhost:8000/api/v1"
PASS = "\033[92m✓\033[0m"
FAIL = "\033[91m✗\033[0m"
passed = 0
failed = 0
def check(label: str, condition: bool, detail: str = ""):
global passed, failed
if condition:
passed += 1
print(f" {PASS} {label}")
else:
failed += 1
msg = f" {FAIL} {label}"
if detail:
msg += f"{detail}"
print(msg)
def get_token(username="administrator", password="admin123"):
r = requests.post(f"{BASE}/auth/login",
data={"username": username, "password": password})
if r.status_code == 200:
return r.json().get("access_token") or r.json().get("token")
raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}")
def auth(token):
return {"Authorization": f"Bearer {token}"}
# ─────────────────────────────────────────────────────────────────────────────
def main():
print("\n====== Phase 14 QA — Enterprise Readiness ======\n")
token = get_token()
h = auth(token)
# ── Block 1: Create API key ───────────────────────────────────────────────
print("── Block 1: Create API key ──")
r = requests.post(f"{BASE}/api-keys", headers=h, json={
"name": "QA Test Key",
"description": "Automated QA key",
"scopes": ["read", "write"],
})
check("POST /api-keys → 201", r.status_code == 201, r.text[:200])
created = r.json() if r.status_code == 201 else {}
check("Created key has id", bool(created.get("id")))
check("Created key has raw_key", bool(created.get("raw_key")))
check("raw_key starts with aegis_", str(created.get("raw_key", "")).startswith("aegis_"))
check("key_prefix present", bool(created.get("key_prefix")))
check("key_prefix matches raw_key start",
str(created.get("raw_key", "")).startswith(str(created.get("key_prefix", "X"))))
check("scopes correct", set(created.get("scopes", [])) == {"read", "write"})
check("is_active = True", created.get("is_active") == True)
check("raw_key NOT in key_prefix", created.get("raw_key") != created.get("key_prefix"))
raw_key = created.get("raw_key", "")
key_id = created.get("id", "")
# Create a read-only key
r2 = requests.post(f"{BASE}/api-keys", headers=h, json={
"name": "QA Read-only Key",
"scopes": ["read"],
})
check("POST second key (read-only) → 201", r2.status_code == 201, r2.text[:100])
raw_key_ro = r2.json().get("raw_key", "") if r2.status_code == 201 else ""
key_id_ro = r2.json().get("id", "") if r2.status_code == 201 else ""
# Invalid scope → 422
r3 = requests.post(f"{BASE}/api-keys", headers=h, json={
"name": "Bad Scope Key",
"scopes": ["superuser"],
})
check("POST with invalid scope → 422", r3.status_code == 422)
print()
# ── Block 2: List & Get API keys ─────────────────────────────────────────
print("── Block 2: List & Get API keys ──")
r = requests.get(f"{BASE}/api-keys", headers=h)
check("GET /api-keys → 200", r.status_code == 200, r.text[:100])
keys = r.json() if r.status_code == 200 else []
check("List not empty", len(keys) > 0)
check("List contains our key", any(k["id"] == key_id for k in keys))
check("raw_key NOT in list response", all("raw_key" not in k for k in keys))
check("key_hash NOT in list response", all("key_hash" not in k for k in keys))
r = requests.get(f"{BASE}/api-keys/{key_id}", headers=h)
check("GET /api-keys/{id} → 200", r.status_code == 200)
k = r.json() if r.status_code == 200 else {}
check("Get returns correct id", k.get("id") == key_id)
check("raw_key NOT in get response", "raw_key" not in k)
# 404 for non-existent key
r = requests.get(
f"{BASE}/api-keys/00000000-0000-0000-0000-000000000001", headers=h
)
check("GET non-existent key → 404", r.status_code == 404)
print()
# ── Block 3: Authenticate with API key ───────────────────────────────────
print("── Block 3: Authenticate with API key ──")
if raw_key:
api_key_header = {"Authorization": f"Bearer {raw_key}"}
r = requests.get(f"{BASE}/techniques", headers=api_key_header)
check("GET /techniques with API key → 200", r.status_code == 200,
r.text[:150])
r = requests.get(f"{BASE}/risk/summary", headers=api_key_header)
check("GET /risk/summary with API key → 200", r.status_code == 200)
r = requests.get(f"{BASE}/dashboard/kpis", headers=api_key_header)
check("GET /dashboard/kpis with API key → 200", r.status_code == 200)
# Verify last_used_at was updated
r = requests.get(f"{BASE}/api-keys/{key_id}", headers=h)
check("last_used_at updated after API key use",
r.status_code == 200 and r.json().get("last_used_at") is not None)
# Invalid / garbage key → 401
r = requests.get(f"{BASE}/techniques",
headers={"Authorization": "Bearer aegis_deadbeefdeadbeef"})
check("Invalid API key → 401", r.status_code == 401)
print()
# ── Block 4: Update API key ───────────────────────────────────────────────
print("── Block 4: Update API key ──")
r = requests.patch(f"{BASE}/api-keys/{key_id}", headers=h, json={
"name": "QA Test Key (updated)",
"scopes": ["read"],
})
check("PATCH /api-keys/{id} → 200", r.status_code == 200, r.text[:150])
upd = r.json() if r.status_code == 200 else {}
check("Name updated", upd.get("name") == "QA Test Key (updated)")
check("Scopes updated to [read]", upd.get("scopes") == ["read"])
print()
# ── Block 5: Revoke API key ───────────────────────────────────────────────
print("── Block 5: Revoke API key ──")
if key_id_ro:
r = requests.post(f"{BASE}/api-keys/{key_id_ro}/revoke", headers=h)
check("POST /api-keys/{id}/revoke → 200", r.status_code == 200,
r.text[:100])
rev = r.json() if r.status_code == 200 else {}
check("is_active = False after revoke", rev.get("is_active") == False)
# Revoked key should no longer authenticate
if raw_key_ro:
r = requests.get(f"{BASE}/techniques",
headers={"Authorization": f"Bearer {raw_key_ro}"})
check("Revoked API key → 401", r.status_code == 401)
print()
# ── Block 6: SSO status (public) ─────────────────────────────────────────
print("── Block 6: SSO status & config ──")
r = requests.get(f"{BASE}/sso/status")
check("GET /sso/status → 200 (no auth required)", r.status_code == 200,
r.text[:150])
status_data = r.json() if r.status_code == 200 else {}
check("SSO status has 'enabled' field", "enabled" in status_data)
check("SSO status has 'configured' field", "configured" in status_data)
# SP metadata (may be 503 if no config yet, but endpoint exists)
r = requests.get(f"{BASE}/sso/metadata")
check("GET /sso/metadata returns XML or 404/503",
r.status_code in (200, 404, 503))
if r.status_code == 200:
check("Metadata is XML", "EntityDescriptor" in r.text
or "xml" in r.headers.get("content-type", ""))
# Get config (none yet → 404 or empty)
r = requests.get(f"{BASE}/sso/config", headers=h)
check("GET /sso/config (admin) → 200 or 404", r.status_code in (200, 404))
# Create/update SSO config
sso_payload = {
"is_enabled": False,
"provider_name": "Test IdP",
"sp_entity_id": "https://aegis.example.com/api/v1/sso/metadata",
"sp_acs_url": "https://aegis.example.com/api/v1/sso/callback",
"idp_entity_id": "https://idp.example.com",
"idp_sso_url": "https://idp.example.com/sso/saml",
"idp_certificate": "MIIC...fakecert",
"attr_email": "email",
"attr_username": "username",
"attr_role": "role",
"default_role": "viewer",
"auto_provision": True,
}
r = requests.put(f"{BASE}/sso/config", headers=h, json=sso_payload)
check("PUT /sso/config (admin) → 200", r.status_code == 200, r.text[:200])
cfg = r.json() if r.status_code == 200 else {}
check("Config has provider_name", cfg.get("provider_name") == "Test IdP")
check("Config has idp_entity_id", bool(cfg.get("idp_entity_id")))
check("sp_private_key NOT in response", "sp_private_key" not in cfg)
# Re-fetch config
r = requests.get(f"{BASE}/sso/config", headers=h)
check("GET /sso/config → 200 after upsert", r.status_code == 200)
# Status should now reflect configured=True
r = requests.get(f"{BASE}/sso/status")
st = r.json() if r.status_code == 200 else {}
check("SSO configured=True after upsert", st.get("configured") == True)
check("SSO enabled=False (we set it off)", st.get("enabled") == False)
print()
# ── Block 7: SSO login endpoint (disabled → 503) ──────────────────────────
print("── Block 7: SSO login (disabled) ──")
r = requests.get(f"{BASE}/sso/login", allow_redirects=False)
# Should be 503 (disabled) or 307/302 (if enabled and library present)
check("GET /sso/login returns 5xx when disabled or no lib",
r.status_code in (503, 302, 307, 400))
print()
# ── Block 8: Auth protection ──────────────────────────────────────────────
print("── Block 8: Auth protection ──")
r = requests.get(f"{BASE}/api-keys")
check("GET /api-keys without auth → 401", r.status_code == 401)
r = requests.post(f"{BASE}/api-keys", json={"name": "x", "scopes": ["read"]})
check("POST /api-keys without auth → 401", r.status_code == 401)
r = requests.get(f"{BASE}/sso/config")
check("GET /sso/config without auth → 401", r.status_code == 401)
r = requests.put(f"{BASE}/sso/config", json={"is_enabled": False})
check("PUT /sso/config without auth → 401", r.status_code == 401)
# Non-admin cannot delete keys
# (would need a non-admin user setup — skip for now)
print()
# ── Block 9: Cleanup ─────────────────────────────────────────────────────
print("── Block 9: Cleanup ──")
# Hard-delete the QA keys (admin)
for kid in [key_id, key_id_ro]:
if kid:
r = requests.delete(f"{BASE}/api-keys/{kid}", headers=h)
check(f"DELETE /api-keys/{kid[:8]}... → 204", r.status_code == 204,
r.text[:100])
print()
# ── Block 10: Regression ─────────────────────────────────────────────────
print("── Block 10: Regression ──")
r = requests.get(f"{BASE}/dashboard/kpis", headers=h)
check("GET /dashboard/kpis still works", r.status_code == 200)
r = requests.get(f"{BASE}/risk/summary", headers=h)
check("GET /risk/summary still works", r.status_code == 200)
r = requests.get(f"{BASE}/knowledge/playbooks", headers=h)
check("GET /knowledge/playbooks still works", r.status_code == 200)
r = requests.get(f"{BASE}/techniques", headers=h)
check("GET /techniques still works", r.status_code == 200)
print()
# ── Summary ───────────────────────────────────────────────────────────────
total = passed + failed
print(f"====== Results: {passed}/{total} passed", end="")
if failed:
print(f"\033[91m{failed} FAILED\033[0m ======\n")
sys.exit(1)
else:
print(" ✓ ALL PASSED ======\n")
if __name__ == "__main__":
main()