"""Heatmap endpoints — ATT&CK Navigator-compatible layer generation. Thin router that delegates entirely to :mod:`app.services.heatmap_service`. No business logic lives here — only request validation and response formatting. """ import io import json from typing import Optional from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user from app.models.user import User from app.services import heatmap_service router = APIRouter(prefix="/heatmap", tags=["heatmap"]) @router.get("/coverage") def heatmap_coverage( platforms: Optional[str] = Query(None, description="Comma-separated platforms"), tactics: Optional[str] = Query(None, description="Comma-separated tactics"), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Coverage layer — score based on status_global of each technique.""" return heatmap_service.build_coverage_layer( db, platforms=platforms, tactics=tactics, min_score=min_score, ) @router.get("/threat-actor/{actor_id}") def heatmap_threat_actor( actor_id: str, platforms: Optional[str] = Query(None), tactics: Optional[str] = Query(None), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Threat actor layer — techniques used by an actor with coverage color.""" return heatmap_service.build_threat_actor_layer( db, actor_id, platforms=platforms, tactics=tactics, min_score=min_score, ) @router.get("/detection-rules") def heatmap_detection_rules( platforms: Optional[str] = Query(None), tactics: Optional[str] = Query(None), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Detection rules layer — score based on ratio of rules available vs total.""" return heatmap_service.build_detection_rules_layer( db, platforms=platforms, tactics=tactics, min_score=min_score, ) @router.get("/campaign/{campaign_id}") def heatmap_campaign( campaign_id: str, platforms: Optional[str] = Query(None), tactics: Optional[str] = Query(None), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Campaign layer — only techniques in the campaign, colored by test state.""" return heatmap_service.build_campaign_layer( db, campaign_id, platforms=platforms, tactics=tactics, min_score=min_score, ) @router.get("/export-navigator") def export_navigator( layer: str = Query(..., description="Layer type: coverage, threat-actor, detection-rules, campaign"), layer_id: Optional[str] = Query(None, description="Actor ID or Campaign ID (if applicable)"), platforms: Optional[str] = Query(None), tactics: Optional[str] = Query(None), min_score: int = Query(0, ge=0, le=100), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Export a heatmap layer as a downloadable JSON file for ATT&CK Navigator.""" data = heatmap_service.build_navigator_export( db, layer, layer_id=layer_id, platforms=platforms, tactics=tactics, min_score=min_score, ) json_content = json.dumps(data, indent=2, default=str) buffer = io.BytesIO(json_content.encode("utf-8")) return StreamingResponse( buffer, media_type="application/json", headers={"Content-Disposition": f"attachment; filename=aegis_{layer}_layer.json"}, )