feat(admin): export/import configuration bundle for migration
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

Backend: GET/POST /api/v1/admin/export-config and /import-config
  Export includes (sensitive values redacted):
  - system_configs (email/jira settings)
  - webhook_configs (secrets redacted)
  - sso_configs (private key redacted)
  - scoring_config (weights)
  - test_templates (source=custom only)
  - users (no passwords/tokens, must_change_password=True on import)
  Import is idempotent — upsert by natural keys, safe to run multiple times.

Frontend: ExportImportSection in SystemPage (admin only)
  - 'Export Configuration' → downloads aegis-config-YYYY-MM-DD.json
  - 'Import Configuration' → file picker, sends JSON, shows summary
  - Visual checklist of what is/isn't included in the export

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-06-02 15:49:51 +02:00
parent 922fb251da
commit eee0560aeb
3 changed files with 487 additions and 1 deletions

View File

@@ -40,6 +40,7 @@ from app.routers import osint as osint_router
from app.routers import webhooks as webhooks_router
from app.routers import detection_lifecycle as detection_lifecycle_router
from app.routers import intel as intel_router
from app.routers import admin_config as admin_config_router
from app.routers import ownership as ownership_router
from app.routers import attack_paths as attack_paths_router
from app.routers import knowledge as knowledge_router
@@ -165,6 +166,7 @@ app.include_router(scores_router.router, prefix="/api/v1")
app.include_router(operational_metrics_router.router, prefix="/api/v1")
app.include_router(compliance_router.router, prefix="/api/v1")
app.include_router(intel_router.router, prefix="/api/v1")
app.include_router(admin_config_router.router, prefix="/api/v1")
app.include_router(snapshots_router.router, prefix="/api/v1")
app.include_router(jira_router.router, prefix="/api/v1")
app.include_router(worklogs_router.router, prefix="/api/v1")

View File

@@ -0,0 +1,340 @@
"""Admin configuration export/import — single-file migration bundle.
GET /admin/export-config — download JSON bundle (admin only)
POST /admin/import-config — upload JSON bundle and restore (admin only)
What is exported (and what is NOT):
✓ system_configs — email / jira settings (passwords REDACTED)
✓ webhook_configs — notification webhooks (secrets REDACTED)
✓ sso_configs — SAML/SSO config (private keys REDACTED)
✓ scoring_config — technique scoring weights
✓ test_templates — CUSTOM templates only (source='custom')
✓ users — username / email / role (no passwords / tokens)
✗ atomic/sigma/elastic templates, techniques, tests, campaigns, reports
"""
import uuid
from datetime import datetime
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from app.auth import hash_password
from app.database import get_db
from app.dependencies.auth import get_current_user, require_role
from app.models.scoring_config import ScoringConfig
from app.models.sso_config import SsoConfig
from app.models.system_config import SystemConfig
from app.models.test_template import TestTemplate
from app.models.user import User
from app.models.webhook_config import WebhookConfig
router = APIRouter(prefix="/admin", tags=["admin"])
# Keys whose values contain secrets and must be redacted in the export
_REDACTED_KEYS = {
"smtp.password",
"jira.api_token",
"jira.password",
"tempo.api_token",
}
_EXPORT_VERSION = "1.0"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _redact(key: str, value: Any) -> Any:
if key in _REDACTED_KEYS:
return "[REDACTED]"
return value
# ---------------------------------------------------------------------------
# GET /admin/export-config
# ---------------------------------------------------------------------------
@router.get("/export-config")
def export_config(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Export all platform configuration as a downloadable JSON bundle."""
# ── 1. system_configs ────────────────────────────────────────────
system_configs = [
{
"key": r.key,
"value": _redact(r.key, r.value),
"description": r.description,
}
for r in db.query(SystemConfig).order_by(SystemConfig.key).all()
]
# ── 2. webhook_configs ───────────────────────────────────────────
webhooks = [
{
"name": w.name,
"url": w.url,
"secret": "[REDACTED]" if w.secret else None,
"events": w.events or [],
"is_active": w.is_active,
}
for w in db.query(WebhookConfig).order_by(WebhookConfig.name).all()
]
# ── 3. SSO config (single row) ───────────────────────────────────
sso_row = db.query(SsoConfig).first()
sso = None
if sso_row:
sso = {
"is_enabled": sso_row.is_enabled,
"provider_name": sso_row.provider_name,
"sp_entity_id": sso_row.sp_entity_id,
"sp_acs_url": sso_row.sp_acs_url,
"sp_slo_url": sso_row.sp_slo_url,
"sp_certificate": sso_row.sp_certificate,
"sp_private_key": "[REDACTED]", # never export private keys
"idp_entity_id": sso_row.idp_entity_id,
"idp_sso_url": getattr(sso_row, "idp_sso_url", None),
"idp_slo_url": getattr(sso_row, "idp_slo_url", None),
"idp_certificate": getattr(sso_row, "idp_certificate", None),
"attr_email": getattr(sso_row, "attr_email", None),
"attr_username": getattr(sso_row, "attr_username", None),
"attr_role": getattr(sso_row, "attr_role", None),
"default_role": getattr(sso_row, "default_role", None),
"auto_provision": getattr(sso_row, "auto_provision", False),
}
# ── 4. Scoring config (single row) ──────────────────────────────
sc = db.query(ScoringConfig).first()
scoring = None
if sc:
scoring = {
"weight_tests": sc.weight_tests,
"weight_detection_rules": sc.weight_detection_rules,
"weight_d3fend": sc.weight_d3fend,
"weight_recency": sc.weight_recency,
"weight_severity": sc.weight_severity,
}
# ── 5. Custom test templates only ───────────────────────────────
templates = [
{
"mitre_technique_id": t.mitre_technique_id,
"name": t.name,
"description": t.description,
"source": t.source,
"source_url": t.source_url,
"attack_procedure": t.attack_procedure,
"expected_detection": t.expected_detection,
"platform": t.platform,
"tool_suggested": t.tool_suggested,
"severity": t.severity,
"suggested_remediation": t.suggested_remediation,
"is_active": t.is_active,
}
for t in db.query(TestTemplate).filter(TestTemplate.source == "custom").all()
]
# ── 6. Users (sanitized — no passwords/tokens) ───────────────────
users = [
{
"username": u.username,
"email": u.email if hasattr(u, "email") else None,
"role": u.role,
"is_active": u.is_active,
"must_change_password": True, # force password reset on new instance
}
for u in db.query(User).order_by(User.username).all()
]
bundle = {
"_meta": {
"version": _EXPORT_VERSION,
"exported_at": datetime.utcnow().isoformat() + "Z",
"exported_by": current_user.username,
"note": (
"Sensitive values (passwords, API tokens, private keys) are REDACTED. "
"Re-enter them manually after import. "
"User passwords are NOT exported — users must reset passwords on first login."
),
},
"system_configs": system_configs,
"webhooks": webhooks,
"sso": sso,
"scoring": scoring,
"custom_templates": templates,
"users": users,
}
filename = f"aegis-config-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}.json"
return JSONResponse(
content=bundle,
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"X-Export-Version": _EXPORT_VERSION,
},
)
# ---------------------------------------------------------------------------
# POST /admin/import-config
# ---------------------------------------------------------------------------
@router.post("/import-config")
async def import_config(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Restore platform configuration from a previously exported JSON bundle.
Idempotent: safe to run multiple times. Existing records are updated,
missing ones are created. REDACTED values are skipped (left as-is).
User passwords are set to a random temp value with must_change_password=True.
"""
try:
bundle = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
meta = bundle.get("_meta", {})
version = meta.get("version", "unknown")
summary: dict[str, int] = {
"system_configs": 0,
"webhooks": 0,
"custom_templates": 0,
"users_created": 0,
"users_updated": 0,
}
# ── 1. system_configs ────────────────────────────────────────────
for item in bundle.get("system_configs", []):
key = item.get("key")
value = item.get("value")
if not key or value == "[REDACTED]":
continue
row = db.query(SystemConfig).filter(SystemConfig.key == key).first()
if row:
row.value = value
row.description = item.get("description") or row.description
else:
db.add(SystemConfig(key=key, value=value, description=item.get("description")))
summary["system_configs"] += 1
# ── 2. webhooks ──────────────────────────────────────────────────
for item in bundle.get("webhooks", []):
name = item.get("name")
url = item.get("url")
if not name or not url:
continue
existing = db.query(WebhookConfig).filter(WebhookConfig.name == name).first()
if existing:
existing.url = url
existing.events = item.get("events", [])
existing.is_active = item.get("is_active", True)
existing.failure_count = 0
else:
db.add(WebhookConfig(
name=name,
url=url,
secret=None, # never restore secrets
events=item.get("events", []),
is_active=item.get("is_active", True),
created_by=current_user.id,
failure_count=0,
))
summary["webhooks"] += 1
# ── 3. SSO config ────────────────────────────────────────────────
sso_data = bundle.get("sso")
if sso_data:
sso_row = db.query(SsoConfig).first()
if sso_row:
for field, val in sso_data.items():
if val == "[REDACTED]":
continue
if hasattr(sso_row, field):
setattr(sso_row, field, val)
else:
clean = {k: v for k, v in sso_data.items() if v != "[REDACTED]"}
clean.pop("sp_private_key", None)
db.add(SsoConfig(**clean))
# ── 4. Scoring config ────────────────────────────────────────────
scoring_data = bundle.get("scoring")
if scoring_data:
sc = db.query(ScoringConfig).first()
if sc:
for field, val in scoring_data.items():
if hasattr(sc, field) and val is not None:
setattr(sc, field, val)
else:
db.add(ScoringConfig(**scoring_data))
# ── 5. Custom templates ──────────────────────────────────────────
for item in bundle.get("custom_templates", []):
name = item.get("name")
mitre_id = item.get("mitre_technique_id")
if not name or not mitre_id:
continue
existing = (
db.query(TestTemplate)
.filter(TestTemplate.name == name, TestTemplate.source == "custom")
.first()
)
if existing:
for field, val in item.items():
if hasattr(existing, field):
setattr(existing, field, val)
else:
db.add(TestTemplate(**{k: v for k, v in item.items()
if k not in ("id", "created_at")}))
summary["custom_templates"] += 1
# ── 6. Users ─────────────────────────────────────────────────────
import secrets as _secrets
for item in bundle.get("users", []):
username = item.get("username")
if not username:
continue
existing = db.query(User).filter(User.username == username).first()
if existing:
existing.role = item.get("role", existing.role)
existing.is_active = item.get("is_active", existing.is_active)
summary["users_updated"] += 1
else:
# Create with random temp password — user must reset on login
temp_pw = _secrets.token_urlsafe(16) + "Aa1!"
new_user = User(
username=username,
hashed_password=hash_password(temp_pw),
role=item.get("role", "viewer"),
is_active=item.get("is_active", True),
must_change_password=True,
)
if item.get("email") and hasattr(User, "email"):
new_user.email = item["email"]
db.add(new_user)
summary["users_created"] += 1
db.commit()
return {
"status": "ok",
"imported_from_version": version,
"summary": summary,
"warnings": [
"REDACTED values were skipped — re-enter passwords/tokens manually.",
"All imported users have must_change_password=True.",
"SSO private key was not restored — re-upload it manually.",
],
}