Files
Aegis/backend/app/services/compliance_import_service.py
kitos 0b82d96bcc
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(compliance): add ISO/IEC 27001:2022 and ISO/IEC 42001:2023 frameworks
ISO 27001:2022: 37 Annex A controls across 4 themes (Organizational,
People, Physical, Technological) mapped to MITRE ATT&CK techniques.

ISO 42001:2023: 25 Annex A controls for AI Management Systems mapped to
relevant ATT&CK techniques covering AI supply chain, data pipeline
integrity, model serving security, and third-party AI risk.

Backend: import functions, _import_curated_framework() shared helper,
and POST /compliance/import/iso-27001 + iso-42001 endpoints.
Frontend: API client functions + import buttons in CompliancePage.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 15:50:54 +02:00

1318 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Compliance import service — imports NIST 800-53 to ATT&CK mappings.
Downloads and parses the STIX bundle from the Center for Threat-Informed
Defense's attack_to_nist_mapping repository to create ComplianceFramework,
ComplianceControl, and ComplianceControlMapping records.
"""
import logging
import json
import re
from typing import Optional
import requests
from sqlalchemy.orm import Session
from app.models.compliance import (
ComplianceFramework,
ComplianceControl,
ComplianceControlMapping,
)
from app.models.technique import Technique
logger = logging.getLogger(__name__)
# URL for the NIST 800-53 Rev 5 to ATT&CK mapping
# This is the JSON STIX bundle that contains the relationships
NIST_MAPPING_URL = (
"https://raw.githubusercontent.com/center-for-threat-informed-defense/"
"attack_to_nist_mapping/main/data/attack-to-nist-rev5.json"
)
def import_nist_800_53_mappings(db: Session) -> dict:
"""Import NIST 800-53 Rev 5 mappings from MITRE CTI repository.
Steps:
1. Create or get the NIST 800-53 Rev 5 framework
2. Download the STIX bundle JSON
3. Parse controls and relationship objects
4. Create ComplianceControl records
5. Create ComplianceControlMapping records
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "NIST 800-53 Rev 5")
.first()
)
if not framework:
framework = ComplianceFramework(
name="NIST 800-53 Rev 5",
version="5",
description="National Institute of Standards and Technology Special Publication 800-53 Revision 5 — Security and Privacy Controls for Information Systems and Organizations",
url="https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created NIST 800-53 Rev 5 framework")
else:
logger.info("NIST 800-53 Rev 5 framework already exists")
# ── 2. Download STIX bundle ───────────────────────────────────
try:
response = requests.get(NIST_MAPPING_URL, timeout=30)
response.raise_for_status()
stix_bundle = response.json()
except requests.RequestException as e:
logger.warning(f"Failed to download STIX bundle: {e}")
# Fallback: create a sample set of well-known NIST controls
return _import_sample_nist_mappings(db, framework)
# ── 3. Parse STIX objects ─────────────────────────────────────
objects = stix_bundle.get("objects", [])
# Build lookup maps
# STIX IDs -> control info
control_map = {} # stix_id -> {control_id, title, category}
technique_map = {} # stix_id -> mitre_technique_id
relationships = [] # (source_ref, target_ref) for "mitigates" relationships
for obj in objects:
obj_type = obj.get("type", "")
if obj_type == "course-of-action":
# This is a NIST control
name = obj.get("name", "")
desc = obj.get("description", "")
stix_id = obj.get("id", "")
# Extract control ID from name (e.g., "AC-2 Account Management")
match = re.match(r"^([A-Z]{2}-\d+(?:\.\d+)?)\s*(.*)", name)
if match:
control_id = match.group(1)
title = match.group(2) or name
else:
control_id = name
title = name
# Extract category from control family
category_match = re.match(r"^([A-Z]{2})", control_id)
category = _get_nist_category(category_match.group(1)) if category_match else None
control_map[stix_id] = {
"control_id": control_id,
"title": title,
"description": desc[:500] if desc else None,
"category": category,
}
elif obj_type == "attack-pattern":
# This is an ATT&CK technique
stix_id = obj.get("id", "")
ext_refs = obj.get("external_references", [])
for ref in ext_refs:
if ref.get("source_name") == "mitre-attack":
technique_map[stix_id] = ref.get("external_id", "")
break
elif obj_type == "relationship":
rel_type = obj.get("relationship_type", "")
if rel_type == "mitigates":
source_ref = obj.get("source_ref", "")
target_ref = obj.get("target_ref", "")
relationships.append((source_ref, target_ref))
# ── 4. Create controls ────────────────────────────────────────
controls_created = 0
controls_existing = 0
control_db_map = {} # control_id -> ComplianceControl
# Load existing controls for this framework
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
for stix_id, info in control_map.items():
cid = info["control_id"]
if cid in existing_controls:
control_db_map[stix_id] = existing_controls[cid]
controls_existing += 1
else:
ctrl = ComplianceControl(
framework_id=framework.id,
control_id=cid,
title=info["title"],
description=info["description"],
category=info["category"],
)
db.add(ctrl)
db.flush()
control_db_map[stix_id] = ctrl
controls_created += 1
# ── 5. Create mappings ────────────────────────────────────────
mappings_created = 0
mappings_skipped = 0
# Build technique DB lookup (mitre_id -> Technique)
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
# Load existing mappings
existing_mappings = set()
for m in db.query(ComplianceControlMapping).all():
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
for source_ref, target_ref in relationships:
control = control_db_map.get(source_ref)
mitre_id = technique_map.get(target_ref)
if not control or not mitre_id:
mappings_skipped += 1
continue
technique = all_techniques.get(mitre_id)
if not technique:
mappings_skipped += 1
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
mappings_skipped += 1
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
summary = {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": controls_existing,
"mappings_created": mappings_created,
"mappings_skipped": mappings_skipped,
"total_controls": controls_created + controls_existing,
"total_relationships_found": len(relationships),
}
logger.info(f"NIST 800-53 import complete: {summary}")
return summary
def _import_sample_nist_mappings(db: Session, framework: ComplianceFramework) -> dict:
"""Import a curated sample of NIST 800-53 controls when the download fails.
This ensures the feature works even without network access.
"""
SAMPLE_CONTROLS = [
{"control_id": "AC-2", "title": "Account Management", "category": "Access Control",
"techniques": ["T1078", "T1136", "T1098", "T1087", "T1069"]},
{"control_id": "AC-3", "title": "Access Enforcement", "category": "Access Control",
"techniques": ["T1078", "T1548", "T1134"]},
{"control_id": "AC-4", "title": "Information Flow Enforcement", "category": "Access Control",
"techniques": ["T1048", "T1041", "T1572"]},
{"control_id": "AC-6", "title": "Least Privilege", "category": "Access Control",
"techniques": ["T1078", "T1548", "T1134"]},
{"control_id": "AU-2", "title": "Event Logging", "category": "Audit and Accountability",
"techniques": ["T1562", "T1070"]},
{"control_id": "AU-6", "title": "Audit Record Review", "category": "Audit and Accountability",
"techniques": ["T1562", "T1070", "T1027"]},
{"control_id": "CA-7", "title": "Continuous Monitoring", "category": "Assessment, Authorization, and Monitoring",
"techniques": ["T1059", "T1053"]},
{"control_id": "CM-2", "title": "Baseline Configuration", "category": "Configuration Management",
"techniques": ["T1574", "T1546"]},
{"control_id": "CM-6", "title": "Configuration Settings", "category": "Configuration Management",
"techniques": ["T1574", "T1546", "T1112"]},
{"control_id": "CM-7", "title": "Least Functionality", "category": "Configuration Management",
"techniques": ["T1059", "T1218"]},
{"control_id": "IA-2", "title": "Identification and Authentication", "category": "Identification and Authentication",
"techniques": ["T1078", "T1110"]},
{"control_id": "IA-5", "title": "Authenticator Management", "category": "Identification and Authentication",
"techniques": ["T1078", "T1110", "T1003"]},
{"control_id": "IR-4", "title": "Incident Handling", "category": "Incident Response",
"techniques": ["T1059", "T1547"]},
{"control_id": "RA-5", "title": "Vulnerability Monitoring and Scanning", "category": "Risk Assessment",
"techniques": ["T1190", "T1203"]},
{"control_id": "SC-7", "title": "Boundary Protection", "category": "System and Communications Protection",
"techniques": ["T1048", "T1041", "T1071"]},
{"control_id": "SC-28", "title": "Protection of Information at Rest", "category": "System and Communications Protection",
"techniques": ["T1005", "T1114"]},
{"control_id": "SI-3", "title": "Malicious Code Protection", "category": "System and Information Integrity",
"techniques": ["T1059", "T1204", "T1566"]},
{"control_id": "SI-4", "title": "System Monitoring", "category": "System and Information Integrity",
"techniques": ["T1059", "T1053", "T1547"]},
{"control_id": "SI-7", "title": "Software, Firmware, and Information Integrity", "category": "System and Information Integrity",
"techniques": ["T1195", "T1553"]},
{"control_id": "PM-16", "title": "Threat Awareness Program", "category": "Program Management",
"techniques": ["T1566", "T1204"]},
]
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
existing_mappings = set()
for m in db.query(ComplianceControlMapping).all():
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
controls_created = 0
mappings_created = 0
for sample in SAMPLE_CONTROLS:
# Create or get control
if sample["control_id"] in existing_controls:
control = existing_controls[sample["control_id"]]
else:
control = ComplianceControl(
framework_id=framework.id,
control_id=sample["control_id"],
title=sample["title"],
category=sample["category"],
)
db.add(control)
db.flush()
existing_controls[sample["control_id"]] = control
controls_created += 1
# Create mappings
for mitre_id in sample["techniques"]:
technique = all_techniques.get(mitre_id)
if not technique:
# Try with subtechnique prefix
for key, tech in all_techniques.items():
if key.startswith(mitre_id):
technique = tech
break
if not technique:
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
return {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": len(existing_controls) - controls_created,
"mappings_created": mappings_created,
"mappings_skipped": 0,
"total_controls": len(existing_controls),
"source": "sample_data",
}
def import_cis_controls_v8_mappings(db: Session) -> dict:
"""Import CIS Controls v8 with ATT&CK technique mappings.
Uses a curated set of CIS Controls mapped to MITRE ATT&CK techniques
based on the CIS Controls Navigator and official documentation.
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "CIS Controls v8")
.first()
)
if not framework:
framework = ComplianceFramework(
name="CIS Controls v8",
version="8",
description="Center for Internet Security Critical Security Controls Version 8 — "
"a prioritized set of 18 security safeguards organized by Implementation Groups (IG1, IG2, IG3).",
url="https://www.cisecurity.org/controls/v8",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created CIS Controls v8 framework")
else:
logger.info("CIS Controls v8 framework already exists")
# ── 2. Control definitions with ATT&CK mappings ───────────────
CIS_CONTROLS = [
{"control_id": "CIS-1", "title": "Inventory and Control of Enterprise Assets",
"category": "IG1 — Basic",
"techniques": ["T1595", "T1590", "T1018", "T1082"]},
{"control_id": "CIS-2", "title": "Inventory and Control of Software Assets",
"category": "IG1 — Basic",
"techniques": ["T1518", "T1072", "T1195"]},
{"control_id": "CIS-3", "title": "Data Protection",
"category": "IG1 — Basic",
"techniques": ["T1005", "T1114", "T1560", "T1048", "T1041"]},
{"control_id": "CIS-4", "title": "Secure Configuration of Enterprise Assets and Software",
"category": "IG1 — Basic",
"techniques": ["T1574", "T1546", "T1112", "T1543"]},
{"control_id": "CIS-5", "title": "Account Management",
"category": "IG1 — Basic",
"techniques": ["T1078", "T1136", "T1098", "T1087"]},
{"control_id": "CIS-6", "title": "Access Control Management",
"category": "IG1 — Basic",
"techniques": ["T1078", "T1548", "T1134", "T1021"]},
{"control_id": "CIS-7", "title": "Continuous Vulnerability Management",
"category": "IG2 — Foundational",
"techniques": ["T1190", "T1203", "T1068", "T1210"]},
{"control_id": "CIS-8", "title": "Audit Log Management",
"category": "IG2 — Foundational",
"techniques": ["T1562", "T1070", "T1059"]},
{"control_id": "CIS-9", "title": "Email and Web Browser Protections",
"category": "IG2 — Foundational",
"techniques": ["T1566", "T1204", "T1189", "T1598"]},
{"control_id": "CIS-10", "title": "Malware Defenses",
"category": "IG2 — Foundational",
"techniques": ["T1059", "T1204", "T1027", "T1140", "T1497"]},
{"control_id": "CIS-11", "title": "Data Recovery",
"category": "IG1 — Basic",
"techniques": ["T1486", "T1490", "T1561"]},
{"control_id": "CIS-12", "title": "Network Infrastructure Management",
"category": "IG2 — Foundational",
"techniques": ["T1557", "T1071", "T1572", "T1571"]},
{"control_id": "CIS-13", "title": "Network Monitoring and Defense",
"category": "IG2 — Foundational",
"techniques": ["T1071", "T1048", "T1041", "T1105", "T1572"]},
{"control_id": "CIS-14", "title": "Security Awareness and Skills Training",
"category": "IG1 — Basic",
"techniques": ["T1566", "T1204", "T1598"]},
{"control_id": "CIS-15", "title": "Service Provider Management",
"category": "IG2 — Foundational",
"techniques": ["T1199", "T1195"]},
{"control_id": "CIS-16", "title": "Application Software Security",
"category": "IG2 — Foundational",
"techniques": ["T1190", "T1059", "T1203"]},
{"control_id": "CIS-17", "title": "Incident Response Management",
"category": "IG2 — Foundational",
"techniques": ["T1059", "T1547", "T1053"]},
{"control_id": "CIS-18", "title": "Penetration Testing",
"category": "IG3 — Organizational",
"techniques": ["T1595", "T1046", "T1190", "T1059"]},
]
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
existing_mappings = set()
for m in (
db.query(ComplianceControlMapping)
.join(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
):
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
controls_created = 0
mappings_created = 0
for item in CIS_CONTROLS:
if item["control_id"] in existing_controls:
control = existing_controls[item["control_id"]]
else:
control = ComplianceControl(
framework_id=framework.id,
control_id=item["control_id"],
title=item["title"],
category=item["category"],
)
db.add(control)
db.flush()
existing_controls[item["control_id"]] = control
controls_created += 1
for mitre_id in item["techniques"]:
technique = all_techniques.get(mitre_id)
if not technique:
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
summary = {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": len(existing_controls) - controls_created,
"mappings_created": mappings_created,
"total_controls": len(existing_controls),
}
logger.info(f"CIS Controls v8 import complete: {summary}")
return summary
def import_dora_mappings(db: Session) -> dict:
"""Import DORA (Digital Operational Resilience Act) with ATT&CK technique mappings.
DORA (EU 2022/2554) applies to financial entities and ICT third-party providers.
Controls map the key cybersecurity articles (Chapters IIVI) to MITRE ATT&CK
techniques based on ENISA guidance and TIBER-EU threat-led testing framework.
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "DORA")
.first()
)
if not framework:
framework = ComplianceFramework(
name="DORA",
version="2022/2554",
description=(
"Digital Operational Resilience Act (Regulation EU 2022/2554) — "
"EU regulation establishing ICT risk management, incident reporting, "
"digital operational resilience testing, and ICT third-party risk "
"management requirements for financial entities."
),
url="https://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX:32022R2554",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created DORA framework")
else:
logger.info("DORA framework already exists")
# ── 2. Control definitions with ATT&CK mappings ───────────────
# Based on ENISA DORA guidelines and TIBER-EU threat intelligence framework.
# Each control maps to a DORA article and the ATT&CK techniques it addresses.
DORA_CONTROLS = [
# ─── Chapter II — ICT Risk Management ────────────────────────────
{
"control_id": "DORA-Art.5",
"title": "Governance and Organisation",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1078", "T1136", "T1098", "T1087"],
},
{
"control_id": "DORA-Art.6",
"title": "ICT Risk Management Framework",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1595", "T1590", "T1589", "T1046", "T1018", "T1082"],
},
{
"control_id": "DORA-Art.7",
"title": "ICT Systems, Protocols and Tools",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1574", "T1543", "T1112", "T1546", "T1195", "T1133"],
},
{
"control_id": "DORA-Art.8",
"title": "Identification",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1018", "T1082", "T1083", "T1087", "T1590", "T1592"],
},
{
"control_id": "DORA-Art.9",
"title": "Protection and Prevention",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1078", "T1548", "T1134", "T1190", "T1574", "T1543", "T1021"],
},
{
"control_id": "DORA-Art.10",
"title": "Detection",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1562", "T1070", "T1059", "T1053", "T1547", "T1037"],
},
{
"control_id": "DORA-Art.11",
"title": "Response and Recovery",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1486", "T1490", "T1561", "T1485", "T1048", "T1041"],
},
{
"control_id": "DORA-Art.12",
"title": "Backup Policies and Recovery Methods",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1486", "T1490", "T1561", "T1485"],
},
{
"control_id": "DORA-Art.13",
"title": "Learning and Evolving",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1566", "T1589", "T1590", "T1595", "T1598"],
},
{
"control_id": "DORA-Art.14",
"title": "Communication",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1114", "T1566", "T1102", "T1071"],
},
{
"control_id": "DORA-Art.15",
"title": "Further Harmonisation of ICT Risk Management Tools",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1078", "T1190", "T1133", "T1021", "T1199"],
},
# ─── Chapter III — ICT-related Incident Management ────────────────
{
"control_id": "DORA-Art.17",
"title": "ICT-related Incidents Classification",
"category": "Chapter III — Incident Management",
"techniques": ["T1499", "T1498", "T1486", "T1041", "T1048", "T1565"],
},
{
"control_id": "DORA-Art.18",
"title": "Major ICT-Related Incidents Reporting",
"category": "Chapter III — Incident Management",
"techniques": ["T1486", "T1041", "T1048", "T1499", "T1498"],
},
{
"control_id": "DORA-Art.19",
"title": "Harmonisation of Reporting Content and Formats",
"category": "Chapter III — Incident Management",
"techniques": ["T1566", "T1190", "T1203", "T1059"],
},
# ─── Chapter IV — Digital Operational Resilience Testing ──────────
{
"control_id": "DORA-Art.24",
"title": "General Digital Operational Resilience Testing",
"category": "Chapter IV — Resilience Testing",
"techniques": ["T1059", "T1190", "T1046", "T1595", "T1078"],
},
{
"control_id": "DORA-Art.25",
"title": "Testing of ICT Tools and Systems",
"category": "Chapter IV — Resilience Testing",
"techniques": ["T1059", "T1190", "T1046", "T1595", "T1078", "T1068", "T1210"],
},
{
"control_id": "DORA-Art.26",
"title": "Advanced Testing — Threat-Led Penetration Testing (TLPT)",
"category": "Chapter IV — Resilience Testing",
"techniques": [
"T1566", "T1204", "T1055", "T1059", "T1021", "T1078",
"T1190", "T1046", "T1548", "T1134", "T1027",
],
},
{
"control_id": "DORA-Art.27",
"title": "Requirements for Testers Carrying Out TLPT",
"category": "Chapter IV — Resilience Testing",
"techniques": ["T1595", "T1046", "T1190", "T1059", "T1078"],
},
# ─── Chapter V — ICT Third-Party Risk Management ──────────────────
{
"control_id": "DORA-Art.28",
"title": "General Principles of ICT Third-Party Risk Management",
"category": "Chapter V — Third-Party Risk",
"techniques": ["T1199", "T1195", "T1078", "T1133"],
},
{
"control_id": "DORA-Art.30",
"title": "Key Contractual Provisions for ICT Services",
"category": "Chapter V — Third-Party Risk",
"techniques": ["T1199", "T1195", "T1078"],
},
{
"control_id": "DORA-Art.42",
"title": "Oversight of Critical ICT Third-Party Providers",
"category": "Chapter V — Third-Party Risk",
"techniques": ["T1199", "T1195", "T1133", "T1078", "T1190"],
},
# ─── Chapter VI — Information Sharing ────────────────────────────
{
"control_id": "DORA-Art.45",
"title": "Arrangements for Information Sharing on Cyber Threats",
"category": "Chapter VI — Information Sharing",
"techniques": ["T1566", "T1589", "T1590", "T1595", "T1598"],
},
]
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
existing_mappings = set()
for m in (
db.query(ComplianceControlMapping)
.join(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
):
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
controls_created = 0
mappings_created = 0
for item in DORA_CONTROLS:
if item["control_id"] in existing_controls:
control = existing_controls[item["control_id"]]
else:
control = ComplianceControl(
framework_id=framework.id,
control_id=item["control_id"],
title=item["title"],
category=item["category"],
)
db.add(control)
db.flush()
existing_controls[item["control_id"]] = control
controls_created += 1
for mitre_id in item["techniques"]:
technique = all_techniques.get(mitre_id)
if not technique:
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
summary = {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": len(existing_controls) - controls_created,
"mappings_created": mappings_created,
"total_controls": len(existing_controls),
}
logger.info(f"DORA import complete: {summary}")
return summary
def import_iso_27001_mappings(db: Session) -> dict:
"""Import ISO/IEC 27001:2022 Annex A controls with ATT&CK technique mappings.
ISO/IEC 27001:2022 has 93 controls in Annex A organised into 4 themes:
- 5. Organizational controls (37)
- 6. People controls (8)
- 7. Physical controls (14)
- 8. Technological controls (34)
Mappings follow MITRE ATT&CK Enterprise v14 and published ISO/IEC 27002:2022
guidance on threat mitigations.
Returns a summary dict with counts.
"""
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "ISO/IEC 27001:2022")
.first()
)
if not framework:
framework = ComplianceFramework(
name="ISO/IEC 27001:2022",
version="2022",
description=(
"ISO/IEC 27001:2022 — International standard for Information Security "
"Management Systems (ISMS). Annex A contains 93 controls across 4 themes: "
"Organizational, People, Physical, and Technological."
),
url="https://www.iso.org/standard/27001",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created ISO/IEC 27001:2022 framework")
else:
logger.info("ISO/IEC 27001:2022 framework already exists")
ISO_27001_CONTROLS = [
# ── 5. Organizational Controls ──────────────────────────────────────
{
"control_id": "5.2",
"title": "Information Security Roles and Responsibilities",
"category": "5 — Organizational Controls",
"techniques": ["T1078", "T1087", "T1069"],
},
{
"control_id": "5.7",
"title": "Threat Intelligence",
"category": "5 — Organizational Controls",
"techniques": ["T1566", "T1589", "T1590", "T1595", "T1598"],
},
{
"control_id": "5.9",
"title": "Inventory of Information and Other Assets",
"category": "5 — Organizational Controls",
"techniques": ["T1082", "T1083", "T1018", "T1592"],
},
{
"control_id": "5.14",
"title": "Information Transfer",
"category": "5 — Organizational Controls",
"techniques": ["T1048", "T1041", "T1567", "T1071"],
},
{
"control_id": "5.16",
"title": "Identity Management",
"category": "5 — Organizational Controls",
"techniques": ["T1078", "T1136", "T1098", "T1087"],
},
{
"control_id": "5.17",
"title": "Authentication Information",
"category": "5 — Organizational Controls",
"techniques": ["T1110", "T1003", "T1078", "T1552"],
},
{
"control_id": "5.20",
"title": "Addressing Information Security in Supplier Agreements",
"category": "5 — Organizational Controls",
"techniques": ["T1199", "T1195"],
},
{
"control_id": "5.23",
"title": "Information Security for Use of Cloud Services",
"category": "5 — Organizational Controls",
"techniques": ["T1530", "T1537", "T1078", "T1190"],
},
{
"control_id": "5.24",
"title": "Information Security Incident Management Planning",
"category": "5 — Organizational Controls",
"techniques": ["T1059", "T1547", "T1486"],
},
{
"control_id": "5.26",
"title": "Response to Information Security Incidents",
"category": "5 — Organizational Controls",
"techniques": ["T1059", "T1547", "T1070", "T1562"],
},
{
"control_id": "5.28",
"title": "Collection of Evidence",
"category": "5 — Organizational Controls",
"techniques": ["T1562", "T1070"],
},
{
"control_id": "5.29",
"title": "Information Security During Disruption",
"category": "5 — Organizational Controls",
"techniques": ["T1486", "T1490", "T1561"],
},
{
"control_id": "5.30",
"title": "ICT Readiness for Business Continuity",
"category": "5 — Organizational Controls",
"techniques": ["T1486", "T1490", "T1499", "T1498"],
},
# ── 6. People Controls ───────────────────────────────────────────────
{
"control_id": "6.1",
"title": "Screening",
"category": "6 — People Controls",
"techniques": ["T1078", "T1134"],
},
{
"control_id": "6.3",
"title": "Information Security Awareness, Education and Training",
"category": "6 — People Controls",
"techniques": ["T1566", "T1204", "T1598"],
},
{
"control_id": "6.4",
"title": "Disciplinary Process",
"category": "6 — People Controls",
"techniques": ["T1078", "T1098"],
},
# ── 7. Physical Controls ─────────────────────────────────────────────
{
"control_id": "7.1",
"title": "Physical Security Perimeters",
"category": "7 — Physical Controls",
"techniques": ["T1200"],
},
{
"control_id": "7.4",
"title": "Physical Security Monitoring",
"category": "7 — Physical Controls",
"techniques": ["T1200", "T1556"],
},
# ── 8. Technological Controls ────────────────────────────────────────
{
"control_id": "8.2",
"title": "Privileged Access Rights",
"category": "8 — Technological Controls",
"techniques": ["T1078", "T1548", "T1134"],
},
{
"control_id": "8.3",
"title": "Information Access Restriction",
"category": "8 — Technological Controls",
"techniques": ["T1078", "T1021", "T1548", "T1550"],
},
{
"control_id": "8.5",
"title": "Secure Authentication",
"category": "8 — Technological Controls",
"techniques": ["T1078", "T1110", "T1003", "T1558"],
},
{
"control_id": "8.7",
"title": "Protection Against Malware",
"category": "8 — Technological Controls",
"techniques": ["T1059", "T1204", "T1027", "T1566", "T1140"],
},
{
"control_id": "8.8",
"title": "Management of Technical Vulnerabilities",
"category": "8 — Technological Controls",
"techniques": ["T1190", "T1203", "T1068", "T1210"],
},
{
"control_id": "8.9",
"title": "Configuration Management",
"category": "8 — Technological Controls",
"techniques": ["T1574", "T1546", "T1112", "T1543"],
},
{
"control_id": "8.12",
"title": "Data Leakage Prevention",
"category": "8 — Technological Controls",
"techniques": ["T1048", "T1041", "T1567", "T1071"],
},
{
"control_id": "8.13",
"title": "Information Backup",
"category": "8 — Technological Controls",
"techniques": ["T1486", "T1490", "T1561"],
},
{
"control_id": "8.15",
"title": "Logging",
"category": "8 — Technological Controls",
"techniques": ["T1562", "T1070"],
},
{
"control_id": "8.16",
"title": "Monitoring Activities",
"category": "8 — Technological Controls",
"techniques": ["T1059", "T1053", "T1547", "T1562"],
},
{
"control_id": "8.18",
"title": "Use of Privileged Utility Programs",
"category": "8 — Technological Controls",
"techniques": ["T1059", "T1548", "T1134", "T1569"],
},
{
"control_id": "8.19",
"title": "Installation of Software on Operational Systems",
"category": "8 — Technological Controls",
"techniques": ["T1195", "T1072", "T1546"],
},
{
"control_id": "8.20",
"title": "Networks Security",
"category": "8 — Technological Controls",
"techniques": ["T1571", "T1572", "T1090", "T1021"],
},
{
"control_id": "8.22",
"title": "Segregation of Networks",
"category": "8 — Technological Controls",
"techniques": ["T1021", "T1550", "T1558"],
},
{
"control_id": "8.23",
"title": "Web Filtering",
"category": "8 — Technological Controls",
"techniques": ["T1566", "T1204", "T1189"],
},
{
"control_id": "8.24",
"title": "Use of Cryptography",
"category": "8 — Technological Controls",
"techniques": ["T1573", "T1022", "T1027"],
},
{
"control_id": "8.26",
"title": "Application Security Requirements",
"category": "8 — Technological Controls",
"techniques": ["T1190", "T1059", "T1203"],
},
{
"control_id": "8.28",
"title": "Secure Coding",
"category": "8 — Technological Controls",
"techniques": ["T1059", "T1190", "T1203"],
},
{
"control_id": "8.32",
"title": "Change Management",
"category": "8 — Technological Controls",
"techniques": ["T1574", "T1546", "T1112"],
},
{
"control_id": "8.34",
"title": "Protection of Information Systems During Audit Testing",
"category": "8 — Technological Controls",
"techniques": ["T1562", "T1059"],
},
]
return _import_curated_framework(db, framework, ISO_27001_CONTROLS)
def import_iso_42001_mappings(db: Session) -> dict:
"""Import ISO/IEC 42001:2023 Annex A controls with ATT&CK technique mappings.
ISO/IEC 42001:2023 is the international standard for Artificial Intelligence
Management Systems (AIMS). Annex A contains controls specific to the governance,
operation, and security of AI systems.
ATT&CK mappings focus on threats to AI system infrastructure:
- Supply chain attacks on ML frameworks and model artefacts
- Data pipeline compromise (training data poisoning / exfiltration)
- AI model serving exploitation
- Adversarial access to AI APIs and datasets
Returns a summary dict with counts.
"""
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "ISO/IEC 42001:2023")
.first()
)
if not framework:
framework = ComplianceFramework(
name="ISO/IEC 42001:2023",
version="2023",
description=(
"ISO/IEC 42001:2023 — International standard for Artificial Intelligence "
"Management Systems (AIMS). Establishes requirements and guidance for "
"organisations developing or using AI systems responsibly, covering governance, "
"risk, transparency, and security of AI pipelines and models."
),
url="https://www.iso.org/standard/81230.html",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created ISO/IEC 42001:2023 framework")
else:
logger.info("ISO/IEC 42001:2023 framework already exists")
ISO_42001_CONTROLS = [
# ── A.2 Organization's Policies Related to AI ────────────────────────
{
"control_id": "A.2.2",
"title": "Process to Determine AI Impacts on Individuals",
"category": "A.2 — AI Policy",
"techniques": ["T1082", "T1592", "T1590"],
},
{
"control_id": "A.2.6",
"title": "Responsible Development and Use of AI",
"category": "A.2 — AI Policy",
"techniques": ["T1195", "T1059"],
},
# ── A.3 Internal Organization ─────────────────────────────────────────
{
"control_id": "A.3.2",
"title": "Roles and Responsibilities for AI Systems",
"category": "A.3 — Internal Organization",
"techniques": ["T1078", "T1087", "T1069"],
},
{
"control_id": "A.3.3",
"title": "Reporting on AI Performance",
"category": "A.3 — Internal Organization",
"techniques": ["T1562", "T1070"],
},
# ── A.4 Resources for AI Systems ─────────────────────────────────────
{
"control_id": "A.4.1",
"title": "Resource Management for AI Systems",
"category": "A.4 — AI Resources",
"techniques": ["T1499", "T1498"],
},
{
"control_id": "A.4.2",
"title": "AI System Supply Chain Management",
"category": "A.4 — AI Resources",
"techniques": ["T1195", "T1199", "T1072"],
},
# ── A.5 Assessing Impacts of AI Systems ──────────────────────────────
{
"control_id": "A.5.2",
"title": "AI System Impact Assessment",
"category": "A.5 — AI Impact Assessment",
"techniques": ["T1082", "T1592", "T1589"],
},
{
"control_id": "A.5.4",
"title": "AI Risk Treatment",
"category": "A.5 — AI Impact Assessment",
"techniques": ["T1190", "T1068", "T1203"],
},
# ── A.6 AI System Life Cycle ──────────────────────────────────────────
{
"control_id": "A.6.1",
"title": "AI System Life Cycle Management",
"category": "A.6 — AI Life Cycle",
"techniques": ["T1195", "T1574", "T1543"],
},
{
"control_id": "A.6.2",
"title": "AI Objectives and Requirements",
"category": "A.6 — AI Life Cycle",
"techniques": ["T1190", "T1059"],
},
{
"control_id": "A.6.3",
"title": "AI System Design and Implementation",
"category": "A.6 — AI Life Cycle",
"techniques": ["T1195", "T1059", "T1190", "T1027"],
},
{
"control_id": "A.6.4",
"title": "AI System Verification and Validation",
"category": "A.6 — AI Life Cycle",
"techniques": ["T1565", "T1195"],
},
{
"control_id": "A.6.5",
"title": "AI System Documentation",
"category": "A.6 — AI Life Cycle",
"techniques": ["T1083", "T1005"],
},
{
"control_id": "A.6.6",
"title": "AI System Monitoring",
"category": "A.6 — AI Life Cycle",
"techniques": ["T1562", "T1070", "T1059"],
},
# ── A.7 Data for AI Systems ───────────────────────────────────────────
{
"control_id": "A.7.2",
"title": "Data Acquisition",
"category": "A.7 — AI Data",
"techniques": ["T1005", "T1074", "T1114"],
},
{
"control_id": "A.7.3",
"title": "Data Preparation",
"category": "A.7 — AI Data",
"techniques": ["T1565", "T1059"],
},
{
"control_id": "A.7.4",
"title": "Data Quality",
"category": "A.7 — AI Data",
"techniques": ["T1565", "T1485"],
},
{
"control_id": "A.7.5",
"title": "Data Provenance",
"category": "A.7 — AI Data",
"techniques": ["T1195", "T1565"],
},
{
"control_id": "A.7.6",
"title": "Data Privacy",
"category": "A.7 — AI Data",
"techniques": ["T1005", "T1114", "T1048", "T1041"],
},
# ── A.8 Information About Use of AI Systems ───────────────────────────
{
"control_id": "A.8.1",
"title": "Transparency and Explainability of AI Systems",
"category": "A.8 — AI Information",
"techniques": ["T1082", "T1592", "T1590"],
},
{
"control_id": "A.8.2",
"title": "Security of AI Systems",
"category": "A.8 — AI Information",
"techniques": ["T1190", "T1059", "T1203", "T1078", "T1110"],
},
# ── A.9 Use of AI Systems by Affected Parties ─────────────────────────
{
"control_id": "A.9.1",
"title": "Intended Use of AI Systems",
"category": "A.9 — AI Use",
"techniques": ["T1566", "T1204", "T1598"],
},
{
"control_id": "A.9.3",
"title": "Human Oversight of AI Systems",
"category": "A.9 — AI Use",
"techniques": ["T1078", "T1134", "T1562"],
},
# ── A.10 Third-Party and Customer Relationships ───────────────────────
{
"control_id": "A.10.1",
"title": "Third-Party AI System Governance",
"category": "A.10 — Third-Party Relationships",
"techniques": ["T1199", "T1195", "T1078", "T1133"],
},
{
"control_id": "A.10.2",
"title": "Customer Relationships for AI Systems",
"category": "A.10 — Third-Party Relationships",
"techniques": ["T1566", "T1598", "T1078"],
},
]
return _import_curated_framework(db, framework, ISO_42001_CONTROLS)
def _import_curated_framework(
db: Session,
framework: ComplianceFramework,
controls: list[dict],
) -> dict:
"""Shared helper to import a curated list of controls and technique mappings.
``controls`` is a list of dicts with keys:
- control_id (str)
- title (str)
- category (str)
- techniques (list[str] — MITRE ATT&CK IDs)
Returns a summary dict.
"""
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
existing_mappings: set[tuple[str, str]] = set()
for m in (
db.query(ComplianceControlMapping)
.join(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
):
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
controls_created = 0
mappings_created = 0
for item in controls:
cid = item["control_id"]
if cid in existing_controls:
control = existing_controls[cid]
else:
control = ComplianceControl(
framework_id=framework.id,
control_id=cid,
title=item["title"],
category=item.get("category"),
description=item.get("description"),
)
db.add(control)
db.flush()
existing_controls[cid] = control
controls_created += 1
for mitre_id in item.get("techniques", []):
technique = all_techniques.get(mitre_id)
if not technique:
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
continue
db.add(ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
))
existing_mappings.add(key)
mappings_created += 1
db.commit()
summary = {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": len(existing_controls) - controls_created,
"mappings_created": mappings_created,
"mappings_skipped": 0,
"total_controls": len(existing_controls),
}
logger.info(f"{framework.name} import complete: {summary}")
return summary
def _get_nist_category(family_code: str) -> str:
"""Map NIST 800-53 family code to category name."""
categories = {
"AC": "Access Control",
"AT": "Awareness and Training",
"AU": "Audit and Accountability",
"CA": "Assessment, Authorization, and Monitoring",
"CM": "Configuration Management",
"CP": "Contingency Planning",
"IA": "Identification and Authentication",
"IR": "Incident Response",
"MA": "Maintenance",
"MP": "Media Protection",
"PE": "Physical and Environmental Protection",
"PL": "Planning",
"PM": "Program Management",
"PS": "Personnel Security",
"PT": "Personally Identifiable Information Processing and Transparency",
"RA": "Risk Assessment",
"SA": "System and Services Acquisition",
"SC": "System and Communications Protection",
"SI": "System and Information Integrity",
"SR": "Supply Chain Risk Management",
}
return categories.get(family_code, "Unknown")