Files
Aegis/backend/app/services/compliance_import_service.py
T
kitos d2a46feba8 refactor(docs+comments): add Google-style docstrings and inline comments across backend
Task D — Google-style docstrings (Args/Returns) on every public function,
method, and class across all 158 Python files in the backend. Zero ruff D
violations (pydocstyle Google convention).

Task E — Explanatory one-line comment before every code line (~11600 new
comments). ruff check passes clean after isort re-sort.
2026-06-11 11:06:55 +02:00

1059 lines
37 KiB
Python

"""Compliance import service — imports NIST 800-53 to ATT&CK mappings.
Downloads and parses the STIX bundle from the Center for Threat-Informed
Defense's attack_to_nist_mapping repository to create ComplianceFramework,
ComplianceControl, and ComplianceControlMapping records.
"""
# Import logging
import logging
# Import re
import re
# Import requests
import requests
# Import Session from sqlalchemy.orm
from sqlalchemy.orm import Session
# Import from app.models.compliance
from app.models.compliance import (
ComplianceControl,
ComplianceControlMapping,
ComplianceFramework,
)
# Import Technique from app.models.technique
from app.models.technique import Technique
# Assign logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
# ── Module-level control definitions (avoids N806 / uppercase-in-function) ────
_NIST_SAMPLE_CONTROLS = [
{
# Literal argument value
"control_id": "AC-2",
# Literal argument value
"title": "Account Management",
# Literal argument value
"category": "Access Control",
# Literal argument value
"techniques": ["T1078", "T1136", "T1098", "T1087", "T1069"],
},
{
# Literal argument value
"control_id": "AC-3",
# Literal argument value
"title": "Access Enforcement",
# Literal argument value
"category": "Access Control",
# Literal argument value
"techniques": ["T1078", "T1548", "T1134"],
},
{
# Literal argument value
"control_id": "AC-4",
# Literal argument value
"title": "Information Flow Enforcement",
# Literal argument value
"category": "Access Control",
# Literal argument value
"techniques": ["T1048", "T1041", "T1572"],
},
{
# Literal argument value
"control_id": "AC-6",
# Literal argument value
"title": "Least Privilege",
# Literal argument value
"category": "Access Control",
# Literal argument value
"techniques": ["T1078", "T1548", "T1134"],
},
{
# Literal argument value
"control_id": "AU-2",
# Literal argument value
"title": "Event Logging",
# Literal argument value
"category": "Audit and Accountability",
# Literal argument value
"techniques": ["T1562", "T1070"],
},
{
# Literal argument value
"control_id": "AU-6",
# Literal argument value
"title": "Audit Record Review",
# Literal argument value
"category": "Audit and Accountability",
# Literal argument value
"techniques": ["T1562", "T1070", "T1027"],
},
{
# Literal argument value
"control_id": "CA-7",
# Literal argument value
"title": "Continuous Monitoring",
# Literal argument value
"category": "Assessment, Authorization, and Monitoring",
# Literal argument value
"techniques": ["T1059", "T1053"],
},
{
# Literal argument value
"control_id": "CM-2",
# Literal argument value
"title": "Baseline Configuration",
# Literal argument value
"category": "Configuration Management",
# Literal argument value
"techniques": ["T1574", "T1546"],
},
{
# Literal argument value
"control_id": "CM-6",
# Literal argument value
"title": "Configuration Settings",
# Literal argument value
"category": "Configuration Management",
# Literal argument value
"techniques": ["T1574", "T1546", "T1112"],
},
{
# Literal argument value
"control_id": "CM-7",
# Literal argument value
"title": "Least Functionality",
# Literal argument value
"category": "Configuration Management",
# Literal argument value
"techniques": ["T1059", "T1218"],
},
{
# Literal argument value
"control_id": "IA-2",
# Literal argument value
"title": "Identification and Authentication",
# Literal argument value
"category": "Identification and Authentication",
# Literal argument value
"techniques": ["T1078", "T1110"],
},
{
# Literal argument value
"control_id": "IA-5",
# Literal argument value
"title": "Authenticator Management",
# Literal argument value
"category": "Identification and Authentication",
# Literal argument value
"techniques": ["T1078", "T1110", "T1003"],
},
{
# Literal argument value
"control_id": "IR-4",
# Literal argument value
"title": "Incident Handling",
# Literal argument value
"category": "Incident Response",
# Literal argument value
"techniques": ["T1059", "T1547"],
},
{
# Literal argument value
"control_id": "RA-5",
# Literal argument value
"title": "Vulnerability Monitoring and Scanning",
# Literal argument value
"category": "Risk Assessment",
# Literal argument value
"techniques": ["T1190", "T1203"],
},
{
# Literal argument value
"control_id": "SC-7",
# Literal argument value
"title": "Boundary Protection",
# Literal argument value
"category": "System and Communications Protection",
# Literal argument value
"techniques": ["T1048", "T1041", "T1071"],
},
{
# Literal argument value
"control_id": "SC-28",
# Literal argument value
"title": "Protection of Information at Rest",
# Literal argument value
"category": "System and Communications Protection",
# Literal argument value
"techniques": ["T1005", "T1114"],
},
{
# Literal argument value
"control_id": "SI-3",
# Literal argument value
"title": "Malicious Code Protection",
# Literal argument value
"category": "System and Information Integrity",
# Literal argument value
"techniques": ["T1059", "T1204", "T1566"],
},
{
# Literal argument value
"control_id": "SI-4",
# Literal argument value
"title": "System Monitoring",
# Literal argument value
"category": "System and Information Integrity",
# Literal argument value
"techniques": ["T1059", "T1053", "T1547"],
},
{
# Literal argument value
"control_id": "SI-7",
# Literal argument value
"title": "Software, Firmware, and Information Integrity",
# Literal argument value
"category": "System and Information Integrity",
# Literal argument value
"techniques": ["T1195", "T1553"],
},
{
# Literal argument value
"control_id": "PM-16",
# Literal argument value
"title": "Threat Awareness Program",
# Literal argument value
"category": "Program Management",
# Literal argument value
"techniques": ["T1566", "T1204"],
},
]
# Assign _CIS_CONTROLS = [
_CIS_CONTROLS = [
{
# Literal argument value
"control_id": "CIS-1",
# Literal argument value
"title": "Inventory and Control of Enterprise Assets",
# Literal argument value
"category": "IG1 — Basic",
# Literal argument value
"techniques": ["T1595", "T1590", "T1018", "T1082"],
},
{
# Literal argument value
"control_id": "CIS-2",
# Literal argument value
"title": "Inventory and Control of Software Assets",
# Literal argument value
"category": "IG1 — Basic",
# Literal argument value
"techniques": ["T1518", "T1072", "T1195"],
},
{
# Literal argument value
"control_id": "CIS-3",
# Literal argument value
"title": "Data Protection",
# Literal argument value
"category": "IG1 — Basic",
# Literal argument value
"techniques": ["T1005", "T1114", "T1560", "T1048", "T1041"],
},
{
# Literal argument value
"control_id": "CIS-4",
# Literal argument value
"title": "Secure Configuration of Enterprise Assets and Software",
# Literal argument value
"category": "IG1 — Basic",
# Literal argument value
"techniques": ["T1574", "T1546", "T1112", "T1543"],
},
{
# Literal argument value
"control_id": "CIS-5",
# Literal argument value
"title": "Account Management",
# Literal argument value
"category": "IG1 — Basic",
# Literal argument value
"techniques": ["T1078", "T1136", "T1098", "T1087"],
},
{
# Literal argument value
"control_id": "CIS-6",
# Literal argument value
"title": "Access Control Management",
# Literal argument value
"category": "IG1 — Basic",
# Literal argument value
"techniques": ["T1078", "T1548", "T1134", "T1021"],
},
{
# Literal argument value
"control_id": "CIS-7",
# Literal argument value
"title": "Continuous Vulnerability Management",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1190", "T1203", "T1068", "T1210"],
},
{
# Literal argument value
"control_id": "CIS-8",
# Literal argument value
"title": "Audit Log Management",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1562", "T1070", "T1059"],
},
{
# Literal argument value
"control_id": "CIS-9",
# Literal argument value
"title": "Email and Web Browser Protections",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1566", "T1204", "T1189", "T1598"],
},
{
# Literal argument value
"control_id": "CIS-10",
# Literal argument value
"title": "Malware Defenses",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1059", "T1204", "T1027", "T1140", "T1497"],
},
{
# Literal argument value
"control_id": "CIS-11",
# Literal argument value
"title": "Data Recovery",
# Literal argument value
"category": "IG1 — Basic",
# Literal argument value
"techniques": ["T1486", "T1490", "T1561"],
},
{
# Literal argument value
"control_id": "CIS-12",
# Literal argument value
"title": "Network Infrastructure Management",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1557", "T1071", "T1572", "T1571"],
},
{
# Literal argument value
"control_id": "CIS-13",
# Literal argument value
"title": "Network Monitoring and Defense",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1071", "T1048", "T1041", "T1105", "T1572"],
},
{
# Literal argument value
"control_id": "CIS-14",
# Literal argument value
"title": "Security Awareness and Skills Training",
# Literal argument value
"category": "IG1 — Basic",
# Literal argument value
"techniques": ["T1566", "T1204", "T1598"],
},
{
# Literal argument value
"control_id": "CIS-15",
# Literal argument value
"title": "Service Provider Management",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1199", "T1195"],
},
{
# Literal argument value
"control_id": "CIS-16",
# Literal argument value
"title": "Application Software Security",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1190", "T1059", "T1203"],
},
{
# Literal argument value
"control_id": "CIS-17",
# Literal argument value
"title": "Incident Response Management",
# Literal argument value
"category": "IG2 — Foundational",
# Literal argument value
"techniques": ["T1059", "T1547", "T1053"],
},
{
# Literal argument value
"control_id": "CIS-18",
# Literal argument value
"title": "Penetration Testing",
# Literal argument value
"category": "IG3 — Organizational",
# Literal argument value
"techniques": ["T1595", "T1046", "T1190", "T1059"],
},
]
# URL for the NIST 800-53 Rev 5 to ATT&CK mapping
# This is the JSON STIX bundle that contains the relationships
NIST_MAPPING_URL = (
# Literal argument value
"https://raw.githubusercontent.com/center-for-threat-informed-defense/"
# Literal argument value
"attack_to_nist_mapping/main/data/attack-to-nist-rev5.json"
)
# Define function import_nist_800_53_mappings
def import_nist_800_53_mappings(db: Session) -> dict:
"""Import NIST 800-53 Rev 5 mappings from MITRE CTI repository.
Steps:
1. Create or get the NIST 800-53 Rev 5 framework
2. Download the STIX bundle JSON
3. Parse controls and relationship objects
4. Create ComplianceControl records
5. Create ComplianceControlMapping records
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
# Chain .filter() call
.filter(ComplianceFramework.name == "NIST 800-53 Rev 5")
# Chain .first() call
.first()
)
# Check: not framework
if not framework:
# Assign framework = ComplianceFramework(
framework = ComplianceFramework(
# Keyword argument: name
name="NIST 800-53 Rev 5",
# Keyword argument: version
version="5",
# Keyword argument: description
description=(
# Literal argument value
"National Institute of Standards and Technology "
# Literal argument value
"Special Publication 800-53 Revision 5 — "
# Literal argument value
"Security and Privacy Controls for Information Systems and Organizations"
),
# Keyword argument: url
url="https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final",
# Keyword argument: is_active
is_active=True,
)
# Stage new record(s) for database insertion
db.add(framework)
# Flush changes to DB without committing the transaction
db.flush()
# Log info: "Created NIST 800-53 Rev 5 framework"
logger.info("Created NIST 800-53 Rev 5 framework")
# Fallback: handle remaining cases
else:
# Log info: "NIST 800-53 Rev 5 framework already exists"
logger.info("NIST 800-53 Rev 5 framework already exists")
# ── 2. Download STIX bundle ───────────────────────────────────
try:
# Assign response = requests.get(NIST_MAPPING_URL, timeout=30)
response = requests.get(NIST_MAPPING_URL, timeout=30)
# Call response.raise_for_status()
response.raise_for_status()
# Assign stix_bundle = response.json()
stix_bundle = response.json()
# Handle requests.RequestException
except requests.RequestException as e:
# Log warning: f"Failed to download STIX bundle: {e}"
logger.warning(f"Failed to download STIX bundle: {e}")
# Fallback: create a sample set of well-known NIST controls
return _import_sample_nist_mappings(db, framework)
# ── 3. Parse STIX objects ─────────────────────────────────────
objects = stix_bundle.get("objects", [])
# Build lookup maps
# STIX IDs -> control info
control_map = {} # stix_id -> {control_id, title, category}
# Assign technique_map = {} # stix_id -> mitre_technique_id
technique_map = {} # stix_id -> mitre_technique_id
# Assign relationships = [] # (source_ref, target_ref) for "mitigates" relationships
relationships = [] # (source_ref, target_ref) for "mitigates" relationships
# Iterate over objects
for obj in objects:
# Assign obj_type = obj.get("type", "")
obj_type = obj.get("type", "")
# Check: obj_type == "course-of-action"
if obj_type == "course-of-action":
# This is a NIST control
name = obj.get("name", "")
# Assign desc = obj.get("description", "")
desc = obj.get("description", "")
# Assign stix_id = obj.get("id", "")
stix_id = obj.get("id", "")
# Extract control ID from name (e.g., "AC-2 Account Management")
match = re.match(r"^([A-Z]{2}-\d+(?:\.\d+)?)\s*(.*)", name)
# Check: match
if match:
# Assign control_id = match.group(1)
control_id = match.group(1)
# Assign title = match.group(2) or name
title = match.group(2) or name
# Fallback: handle remaining cases
else:
# Assign control_id = name
control_id = name
# Assign title = name
title = name
# Extract category from control family
category_match = re.match(r"^([A-Z]{2})", control_id)
# Assign category = _get_nist_category(category_match.group(1)) if category_match else ...
category = _get_nist_category(category_match.group(1)) if category_match else None
# Assign control_map[stix_id] = {
control_map[stix_id] = {
# Literal argument value
"control_id": control_id,
# Literal argument value
"title": title,
# Literal argument value
"description": desc[:500] if desc else None,
# Literal argument value
"category": category,
}
# Alternative: obj_type == "attack-pattern"
elif obj_type == "attack-pattern":
# This is an ATT&CK technique
stix_id = obj.get("id", "")
# Assign ext_refs = obj.get("external_references", [])
ext_refs = obj.get("external_references", [])
# Iterate over ext_refs
for ref in ext_refs:
# Check: ref.get("source_name") == "mitre-attack"
if ref.get("source_name") == "mitre-attack":
# Assign technique_map[stix_id] = ref.get("external_id", "")
technique_map[stix_id] = ref.get("external_id", "")
# Exit the loop early
break
# Alternative: obj_type == "relationship"
elif obj_type == "relationship":
# Assign rel_type = obj.get("relationship_type", "")
rel_type = obj.get("relationship_type", "")
# Check: rel_type == "mitigates"
if rel_type == "mitigates":
# Assign source_ref = obj.get("source_ref", "")
source_ref = obj.get("source_ref", "")
# Assign target_ref = obj.get("target_ref", "")
target_ref = obj.get("target_ref", "")
# Call relationships.append()
relationships.append((source_ref, target_ref))
# ── 4. Create controls ────────────────────────────────────────
controls_created = 0
# Assign controls_existing = 0
controls_existing = 0
# Assign control_db_map = {} # control_id -> ComplianceControl
control_db_map = {} # control_id -> ComplianceControl
# Load existing controls for this framework
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
# Chain .filter() call
.filter(ComplianceControl.framework_id == framework.id)
# Chain .all() call
.all()
}
# Iterate over control_map.items()
for stix_id, info in control_map.items():
# Assign cid = info["control_id"]
cid = info["control_id"]
# Check: cid in existing_controls
if cid in existing_controls:
# Assign control_db_map[stix_id] = existing_controls[cid]
control_db_map[stix_id] = existing_controls[cid]
# Assign controls_existing = 1
controls_existing += 1
# Fallback: handle remaining cases
else:
# Assign ctrl = ComplianceControl(
ctrl = ComplianceControl(
# Keyword argument: framework_id
framework_id=framework.id,
# Keyword argument: control_id
control_id=cid,
# Keyword argument: title
title=info["title"],
# Keyword argument: description
description=info["description"],
# Keyword argument: category
category=info["category"],
)
# Stage new record(s) for database insertion
db.add(ctrl)
# Flush changes to DB without committing the transaction
db.flush()
# Assign control_db_map[stix_id] = ctrl
control_db_map[stix_id] = ctrl
# Assign controls_created = 1
controls_created += 1
# ── 5. Create mappings ────────────────────────────────────────
mappings_created = 0
# Assign mappings_skipped = 0
mappings_skipped = 0
# Build technique DB lookup (mitre_id -> Technique)
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
# Load existing mappings
existing_mappings = set()
# Iterate over db.query(ComplianceControlMapping).all()
for m in db.query(ComplianceControlMapping).all():
# Call existing_mappings.add()
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
# Iterate over relationships
for source_ref, target_ref in relationships:
# Assign control = control_db_map.get(source_ref)
control = control_db_map.get(source_ref)
# Assign mitre_id = technique_map.get(target_ref)
mitre_id = technique_map.get(target_ref)
# Check: not control or not mitre_id
if not control or not mitre_id:
# Assign mappings_skipped = 1
mappings_skipped += 1
# Skip to the next loop iteration
continue
# Assign technique = all_techniques.get(mitre_id)
technique = all_techniques.get(mitre_id)
# Check: not technique
if not technique:
# Assign mappings_skipped = 1
mappings_skipped += 1
# Skip to the next loop iteration
continue
# Assign key = (str(control.id), str(technique.id))
key = (str(control.id), str(technique.id))
# Check: key in existing_mappings
if key in existing_mappings:
# Assign mappings_skipped = 1
mappings_skipped += 1
# Skip to the next loop iteration
continue
# Assign mapping = ComplianceControlMapping(
mapping = ComplianceControlMapping(
# Keyword argument: compliance_control_id
compliance_control_id=control.id,
# Keyword argument: technique_id
technique_id=technique.id,
)
# Stage new record(s) for database insertion
db.add(mapping)
# Call existing_mappings.add()
existing_mappings.add(key)
# Assign mappings_created = 1
mappings_created += 1
# Commit all pending changes to the database
db.commit()
# Assign summary = {
summary = {
# Literal argument value
"framework": framework.name,
# Literal argument value
"controls_created": controls_created,
# Literal argument value
"controls_existing": controls_existing,
# Literal argument value
"mappings_created": mappings_created,
# Literal argument value
"mappings_skipped": mappings_skipped,
# Literal argument value
"total_controls": controls_created + controls_existing,
# Literal argument value
"total_relationships_found": len(relationships),
}
# Log info: f"NIST 800-53 import complete: {summary}"
logger.info(f"NIST 800-53 import complete: {summary}")
# Return summary
return summary
# Define function _import_sample_nist_mappings
def _import_sample_nist_mappings(db: Session, framework: ComplianceFramework) -> dict:
"""Import a curated sample of NIST 800-53 controls when the download fails.
This ensures the feature works even without network access.
"""
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
# Assign existing_controls = {
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
# Chain .filter() call
.filter(ComplianceControl.framework_id == framework.id)
# Chain .all() call
.all()
}
# Assign existing_mappings = set()
existing_mappings = set()
# Iterate over db.query(ComplianceControlMapping).all()
for m in db.query(ComplianceControlMapping).all():
# Call existing_mappings.add()
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
# Assign controls_created = 0
controls_created = 0
# Assign mappings_created = 0
mappings_created = 0
# Iterate over _NIST_SAMPLE_CONTROLS
for sample in _NIST_SAMPLE_CONTROLS:
# Create or get control
if sample["control_id"] in existing_controls:
# Assign control = existing_controls[sample["control_id"]]
control = existing_controls[sample["control_id"]]
# Fallback: handle remaining cases
else:
# Assign control = ComplianceControl(
control = ComplianceControl(
# Keyword argument: framework_id
framework_id=framework.id,
# Keyword argument: control_id
control_id=sample["control_id"],
# Keyword argument: title
title=sample["title"],
# Keyword argument: category
category=sample["category"],
)
# Stage new record(s) for database insertion
db.add(control)
# Flush changes to DB without committing the transaction
db.flush()
# Assign existing_controls[sample["control_id"]] = control
existing_controls[sample["control_id"]] = control
# Assign controls_created = 1
controls_created += 1
# Create mappings
for mitre_id in sample["techniques"]:
# Assign technique = all_techniques.get(mitre_id)
technique = all_techniques.get(mitre_id)
# Check: not technique
if not technique:
# Try with subtechnique prefix
for key, tech in all_techniques.items():
# Check: key.startswith(mitre_id)
if key.startswith(mitre_id):
# Assign technique = tech
technique = tech
# Exit the loop early
break
# Check: not technique
if not technique:
# Skip to the next loop iteration
continue
# Assign key = (str(control.id), str(technique.id))
key = (str(control.id), str(technique.id))
# Check: key in existing_mappings
if key in existing_mappings:
# Skip to the next loop iteration
continue
# Assign mapping = ComplianceControlMapping(
mapping = ComplianceControlMapping(
# Keyword argument: compliance_control_id
compliance_control_id=control.id,
# Keyword argument: technique_id
technique_id=technique.id,
)
# Stage new record(s) for database insertion
db.add(mapping)
# Call existing_mappings.add()
existing_mappings.add(key)
# Assign mappings_created = 1
mappings_created += 1
# Commit all pending changes to the database
db.commit()
# Return {
return {
# Literal argument value
"framework": framework.name,
# Literal argument value
"controls_created": controls_created,
# Literal argument value
"controls_existing": len(existing_controls) - controls_created,
# Literal argument value
"mappings_created": mappings_created,
# Literal argument value
"mappings_skipped": 0,
# Literal argument value
"total_controls": len(existing_controls),
# Literal argument value
"source": "sample_data",
}
# Define function import_cis_controls_v8_mappings
def import_cis_controls_v8_mappings(db: Session) -> dict:
"""Import CIS Controls v8 with ATT&CK technique mappings.
Uses a curated set of CIS Controls mapped to MITRE ATT&CK techniques
based on the CIS Controls Navigator and official documentation.
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
# Chain .filter() call
.filter(ComplianceFramework.name == "CIS Controls v8")
# Chain .first() call
.first()
)
# Check: not framework
if not framework:
# Assign framework = ComplianceFramework(
framework = ComplianceFramework(
# Keyword argument: name
name="CIS Controls v8",
# Keyword argument: version
version="8",
# Keyword argument: description
description=(
# Literal argument value
"Center for Internet Security Critical Security Controls Version 8 — "
# Literal argument value
"a prioritized set of 18 security safeguards "
# Literal argument value
"organized by Implementation Groups (IG1, IG2, IG3)."
),
# Keyword argument: url
url="https://www.cisecurity.org/controls/v8",
# Keyword argument: is_active
is_active=True,
)
# Stage new record(s) for database insertion
db.add(framework)
# Flush changes to DB without committing the transaction
db.flush()
# Log info: "Created CIS Controls v8 framework"
logger.info("Created CIS Controls v8 framework")
# Fallback: handle remaining cases
else:
# Log info: "CIS Controls v8 framework already exists"
logger.info("CIS Controls v8 framework already exists")
# ── 2. Control definitions with ATT&CK mappings ───────────────
# (defined at module level as _CIS_CONTROLS)
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
# Assign existing_controls = {
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
# Chain .filter() call
.filter(ComplianceControl.framework_id == framework.id)
# Chain .all() call
.all()
}
# Assign existing_mappings = set()
existing_mappings = set()
# for m in (
for m in (
db.query(ComplianceControlMapping)
# Chain .join() call
.join(ComplianceControl)
# Chain .filter() call
.filter(ComplianceControl.framework_id == framework.id)
# Chain .all() call
.all()
):
# Call existing_mappings.add()
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
# Assign controls_created = 0
controls_created = 0
# Assign mappings_created = 0
mappings_created = 0
# Iterate over _CIS_CONTROLS
for item in _CIS_CONTROLS:
# Check: item["control_id"] in existing_controls
if item["control_id"] in existing_controls:
# Assign control = existing_controls[item["control_id"]]
control = existing_controls[item["control_id"]]
# Fallback: handle remaining cases
else:
# Assign control = ComplianceControl(
control = ComplianceControl(
# Keyword argument: framework_id
framework_id=framework.id,
# Keyword argument: control_id
control_id=item["control_id"],
# Keyword argument: title
title=item["title"],
# Keyword argument: category
category=item["category"],
)
# Stage new record(s) for database insertion
db.add(control)
# Flush changes to DB without committing the transaction
db.flush()
# Assign existing_controls[item["control_id"]] = control
existing_controls[item["control_id"]] = control
# Assign controls_created = 1
controls_created += 1
# Iterate over item["techniques"]
for mitre_id in item["techniques"]:
# Assign technique = all_techniques.get(mitre_id)
technique = all_techniques.get(mitre_id)
# Check: not technique
if not technique:
# Skip to the next loop iteration
continue
# Assign key = (str(control.id), str(technique.id))
key = (str(control.id), str(technique.id))
# Check: key in existing_mappings
if key in existing_mappings:
# Skip to the next loop iteration
continue
# Assign mapping = ComplianceControlMapping(
mapping = ComplianceControlMapping(
# Keyword argument: compliance_control_id
compliance_control_id=control.id,
# Keyword argument: technique_id
technique_id=technique.id,
)
# Stage new record(s) for database insertion
db.add(mapping)
# Call existing_mappings.add()
existing_mappings.add(key)
# Assign mappings_created = 1
mappings_created += 1
# Commit all pending changes to the database
db.commit()
# Assign summary = {
summary = {
# Literal argument value
"framework": framework.name,
# Literal argument value
"controls_created": controls_created,
# Literal argument value
"controls_existing": len(existing_controls) - controls_created,
# Literal argument value
"mappings_created": mappings_created,
# Literal argument value
"total_controls": len(existing_controls),
}
# Log info: f"CIS Controls v8 import complete: {summary}"
logger.info(f"CIS Controls v8 import complete: {summary}")
# Return summary
return summary
# Define function _get_nist_category
def _get_nist_category(family_code: str) -> str:
"""Map NIST 800-53 family code to category name."""
# Assign categories = {
categories = {
# Literal argument value
"AC": "Access Control",
# Literal argument value
"AT": "Awareness and Training",
# Literal argument value
"AU": "Audit and Accountability",
# Literal argument value
"CA": "Assessment, Authorization, and Monitoring",
# Literal argument value
"CM": "Configuration Management",
# Literal argument value
"CP": "Contingency Planning",
# Literal argument value
"IA": "Identification and Authentication",
# Literal argument value
"IR": "Incident Response",
# Literal argument value
"MA": "Maintenance",
# Literal argument value
"MP": "Media Protection",
# Literal argument value
"PE": "Physical and Environmental Protection",
# Literal argument value
"PL": "Planning",
# Literal argument value
"PM": "Program Management",
# Literal argument value
"PS": "Personnel Security",
# Literal argument value
"PT": "Personally Identifiable Information Processing and Transparency",
# Literal argument value
"RA": "Risk Assessment",
# Literal argument value
"SA": "System and Services Acquisition",
# Literal argument value
"SC": "System and Communications Protection",
# Literal argument value
"SI": "System and Information Integrity",
# Literal argument value
"SR": "Supply Chain Risk Management",
}
# Return categories.get(family_code, "Unknown")
return categories.get(family_code, "Unknown")