"""Phase 14: SSO / SAML 2.0 service.""" from __future__ import annotations import logging from typing import Optional from uuid import UUID from sqlalchemy.orm import Session from app.domain.errors import EntityNotFoundError from app.models.sso_config import SsoConfig from app.models.user import User log = logging.getLogger(__name__) # ── python3-saml optional import ────────────────────────────────────────────── try: from onelogin.saml2.auth import OneLogin_Saml2_Auth from onelogin.saml2.settings import OneLogin_Saml2_Settings from onelogin.saml2.utils import OneLogin_Saml2_Utils _SAML_AVAILABLE = True except ImportError: # pragma: no cover _SAML_AVAILABLE = False log.warning( "python3-saml not installed — SAML login/callback endpoints will return 503. " "Install python3-saml and its system dependencies to enable SSO." ) # ── Configuration helpers ───────────────────────────────────────────────────── def get_config(db: Session) -> Optional[SsoConfig]: """Return the first (and only) SsoConfig row, or None if none exists.""" return db.query(SsoConfig).first() def get_or_404(db: Session) -> SsoConfig: cfg = get_config(db) if not cfg: raise EntityNotFoundError("SsoConfig", "singleton") return cfg def upsert_config(db: Session, **kwargs) -> SsoConfig: """Create or update the singleton SSO config.""" cfg = get_config(db) if cfg: for k, v in kwargs.items(): setattr(cfg, k, v) db.commit() db.refresh(cfg) return cfg cfg = SsoConfig(**kwargs) db.add(cfg) db.commit() db.refresh(cfg) return cfg def is_configured(cfg: SsoConfig) -> bool: """Return True if the minimum IdP settings are present.""" return bool(cfg.idp_entity_id and cfg.idp_sso_url and cfg.idp_certificate) # ── python3-saml settings builder ───────────────────────────────────────────── def _build_saml_settings(cfg: SsoConfig) -> dict: sp: dict = { "entityId": cfg.sp_entity_id or "", "assertionConsumerService": { "url": cfg.sp_acs_url or "", "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST", }, "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress", "x509cert": cfg.sp_certificate or "", "privateKey": cfg.sp_private_key or "", } if cfg.sp_slo_url: sp["singleLogoutService"] = { "url": cfg.sp_slo_url, "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", } idp: dict = { "entityId": cfg.idp_entity_id or "", "singleSignOnService": { "url": cfg.idp_sso_url or "", "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", }, "x509cert": cfg.idp_certificate or "", } if cfg.idp_slo_url: idp["singleLogoutService"] = { "url": cfg.idp_slo_url, "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", } return { "strict": True, "debug": False, "sp": sp, "idp": idp, } def _build_saml_request(request_data: dict) -> dict: """Build the request dict expected by python3-saml.""" return { "https": "on" if request_data.get("https") else "off", "http_host": request_data.get("http_host", ""), "script_name": request_data.get("path", ""), "server_port": request_data.get("port", "443"), "get_data": request_data.get("get_data", {}), "post_data": request_data.get("post_data", {}), "query_string": request_data.get("query_string", ""), } # ── Login initiation ────────────────────────────────────────────────────────── def initiate_login(db: Session, request_data: dict) -> dict: """ Build a SAML AuthnRequest and return the IdP redirect URL. ``request_data`` must contain: https, http_host, path, port. Returns ``{"redirect_url": ..., "request_id": ...}``. Raises RuntimeError if SAML library not available or SSO not configured. """ if not _SAML_AVAILABLE: raise RuntimeError("SAML library not available — see server logs") cfg = get_or_404(db) if not cfg.is_enabled: raise RuntimeError("SSO is not enabled") if not is_configured(cfg): raise RuntimeError("SSO IdP is not fully configured") settings_dict = _build_saml_settings(cfg) req = _build_saml_request(request_data) auth = OneLogin_Saml2_Auth(req, old_settings=settings_dict) redirect_url, request_id = auth.login(return_to=None, force_authn=False, is_passive=False, set_nameid_policy=True, name_id_value_req=None), auth.get_last_request_id() return {"redirect_url": redirect_url, "request_id": request_id} # ── ACS (callback) ──────────────────────────────────────────────────────────── def process_callback(db: Session, request_data: dict) -> User: """ Process the SAML Response POSTed by the IdP. Returns the authenticated User (creating if auto_provision is True). Raises ValueError on assertion errors. """ if not _SAML_AVAILABLE: raise RuntimeError("SAML library not available — see server logs") cfg = get_or_404(db) if not cfg.is_enabled: raise RuntimeError("SSO is not enabled") settings_dict = _build_saml_settings(cfg) req = _build_saml_request(request_data) auth = OneLogin_Saml2_Auth(req, old_settings=settings_dict) auth.process_response() errors = auth.get_errors() if errors: raise ValueError(f"SAML assertion errors: {errors}. {auth.get_last_error_reason()}") if not auth.is_authenticated(): raise ValueError("SAML authentication failed — not authenticated") # Extract attributes attrs = auth.get_attributes() name_id = auth.get_nameid() email_attr = cfg.attr_email or "email" username_attr = cfg.attr_username or "username" role_attr = cfg.attr_role or "role" email = _first_attr(attrs, email_attr) or name_id or "" username = _first_attr(attrs, username_attr) or email.split("@")[0] or name_id role = _first_attr(attrs, role_attr) or cfg.default_role or "viewer" # Validate role valid_roles = {"admin", "red_lead", "blue_lead", "red_tech", "blue_tech", "viewer"} if role not in valid_roles: role = cfg.default_role or "viewer" # Look up or provision user user = db.query(User).filter(User.username == username).first() if user: # Refresh role from IdP on every login user.role = role db.commit() return user if not cfg.auto_provision: raise ValueError(f"User '{username}' not found and auto-provisioning is disabled") # Create new user (no password — SSO-only) import secrets as _secrets from passlib.context import CryptContext _pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto") dummy_hash = _pwd_ctx.hash(_secrets.token_hex(32)) user = User( username = username, email = email, hashed_password = dummy_hash, role = role, is_active = True, must_change_password = False, ) db.add(user) db.commit() db.refresh(user) return user def _first_attr(attrs: dict, name: str) -> Optional[str]: """Return the first value of a SAML attribute, or None.""" v = attrs.get(name) if isinstance(v, list) and v: return str(v[0]) if isinstance(v, str): return v return None # ── SP Metadata ─────────────────────────────────────────────────────────────── def get_sp_metadata(db: Session) -> str: """ Generate SP SAML metadata XML. Uses python3-saml if available; falls back to a minimal hand-built XML so the endpoint is always functional for configuration purposes. """ cfg = get_or_404(db) settings_dict = _build_saml_settings(cfg) if _SAML_AVAILABLE: saml_settings = OneLogin_Saml2_Settings(settings=settings_dict, sp_validation_only=True) metadata = saml_settings.get_sp_metadata() errors = saml_settings.validate_metadata(metadata) if errors: log.warning("SP metadata validation warnings: %s", errors) return metadata.decode() if isinstance(metadata, bytes) else metadata # Fallback: minimal XML without signing (useful for dev/testing) sp = settings_dict["sp"] acs = sp.get("assertionConsumerService", {}) return f""" """ def get_status(db: Session) -> dict: """Return SSO status summary.""" cfg = get_config(db) if not cfg: return {"enabled": False, "provider_name": None, "configured": False, "login_url": None} return { "enabled": cfg.is_enabled, "provider_name": cfg.provider_name, "configured": is_configured(cfg), "login_url": "/api/v1/sso/login" if cfg.is_enabled and is_configured(cfg) else None, }