"""Compliance import service — imports NIST 800-53 to ATT&CK mappings. Downloads and parses the STIX bundle from the Center for Threat-Informed Defense's attack_to_nist_mapping repository to create ComplianceFramework, ComplianceControl, and ComplianceControlMapping records. """ # Import logging import logging # Import re import re # Import requests import requests # Import Session from sqlalchemy.orm from sqlalchemy.orm import Session # Import from app.models.compliance from app.models.compliance import ( ComplianceControl, ComplianceControlMapping, ComplianceFramework, ) # Import Technique from app.models.technique from app.models.technique import Technique # Assign logger = logging.getLogger(__name__) logger = logging.getLogger(__name__) # ── Module-level control definitions (avoids N806 / uppercase-in-function) ──── _NIST_SAMPLE_CONTROLS = [ { # Literal argument value "control_id": "AC-2", # Literal argument value "title": "Account Management", # Literal argument value "category": "Access Control", # Literal argument value "techniques": ["T1078", "T1136", "T1098", "T1087", "T1069"], }, { # Literal argument value "control_id": "AC-3", # Literal argument value "title": "Access Enforcement", # Literal argument value "category": "Access Control", # Literal argument value "techniques": ["T1078", "T1548", "T1134"], }, { # Literal argument value "control_id": "AC-4", # Literal argument value "title": "Information Flow Enforcement", # Literal argument value "category": "Access Control", # Literal argument value "techniques": ["T1048", "T1041", "T1572"], }, { # Literal argument value "control_id": "AC-6", # Literal argument value "title": "Least Privilege", # Literal argument value "category": "Access Control", # Literal argument value "techniques": ["T1078", "T1548", "T1134"], }, { # Literal argument value "control_id": "AU-2", # Literal argument value "title": "Event Logging", # Literal argument value "category": "Audit and Accountability", # Literal argument value "techniques": ["T1562", "T1070"], }, { # Literal argument value "control_id": "AU-6", # Literal argument value "title": "Audit Record Review", # Literal argument value "category": "Audit and Accountability", # Literal argument value "techniques": ["T1562", "T1070", "T1027"], }, { # Literal argument value "control_id": "CA-7", # Literal argument value "title": "Continuous Monitoring", # Literal argument value "category": "Assessment, Authorization, and Monitoring", # Literal argument value "techniques": ["T1059", "T1053"], }, { # Literal argument value "control_id": "CM-2", # Literal argument value "title": "Baseline Configuration", # Literal argument value "category": "Configuration Management", # Literal argument value "techniques": ["T1574", "T1546"], }, { # Literal argument value "control_id": "CM-6", # Literal argument value "title": "Configuration Settings", # Literal argument value "category": "Configuration Management", # Literal argument value "techniques": ["T1574", "T1546", "T1112"], }, { # Literal argument value "control_id": "CM-7", # Literal argument value "title": "Least Functionality", # Literal argument value "category": "Configuration Management", # Literal argument value "techniques": ["T1059", "T1218"], }, { # Literal argument value "control_id": "IA-2", # Literal argument value "title": "Identification and Authentication", # Literal argument value "category": "Identification and Authentication", # Literal argument value "techniques": ["T1078", "T1110"], }, { # Literal argument value "control_id": "IA-5", # Literal argument value "title": "Authenticator Management", # Literal argument value "category": "Identification and Authentication", # Literal argument value "techniques": ["T1078", "T1110", "T1003"], }, { # Literal argument value "control_id": "IR-4", # Literal argument value "title": "Incident Handling", # Literal argument value "category": "Incident Response", # Literal argument value "techniques": ["T1059", "T1547"], }, { # Literal argument value "control_id": "RA-5", # Literal argument value "title": "Vulnerability Monitoring and Scanning", # Literal argument value "category": "Risk Assessment", # Literal argument value "techniques": ["T1190", "T1203"], }, { # Literal argument value "control_id": "SC-7", # Literal argument value "title": "Boundary Protection", # Literal argument value "category": "System and Communications Protection", # Literal argument value "techniques": ["T1048", "T1041", "T1071"], }, { # Literal argument value "control_id": "SC-28", # Literal argument value "title": "Protection of Information at Rest", # Literal argument value "category": "System and Communications Protection", # Literal argument value "techniques": ["T1005", "T1114"], }, { # Literal argument value "control_id": "SI-3", # Literal argument value "title": "Malicious Code Protection", # Literal argument value "category": "System and Information Integrity", # Literal argument value "techniques": ["T1059", "T1204", "T1566"], }, { # Literal argument value "control_id": "SI-4", # Literal argument value "title": "System Monitoring", # Literal argument value "category": "System and Information Integrity", # Literal argument value "techniques": ["T1059", "T1053", "T1547"], }, { # Literal argument value "control_id": "SI-7", # Literal argument value "title": "Software, Firmware, and Information Integrity", # Literal argument value "category": "System and Information Integrity", # Literal argument value "techniques": ["T1195", "T1553"], }, { # Literal argument value "control_id": "PM-16", # Literal argument value "title": "Threat Awareness Program", # Literal argument value "category": "Program Management", # Literal argument value "techniques": ["T1566", "T1204"], }, ] # Assign _CIS_CONTROLS = [ _CIS_CONTROLS = [ { # Literal argument value "control_id": "CIS-1", # Literal argument value "title": "Inventory and Control of Enterprise Assets", # Literal argument value "category": "IG1 — Basic", # Literal argument value "techniques": ["T1595", "T1590", "T1018", "T1082"], }, { # Literal argument value "control_id": "CIS-2", # Literal argument value "title": "Inventory and Control of Software Assets", # Literal argument value "category": "IG1 — Basic", # Literal argument value "techniques": ["T1518", "T1072", "T1195"], }, { # Literal argument value "control_id": "CIS-3", # Literal argument value "title": "Data Protection", # Literal argument value "category": "IG1 — Basic", # Literal argument value "techniques": ["T1005", "T1114", "T1560", "T1048", "T1041"], }, { # Literal argument value "control_id": "CIS-4", # Literal argument value "title": "Secure Configuration of Enterprise Assets and Software", # Literal argument value "category": "IG1 — Basic", # Literal argument value "techniques": ["T1574", "T1546", "T1112", "T1543"], }, { # Literal argument value "control_id": "CIS-5", # Literal argument value "title": "Account Management", # Literal argument value "category": "IG1 — Basic", # Literal argument value "techniques": ["T1078", "T1136", "T1098", "T1087"], }, { # Literal argument value "control_id": "CIS-6", # Literal argument value "title": "Access Control Management", # Literal argument value "category": "IG1 — Basic", # Literal argument value "techniques": ["T1078", "T1548", "T1134", "T1021"], }, { # Literal argument value "control_id": "CIS-7", # Literal argument value "title": "Continuous Vulnerability Management", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1190", "T1203", "T1068", "T1210"], }, { # Literal argument value "control_id": "CIS-8", # Literal argument value "title": "Audit Log Management", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1562", "T1070", "T1059"], }, { # Literal argument value "control_id": "CIS-9", # Literal argument value "title": "Email and Web Browser Protections", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1566", "T1204", "T1189", "T1598"], }, { # Literal argument value "control_id": "CIS-10", # Literal argument value "title": "Malware Defenses", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1059", "T1204", "T1027", "T1140", "T1497"], }, { # Literal argument value "control_id": "CIS-11", # Literal argument value "title": "Data Recovery", # Literal argument value "category": "IG1 — Basic", # Literal argument value "techniques": ["T1486", "T1490", "T1561"], }, { # Literal argument value "control_id": "CIS-12", # Literal argument value "title": "Network Infrastructure Management", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1557", "T1071", "T1572", "T1571"], }, { # Literal argument value "control_id": "CIS-13", # Literal argument value "title": "Network Monitoring and Defense", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1071", "T1048", "T1041", "T1105", "T1572"], }, { # Literal argument value "control_id": "CIS-14", # Literal argument value "title": "Security Awareness and Skills Training", # Literal argument value "category": "IG1 — Basic", # Literal argument value "techniques": ["T1566", "T1204", "T1598"], }, { # Literal argument value "control_id": "CIS-15", # Literal argument value "title": "Service Provider Management", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1199", "T1195"], }, { # Literal argument value "control_id": "CIS-16", # Literal argument value "title": "Application Software Security", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1190", "T1059", "T1203"], }, { # Literal argument value "control_id": "CIS-17", # Literal argument value "title": "Incident Response Management", # Literal argument value "category": "IG2 — Foundational", # Literal argument value "techniques": ["T1059", "T1547", "T1053"], }, { # Literal argument value "control_id": "CIS-18", # Literal argument value "title": "Penetration Testing", # Literal argument value "category": "IG3 — Organizational", # Literal argument value "techniques": ["T1595", "T1046", "T1190", "T1059"], }, ] # URL for the NIST 800-53 Rev 5 to ATT&CK mapping # This is the JSON STIX bundle that contains the relationships NIST_MAPPING_URL = ( # Literal argument value "https://raw.githubusercontent.com/center-for-threat-informed-defense/" # Literal argument value "attack_to_nist_mapping/main/data/attack-to-nist-rev5.json" ) # Define function import_nist_800_53_mappings def import_nist_800_53_mappings(db: Session) -> dict: """Import NIST 800-53 Rev 5 mappings from MITRE CTI repository. Steps: 1. Create or get the NIST 800-53 Rev 5 framework 2. Download the STIX bundle JSON 3. Parse controls and relationship objects 4. Create ComplianceControl records 5. Create ComplianceControlMapping records Returns a summary dict with counts. """ # ── 1. Create or get framework ──────────────────────────────── framework = ( db.query(ComplianceFramework) # Chain .filter() call .filter(ComplianceFramework.name == "NIST 800-53 Rev 5") # Chain .first() call .first() ) # Check: not framework if not framework: # Assign framework = ComplianceFramework( framework = ComplianceFramework( # Keyword argument: name name="NIST 800-53 Rev 5", # Keyword argument: version version="5", # Keyword argument: description description=( # Literal argument value "National Institute of Standards and Technology " # Literal argument value "Special Publication 800-53 Revision 5 — " # Literal argument value "Security and Privacy Controls for Information Systems and Organizations" ), # Keyword argument: url url="https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final", # Keyword argument: is_active is_active=True, ) # Stage new record(s) for database insertion db.add(framework) # Flush changes to DB without committing the transaction db.flush() # Log info: "Created NIST 800-53 Rev 5 framework" logger.info("Created NIST 800-53 Rev 5 framework") # Fallback: handle remaining cases else: # Log info: "NIST 800-53 Rev 5 framework already exists" logger.info("NIST 800-53 Rev 5 framework already exists") # ── 2. Download STIX bundle ─────────────────────────────────── try: # Assign response = requests.get(NIST_MAPPING_URL, timeout=30) response = requests.get(NIST_MAPPING_URL, timeout=30) # Call response.raise_for_status() response.raise_for_status() # Assign stix_bundle = response.json() stix_bundle = response.json() # Handle requests.RequestException except requests.RequestException as e: # Log warning: f"Failed to download STIX bundle: {e}" logger.warning(f"Failed to download STIX bundle: {e}") # Fallback: create a sample set of well-known NIST controls return _import_sample_nist_mappings(db, framework) # ── 3. Parse STIX objects ───────────────────────────────────── objects = stix_bundle.get("objects", []) # Build lookup maps # STIX IDs -> control info control_map = {} # stix_id -> {control_id, title, category} # Assign technique_map = {} # stix_id -> mitre_technique_id technique_map = {} # stix_id -> mitre_technique_id # Assign relationships = [] # (source_ref, target_ref) for "mitigates" relationships relationships = [] # (source_ref, target_ref) for "mitigates" relationships # Iterate over objects for obj in objects: # Assign obj_type = obj.get("type", "") obj_type = obj.get("type", "") # Check: obj_type == "course-of-action" if obj_type == "course-of-action": # This is a NIST control name = obj.get("name", "") # Assign desc = obj.get("description", "") desc = obj.get("description", "") # Assign stix_id = obj.get("id", "") stix_id = obj.get("id", "") # Extract control ID from name (e.g., "AC-2 Account Management") match = re.match(r"^([A-Z]{2}-\d+(?:\.\d+)?)\s*(.*)", name) # Check: match if match: # Assign control_id = match.group(1) control_id = match.group(1) # Assign title = match.group(2) or name title = match.group(2) or name # Fallback: handle remaining cases else: # Assign control_id = name control_id = name # Assign title = name title = name # Extract category from control family category_match = re.match(r"^([A-Z]{2})", control_id) # Assign category = _get_nist_category(category_match.group(1)) if category_match else ... category = _get_nist_category(category_match.group(1)) if category_match else None # Assign control_map[stix_id] = { control_map[stix_id] = { # Literal argument value "control_id": control_id, # Literal argument value "title": title, # Literal argument value "description": desc[:500] if desc else None, # Literal argument value "category": category, } # Alternative: obj_type == "attack-pattern" elif obj_type == "attack-pattern": # This is an ATT&CK technique stix_id = obj.get("id", "") # Assign ext_refs = obj.get("external_references", []) ext_refs = obj.get("external_references", []) # Iterate over ext_refs for ref in ext_refs: # Check: ref.get("source_name") == "mitre-attack" if ref.get("source_name") == "mitre-attack": # Assign technique_map[stix_id] = ref.get("external_id", "") technique_map[stix_id] = ref.get("external_id", "") # Exit the loop early break # Alternative: obj_type == "relationship" elif obj_type == "relationship": # Assign rel_type = obj.get("relationship_type", "") rel_type = obj.get("relationship_type", "") # Check: rel_type == "mitigates" if rel_type == "mitigates": # Assign source_ref = obj.get("source_ref", "") source_ref = obj.get("source_ref", "") # Assign target_ref = obj.get("target_ref", "") target_ref = obj.get("target_ref", "") # Call relationships.append() relationships.append((source_ref, target_ref)) # ── 4. Create controls ──────────────────────────────────────── controls_created = 0 # Assign controls_existing = 0 controls_existing = 0 # Assign control_db_map = {} # control_id -> ComplianceControl control_db_map = {} # control_id -> ComplianceControl # Load existing controls for this framework existing_controls = { c.control_id: c for c in db.query(ComplianceControl) # Chain .filter() call .filter(ComplianceControl.framework_id == framework.id) # Chain .all() call .all() } # Iterate over control_map.items() for stix_id, info in control_map.items(): # Assign cid = info["control_id"] cid = info["control_id"] # Check: cid in existing_controls if cid in existing_controls: # Assign control_db_map[stix_id] = existing_controls[cid] control_db_map[stix_id] = existing_controls[cid] # Assign controls_existing = 1 controls_existing += 1 # Fallback: handle remaining cases else: # Assign ctrl = ComplianceControl( ctrl = ComplianceControl( # Keyword argument: framework_id framework_id=framework.id, # Keyword argument: control_id control_id=cid, # Keyword argument: title title=info["title"], # Keyword argument: description description=info["description"], # Keyword argument: category category=info["category"], ) # Stage new record(s) for database insertion db.add(ctrl) # Flush changes to DB without committing the transaction db.flush() # Assign control_db_map[stix_id] = ctrl control_db_map[stix_id] = ctrl # Assign controls_created = 1 controls_created += 1 # ── 5. Create mappings ──────────────────────────────────────── mappings_created = 0 # Assign mappings_skipped = 0 mappings_skipped = 0 # Build technique DB lookup (mitre_id -> Technique) all_techniques = {t.mitre_id: t for t in db.query(Technique).all()} # Load existing mappings existing_mappings = set() # Iterate over db.query(ComplianceControlMapping).all() for m in db.query(ComplianceControlMapping).all(): # Call existing_mappings.add() existing_mappings.add((str(m.compliance_control_id), str(m.technique_id))) # Iterate over relationships for source_ref, target_ref in relationships: # Assign control = control_db_map.get(source_ref) control = control_db_map.get(source_ref) # Assign mitre_id = technique_map.get(target_ref) mitre_id = technique_map.get(target_ref) # Check: not control or not mitre_id if not control or not mitre_id: # Assign mappings_skipped = 1 mappings_skipped += 1 # Skip to the next loop iteration continue # Assign technique = all_techniques.get(mitre_id) technique = all_techniques.get(mitre_id) # Check: not technique if not technique: # Assign mappings_skipped = 1 mappings_skipped += 1 # Skip to the next loop iteration continue # Assign key = (str(control.id), str(technique.id)) key = (str(control.id), str(technique.id)) # Check: key in existing_mappings if key in existing_mappings: # Assign mappings_skipped = 1 mappings_skipped += 1 # Skip to the next loop iteration continue # Assign mapping = ComplianceControlMapping( mapping = ComplianceControlMapping( # Keyword argument: compliance_control_id compliance_control_id=control.id, # Keyword argument: technique_id technique_id=technique.id, ) # Stage new record(s) for database insertion db.add(mapping) # Call existing_mappings.add() existing_mappings.add(key) # Assign mappings_created = 1 mappings_created += 1 # Commit all pending changes to the database db.commit() # Assign summary = { summary = { # Literal argument value "framework": framework.name, # Literal argument value "controls_created": controls_created, # Literal argument value "controls_existing": controls_existing, # Literal argument value "mappings_created": mappings_created, # Literal argument value "mappings_skipped": mappings_skipped, # Literal argument value "total_controls": controls_created + controls_existing, # Literal argument value "total_relationships_found": len(relationships), } # Log info: f"NIST 800-53 import complete: {summary}" logger.info(f"NIST 800-53 import complete: {summary}") # Return summary return summary # Define function _import_sample_nist_mappings def _import_sample_nist_mappings(db: Session, framework: ComplianceFramework) -> dict: """Import a curated sample of NIST 800-53 controls when the download fails. This ensures the feature works even without network access. """ # Build technique lookup all_techniques = {t.mitre_id: t for t in db.query(Technique).all()} # Assign existing_controls = { existing_controls = { c.control_id: c for c in db.query(ComplianceControl) # Chain .filter() call .filter(ComplianceControl.framework_id == framework.id) # Chain .all() call .all() } # Assign existing_mappings = set() existing_mappings = set() # Iterate over db.query(ComplianceControlMapping).all() for m in db.query(ComplianceControlMapping).all(): # Call existing_mappings.add() existing_mappings.add((str(m.compliance_control_id), str(m.technique_id))) # Assign controls_created = 0 controls_created = 0 # Assign mappings_created = 0 mappings_created = 0 # Iterate over _NIST_SAMPLE_CONTROLS for sample in _NIST_SAMPLE_CONTROLS: # Create or get control if sample["control_id"] in existing_controls: # Assign control = existing_controls[sample["control_id"]] control = existing_controls[sample["control_id"]] # Fallback: handle remaining cases else: # Assign control = ComplianceControl( control = ComplianceControl( # Keyword argument: framework_id framework_id=framework.id, # Keyword argument: control_id control_id=sample["control_id"], # Keyword argument: title title=sample["title"], # Keyword argument: category category=sample["category"], ) # Stage new record(s) for database insertion db.add(control) # Flush changes to DB without committing the transaction db.flush() # Assign existing_controls[sample["control_id"]] = control existing_controls[sample["control_id"]] = control # Assign controls_created = 1 controls_created += 1 # Create mappings for mitre_id in sample["techniques"]: # Assign technique = all_techniques.get(mitre_id) technique = all_techniques.get(mitre_id) # Check: not technique if not technique: # Try with subtechnique prefix for key, tech in all_techniques.items(): # Check: key.startswith(mitre_id) if key.startswith(mitre_id): # Assign technique = tech technique = tech # Exit the loop early break # Check: not technique if not technique: # Skip to the next loop iteration continue # Assign key = (str(control.id), str(technique.id)) key = (str(control.id), str(technique.id)) # Check: key in existing_mappings if key in existing_mappings: # Skip to the next loop iteration continue # Assign mapping = ComplianceControlMapping( mapping = ComplianceControlMapping( # Keyword argument: compliance_control_id compliance_control_id=control.id, # Keyword argument: technique_id technique_id=technique.id, ) # Stage new record(s) for database insertion db.add(mapping) # Call existing_mappings.add() existing_mappings.add(key) # Assign mappings_created = 1 mappings_created += 1 # Commit all pending changes to the database db.commit() # Return { return { # Literal argument value "framework": framework.name, # Literal argument value "controls_created": controls_created, # Literal argument value "controls_existing": len(existing_controls) - controls_created, # Literal argument value "mappings_created": mappings_created, # Literal argument value "mappings_skipped": 0, # Literal argument value "total_controls": len(existing_controls), # Literal argument value "source": "sample_data", } # Define function import_cis_controls_v8_mappings def import_cis_controls_v8_mappings(db: Session) -> dict: """Import CIS Controls v8 with ATT&CK technique mappings. Uses a curated set of CIS Controls mapped to MITRE ATT&CK techniques based on the CIS Controls Navigator and official documentation. Returns a summary dict with counts. """ # ── 1. Create or get framework ──────────────────────────────── framework = ( db.query(ComplianceFramework) # Chain .filter() call .filter(ComplianceFramework.name == "CIS Controls v8") # Chain .first() call .first() ) # Check: not framework if not framework: # Assign framework = ComplianceFramework( framework = ComplianceFramework( # Keyword argument: name name="CIS Controls v8", # Keyword argument: version version="8", # Keyword argument: description description=( # Literal argument value "Center for Internet Security Critical Security Controls Version 8 — " # Literal argument value "a prioritized set of 18 security safeguards " # Literal argument value "organized by Implementation Groups (IG1, IG2, IG3)." ), # Keyword argument: url url="https://www.cisecurity.org/controls/v8", # Keyword argument: is_active is_active=True, ) # Stage new record(s) for database insertion db.add(framework) # Flush changes to DB without committing the transaction db.flush() # Log info: "Created CIS Controls v8 framework" logger.info("Created CIS Controls v8 framework") # Fallback: handle remaining cases else: # Log info: "CIS Controls v8 framework already exists" logger.info("CIS Controls v8 framework already exists") # ── 2. Control definitions with ATT&CK mappings ─────────────── # (defined at module level as _CIS_CONTROLS) # Build technique lookup all_techniques = {t.mitre_id: t for t in db.query(Technique).all()} # Assign existing_controls = { existing_controls = { c.control_id: c for c in db.query(ComplianceControl) # Chain .filter() call .filter(ComplianceControl.framework_id == framework.id) # Chain .all() call .all() } # Assign existing_mappings = set() existing_mappings = set() # for m in ( for m in ( db.query(ComplianceControlMapping) # Chain .join() call .join(ComplianceControl) # Chain .filter() call .filter(ComplianceControl.framework_id == framework.id) # Chain .all() call .all() ): # Call existing_mappings.add() existing_mappings.add((str(m.compliance_control_id), str(m.technique_id))) # Assign controls_created = 0 controls_created = 0 # Assign mappings_created = 0 mappings_created = 0 # Iterate over _CIS_CONTROLS for item in _CIS_CONTROLS: # Check: item["control_id"] in existing_controls if item["control_id"] in existing_controls: # Assign control = existing_controls[item["control_id"]] control = existing_controls[item["control_id"]] # Fallback: handle remaining cases else: # Assign control = ComplianceControl( control = ComplianceControl( # Keyword argument: framework_id framework_id=framework.id, # Keyword argument: control_id control_id=item["control_id"], # Keyword argument: title title=item["title"], # Keyword argument: category category=item["category"], ) # Stage new record(s) for database insertion db.add(control) # Flush changes to DB without committing the transaction db.flush() # Assign existing_controls[item["control_id"]] = control existing_controls[item["control_id"]] = control # Assign controls_created = 1 controls_created += 1 # Iterate over item["techniques"] for mitre_id in item["techniques"]: # Assign technique = all_techniques.get(mitre_id) technique = all_techniques.get(mitre_id) # Check: not technique if not technique: # Skip to the next loop iteration continue # Assign key = (str(control.id), str(technique.id)) key = (str(control.id), str(technique.id)) # Check: key in existing_mappings if key in existing_mappings: # Skip to the next loop iteration continue # Assign mapping = ComplianceControlMapping( mapping = ComplianceControlMapping( # Keyword argument: compliance_control_id compliance_control_id=control.id, # Keyword argument: technique_id technique_id=technique.id, ) # Stage new record(s) for database insertion db.add(mapping) # Call existing_mappings.add() existing_mappings.add(key) # Assign mappings_created = 1 mappings_created += 1 # Commit all pending changes to the database db.commit() # Assign summary = { summary = { # Literal argument value "framework": framework.name, # Literal argument value "controls_created": controls_created, # Literal argument value "controls_existing": len(existing_controls) - controls_created, # Literal argument value "mappings_created": mappings_created, # Literal argument value "total_controls": len(existing_controls), } # Log info: f"CIS Controls v8 import complete: {summary}" logger.info(f"CIS Controls v8 import complete: {summary}") # Return summary return summary # Define function _get_nist_category def _get_nist_category(family_code: str) -> str: """Map NIST 800-53 family code to category name.""" # Assign categories = { categories = { # Literal argument value "AC": "Access Control", # Literal argument value "AT": "Awareness and Training", # Literal argument value "AU": "Audit and Accountability", # Literal argument value "CA": "Assessment, Authorization, and Monitoring", # Literal argument value "CM": "Configuration Management", # Literal argument value "CP": "Contingency Planning", # Literal argument value "IA": "Identification and Authentication", # Literal argument value "IR": "Incident Response", # Literal argument value "MA": "Maintenance", # Literal argument value "MP": "Media Protection", # Literal argument value "PE": "Physical and Environmental Protection", # Literal argument value "PL": "Planning", # Literal argument value "PM": "Program Management", # Literal argument value "PS": "Personnel Security", # Literal argument value "PT": "Personally Identifiable Information Processing and Transparency", # Literal argument value "RA": "Risk Assessment", # Literal argument value "SA": "System and Services Acquisition", # Literal argument value "SC": "System and Communications Protection", # Literal argument value "SI": "System and Information Integrity", # Literal argument value "SR": "Supply Chain Risk Management", } # Return categories.get(family_code, "Unknown") return categories.get(family_code, "Unknown")