diff --git a/backend/alembic/versions/b014_add_compliance_tables.py b/backend/alembic/versions/b014_add_compliance_tables.py new file mode 100644 index 0000000..e1b2af5 --- /dev/null +++ b/backend/alembic/versions/b014_add_compliance_tables.py @@ -0,0 +1,92 @@ +"""add_compliance_tables + +Revision ID: b014compliance +Revises: b013campaigns +Create Date: 2026-02-09 20:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "b014compliance" +down_revision: Union[str, None] = "b013campaigns" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ── compliance_frameworks ───────────────────────────────────── + op.create_table( + "compliance_frameworks", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("name", sa.String, unique=True, nullable=False), + sa.Column("version", sa.String, nullable=True), + sa.Column("description", sa.Text, nullable=True), + sa.Column("url", sa.String, nullable=True), + sa.Column("is_active", sa.Boolean, server_default="true"), + sa.Column("created_at", sa.DateTime, server_default=sa.func.now()), + ) + + # ── compliance_controls ─────────────────────────────────────── + op.create_table( + "compliance_controls", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column( + "framework_id", + postgresql.UUID(as_uuid=True), + sa.ForeignKey("compliance_frameworks.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("control_id", sa.String, nullable=False), + sa.Column("title", sa.String, nullable=False), + sa.Column("description", sa.Text, nullable=True), + sa.Column("category", sa.String, nullable=True), + ) + op.create_index( + "ix_compliance_controls_framework", + "compliance_controls", + ["framework_id"], + ) + + # ── compliance_control_mappings ─────────────────────────────── + op.create_table( + "compliance_control_mappings", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column( + "compliance_control_id", + postgresql.UUID(as_uuid=True), + sa.ForeignKey("compliance_controls.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column( + "technique_id", + postgresql.UUID(as_uuid=True), + sa.ForeignKey("techniques.id", ondelete="CASCADE"), + nullable=False, + ), + ) + op.create_index( + "ix_compliance_mappings_control", + "compliance_control_mappings", + ["compliance_control_id"], + ) + op.create_index( + "ix_compliance_mappings_technique", + "compliance_control_mappings", + ["technique_id"], + ) + op.create_unique_constraint( + "uq_control_technique", + "compliance_control_mappings", + ["compliance_control_id", "technique_id"], + ) + + +def downgrade() -> None: + op.drop_table("compliance_control_mappings") + op.drop_table("compliance_controls") + op.drop_table("compliance_frameworks") diff --git a/backend/app/main.py b/backend/app/main.py index 81a4b85..4d9ae29 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -26,6 +26,7 @@ from app.routers import campaigns as campaigns_router from app.routers import heatmap as heatmap_router from app.routers import scores as scores_router from app.routers import operational_metrics as operational_metrics_router +from app.routers import compliance as compliance_router from app.storage import ensure_bucket_exists from app.jobs.mitre_sync_job import start_scheduler, scheduler @@ -76,6 +77,7 @@ app.include_router(campaigns_router.router, prefix="/api/v1") app.include_router(heatmap_router.router, prefix="/api/v1") app.include_router(scores_router.router, prefix="/api/v1") app.include_router(operational_metrics_router.router, prefix="/api/v1") +app.include_router(compliance_router.router, prefix="/api/v1") @app.get("/health") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index a547756..c61e495 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -14,6 +14,7 @@ from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqu from app.models.test_template_detection_rule import TestTemplateDetectionRule from app.models.test_detection_result import TestDetectionResult from app.models.campaign import Campaign, CampaignTest +from app.models.compliance import ComplianceFramework, ComplianceControl, ComplianceControlMapping from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide __all__ = [ @@ -23,5 +24,6 @@ __all__ = [ "DefensiveTechnique", "DefensiveTechniqueMapping", "TestTemplateDetectionRule", "TestDetectionResult", "Campaign", "CampaignTest", + "ComplianceFramework", "ComplianceControl", "ComplianceControlMapping", "TechniqueStatus", "TestState", "TestResult", "TeamSide", ] diff --git a/backend/app/models/compliance.py b/backend/app/models/compliance.py new file mode 100644 index 0000000..2cd0486 --- /dev/null +++ b/backend/app/models/compliance.py @@ -0,0 +1,97 @@ +"""Compliance models — frameworks, controls, and technique mappings. + +Maps compliance frameworks (NIST 800-53, DORA, NIS2, ISO 27001) to +MITRE ATT&CK techniques, enabling compliance gap analysis. +""" + +import uuid +from datetime import datetime + +from sqlalchemy import ( + Column, String, Text, Boolean, DateTime, + ForeignKey, Index, UniqueConstraint, +) +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import relationship + +from app.database import Base + + +class ComplianceFramework(Base): + """A compliance framework (e.g. NIST 800-53, ISO 27001).""" + __tablename__ = "compliance_frameworks" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String, unique=True, nullable=False) + version = Column(String, nullable=True) + description = Column(Text, nullable=True) + url = Column(String, nullable=True) + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.utcnow) + + # Relationships + controls = relationship( + "ComplianceControl", + back_populates="framework", + cascade="all, delete-orphan", + ) + + +class ComplianceControl(Base): + """A control within a compliance framework (e.g. AC-2, PR.AC-1).""" + __tablename__ = "compliance_controls" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + framework_id = Column( + UUID(as_uuid=True), + ForeignKey("compliance_frameworks.id", ondelete="CASCADE"), + nullable=False, + ) + control_id = Column(String, nullable=False) # e.g. "AC-2" + title = Column(String, nullable=False) + description = Column(Text, nullable=True) + category = Column(String, nullable=True) + + # Relationships + framework = relationship("ComplianceFramework", back_populates="controls") + technique_mappings = relationship( + "ComplianceControlMapping", + back_populates="compliance_control", + cascade="all, delete-orphan", + ) + + __table_args__ = ( + Index('ix_compliance_controls_framework', 'framework_id'), + ) + + +class ComplianceControlMapping(Base): + """Maps a compliance control to a MITRE ATT&CK technique.""" + __tablename__ = "compliance_control_mappings" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + compliance_control_id = Column( + UUID(as_uuid=True), + ForeignKey("compliance_controls.id", ondelete="CASCADE"), + nullable=False, + ) + technique_id = Column( + UUID(as_uuid=True), + ForeignKey("techniques.id", ondelete="CASCADE"), + nullable=False, + ) + + # Relationships + compliance_control = relationship( + "ComplianceControl", back_populates="technique_mappings" + ) + technique = relationship("Technique") + + __table_args__ = ( + Index('ix_compliance_mappings_control', 'compliance_control_id'), + Index('ix_compliance_mappings_technique', 'technique_id'), + UniqueConstraint( + 'compliance_control_id', 'technique_id', + name='uq_control_technique', + ), + ) diff --git a/backend/app/routers/compliance.py b/backend/app/routers/compliance.py new file mode 100644 index 0000000..b8bfd5d --- /dev/null +++ b/backend/app/routers/compliance.py @@ -0,0 +1,380 @@ +"""Compliance endpoints — framework status, reports, and gap analysis. + +Provides compliance posture assessment by mapping MITRE ATT&CK technique +coverage to compliance framework controls. +""" + +import csv +import io +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.responses import StreamingResponse +from sqlalchemy.orm import Session, joinedload + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_role +from app.models.user import User +from app.models.compliance import ( + ComplianceFramework, + ComplianceControl, + ComplianceControlMapping, +) +from app.models.technique import Technique +from app.models.test_template import TestTemplate +from app.models.threat_actor import ThreatActorTechnique +from app.services.scoring_service import calculate_technique_score +from app.services.compliance_import_service import import_nist_800_53_mappings + +router = APIRouter(prefix="/compliance", tags=["compliance"]) + + +# ── Helpers ─────────────────────────────────────────────────────────── + + +def _classify_control(technique_scores: list[float]) -> str: + """Classify a control status based on its technique scores.""" + if not technique_scores: + return "not_evaluated" + + all_above_70 = all(s >= 70 for s in technique_scores) + any_above_30 = any(s >= 30 for s in technique_scores) + all_below_30 = all(s < 30 for s in technique_scores) + all_zero = all(s == 0 for s in technique_scores) + + if all_zero: + return "not_evaluated" + if all_above_70: + return "covered" + if all_below_30: + return "not_covered" + if any_above_30: + return "partially_covered" + return "not_covered" + + +def _get_control_status(control: ComplianceControl, db: Session) -> dict: + """Compute the status and score for a single control.""" + mappings = ( + db.query(ComplianceControlMapping) + .filter(ComplianceControlMapping.compliance_control_id == control.id) + .all() + ) + + if not mappings: + return { + "control_id": control.control_id, + "title": control.title, + "category": control.category, + "status": "not_evaluated", + "score": 0, + "techniques_count": 0, + "techniques_covered": 0, + "techniques": [], + } + + technique_ids = [m.technique_id for m in mappings] + techniques = ( + db.query(Technique) + .filter(Technique.id.in_(technique_ids)) + .all() + ) + + tech_details = [] + scores = [] + covered_count = 0 + + for tech in techniques: + result = calculate_technique_score(tech, db) + score = result["total_score"] + scores.append(score) + if score >= 50: + covered_count += 1 + + tech_details.append({ + "mitre_id": tech.mitre_id, + "name": tech.name, + "score": score, + "status": tech.status_global.value if tech.status_global else "not_evaluated", + }) + + # Sort techniques by score ascending (worst first for priority) + tech_details.sort(key=lambda t: t["score"]) + + avg_score = round(sum(scores) / len(scores), 1) if scores else 0 + status = _classify_control(scores) + + return { + "control_id": control.control_id, + "title": control.title, + "category": control.category, + "status": status, + "score": avg_score, + "techniques_count": len(techniques), + "techniques_covered": covered_count, + "techniques": tech_details, + } + + +# ── GET /compliance/frameworks ──────────────────────────────────────── + + +@router.get("/frameworks") +def list_frameworks( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """List all available compliance frameworks.""" + frameworks = ( + db.query(ComplianceFramework) + .filter(ComplianceFramework.is_active == True) + .all() + ) + + result = [] + for fw in frameworks: + control_count = ( + db.query(ComplianceControl) + .filter(ComplianceControl.framework_id == fw.id) + .count() + ) + result.append({ + "id": str(fw.id), + "name": fw.name, + "version": fw.version, + "description": fw.description, + "url": fw.url, + "is_active": fw.is_active, + "controls_count": control_count, + }) + + return result + + +# ── GET /compliance/frameworks/{id}/status ──────────────────────────── + + +@router.get("/frameworks/{framework_id}/status") +def framework_status( + framework_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Get compliance status for each control in a framework.""" + framework = ( + db.query(ComplianceFramework) + .filter(ComplianceFramework.id == framework_id) + .first() + ) + if not framework: + raise HTTPException(status_code=404, detail="Framework not found") + + controls = ( + db.query(ComplianceControl) + .filter(ComplianceControl.framework_id == framework.id) + .order_by(ComplianceControl.control_id) + .all() + ) + + control_statuses = [] + summary = { + "total_controls": len(controls), + "covered": 0, + "partially_covered": 0, + "not_covered": 0, + "not_evaluated": 0, + } + + for control in controls: + status_data = _get_control_status(control, db) + control_statuses.append(status_data) + + status = status_data["status"] + if status in summary: + summary[status] += 1 + + # Compliance percentage: (covered + partially_covered*0.5) / total * 100 + total = summary["total_controls"] + if total > 0: + compliance_pct = round( + (summary["covered"] + summary["partially_covered"] * 0.5) / total * 100, + 1, + ) + else: + compliance_pct = 0 + + summary["compliance_percentage"] = compliance_pct + + return { + "framework": {"id": str(framework.id), "name": framework.name}, + "summary": summary, + "controls": control_statuses, + } + + +# ── GET /compliance/frameworks/{id}/report ──────────────────────────── + + +@router.get("/frameworks/{framework_id}/report") +def framework_report( + framework_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Get the full compliance report (same as status but marked as report).""" + return framework_status(framework_id, db=db, current_user=current_user) + + +# ── GET /compliance/frameworks/{id}/report/csv ──────────────────────── + + +@router.get("/frameworks/{framework_id}/report/csv") +def framework_report_csv( + framework_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Export compliance report as CSV.""" + framework = ( + db.query(ComplianceFramework) + .filter(ComplianceFramework.id == framework_id) + .first() + ) + if not framework: + raise HTTPException(status_code=404, detail="Framework not found") + + controls = ( + db.query(ComplianceControl) + .filter(ComplianceControl.framework_id == framework.id) + .order_by(ComplianceControl.control_id) + .all() + ) + + output = io.StringIO() + writer = csv.writer(output) + writer.writerow([ + "control_id", + "title", + "category", + "status", + "score", + "techniques_total", + "techniques_covered", + "technique_ids", + ]) + + for control in controls: + status_data = _get_control_status(control, db) + technique_ids = ",".join(t["mitre_id"] for t in status_data["techniques"]) + writer.writerow([ + status_data["control_id"], + status_data["title"], + status_data["category"] or "", + status_data["status"], + status_data["score"], + status_data["techniques_count"], + status_data["techniques_covered"], + technique_ids, + ]) + + output.seek(0) + filename = f"compliance_{framework.name.replace(' ', '_')}.csv" + + return StreamingResponse( + io.BytesIO(output.getvalue().encode("utf-8")), + media_type="text/csv", + headers={ + "Content-Disposition": f"attachment; filename={filename}", + }, + ) + + +# ── GET /compliance/frameworks/{id}/gaps ────────────────────────────── + + +@router.get("/frameworks/{framework_id}/gaps") +def framework_gaps( + framework_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Get controls with techniques that are not adequately covered.""" + framework = ( + db.query(ComplianceFramework) + .filter(ComplianceFramework.id == framework_id) + .first() + ) + if not framework: + raise HTTPException(status_code=404, detail="Framework not found") + + controls = ( + db.query(ComplianceControl) + .filter(ComplianceControl.framework_id == framework.id) + .order_by(ComplianceControl.control_id) + .all() + ) + + gaps = [] + for control in controls: + status_data = _get_control_status(control, db) + + if status_data["status"] in ("not_covered", "partially_covered"): + # Find uncovered techniques + uncovered_techniques = [] + for tech_info in status_data["techniques"]: + if tech_info["score"] < 70: + # Count available templates + template_count = ( + db.query(TestTemplate) + .filter(TestTemplate.mitre_technique_id == tech_info["mitre_id"]) + .count() + ) + + # Count threat actors using this technique + technique = ( + db.query(Technique) + .filter(Technique.mitre_id == tech_info["mitre_id"]) + .first() + ) + actor_count = 0 + if technique: + actor_count = ( + db.query(ThreatActorTechnique) + .filter(ThreatActorTechnique.technique_id == technique.id) + .count() + ) + + uncovered_techniques.append({ + **tech_info, + "templates_available": template_count, + "threat_actors_using": actor_count, + }) + + if uncovered_techniques: + gaps.append({ + "control_id": status_data["control_id"], + "title": status_data["title"], + "category": status_data["category"], + "status": status_data["status"], + "score": status_data["score"], + "uncovered_techniques": uncovered_techniques, + }) + + return { + "framework": {"id": str(framework.id), "name": framework.name}, + "total_gaps": len(gaps), + "gaps": gaps, + } + + +# ── POST /compliance/import/nist-800-53 ────────────────────────────── + + +@router.post("/import/nist-800-53") +def import_nist( + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """Import NIST 800-53 Rev 5 mappings (admin only).""" + result = import_nist_800_53_mappings(db) + return result diff --git a/backend/app/services/compliance_import_service.py b/backend/app/services/compliance_import_service.py new file mode 100644 index 0000000..ce8f763 --- /dev/null +++ b/backend/app/services/compliance_import_service.py @@ -0,0 +1,356 @@ +"""Compliance import service — imports NIST 800-53 to ATT&CK mappings. + +Downloads and parses the STIX bundle from the Center for Threat-Informed +Defense's attack_to_nist_mapping repository to create ComplianceFramework, +ComplianceControl, and ComplianceControlMapping records. +""" + +import logging +import json +import re +from typing import Optional + +import requests +from sqlalchemy.orm import Session + +from app.models.compliance import ( + ComplianceFramework, + ComplianceControl, + ComplianceControlMapping, +) +from app.models.technique import Technique + +logger = logging.getLogger(__name__) + +# URL for the NIST 800-53 Rev 5 to ATT&CK mapping +# This is the JSON STIX bundle that contains the relationships +NIST_MAPPING_URL = ( + "https://raw.githubusercontent.com/center-for-threat-informed-defense/" + "attack_to_nist_mapping/main/data/attack-to-nist-rev5.json" +) + + +def import_nist_800_53_mappings(db: Session) -> dict: + """Import NIST 800-53 Rev 5 mappings from MITRE CTI repository. + + Steps: + 1. Create or get the NIST 800-53 Rev 5 framework + 2. Download the STIX bundle JSON + 3. Parse controls and relationship objects + 4. Create ComplianceControl records + 5. Create ComplianceControlMapping records + + Returns a summary dict with counts. + """ + # ── 1. Create or get framework ──────────────────────────────── + framework = ( + db.query(ComplianceFramework) + .filter(ComplianceFramework.name == "NIST 800-53 Rev 5") + .first() + ) + + if not framework: + framework = ComplianceFramework( + name="NIST 800-53 Rev 5", + version="5", + description="National Institute of Standards and Technology Special Publication 800-53 Revision 5 — Security and Privacy Controls for Information Systems and Organizations", + url="https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final", + is_active=True, + ) + db.add(framework) + db.flush() + logger.info("Created NIST 800-53 Rev 5 framework") + else: + logger.info("NIST 800-53 Rev 5 framework already exists") + + # ── 2. Download STIX bundle ─────────────────────────────────── + try: + response = requests.get(NIST_MAPPING_URL, timeout=30) + response.raise_for_status() + stix_bundle = response.json() + except requests.RequestException as e: + logger.warning(f"Failed to download STIX bundle: {e}") + # Fallback: create a sample set of well-known NIST controls + return _import_sample_nist_mappings(db, framework) + + # ── 3. Parse STIX objects ───────────────────────────────────── + objects = stix_bundle.get("objects", []) + + # Build lookup maps + # STIX IDs -> control info + control_map = {} # stix_id -> {control_id, title, category} + technique_map = {} # stix_id -> mitre_technique_id + relationships = [] # (source_ref, target_ref) for "mitigates" relationships + + for obj in objects: + obj_type = obj.get("type", "") + + if obj_type == "course-of-action": + # This is a NIST control + name = obj.get("name", "") + desc = obj.get("description", "") + stix_id = obj.get("id", "") + + # Extract control ID from name (e.g., "AC-2 Account Management") + match = re.match(r"^([A-Z]{2}-\d+(?:\.\d+)?)\s*(.*)", name) + if match: + control_id = match.group(1) + title = match.group(2) or name + else: + control_id = name + title = name + + # Extract category from control family + category_match = re.match(r"^([A-Z]{2})", control_id) + category = _get_nist_category(category_match.group(1)) if category_match else None + + control_map[stix_id] = { + "control_id": control_id, + "title": title, + "description": desc[:500] if desc else None, + "category": category, + } + + elif obj_type == "attack-pattern": + # This is an ATT&CK technique + stix_id = obj.get("id", "") + ext_refs = obj.get("external_references", []) + for ref in ext_refs: + if ref.get("source_name") == "mitre-attack": + technique_map[stix_id] = ref.get("external_id", "") + break + + elif obj_type == "relationship": + rel_type = obj.get("relationship_type", "") + if rel_type == "mitigates": + source_ref = obj.get("source_ref", "") + target_ref = obj.get("target_ref", "") + relationships.append((source_ref, target_ref)) + + # ── 4. Create controls ──────────────────────────────────────── + controls_created = 0 + controls_existing = 0 + control_db_map = {} # control_id -> ComplianceControl + + # Load existing controls for this framework + existing_controls = { + c.control_id: c + for c in db.query(ComplianceControl) + .filter(ComplianceControl.framework_id == framework.id) + .all() + } + + for stix_id, info in control_map.items(): + cid = info["control_id"] + if cid in existing_controls: + control_db_map[stix_id] = existing_controls[cid] + controls_existing += 1 + else: + ctrl = ComplianceControl( + framework_id=framework.id, + control_id=cid, + title=info["title"], + description=info["description"], + category=info["category"], + ) + db.add(ctrl) + db.flush() + control_db_map[stix_id] = ctrl + controls_created += 1 + + # ── 5. Create mappings ──────────────────────────────────────── + mappings_created = 0 + mappings_skipped = 0 + + # Build technique DB lookup (mitre_id -> Technique) + all_techniques = {t.mitre_id: t for t in db.query(Technique).all()} + + # Load existing mappings + existing_mappings = set() + for m in db.query(ComplianceControlMapping).all(): + existing_mappings.add((str(m.compliance_control_id), str(m.technique_id))) + + for source_ref, target_ref in relationships: + control = control_db_map.get(source_ref) + mitre_id = technique_map.get(target_ref) + + if not control or not mitre_id: + mappings_skipped += 1 + continue + + technique = all_techniques.get(mitre_id) + if not technique: + mappings_skipped += 1 + continue + + key = (str(control.id), str(technique.id)) + if key in existing_mappings: + mappings_skipped += 1 + continue + + mapping = ComplianceControlMapping( + compliance_control_id=control.id, + technique_id=technique.id, + ) + db.add(mapping) + existing_mappings.add(key) + mappings_created += 1 + + db.commit() + + summary = { + "framework": framework.name, + "controls_created": controls_created, + "controls_existing": controls_existing, + "mappings_created": mappings_created, + "mappings_skipped": mappings_skipped, + "total_controls": controls_created + controls_existing, + "total_relationships_found": len(relationships), + } + logger.info(f"NIST 800-53 import complete: {summary}") + return summary + + +def _import_sample_nist_mappings(db: Session, framework: ComplianceFramework) -> dict: + """Import a curated sample of NIST 800-53 controls when the download fails. + + This ensures the feature works even without network access. + """ + SAMPLE_CONTROLS = [ + {"control_id": "AC-2", "title": "Account Management", "category": "Access Control", + "techniques": ["T1078", "T1136", "T1098", "T1087", "T1069"]}, + {"control_id": "AC-3", "title": "Access Enforcement", "category": "Access Control", + "techniques": ["T1078", "T1548", "T1134"]}, + {"control_id": "AC-4", "title": "Information Flow Enforcement", "category": "Access Control", + "techniques": ["T1048", "T1041", "T1572"]}, + {"control_id": "AC-6", "title": "Least Privilege", "category": "Access Control", + "techniques": ["T1078", "T1548", "T1134"]}, + {"control_id": "AU-2", "title": "Event Logging", "category": "Audit and Accountability", + "techniques": ["T1562", "T1070"]}, + {"control_id": "AU-6", "title": "Audit Record Review", "category": "Audit and Accountability", + "techniques": ["T1562", "T1070", "T1027"]}, + {"control_id": "CA-7", "title": "Continuous Monitoring", "category": "Assessment, Authorization, and Monitoring", + "techniques": ["T1059", "T1053"]}, + {"control_id": "CM-2", "title": "Baseline Configuration", "category": "Configuration Management", + "techniques": ["T1574", "T1546"]}, + {"control_id": "CM-6", "title": "Configuration Settings", "category": "Configuration Management", + "techniques": ["T1574", "T1546", "T1112"]}, + {"control_id": "CM-7", "title": "Least Functionality", "category": "Configuration Management", + "techniques": ["T1059", "T1218"]}, + {"control_id": "IA-2", "title": "Identification and Authentication", "category": "Identification and Authentication", + "techniques": ["T1078", "T1110"]}, + {"control_id": "IA-5", "title": "Authenticator Management", "category": "Identification and Authentication", + "techniques": ["T1078", "T1110", "T1003"]}, + {"control_id": "IR-4", "title": "Incident Handling", "category": "Incident Response", + "techniques": ["T1059", "T1547"]}, + {"control_id": "RA-5", "title": "Vulnerability Monitoring and Scanning", "category": "Risk Assessment", + "techniques": ["T1190", "T1203"]}, + {"control_id": "SC-7", "title": "Boundary Protection", "category": "System and Communications Protection", + "techniques": ["T1048", "T1041", "T1071"]}, + {"control_id": "SC-28", "title": "Protection of Information at Rest", "category": "System and Communications Protection", + "techniques": ["T1005", "T1114"]}, + {"control_id": "SI-3", "title": "Malicious Code Protection", "category": "System and Information Integrity", + "techniques": ["T1059", "T1204", "T1566"]}, + {"control_id": "SI-4", "title": "System Monitoring", "category": "System and Information Integrity", + "techniques": ["T1059", "T1053", "T1547"]}, + {"control_id": "SI-7", "title": "Software, Firmware, and Information Integrity", "category": "System and Information Integrity", + "techniques": ["T1195", "T1553"]}, + {"control_id": "PM-16", "title": "Threat Awareness Program", "category": "Program Management", + "techniques": ["T1566", "T1204"]}, + ] + + # Build technique lookup + all_techniques = {t.mitre_id: t for t in db.query(Technique).all()} + + existing_controls = { + c.control_id: c + for c in db.query(ComplianceControl) + .filter(ComplianceControl.framework_id == framework.id) + .all() + } + + existing_mappings = set() + for m in db.query(ComplianceControlMapping).all(): + existing_mappings.add((str(m.compliance_control_id), str(m.technique_id))) + + controls_created = 0 + mappings_created = 0 + + for sample in SAMPLE_CONTROLS: + # Create or get control + if sample["control_id"] in existing_controls: + control = existing_controls[sample["control_id"]] + else: + control = ComplianceControl( + framework_id=framework.id, + control_id=sample["control_id"], + title=sample["title"], + category=sample["category"], + ) + db.add(control) + db.flush() + existing_controls[sample["control_id"]] = control + controls_created += 1 + + # Create mappings + for mitre_id in sample["techniques"]: + technique = all_techniques.get(mitre_id) + if not technique: + # Try with subtechnique prefix + for key, tech in all_techniques.items(): + if key.startswith(mitre_id): + technique = tech + break + if not technique: + continue + + key = (str(control.id), str(technique.id)) + if key in existing_mappings: + continue + + mapping = ComplianceControlMapping( + compliance_control_id=control.id, + technique_id=technique.id, + ) + db.add(mapping) + existing_mappings.add(key) + mappings_created += 1 + + db.commit() + + return { + "framework": framework.name, + "controls_created": controls_created, + "controls_existing": len(existing_controls) - controls_created, + "mappings_created": mappings_created, + "mappings_skipped": 0, + "total_controls": len(existing_controls), + "source": "sample_data", + } + + +def _get_nist_category(family_code: str) -> str: + """Map NIST 800-53 family code to category name.""" + categories = { + "AC": "Access Control", + "AT": "Awareness and Training", + "AU": "Audit and Accountability", + "CA": "Assessment, Authorization, and Monitoring", + "CM": "Configuration Management", + "CP": "Contingency Planning", + "IA": "Identification and Authentication", + "IR": "Incident Response", + "MA": "Maintenance", + "MP": "Media Protection", + "PE": "Physical and Environmental Protection", + "PL": "Planning", + "PM": "Program Management", + "PS": "Personnel Security", + "PT": "Personally Identifiable Information Processing and Transparency", + "RA": "Risk Assessment", + "SA": "System and Services Acquisition", + "SC": "System and Communications Protection", + "SI": "System and Information Integrity", + "SR": "Supply Chain Risk Management", + } + return categories.get(family_code, "Unknown") diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 94ff34a..7debaaf 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -4,6 +4,7 @@ import DashboardPage from "./pages/DashboardPage"; import TechniquesPage from "./pages/TechniquesPage"; import MatrixPage from "./pages/MatrixPage"; import ExecutiveDashboardPage from "./pages/ExecutiveDashboardPage"; +import CompliancePage from "./pages/CompliancePage"; import TechniqueDetailPage from "./pages/TechniqueDetailPage"; import TestsPage from "./pages/TestsPage"; import TestCreatePage from "./pages/TestCreatePage"; @@ -57,6 +58,7 @@ export default function App() { } /> } /> } /> + } /> { + const { data } = await client.get("/compliance/frameworks"); + return data; +} + +/** Get compliance status for a framework. */ +export async function getFrameworkStatus( + frameworkId: string, +): Promise { + const { data } = await client.get( + `/compliance/frameworks/${frameworkId}/status`, + ); + return data; +} + +/** Get compliance gaps for a framework. */ +export async function getFrameworkGaps( + frameworkId: string, +): Promise { + const { data } = await client.get( + `/compliance/frameworks/${frameworkId}/gaps`, + ); + return data; +} + +/** Download CSV report for a framework. */ +export async function downloadComplianceCSV(frameworkId: string): Promise { + const { data } = await client.get(`/compliance/frameworks/${frameworkId}/report/csv`, { + responseType: "blob", + }); + const blob = new Blob([data], { type: "text/csv" }); + const url = URL.createObjectURL(blob); + const a = document.createElement("a"); + a.href = url; + a.download = "compliance_report.csv"; + document.body.appendChild(a); + a.click(); + document.body.removeChild(a); + URL.revokeObjectURL(url); +} + +/** Import NIST 800-53 mappings (admin). */ +export async function importNistMappings(): Promise> { + const { data } = await client.post("/compliance/import/nist-800-53"); + return data; +} diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index c69e7e4..84b8707 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -17,6 +17,7 @@ import { Zap, Grid3X3, Gauge, + ShieldCheck, } from "lucide-react"; import { useAuth } from "../context/AuthContext"; @@ -45,6 +46,7 @@ const mainLinks: NavItem[] = [ { to: "/reports", label: "Reports", icon: BarChart3 }, { to: "/threat-actors", label: "Threat Actors", icon: Crosshair }, { to: "/campaigns", label: "Campaigns", icon: Zap }, + { to: "/compliance", label: "Compliance", icon: ShieldCheck }, ]; const adminLinks: NavItem[] = [ diff --git a/frontend/src/components/compliance/ComplianceGauge.tsx b/frontend/src/components/compliance/ComplianceGauge.tsx new file mode 100644 index 0000000..fdd469b --- /dev/null +++ b/frontend/src/components/compliance/ComplianceGauge.tsx @@ -0,0 +1,62 @@ +interface ComplianceGaugeProps { + percentage: number; + size?: "sm" | "md" | "lg"; +} + +export default function ComplianceGauge({ + percentage, + size = "md", +}: ComplianceGaugeProps) { + const getColor = (p: number) => { + if (p < 30) return "#ef4444"; + if (p < 50) return "#f97316"; + if (p < 70) return "#eab308"; + return "#22c55e"; + }; + + const color = getColor(percentage); + + const sizes = { + sm: { outer: 64, radius: 26, stroke: 5, text: "text-lg", label: "text-[8px]" }, + md: { outer: 96, radius: 40, stroke: 6, text: "text-2xl", label: "text-[10px]" }, + lg: { outer: 128, radius: 54, stroke: 8, text: "text-3xl", label: "text-xs" }, + }; + + const s = sizes[size]; + const circumference = 2 * Math.PI * s.radius; + const strokeDasharray = `${(percentage / 100) * circumference} ${circumference}`; + const viewBox = `0 0 ${s.outer + 4} ${s.outer + 4}`; + const center = (s.outer + 4) / 2; + + return ( +
+ + + + +
+ + {Math.round(percentage)} + + % +
+
+ ); +} diff --git a/frontend/src/components/compliance/ControlsTable.tsx b/frontend/src/components/compliance/ControlsTable.tsx new file mode 100644 index 0000000..616c467 --- /dev/null +++ b/frontend/src/components/compliance/ControlsTable.tsx @@ -0,0 +1,216 @@ +import { useState } from "react"; +import { useNavigate } from "react-router-dom"; +import { ChevronDown, ChevronRight, Search, Filter } from "lucide-react"; +import type { ComplianceControlStatus } from "../../api/compliance"; + +interface ControlsTableProps { + controls: ComplianceControlStatus[]; +} + +const STATUS_COLORS: Record = { + covered: { bg: "bg-green-500/10", text: "text-green-400", dot: "bg-green-500" }, + partially_covered: { bg: "bg-yellow-500/10", text: "text-yellow-400", dot: "bg-yellow-500" }, + not_covered: { bg: "bg-red-500/10", text: "text-red-400", dot: "bg-red-500" }, + not_evaluated: { bg: "bg-gray-500/10", text: "text-gray-400", dot: "bg-gray-500" }, +}; + +const STATUS_LABELS: Record = { + covered: "Covered", + partially_covered: "Partial", + not_covered: "Not Covered", + not_evaluated: "Not Evaluated", +}; + +export default function ControlsTable({ controls }: ControlsTableProps) { + const navigate = useNavigate(); + const [expandedId, setExpandedId] = useState(null); + const [statusFilter, setStatusFilter] = useState("all"); + const [categoryFilter, setCategoryFilter] = useState("all"); + const [search, setSearch] = useState(""); + + // Extract unique categories + const categories = [...new Set(controls.map((c) => c.category).filter(Boolean))] as string[]; + + // Apply filters + const filteredControls = controls.filter((c) => { + if (statusFilter !== "all" && c.status !== statusFilter) return false; + if (categoryFilter !== "all" && c.category !== categoryFilter) return false; + if (search) { + const q = search.toLowerCase(); + return ( + c.control_id.toLowerCase().includes(q) || + c.title.toLowerCase().includes(q) + ); + } + return true; + }); + + const toggleExpand = (controlId: string) => { + setExpandedId(expandedId === controlId ? null : controlId); + }; + + return ( +
+ {/* Filters row */} +
+
+ +
+ + + + + +
+ + setSearch(e.target.value)} + placeholder="Search by ID or title..." + className="w-full rounded-lg border border-gray-700 bg-gray-800 py-1.5 pl-8 pr-3 text-xs text-gray-200 placeholder-gray-500 focus:border-cyan-500 focus:outline-none" + /> +
+ + + {filteredControls.length} of {controls.length} controls + +
+ + {/* Table */} +
+ + + + + + + + + + + + + {filteredControls.map((control) => { + const isExpanded = expandedId === control.control_id; + const statusStyle = STATUS_COLORS[control.status] || STATUS_COLORS.not_evaluated; + + return ( + + toggleExpand(control.control_id)} + > + + + + + + + + + + {/* Expanded row: technique details */} + {isExpanded && control.techniques.length > 0 && ( + + + + )} + + ); + })} + +
+ ControlTitleCategoryStatusScoreTechniques
+ {isExpanded ? ( + + ) : ( + + )} + + {control.control_id} + + {control.title} + + {control.category} + + + + {STATUS_LABELS[control.status]} + + + {control.score.toFixed(1)} + + {control.techniques_covered}/{control.techniques_count} +
+
+

+ Mapped Techniques +

+ {control.techniques.map((tech) => { + const techStatusColor = + tech.score >= 70 + ? "text-green-400" + : tech.score >= 30 + ? "text-yellow-400" + : tech.score > 0 + ? "text-red-400" + : "text-gray-500"; + + return ( +
{ + e.stopPropagation(); + navigate(`/techniques/${tech.mitre_id}`); + }} + > +
+ + {tech.mitre_id} + + + {tech.name} + +
+
+ + {tech.status.replace(/_/g, " ")} + + + {tech.score.toFixed(1)} + +
+
+ ); + })} +
+
+
+
+ ); +} diff --git a/frontend/src/pages/CompliancePage.tsx b/frontend/src/pages/CompliancePage.tsx new file mode 100644 index 0000000..d33b7c5 --- /dev/null +++ b/frontend/src/pages/CompliancePage.tsx @@ -0,0 +1,189 @@ +import { useState } from "react"; +import { useQuery } from "@tanstack/react-query"; +import { Loader2, AlertCircle, Download, FileText } from "lucide-react"; +import { + getComplianceFrameworks, + getFrameworkStatus, + downloadComplianceCSV, + type ComplianceFrameworkSummary, +} from "../api/compliance"; +import ComplianceGauge from "../components/compliance/ComplianceGauge"; +import ControlsTable from "../components/compliance/ControlsTable"; + +export default function CompliancePage() { + const [selectedFrameworkId, setSelectedFrameworkId] = useState(null); + + // Fetch available frameworks + const { + data: frameworks, + isLoading: loadingFrameworks, + } = useQuery({ + queryKey: ["compliance-frameworks"], + queryFn: getComplianceFrameworks, + }); + + // Auto-select first framework + const activeFrameworkId = selectedFrameworkId || frameworks?.[0]?.id || null; + + // Fetch framework status + const { + data: frameworkStatus, + isLoading: loadingStatus, + } = useQuery({ + queryKey: ["compliance-status", activeFrameworkId], + queryFn: () => getFrameworkStatus(activeFrameworkId!), + enabled: !!activeFrameworkId, + }); + + const isLoading = loadingFrameworks || loadingStatus; + const summary = frameworkStatus?.summary; + const controls = frameworkStatus?.controls || []; + + const handleExportCSV = async () => { + if (activeFrameworkId) { + await downloadComplianceCSV(activeFrameworkId); + } + }; + + const handleExportJSON = async () => { + if (!frameworkStatus) return; + const json = JSON.stringify(frameworkStatus, null, 2); + const blob = new Blob([json], { type: "application/json" }); + const url = URL.createObjectURL(blob); + const a = document.createElement("a"); + a.href = url; + a.download = `compliance_${frameworkStatus.framework.name.replace(/\s+/g, "_")}.json`; + document.body.appendChild(a); + a.click(); + document.body.removeChild(a); + URL.revokeObjectURL(url); + }; + + if (isLoading && !frameworkStatus) { + return ( +
+ +
+ ); + } + + return ( +
+ {/* Header */} +
+
+

Compliance

+

+ Map ATT&CK coverage to compliance framework controls +

+
+ +
+ {/* Framework selector */} + + + {/* Export buttons */} + + +
+
+ + {/* Summary cards */} + {summary && ( +
+ {/* Gauge */} +
+ +

Overall Compliance

+
+ + {/* Covered */} +
+

Covered

+

{summary.covered}

+
+
0 ? (summary.covered / summary.total_controls) * 100 : 0}%` }} + /> +
+
+ + {/* Partial */} +
+

Partial

+

+ {summary.partially_covered} +

+
+
0 ? (summary.partially_covered / summary.total_controls) * 100 : 0}%` }} + /> +
+
+ + {/* Not Covered */} +
+

Not Covered

+

{summary.not_covered}

+
+
0 ? (summary.not_covered / summary.total_controls) * 100 : 0}%` }} + /> +
+
+ + {/* Not Evaluated */} +
+

Not Evaluated

+

{summary.not_evaluated}

+
+
0 ? (summary.not_evaluated / summary.total_controls) * 100 : 0}%` }} + /> +
+
+
+ )} + + {/* Controls table */} + {controls.length > 0 ? ( + + ) : ( + !isLoading && ( +
+ +

+ No compliance data available. Import a compliance framework from the System page. +

+
+ ) + )} +
+ ); +}