Files
Aegis/backend/app/main.py
Kitos 64d64080e0 fix: resolve 20 security vulnerabilities from comprehensive audit
Critical (1-3):
- Replace hardcoded admin credentials with secure auto-generation (seed.py)
- Enforce SECRET_KEY configuration, fail in production if missing (config.py)
- Add Zip Slip and Zip Bomb protection to all ZIP import services

High/Medium (4-9):
- Add 50MB file size limit and extension whitelist to evidence uploads
- Configure CORS origins via environment variable instead of hardcoded
- Migrate JWT storage from localStorage to HttpOnly cookies (frontend+backend)
- Add rate limiting (5/min) on login endpoint via slowapi
- Replace generic dict payloads with Pydantic schemas (mass assignment)

Medium (10-17):
- Check is_active on login to prevent disabled users from authenticating
- Sanitize exception messages in API responses (system, data_sources)
- Escape LIKE wildcards in all ilike search filters across 8 routers
- Run Docker container as non-root user (appuser)
- Make MINIO_SECURE configurable via environment variable
- Add password complexity policy (12+ chars, upper/lower/digit/special)
- Implement JWT token revocation via in-memory blacklist + reduce TTL to 15min
- Replace xml.etree with defusedxml to prevent Billion Laughs attacks

Low (18-20):
- Add security headers to Nginx (CSP, X-Frame-Options, HSTS-ready, etc.)
- Disable Swagger UI/ReDoc/OpenAPI in production
- Restrict /health endpoint to internal networks via Nginx ACL

Also: rewrite install.sh as interactive wizard for guided deployment,
fix test-from-template validation error (technique_id UUID vs MITRE ID)
2026-02-11 08:56:26 +01:00

160 lines
6.8 KiB
Python

import logging
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from sqlalchemy.exc import SQLAlchemyError
from app.routers import auth as auth_router
from app.routers import techniques as techniques_router
from app.routers import tests as tests_router
from app.routers import evidence as evidence_router
from app.routers import test_templates as test_templates_router
from app.routers import system as system_router
from app.routers import metrics as metrics_router
from app.routers import users as users_router
from app.routers import audit as audit_router
from app.routers import notifications as notifications_router
from app.routers import reports as reports_router
from app.routers import data_sources as data_sources_router
from app.routers import threat_actors as threat_actors_router
from app.routers import d3fend as d3fend_router
from app.routers import detection_rules as detection_rules_router
from app.routers import campaigns as campaigns_router
from app.routers import heatmap as heatmap_router
from app.routers import scores as scores_router
from app.routers import operational_metrics as operational_metrics_router
from app.routers import compliance as compliance_router
from app.routers import snapshots as snapshots_router
from app.storage import ensure_bucket_exists
from app.jobs.mitre_sync_job import start_scheduler, scheduler
# ── Environment detection ─────────────────────────────────────────────────
_IS_PRODUCTION = os.environ.get("AEGIS_ENV", "").lower() == "production"
# ── Logging ───────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup / shutdown logic."""
ensure_bucket_exists()
start_scheduler()
yield
# Graceful shutdown of the background scheduler
scheduler.shutdown(wait=False)
# ── In production, disable Swagger UI and ReDoc to hide API surface ──────
app = FastAPI(
title="Attack Coverage Platform",
lifespan=lifespan,
docs_url=None if _IS_PRODUCTION else "/docs",
redoc_url=None if _IS_PRODUCTION else "/redoc",
openapi_url=None if _IS_PRODUCTION else "/openapi.json",
)
# ── Rate Limiter ──────────────────────────────────────────────────────────
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# ── CORS ──────────────────────────────────────────────────────────────────
from app.config import settings as _settings
_cors_origins: list[str] = [
o.strip() for o in _settings.CORS_ORIGINS.split(",") if o.strip()
]
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["Authorization", "Content-Type"],
)
# ── Routers ──────────────────────────────────────────────────────────────
app.include_router(auth_router.router, prefix="/api/v1")
app.include_router(techniques_router.router, prefix="/api/v1")
app.include_router(tests_router.router, prefix="/api/v1")
app.include_router(evidence_router.router, prefix="/api/v1")
app.include_router(test_templates_router.router, prefix="/api/v1")
app.include_router(system_router.router, prefix="/api/v1")
app.include_router(metrics_router.router, prefix="/api/v1")
app.include_router(users_router.router, prefix="/api/v1")
app.include_router(audit_router.router, prefix="/api/v1")
app.include_router(notifications_router.router, prefix="/api/v1")
app.include_router(reports_router.router, prefix="/api/v1")
app.include_router(data_sources_router.router, prefix="/api/v1")
app.include_router(threat_actors_router.router, prefix="/api/v1")
app.include_router(d3fend_router.router, prefix="/api/v1")
app.include_router(detection_rules_router.router, prefix="/api/v1")
app.include_router(campaigns_router.router, prefix="/api/v1")
app.include_router(heatmap_router.router, prefix="/api/v1")
app.include_router(scores_router.router, prefix="/api/v1")
app.include_router(operational_metrics_router.router, prefix="/api/v1")
app.include_router(compliance_router.router, prefix="/api/v1")
app.include_router(snapshots_router.router, prefix="/api/v1")
@app.get("/health", include_in_schema=False)
def health():
"""Minimal health check — returns only an HTTP 200 with no service metadata.
Access is restricted to internal networks at the Nginx level
(see ``frontend/nginx.conf``).
"""
return {"status": "ok"}
# ── Exception Handlers ────────────────────────────────────────────────────
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle validation errors with consistent format."""
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={
"detail": "Validation error",
"code": "VALIDATION_ERROR",
"errors": exc.errors(),
},
)
@app.exception_handler(SQLAlchemyError)
async def sqlalchemy_exception_handler(request: Request, exc: SQLAlchemyError):
"""Handle database errors."""
logging.error(f"Database error: {exc}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"detail": "Database error occurred",
"code": "DATABASE_ERROR",
},
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle all unhandled exceptions."""
logging.error(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"detail": "An internal server error occurred",
"code": "INTERNAL_ERROR",
},
)