"""Compliance endpoints — framework status, reports, and gap analysis. Thin HTTP adapter: delegates all data logic to compliance_service. Provides compliance posture assessment by mapping MITRE ATT&CK technique coverage to compliance framework controls. """ from fastapi import APIRouter, Depends from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user, require_role from app.models.user import User from app.services.compliance_service import ( list_frameworks, get_framework_status, build_framework_report_csv, get_framework_gaps, ) from app.services.compliance_import_service import ( import_nist_800_53_mappings, import_cis_controls_v8_mappings, import_dora_mappings, ) router = APIRouter(prefix="/compliance", tags=["compliance"]) # ── GET /compliance/frameworks ──────────────────────────────────────── @router.get("/frameworks") def list_frameworks_endpoint( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """List all available compliance frameworks.""" return list_frameworks(db) # ── GET /compliance/frameworks/{id}/status ──────────────────────────── @router.get("/frameworks/{framework_id}/status") def framework_status( framework_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get compliance status for each control in a framework.""" return get_framework_status(db, framework_id) # ── GET /compliance/frameworks/{id}/report ──────────────────────────── @router.get("/frameworks/{framework_id}/report") def framework_report( framework_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get the full compliance report (same as status but marked as report).""" return get_framework_status(db, framework_id) # ── GET /compliance/frameworks/{id}/report/csv ──────────────────────── @router.get("/frameworks/{framework_id}/report/csv") def framework_report_csv( framework_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Export compliance report as CSV.""" csv_bytes, filename = build_framework_report_csv(db, framework_id) return StreamingResponse( iter([csv_bytes]), media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename={filename}", }, ) # ── GET /compliance/frameworks/{id}/gaps ────────────────────────────── @router.get("/frameworks/{framework_id}/gaps") def framework_gaps( framework_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get controls with techniques that are not adequately covered.""" return get_framework_gaps(db, framework_id) # ── POST /compliance/import/... ──────────────────────────────────────── @router.post("/import/nist-800-53") def import_nist( db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Import NIST 800-53 Rev 5 mappings (admin only).""" result = import_nist_800_53_mappings(db) return result @router.post("/import/cis-controls-v8") def import_cis( db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Import CIS Controls v8 mappings (admin only).""" result = import_cis_controls_v8_mappings(db) return result @router.post("/import/dora") def import_dora( db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Import DORA (EU 2022/2554) compliance mappings (admin only).""" result = import_dora_mappings(db) return result