"""Compliance import service — imports NIST 800-53 to ATT&CK mappings. Downloads and parses the STIX bundle from the Center for Threat-Informed Defense's attack_to_nist_mapping repository to create ComplianceFramework, ComplianceControl, and ComplianceControlMapping records. """ import logging import json import re from typing import Optional import requests from sqlalchemy.orm import Session from app.models.compliance import ( ComplianceFramework, ComplianceControl, ComplianceControlMapping, ) from app.models.technique import Technique logger = logging.getLogger(__name__) # URL for the NIST 800-53 Rev 5 to ATT&CK mapping # This is the JSON STIX bundle that contains the relationships NIST_MAPPING_URL = ( "https://raw.githubusercontent.com/center-for-threat-informed-defense/" "attack_to_nist_mapping/main/data/attack-to-nist-rev5.json" ) def import_nist_800_53_mappings(db: Session) -> dict: """Import NIST 800-53 Rev 5 mappings from MITRE CTI repository. Steps: 1. Create or get the NIST 800-53 Rev 5 framework 2. Download the STIX bundle JSON 3. Parse controls and relationship objects 4. Create ComplianceControl records 5. Create ComplianceControlMapping records Returns a summary dict with counts. """ # ── 1. Create or get framework ──────────────────────────────── framework = ( db.query(ComplianceFramework) .filter(ComplianceFramework.name == "NIST 800-53 Rev 5") .first() ) if not framework: framework = ComplianceFramework( name="NIST 800-53 Rev 5", version="5", description="National Institute of Standards and Technology Special Publication 800-53 Revision 5 — Security and Privacy Controls for Information Systems and Organizations", url="https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final", is_active=True, ) db.add(framework) db.flush() logger.info("Created NIST 800-53 Rev 5 framework") else: logger.info("NIST 800-53 Rev 5 framework already exists") # ── 2. Download STIX bundle ─────────────────────────────────── try: response = requests.get(NIST_MAPPING_URL, timeout=30) response.raise_for_status() stix_bundle = response.json() except requests.RequestException as e: logger.warning(f"Failed to download STIX bundle: {e}") # Fallback: create a sample set of well-known NIST controls return _import_sample_nist_mappings(db, framework) # ── 3. Parse STIX objects ───────────────────────────────────── objects = stix_bundle.get("objects", []) # Build lookup maps # STIX IDs -> control info control_map = {} # stix_id -> {control_id, title, category} technique_map = {} # stix_id -> mitre_technique_id relationships = [] # (source_ref, target_ref) for "mitigates" relationships for obj in objects: obj_type = obj.get("type", "") if obj_type == "course-of-action": # This is a NIST control name = obj.get("name", "") desc = obj.get("description", "") stix_id = obj.get("id", "") # Extract control ID from name (e.g., "AC-2 Account Management") match = re.match(r"^([A-Z]{2}-\d+(?:\.\d+)?)\s*(.*)", name) if match: control_id = match.group(1) title = match.group(2) or name else: control_id = name title = name # Extract category from control family category_match = re.match(r"^([A-Z]{2})", control_id) category = _get_nist_category(category_match.group(1)) if category_match else None control_map[stix_id] = { "control_id": control_id, "title": title, "description": desc[:500] if desc else None, "category": category, } elif obj_type == "attack-pattern": # This is an ATT&CK technique stix_id = obj.get("id", "") ext_refs = obj.get("external_references", []) for ref in ext_refs: if ref.get("source_name") == "mitre-attack": technique_map[stix_id] = ref.get("external_id", "") break elif obj_type == "relationship": rel_type = obj.get("relationship_type", "") if rel_type == "mitigates": source_ref = obj.get("source_ref", "") target_ref = obj.get("target_ref", "") relationships.append((source_ref, target_ref)) # ── 4. Create controls ──────────────────────────────────────── controls_created = 0 controls_existing = 0 control_db_map = {} # control_id -> ComplianceControl # Load existing controls for this framework existing_controls = { c.control_id: c for c in db.query(ComplianceControl) .filter(ComplianceControl.framework_id == framework.id) .all() } for stix_id, info in control_map.items(): cid = info["control_id"] if cid in existing_controls: control_db_map[stix_id] = existing_controls[cid] controls_existing += 1 else: ctrl = ComplianceControl( framework_id=framework.id, control_id=cid, title=info["title"], description=info["description"], category=info["category"], ) db.add(ctrl) db.flush() control_db_map[stix_id] = ctrl controls_created += 1 # ── 5. Create mappings ──────────────────────────────────────── mappings_created = 0 mappings_skipped = 0 # Build technique DB lookup (mitre_id -> Technique) all_techniques = {t.mitre_id: t for t in db.query(Technique).all()} # Load existing mappings existing_mappings = set() for m in db.query(ComplianceControlMapping).all(): existing_mappings.add((str(m.compliance_control_id), str(m.technique_id))) for source_ref, target_ref in relationships: control = control_db_map.get(source_ref) mitre_id = technique_map.get(target_ref) if not control or not mitre_id: mappings_skipped += 1 continue technique = all_techniques.get(mitre_id) if not technique: mappings_skipped += 1 continue key = (str(control.id), str(technique.id)) if key in existing_mappings: mappings_skipped += 1 continue mapping = ComplianceControlMapping( compliance_control_id=control.id, technique_id=technique.id, ) db.add(mapping) existing_mappings.add(key) mappings_created += 1 db.commit() summary = { "framework": framework.name, "controls_created": controls_created, "controls_existing": controls_existing, "mappings_created": mappings_created, "mappings_skipped": mappings_skipped, "total_controls": controls_created + controls_existing, "total_relationships_found": len(relationships), } logger.info(f"NIST 800-53 import complete: {summary}") return summary def _import_sample_nist_mappings(db: Session, framework: ComplianceFramework) -> dict: """Import a curated sample of NIST 800-53 controls when the download fails. This ensures the feature works even without network access. """ SAMPLE_CONTROLS = [ {"control_id": "AC-2", "title": "Account Management", "category": "Access Control", "techniques": ["T1078", "T1136", "T1098", "T1087", "T1069"]}, {"control_id": "AC-3", "title": "Access Enforcement", "category": "Access Control", "techniques": ["T1078", "T1548", "T1134"]}, {"control_id": "AC-4", "title": "Information Flow Enforcement", "category": "Access Control", "techniques": ["T1048", "T1041", "T1572"]}, {"control_id": "AC-6", "title": "Least Privilege", "category": "Access Control", "techniques": ["T1078", "T1548", "T1134"]}, {"control_id": "AU-2", "title": "Event Logging", "category": "Audit and Accountability", "techniques": ["T1562", "T1070"]}, {"control_id": "AU-6", "title": "Audit Record Review", "category": "Audit and Accountability", "techniques": ["T1562", "T1070", "T1027"]}, {"control_id": "CA-7", "title": "Continuous Monitoring", "category": "Assessment, Authorization, and Monitoring", "techniques": ["T1059", "T1053"]}, {"control_id": "CM-2", "title": "Baseline Configuration", "category": "Configuration Management", "techniques": ["T1574", "T1546"]}, {"control_id": "CM-6", "title": "Configuration Settings", "category": "Configuration Management", "techniques": ["T1574", "T1546", "T1112"]}, {"control_id": "CM-7", "title": "Least Functionality", "category": "Configuration Management", "techniques": ["T1059", "T1218"]}, {"control_id": "IA-2", "title": "Identification and Authentication", "category": "Identification and Authentication", "techniques": ["T1078", "T1110"]}, {"control_id": "IA-5", "title": "Authenticator Management", "category": "Identification and Authentication", "techniques": ["T1078", "T1110", "T1003"]}, {"control_id": "IR-4", "title": "Incident Handling", "category": "Incident Response", "techniques": ["T1059", "T1547"]}, {"control_id": "RA-5", "title": "Vulnerability Monitoring and Scanning", "category": "Risk Assessment", "techniques": ["T1190", "T1203"]}, {"control_id": "SC-7", "title": "Boundary Protection", "category": "System and Communications Protection", "techniques": ["T1048", "T1041", "T1071"]}, {"control_id": "SC-28", "title": "Protection of Information at Rest", "category": "System and Communications Protection", "techniques": ["T1005", "T1114"]}, {"control_id": "SI-3", "title": "Malicious Code Protection", "category": "System and Information Integrity", "techniques": ["T1059", "T1204", "T1566"]}, {"control_id": "SI-4", "title": "System Monitoring", "category": "System and Information Integrity", "techniques": ["T1059", "T1053", "T1547"]}, {"control_id": "SI-7", "title": "Software, Firmware, and Information Integrity", "category": "System and Information Integrity", "techniques": ["T1195", "T1553"]}, {"control_id": "PM-16", "title": "Threat Awareness Program", "category": "Program Management", "techniques": ["T1566", "T1204"]}, ] # Build technique lookup all_techniques = {t.mitre_id: t for t in db.query(Technique).all()} existing_controls = { c.control_id: c for c in db.query(ComplianceControl) .filter(ComplianceControl.framework_id == framework.id) .all() } existing_mappings = set() for m in db.query(ComplianceControlMapping).all(): existing_mappings.add((str(m.compliance_control_id), str(m.technique_id))) controls_created = 0 mappings_created = 0 for sample in SAMPLE_CONTROLS: # Create or get control if sample["control_id"] in existing_controls: control = existing_controls[sample["control_id"]] else: control = ComplianceControl( framework_id=framework.id, control_id=sample["control_id"], title=sample["title"], category=sample["category"], ) db.add(control) db.flush() existing_controls[sample["control_id"]] = control controls_created += 1 # Create mappings for mitre_id in sample["techniques"]: technique = all_techniques.get(mitre_id) if not technique: # Try with subtechnique prefix for key, tech in all_techniques.items(): if key.startswith(mitre_id): technique = tech break if not technique: continue key = (str(control.id), str(technique.id)) if key in existing_mappings: continue mapping = ComplianceControlMapping( compliance_control_id=control.id, technique_id=technique.id, ) db.add(mapping) existing_mappings.add(key) mappings_created += 1 db.commit() return { "framework": framework.name, "controls_created": controls_created, "controls_existing": len(existing_controls) - controls_created, "mappings_created": mappings_created, "mappings_skipped": 0, "total_controls": len(existing_controls), "source": "sample_data", } def import_cis_controls_v8_mappings(db: Session) -> dict: """Import CIS Controls v8 with ATT&CK technique mappings. Uses a curated set of CIS Controls mapped to MITRE ATT&CK techniques based on the CIS Controls Navigator and official documentation. Returns a summary dict with counts. """ # ── 1. Create or get framework ──────────────────────────────── framework = ( db.query(ComplianceFramework) .filter(ComplianceFramework.name == "CIS Controls v8") .first() ) if not framework: framework = ComplianceFramework( name="CIS Controls v8", version="8", description="Center for Internet Security Critical Security Controls Version 8 — " "a prioritized set of 18 security safeguards organized by Implementation Groups (IG1, IG2, IG3).", url="https://www.cisecurity.org/controls/v8", is_active=True, ) db.add(framework) db.flush() logger.info("Created CIS Controls v8 framework") else: logger.info("CIS Controls v8 framework already exists") # ── 2. Control definitions with ATT&CK mappings ─────────────── CIS_CONTROLS = [ {"control_id": "CIS-1", "title": "Inventory and Control of Enterprise Assets", "category": "IG1 — Basic", "techniques": ["T1595", "T1590", "T1018", "T1082"]}, {"control_id": "CIS-2", "title": "Inventory and Control of Software Assets", "category": "IG1 — Basic", "techniques": ["T1518", "T1072", "T1195"]}, {"control_id": "CIS-3", "title": "Data Protection", "category": "IG1 — Basic", "techniques": ["T1005", "T1114", "T1560", "T1048", "T1041"]}, {"control_id": "CIS-4", "title": "Secure Configuration of Enterprise Assets and Software", "category": "IG1 — Basic", "techniques": ["T1574", "T1546", "T1112", "T1543"]}, {"control_id": "CIS-5", "title": "Account Management", "category": "IG1 — Basic", "techniques": ["T1078", "T1136", "T1098", "T1087"]}, {"control_id": "CIS-6", "title": "Access Control Management", "category": "IG1 — Basic", "techniques": ["T1078", "T1548", "T1134", "T1021"]}, {"control_id": "CIS-7", "title": "Continuous Vulnerability Management", "category": "IG2 — Foundational", "techniques": ["T1190", "T1203", "T1068", "T1210"]}, {"control_id": "CIS-8", "title": "Audit Log Management", "category": "IG2 — Foundational", "techniques": ["T1562", "T1070", "T1059"]}, {"control_id": "CIS-9", "title": "Email and Web Browser Protections", "category": "IG2 — Foundational", "techniques": ["T1566", "T1204", "T1189", "T1598"]}, {"control_id": "CIS-10", "title": "Malware Defenses", "category": "IG2 — Foundational", "techniques": ["T1059", "T1204", "T1027", "T1140", "T1497"]}, {"control_id": "CIS-11", "title": "Data Recovery", "category": "IG1 — Basic", "techniques": ["T1486", "T1490", "T1561"]}, {"control_id": "CIS-12", "title": "Network Infrastructure Management", "category": "IG2 — Foundational", "techniques": ["T1557", "T1071", "T1572", "T1571"]}, {"control_id": "CIS-13", "title": "Network Monitoring and Defense", "category": "IG2 — Foundational", "techniques": ["T1071", "T1048", "T1041", "T1105", "T1572"]}, {"control_id": "CIS-14", "title": "Security Awareness and Skills Training", "category": "IG1 — Basic", "techniques": ["T1566", "T1204", "T1598"]}, {"control_id": "CIS-15", "title": "Service Provider Management", "category": "IG2 — Foundational", "techniques": ["T1199", "T1195"]}, {"control_id": "CIS-16", "title": "Application Software Security", "category": "IG2 — Foundational", "techniques": ["T1190", "T1059", "T1203"]}, {"control_id": "CIS-17", "title": "Incident Response Management", "category": "IG2 — Foundational", "techniques": ["T1059", "T1547", "T1053"]}, {"control_id": "CIS-18", "title": "Penetration Testing", "category": "IG3 — Organizational", "techniques": ["T1595", "T1046", "T1190", "T1059"]}, ] # Build technique lookup all_techniques = {t.mitre_id: t for t in db.query(Technique).all()} existing_controls = { c.control_id: c for c in db.query(ComplianceControl) .filter(ComplianceControl.framework_id == framework.id) .all() } existing_mappings = set() for m in ( db.query(ComplianceControlMapping) .join(ComplianceControl) .filter(ComplianceControl.framework_id == framework.id) .all() ): existing_mappings.add((str(m.compliance_control_id), str(m.technique_id))) controls_created = 0 mappings_created = 0 for item in CIS_CONTROLS: if item["control_id"] in existing_controls: control = existing_controls[item["control_id"]] else: control = ComplianceControl( framework_id=framework.id, control_id=item["control_id"], title=item["title"], category=item["category"], ) db.add(control) db.flush() existing_controls[item["control_id"]] = control controls_created += 1 for mitre_id in item["techniques"]: technique = all_techniques.get(mitre_id) if not technique: continue key = (str(control.id), str(technique.id)) if key in existing_mappings: continue mapping = ComplianceControlMapping( compliance_control_id=control.id, technique_id=technique.id, ) db.add(mapping) existing_mappings.add(key) mappings_created += 1 db.commit() summary = { "framework": framework.name, "controls_created": controls_created, "controls_existing": len(existing_controls) - controls_created, "mappings_created": mappings_created, "total_controls": len(existing_controls), } logger.info(f"CIS Controls v8 import complete: {summary}") return summary def _get_nist_category(family_code: str) -> str: """Map NIST 800-53 family code to category name.""" categories = { "AC": "Access Control", "AT": "Awareness and Training", "AU": "Audit and Accountability", "CA": "Assessment, Authorization, and Monitoring", "CM": "Configuration Management", "CP": "Contingency Planning", "IA": "Identification and Authentication", "IR": "Incident Response", "MA": "Maintenance", "MP": "Media Protection", "PE": "Physical and Environmental Protection", "PL": "Planning", "PM": "Program Management", "PS": "Personnel Security", "PT": "Personally Identifiable Information Processing and Transparency", "RA": "Risk Assessment", "SA": "System and Services Acquisition", "SC": "System and Communications Protection", "SI": "System and Information Integrity", "SR": "Supply Chain Risk Management", } return categories.get(family_code, "Unknown")