Files
Aegis/backend/app/routers/system.py
kitos 0e1b8e2b39
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(settings): Settings page with email, webhooks, notifications, profile [FASE-8]
- SystemConfig model + migration b033 for runtime key-value config
- GET/PATCH /system/email-config + POST /system/email-test (admin only)
- email_service reads SMTP config from DB (overrides .env)
- Webhooks now accessible to red_lead/blue_lead + admin
- GET /users/me already existed; /users/me/preferences already working
- SettingsPage with 4 role-aware tabs:
  * Profile & Jira: jira_account_id, user info
  * Notifications: role-specific email/in-app toggles (12 prefs)
  * Webhooks: full CRUD + test ping (leads + admin)
  * Email/SMTP: enable toggle, server config, test email (admin only)
- Added /settings route (all authenticated users)
- Settings link added to Sidebar

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:10:31 +02:00

288 lines
8.3 KiB
Python

"""System-level endpoints (admin only).
Provides manual triggers for background operations such as the MITRE
ATT&CK synchronisation, intel scanning, Atomic Red Team import, and
scheduler health introspection.
Also exposes email configuration CRUD (admin only) that writes to the
system_configs table so settings survive container restarts.
"""
import logging
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.database import SessionLocal, get_db
from app.dependencies.auth import require_role
from app.models.user import User
from app.services.mitre_sync_service import sync_mitre
from app.services.intel_service import scan_intel
from app.services.atomic_import_service import import_atomic_red_team
from app.jobs.mitre_sync_job import scheduler
from app.limiter import limiter
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/system", tags=["system"])
# ---------------------------------------------------------------------------
# Pydantic schemas for email config
# ---------------------------------------------------------------------------
class EmailConfigOut(BaseModel):
enabled: bool
host: str
port: int
username: str
from_email: str
use_tls: bool
# password is never returned
class EmailConfigUpdate(BaseModel):
enabled: Optional[bool] = None
host: Optional[str] = None
port: Optional[int] = None
username: Optional[str] = None
password: Optional[str] = None
from_email: Optional[str] = None
use_tls: Optional[bool] = None
class EmailTestRequest(BaseModel):
to: str
# ---------------------------------------------------------------------------
# Helpers for system_configs CRUD
# ---------------------------------------------------------------------------
_SMTP_KEYS = {
"enabled": "smtp.enabled",
"host": "smtp.host",
"port": "smtp.port",
"username": "smtp.username",
"password": "smtp.password",
"from_email": "smtp.from_email",
"use_tls": "smtp.use_tls",
}
def _upsert_config(db: Session, key: str, value: str) -> None:
from app.models.system_config import SystemConfig # lazy import avoids circular
row = db.query(SystemConfig).filter(SystemConfig.key == key).first()
if row:
row.value = value
else:
row = SystemConfig(key=key, value=value)
db.add(row)
def _read_email_config_from_db(db: Session) -> dict:
"""Return a dict with resolved email settings (DB overrides env)."""
from app.services.email_service import _get_smtp_config
return _get_smtp_config(db)
def _bg_mitre_sync() -> None:
"""Run MITRE sync in a background task with its own DB session."""
logger.info("Background MITRE sync task starting...")
db = SessionLocal()
try:
summary = sync_mitre(db)
logger.info("Background MITRE sync task finished — %s", summary)
except Exception:
logger.exception("Background MITRE sync task failed")
finally:
db.close()
@router.post("/sync-mitre")
@limiter.limit("2/hour")
def trigger_mitre_sync(
request: Request,
background_tasks: BackgroundTasks,
current_user: User = Depends(require_role("admin")),
):
"""Manually trigger a MITRE ATT&CK synchronisation in the background.
**Requires** the ``admin`` role.
Returns immediately — the sync runs asynchronously. Poll
``/system/scheduler-status`` for progress, or check server logs.
"""
background_tasks.add_task(_bg_mitre_sync)
return {
"message": "MITRE sync started in background",
"status": "started",
"new": 0,
"updated": 0,
}
@router.post("/run-intel-scan")
def trigger_intel_scan(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Manually trigger a threat-intelligence scan.
**Requires** the ``admin`` role.
Returns a JSON object with the scan summary including the count of
new intel items found.
"""
summary = scan_intel(db)
return {
"message": "Intel scan completed",
"new_items": summary["new_items"],
}
@router.post("/import-atomic-tests")
@limiter.limit("2/hour")
def trigger_atomic_import(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Trigger an import of Atomic Red Team tests as TestTemplates.
**Requires** the ``admin`` role.
Downloads the Atomic Red Team repository ZIP from GitHub, parses the
YAML files, and creates/updates TestTemplate records. Running this
endpoint multiple times is idempotent — duplicates are skipped.
Returns a JSON object with import statistics.
"""
try:
summary = import_atomic_red_team(db)
except Exception as exc:
logger.error("Atomic Red Team import failed: %s", exc, exc_info=True)
return {
"message": "Import failed. Check server logs for details.",
}
return {
"message": "Import completed",
"imported": summary["created"],
"skipped": summary["skipped_existing"],
"total_parsed": summary["total_tests_parsed"],
}
@router.get("/scheduler-status")
def scheduler_status(
current_user: User = Depends(require_role("admin")),
):
"""Return the current state of the background scheduler.
**Requires** the ``admin`` role.
"""
jobs = scheduler.get_jobs()
return {
"running": scheduler.running,
"jobs": [
{
"id": job.id,
"name": job.name,
"next_run_time": str(job.next_run_time) if job.next_run_time else None,
}
for job in jobs
],
}
# ---------------------------------------------------------------------------
# GET /system/email-config
# ---------------------------------------------------------------------------
@router.get("/email-config", response_model=EmailConfigOut)
def get_email_config(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Return current SMTP email configuration (merged DB + env).
**Requires** the ``admin`` role. Password is never returned.
"""
cfg = _read_email_config_from_db(db)
return EmailConfigOut(
enabled=cfg["enabled"],
host=cfg["host"],
port=cfg["port"],
username=cfg["username"],
from_email=cfg["from_email"],
use_tls=cfg["use_tls"],
)
# ---------------------------------------------------------------------------
# PATCH /system/email-config
# ---------------------------------------------------------------------------
@router.patch("/email-config", response_model=EmailConfigOut)
def update_email_config(
payload: EmailConfigUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Update SMTP email configuration and persist to DB.
**Requires** the ``admin`` role.
Only provided fields are updated (partial update).
"""
update_data = payload.model_dump(exclude_unset=True)
for field, val in update_data.items():
db_key = _SMTP_KEYS.get(field)
if db_key:
_upsert_config(db, db_key, str(val))
db.commit()
cfg = _read_email_config_from_db(db)
return EmailConfigOut(
enabled=cfg["enabled"],
host=cfg["host"],
port=cfg["port"],
username=cfg["username"],
from_email=cfg["from_email"],
use_tls=cfg["use_tls"],
)
# ---------------------------------------------------------------------------
# POST /system/email-test
# ---------------------------------------------------------------------------
@router.post("/email-test")
def send_test_email(
payload: EmailTestRequest,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Send a test email to verify SMTP configuration.
**Requires** the ``admin`` role.
Returns 200 on success, 502 if sending fails.
"""
from app.services.email_service import send_test_email as _send_test
ok = _send_test(payload.to, db=db)
if not ok:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to send test email. Check SMTP configuration and server logs.",
)
return {"detail": f"Test email sent to {payload.to}"}