"""Heatmap endpoints — ATT&CK Navigator-compatible layer generation. Thin router that delegates to :mod:`app.services.heatmap_service`. """ import io import json from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user from app.models.user import User from app.services import heatmap_service router = APIRouter(prefix="/heatmap", tags=["heatmap"]) # ── GET /heatmap/coverage ───────────────────────────────────────────── @router.get("/coverage") def heatmap_coverage( platforms: Optional[str] = Query(None, description="Comma-separated platforms"), tactics: Optional[str] = Query(None, description="Comma-separated tactics"), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Coverage layer — score based on status_global of each technique.""" return heatmap_service.build_coverage_layer( db, platforms=platforms, tactics=tactics, min_score=min_score, ) # ── GET /heatmap/threat-actor/{actor_id} ────────────────────────────── @router.get("/threat-actor/{actor_id}") def heatmap_threat_actor( actor_id: str, platforms: Optional[str] = Query(None), tactics: Optional[str] = Query(None), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Threat actor layer — techniques used by an actor with coverage color.""" layer = heatmap_service.build_threat_actor_layer( db, actor_id, platforms=platforms, tactics=tactics, min_score=min_score, ) if layer is None: raise HTTPException(status_code=404, detail="Threat actor not found") return layer # ── GET /heatmap/detection-rules ────────────────────────────────────── @router.get("/detection-rules") def heatmap_detection_rules( platforms: Optional[str] = Query(None), tactics: Optional[str] = Query(None), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Detection rules layer — score based on ratio of rules available vs total.""" return heatmap_service.build_detection_rules_layer( db, platforms=platforms, tactics=tactics, min_score=min_score, ) # ── GET /heatmap/campaign/{campaign_id} ─────────────────────────────── @router.get("/campaign/{campaign_id}") def heatmap_campaign( campaign_id: str, platforms: Optional[str] = Query(None), tactics: Optional[str] = Query(None), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Campaign layer — only techniques in the campaign, colored by test state.""" layer = heatmap_service.build_campaign_layer( db, campaign_id, platforms=platforms, tactics=tactics, min_score=min_score, ) if layer is None: raise HTTPException(status_code=404, detail="Campaign not found") return layer # ── GET /heatmap/export-navigator ───────────────────────────────────── _LAYER_BUILDERS = { "coverage": lambda db, **kw: heatmap_service.build_coverage_layer(db, **kw), "detection-rules": lambda db, **kw: heatmap_service.build_detection_rules_layer(db, **kw), } _LAYER_BUILDERS_WITH_ID = { "threat-actor": lambda db, lid, **kw: heatmap_service.build_threat_actor_layer(db, lid, **kw), "campaign": lambda db, lid, **kw: heatmap_service.build_campaign_layer(db, lid, **kw), } @router.get("/export-navigator") def export_navigator( layer: str = Query(..., description="Layer type: coverage, threat-actor, detection-rules, campaign"), layer_id: Optional[str] = Query(None, description="Actor ID or Campaign ID (if applicable)"), platforms: Optional[str] = Query(None), tactics: Optional[str] = Query(None), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Export a heatmap layer as a downloadable JSON file for ATT&CK Navigator.""" kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score) if layer in _LAYER_BUILDERS: data = _LAYER_BUILDERS[layer](db, **kwargs) elif layer in _LAYER_BUILDERS_WITH_ID: if not layer_id: raise HTTPException(status_code=400, detail=f"layer_id required for {layer} layer") data = _LAYER_BUILDERS_WITH_ID[layer](db, layer_id, **kwargs) if data is None: raise HTTPException(status_code=404, detail=f"{layer} not found") else: raise HTTPException(status_code=400, detail=f"Unknown layer type: {layer}") json_content = json.dumps(data, indent=2, default=str) buffer = io.BytesIO(json_content.encode("utf-8")) return StreamingResponse( buffer, media_type="application/json", headers={"Content-Disposition": f"attachment; filename=aegis_{layer}_layer.json"}, )