"""Admin configuration export/import — single-file migration bundle. GET /admin/export-config — download JSON bundle (admin only) POST /admin/import-config — upload JSON bundle and restore (admin only) What is exported (and what is NOT): ✓ system_configs — email / jira settings (passwords REDACTED) ✓ webhook_configs — notification webhooks (secrets REDACTED) ✓ sso_configs — SAML/SSO config (private keys REDACTED) ✓ scoring_config — technique scoring weights ✓ test_templates — CUSTOM templates only (source='custom') ✓ users — username / email / role (no passwords / tokens) ✗ atomic/sigma/elastic templates, techniques, tests, campaigns, reports """ import uuid from datetime import datetime from typing import Any from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from app.auth import hash_password from app.database import get_db from app.dependencies.auth import get_current_user, require_role from app.models.scoring_config import ScoringConfig from app.models.sso_config import SsoConfig from app.models.system_config import SystemConfig from app.models.test_template import TestTemplate from app.models.user import User from app.models.webhook_config import WebhookConfig router = APIRouter(prefix="/admin", tags=["admin"]) # Keys whose values contain secrets and must be redacted in the export _REDACTED_KEYS = { "smtp.password", "jira.api_token", "jira.password", "tempo.api_token", } _EXPORT_VERSION = "1.0" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _redact(key: str, value: Any) -> Any: if key in _REDACTED_KEYS: return "[REDACTED]" return value # --------------------------------------------------------------------------- # GET /admin/export-config # --------------------------------------------------------------------------- @router.get("/export-config") def export_config( db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Export all platform configuration as a downloadable JSON bundle.""" # ── 1. system_configs ──────────────────────────────────────────── system_configs = [ { "key": r.key, "value": _redact(r.key, r.value), "description": r.description, } for r in db.query(SystemConfig).order_by(SystemConfig.key).all() ] # ── 2. webhook_configs ─────────────────────────────────────────── webhooks = [ { "name": w.name, "url": w.url, "secret": "[REDACTED]" if w.secret else None, "events": w.events or [], "is_active": w.is_active, } for w in db.query(WebhookConfig).order_by(WebhookConfig.name).all() ] # ── 3. SSO config (single row) ─────────────────────────────────── sso_row = db.query(SsoConfig).first() sso = None if sso_row: sso = { "is_enabled": sso_row.is_enabled, "provider_name": sso_row.provider_name, "sp_entity_id": sso_row.sp_entity_id, "sp_acs_url": sso_row.sp_acs_url, "sp_slo_url": sso_row.sp_slo_url, "sp_certificate": sso_row.sp_certificate, "sp_private_key": "[REDACTED]", # never export private keys "idp_entity_id": sso_row.idp_entity_id, "idp_sso_url": getattr(sso_row, "idp_sso_url", None), "idp_slo_url": getattr(sso_row, "idp_slo_url", None), "idp_certificate": getattr(sso_row, "idp_certificate", None), "attr_email": getattr(sso_row, "attr_email", None), "attr_username": getattr(sso_row, "attr_username", None), "attr_role": getattr(sso_row, "attr_role", None), "default_role": getattr(sso_row, "default_role", None), "auto_provision": getattr(sso_row, "auto_provision", False), } # ── 4. Scoring config (single row) ────────────────────────────── sc = db.query(ScoringConfig).first() scoring = None if sc: scoring = { "weight_tests": sc.weight_tests, "weight_detection_rules": sc.weight_detection_rules, "weight_d3fend": sc.weight_d3fend, "weight_recency": sc.weight_recency, "weight_severity": sc.weight_severity, } # ── 5. Custom test templates only ─────────────────────────────── templates = [ { "mitre_technique_id": t.mitre_technique_id, "name": t.name, "description": t.description, "source": t.source, "source_url": t.source_url, "attack_procedure": t.attack_procedure, "expected_detection": t.expected_detection, "platform": t.platform, "tool_suggested": t.tool_suggested, "severity": t.severity, "suggested_remediation": t.suggested_remediation, "is_active": t.is_active, } for t in db.query(TestTemplate).filter(TestTemplate.source == "custom").all() ] # ── 6. Users (sanitized — no passwords/tokens) ─────────────────── users = [ { "username": u.username, "email": u.email if hasattr(u, "email") else None, "role": u.role, "is_active": u.is_active, "must_change_password": True, # force password reset on new instance } for u in db.query(User).order_by(User.username).all() ] bundle = { "_meta": { "version": _EXPORT_VERSION, "exported_at": datetime.utcnow().isoformat() + "Z", "exported_by": current_user.username, "note": ( "Sensitive values (passwords, API tokens, private keys) are REDACTED. " "Re-enter them manually after import. " "User passwords are NOT exported — users must reset passwords on first login." ), }, "system_configs": system_configs, "webhooks": webhooks, "sso": sso, "scoring": scoring, "custom_templates": templates, "users": users, } filename = f"aegis-config-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}.json" return JSONResponse( content=bundle, headers={ "Content-Disposition": f'attachment; filename="{filename}"', "X-Export-Version": _EXPORT_VERSION, }, ) # --------------------------------------------------------------------------- # POST /admin/import-config # --------------------------------------------------------------------------- @router.post("/import-config") async def import_config( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Restore platform configuration from a previously exported JSON bundle. Idempotent: safe to run multiple times. Existing records are updated, missing ones are created. REDACTED values are skipped (left as-is). User passwords are set to a random temp value with must_change_password=True. """ try: bundle = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body") meta = bundle.get("_meta", {}) version = meta.get("version", "unknown") summary: dict[str, int] = { "system_configs": 0, "webhooks": 0, "custom_templates": 0, "users_created": 0, "users_updated": 0, } # ── 1. system_configs ──────────────────────────────────────────── for item in bundle.get("system_configs", []): key = item.get("key") value = item.get("value") if not key or value == "[REDACTED]": continue row = db.query(SystemConfig).filter(SystemConfig.key == key).first() if row: row.value = value row.description = item.get("description") or row.description else: db.add(SystemConfig(key=key, value=value, description=item.get("description"))) summary["system_configs"] += 1 # ── 2. webhooks ────────────────────────────────────────────────── for item in bundle.get("webhooks", []): name = item.get("name") url = item.get("url") if not name or not url: continue existing = db.query(WebhookConfig).filter(WebhookConfig.name == name).first() if existing: existing.url = url existing.events = item.get("events", []) existing.is_active = item.get("is_active", True) existing.failure_count = 0 else: db.add(WebhookConfig( name=name, url=url, secret=None, # never restore secrets events=item.get("events", []), is_active=item.get("is_active", True), created_by=current_user.id, failure_count=0, )) summary["webhooks"] += 1 # ── 3. SSO config ──────────────────────────────────────────────── sso_data = bundle.get("sso") if sso_data: sso_row = db.query(SsoConfig).first() if sso_row: for field, val in sso_data.items(): if val == "[REDACTED]": continue if hasattr(sso_row, field): setattr(sso_row, field, val) else: clean = {k: v for k, v in sso_data.items() if v != "[REDACTED]"} clean.pop("sp_private_key", None) db.add(SsoConfig(**clean)) # ── 4. Scoring config ──────────────────────────────────────────── scoring_data = bundle.get("scoring") if scoring_data: sc = db.query(ScoringConfig).first() if sc: for field, val in scoring_data.items(): if hasattr(sc, field) and val is not None: setattr(sc, field, val) else: db.add(ScoringConfig(**scoring_data)) # ── 5. Custom templates ────────────────────────────────────────── for item in bundle.get("custom_templates", []): name = item.get("name") mitre_id = item.get("mitre_technique_id") if not name or not mitre_id: continue existing = ( db.query(TestTemplate) .filter(TestTemplate.name == name, TestTemplate.source == "custom") .first() ) if existing: for field, val in item.items(): if hasattr(existing, field): setattr(existing, field, val) else: db.add(TestTemplate(**{k: v for k, v in item.items() if k not in ("id", "created_at")})) summary["custom_templates"] += 1 # ── 6. Users ───────────────────────────────────────────────────── import secrets as _secrets for item in bundle.get("users", []): username = item.get("username") if not username: continue existing = db.query(User).filter(User.username == username).first() if existing: existing.role = item.get("role", existing.role) existing.is_active = item.get("is_active", existing.is_active) summary["users_updated"] += 1 else: # Create with random temp password — user must reset on login temp_pw = _secrets.token_urlsafe(16) + "Aa1!" new_user = User( username=username, hashed_password=hash_password(temp_pw), role=item.get("role", "viewer"), is_active=item.get("is_active", True), must_change_password=True, ) if item.get("email") and hasattr(User, "email"): new_user.email = item["email"] db.add(new_user) summary["users_created"] += 1 db.commit() return { "status": "ok", "imported_from_version": version, "summary": summary, "warnings": [ "REDACTED values were skipped — re-enter passwords/tokens manually.", "All imported users have must_change_password=True.", "SSO private key was not restored — re-upload it manually.", ], }