Files
Aegis/backend/app/routers/admin_config.py
T
kitos 6d3617938e
Aegis CI / lint-and-test (push) Has been cancelled
Snyk Security Scan / Python vulnerabilities (backend) (push) Has been cancelled
Snyk Security Scan / npm vulnerabilities (frontend) (push) Has been cancelled
Snyk Security Scan / Docker image vulnerabilities (backend) (push) Has been cancelled
fix(security): resolve Snyk/bandit code analysis findings
- config.py: move REPORT_OUTPUT_DIR from /tmp (world-writable) to /app/reports
  to prevent CWE-377 symlink attack vector (B108, only real security issue)
- main.py: log startup seed failures instead of silently swallowing them (B110)
- Add # nosec annotations to intentional try/except patterns that are by design:
  Jira integration errors, email failures, DetachedInstanceError, storage errors,
  and Jira session timeout (all B110/B112 false positives)
- Add # nosec B105 to false positives where bandit misidentifies config key
  names and masking strings as hardcoded passwords
- Add .bandit config to skip B311 in seed_demo.py (random used for fake
  demo data generation, not cryptographic purposes)
2026-06-12 12:59:11 +02:00

340 lines
13 KiB
Python

"""Admin configuration export/import — single-file migration bundle.
GET /admin/export-config — download JSON bundle (admin only)
POST /admin/import-config — upload JSON bundle and restore (admin only)
What is exported (and what is NOT):
✓ system_configs — email / jira settings (passwords REDACTED)
✓ webhook_configs — notification webhooks (secrets REDACTED)
✓ sso_configs — SAML/SSO config (private keys REDACTED)
✓ scoring_config — technique scoring weights
✓ test_templates — CUSTOM templates only (source='custom')
✓ users — username / email / role (no passwords / tokens)
✗ atomic/sigma/elastic templates, techniques, tests, campaigns, reports
"""
from datetime import datetime
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from app.auth import hash_password
from app.database import get_db
from app.dependencies.auth import require_role
from app.models.scoring_config import ScoringConfig
from app.models.sso_config import SsoConfig
from app.models.system_config import SystemConfig
from app.models.test_template import TestTemplate
from app.models.user import User
from app.models.webhook_config import WebhookConfig
router = APIRouter(prefix="/admin", tags=["admin"])
# Keys whose values contain secrets and must be redacted in the export
_REDACTED_KEYS = {
"smtp.password",
"jira.api_token",
"jira.password",
"tempo.api_token",
}
_EXPORT_VERSION = "1.0"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _redact(key: str, value: Any) -> Any:
if key in _REDACTED_KEYS:
return "[REDACTED]"
return value
# ---------------------------------------------------------------------------
# GET /admin/export-config
# ---------------------------------------------------------------------------
@router.get("/export-config")
def export_config(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Export all platform configuration as a downloadable JSON bundle."""
# ── 1. system_configs ────────────────────────────────────────────
system_configs = [
{
"key": r.key,
"value": _redact(r.key, r.value),
"description": r.description,
}
for r in db.query(SystemConfig).order_by(SystemConfig.key).all()
]
# ── 2. webhook_configs ───────────────────────────────────────────
webhooks = [
{
"name": w.name,
"url": w.url,
"secret": "[REDACTED]" if w.secret else None,
"events": w.events or [],
"is_active": w.is_active,
}
for w in db.query(WebhookConfig).order_by(WebhookConfig.name).all()
]
# ── 3. SSO config (single row) ───────────────────────────────────
sso_row = db.query(SsoConfig).first()
sso = None
if sso_row:
sso = {
"is_enabled": sso_row.is_enabled,
"provider_name": sso_row.provider_name,
"sp_entity_id": sso_row.sp_entity_id,
"sp_acs_url": sso_row.sp_acs_url,
"sp_slo_url": sso_row.sp_slo_url,
"sp_certificate": sso_row.sp_certificate,
"sp_private_key": "[REDACTED]", # never export private keys
"idp_entity_id": sso_row.idp_entity_id,
"idp_sso_url": getattr(sso_row, "idp_sso_url", None),
"idp_slo_url": getattr(sso_row, "idp_slo_url", None),
"idp_certificate": getattr(sso_row, "idp_certificate", None),
"attr_email": getattr(sso_row, "attr_email", None),
"attr_username": getattr(sso_row, "attr_username", None),
"attr_role": getattr(sso_row, "attr_role", None),
"default_role": getattr(sso_row, "default_role", None),
"auto_provision": getattr(sso_row, "auto_provision", False),
}
# ── 4. Scoring config (single row) ──────────────────────────────
sc = db.query(ScoringConfig).first()
scoring = None
if sc:
scoring = {
"weight_tests": sc.weight_tests,
"weight_detection_rules": sc.weight_detection_rules,
"weight_d3fend": sc.weight_d3fend,
"weight_recency": sc.weight_recency,
"weight_severity": sc.weight_severity,
}
# ── 5. Custom test templates only ───────────────────────────────
templates = [
{
"mitre_technique_id": t.mitre_technique_id,
"name": t.name,
"description": t.description,
"source": t.source,
"source_url": t.source_url,
"attack_procedure": t.attack_procedure,
"expected_detection": t.expected_detection,
"platform": t.platform,
"tool_suggested": t.tool_suggested,
"severity": t.severity,
"suggested_remediation": t.suggested_remediation,
"is_active": t.is_active,
}
for t in db.query(TestTemplate).filter(TestTemplate.source == "custom").all()
]
# ── 6. Users (sanitized — no passwords/tokens) ───────────────────
users = [
{
"username": u.username,
"email": u.email if hasattr(u, "email") else None,
"role": u.role,
"is_active": u.is_active,
"must_change_password": True, # force password reset on new instance # nosec B105
}
for u in db.query(User).order_by(User.username).all()
]
bundle = {
"_meta": {
"version": _EXPORT_VERSION,
"exported_at": datetime.utcnow().isoformat() + "Z",
"exported_by": current_user.username,
"note": (
"Sensitive values (passwords, API tokens, private keys) are REDACTED. "
"Re-enter them manually after import. "
"User passwords are NOT exported — users must reset passwords on first login."
),
},
"system_configs": system_configs,
"webhooks": webhooks,
"sso": sso,
"scoring": scoring,
"custom_templates": templates,
"users": users,
}
filename = f"aegis-config-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}.json"
return JSONResponse(
content=bundle,
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"X-Export-Version": _EXPORT_VERSION,
},
)
# ---------------------------------------------------------------------------
# POST /admin/import-config
# ---------------------------------------------------------------------------
@router.post("/import-config")
async def import_config(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Restore platform configuration from a previously exported JSON bundle.
Idempotent: safe to run multiple times. Existing records are updated,
missing ones are created. REDACTED values are skipped (left as-is).
User passwords are set to a random temp value with must_change_password=True.
"""
try:
bundle = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
meta = bundle.get("_meta", {})
version = meta.get("version", "unknown")
summary: dict[str, int] = {
"system_configs": 0,
"webhooks": 0,
"custom_templates": 0,
"users_created": 0,
"users_updated": 0,
}
# ── 1. system_configs ────────────────────────────────────────────
for item in bundle.get("system_configs", []):
key = item.get("key")
value = item.get("value")
if not key or value == "[REDACTED]":
continue
row = db.query(SystemConfig).filter(SystemConfig.key == key).first()
if row:
row.value = value
row.description = item.get("description") or row.description
else:
db.add(SystemConfig(key=key, value=value, description=item.get("description")))
summary["system_configs"] += 1
# ── 2. webhooks ──────────────────────────────────────────────────
for item in bundle.get("webhooks", []):
name = item.get("name")
url = item.get("url")
if not name or not url:
continue
existing = db.query(WebhookConfig).filter(WebhookConfig.name == name).first()
if existing:
existing.url = url
existing.events = item.get("events", [])
existing.is_active = item.get("is_active", True)
existing.failure_count = 0
else:
db.add(WebhookConfig(
name=name,
url=url,
secret=None, # never restore secrets
events=item.get("events", []),
is_active=item.get("is_active", True),
created_by=current_user.id,
failure_count=0,
))
summary["webhooks"] += 1
# ── 3. SSO config ────────────────────────────────────────────────
sso_data = bundle.get("sso")
if sso_data:
sso_row = db.query(SsoConfig).first()
if sso_row:
for field, val in sso_data.items():
if val == "[REDACTED]":
continue
if hasattr(sso_row, field):
setattr(sso_row, field, val)
else:
clean = {k: v for k, v in sso_data.items() if v != "[REDACTED]"}
clean.pop("sp_private_key", None)
db.add(SsoConfig(**clean))
# ── 4. Scoring config ────────────────────────────────────────────
scoring_data = bundle.get("scoring")
if scoring_data:
sc = db.query(ScoringConfig).first()
if sc:
for field, val in scoring_data.items():
if hasattr(sc, field) and val is not None:
setattr(sc, field, val)
else:
db.add(ScoringConfig(**scoring_data))
# ── 5. Custom templates ──────────────────────────────────────────
for item in bundle.get("custom_templates", []):
name = item.get("name")
mitre_id = item.get("mitre_technique_id")
if not name or not mitre_id:
continue
existing = (
db.query(TestTemplate)
.filter(TestTemplate.name == name, TestTemplate.source == "custom")
.first()
)
if existing:
for field, val in item.items():
if hasattr(existing, field):
setattr(existing, field, val)
else:
db.add(TestTemplate(**{k: v for k, v in item.items()
if k not in ("id", "created_at")}))
summary["custom_templates"] += 1
# ── 6. Users ─────────────────────────────────────────────────────
import secrets as _secrets
for item in bundle.get("users", []):
username = item.get("username")
if not username:
continue
existing = db.query(User).filter(User.username == username).first()
if existing:
existing.role = item.get("role", existing.role)
existing.is_active = item.get("is_active", existing.is_active)
summary["users_updated"] += 1
else:
# Create with random temp password — user must reset on login
temp_pw = _secrets.token_urlsafe(16) + "Aa1!"
new_user = User(
username=username,
hashed_password=hash_password(temp_pw),
role=item.get("role", "viewer"),
is_active=item.get("is_active", True),
must_change_password=True,
)
if item.get("email") and hasattr(User, "email"):
new_user.email = item["email"]
db.add(new_user)
summary["users_created"] += 1
db.commit()
return {
"status": "ok",
"imported_from_version": version,
"summary": summary,
"warnings": [
"REDACTED values were skipped — re-enter passwords/tokens manually.",
"All imported users have must_change_password=True.",
"SSO private key was not restored — re-upload it manually.",
],
}