"""System-level endpoints (admin only). Provides manual triggers for background operations such as the MITRE ATT&CK synchronisation, intel scanning, Atomic Red Team import, and scheduler health introspection. """ import logging from fastapi import APIRouter, BackgroundTasks, Depends, Request from sqlalchemy.orm import Session from app.database import SessionLocal, get_db from app.dependencies.auth import require_role from app.models.user import User from app.services.mitre_sync_service import sync_mitre from app.services.intel_service import scan_intel from app.services.atomic_import_service import import_atomic_red_team from app.jobs.mitre_sync_job import scheduler from app.limiter import limiter logger = logging.getLogger(__name__) router = APIRouter(prefix="/system", tags=["system"]) def _bg_mitre_sync() -> None: """Run MITRE sync in a background task with its own DB session.""" logger.info("Background MITRE sync task starting...") db = SessionLocal() try: summary = sync_mitre(db) logger.info("Background MITRE sync task finished — %s", summary) except Exception: logger.exception("Background MITRE sync task failed") finally: db.close() @router.post("/sync-mitre") @limiter.limit("2/hour") def trigger_mitre_sync( request: Request, background_tasks: BackgroundTasks, current_user: User = Depends(require_role("admin")), ): """Manually trigger a MITRE ATT&CK synchronisation in the background. **Requires** the ``admin`` role. Returns immediately — the sync runs asynchronously. Poll ``/system/scheduler-status`` for progress, or check server logs. """ background_tasks.add_task(_bg_mitre_sync) return { "message": "MITRE sync started in background", "status": "started", "new": 0, "updated": 0, } @router.post("/run-intel-scan") def trigger_intel_scan( db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Manually trigger a threat-intelligence scan. **Requires** the ``admin`` role. Returns a JSON object with the scan summary including the count of new intel items found. """ summary = scan_intel(db) return { "message": "Intel scan completed", "new_items": summary["new_items"], } @router.post("/import-atomic-tests") @limiter.limit("2/hour") def trigger_atomic_import( request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Trigger an import of Atomic Red Team tests as TestTemplates. **Requires** the ``admin`` role. Downloads the Atomic Red Team repository ZIP from GitHub, parses the YAML files, and creates/updates TestTemplate records. Running this endpoint multiple times is idempotent — duplicates are skipped. Returns a JSON object with import statistics. """ try: summary = import_atomic_red_team(db) except Exception as exc: logger.error("Atomic Red Team import failed: %s", exc, exc_info=True) return { "message": "Import failed. Check server logs for details.", } return { "message": "Import completed", "imported": summary["created"], "skipped": summary["skipped_existing"], "total_parsed": summary["total_tests_parsed"], } @router.get("/scheduler-status") def scheduler_status( current_user: User = Depends(require_role("admin")), ): """Return the current state of the background scheduler. **Requires** the ``admin`` role. """ jobs = scheduler.get_jobs() return { "running": scheduler.running, "jobs": [ { "id": job.id, "name": job.name, "next_run_time": str(job.next_run_time) if job.next_run_time else None, } for job in jobs ], }