Files
Aegis/backend/app/services/compliance_import_service.py
kitos 7d856bef43
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(compliance): add DORA (EU 2022/2554) framework with ATT&CK mappings
Implements the Digital Operational Resilience Act as a compliance framework
using the same pattern as CIS Controls v8 (hardcoded curated mappings,
no official STIX bundle exists for DORA).

22 controls across 5 chapters mapped to MITRE ATT&CK techniques:
  Ch. II  — ICT Risk Management (Art. 5–15): governance, identification,
            protection, detection, response, backup, threat intel
  Ch. III — Incident Management (Art. 17–19): classification, reporting
  Ch. IV  — Resilience Testing (Art. 24–27): general testing + TLPT
            (Art. 26 explicitly based on TIBER-EU/ATT&CK threat-led testing)
  Ch. V   — Third-Party Risk (Art. 28, 30, 42): supply chain, trusted rels.
  Ch. VI  — Information Sharing (Art. 45)

Technique mappings derived from ENISA DORA guidelines and TIBER-EU framework.
Import is triggered via POST /api/v1/compliance/import/dora (admin only).
Frontend: new 'DORA' button in the Compliance page import section.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 13:52:51 +02:00

756 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Compliance import service — imports NIST 800-53 to ATT&CK mappings.
Downloads and parses the STIX bundle from the Center for Threat-Informed
Defense's attack_to_nist_mapping repository to create ComplianceFramework,
ComplianceControl, and ComplianceControlMapping records.
"""
import logging
import json
import re
from typing import Optional
import requests
from sqlalchemy.orm import Session
from app.models.compliance import (
ComplianceFramework,
ComplianceControl,
ComplianceControlMapping,
)
from app.models.technique import Technique
logger = logging.getLogger(__name__)
# URL for the NIST 800-53 Rev 5 to ATT&CK mapping
# This is the JSON STIX bundle that contains the relationships
NIST_MAPPING_URL = (
"https://raw.githubusercontent.com/center-for-threat-informed-defense/"
"attack_to_nist_mapping/main/data/attack-to-nist-rev5.json"
)
def import_nist_800_53_mappings(db: Session) -> dict:
"""Import NIST 800-53 Rev 5 mappings from MITRE CTI repository.
Steps:
1. Create or get the NIST 800-53 Rev 5 framework
2. Download the STIX bundle JSON
3. Parse controls and relationship objects
4. Create ComplianceControl records
5. Create ComplianceControlMapping records
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "NIST 800-53 Rev 5")
.first()
)
if not framework:
framework = ComplianceFramework(
name="NIST 800-53 Rev 5",
version="5",
description="National Institute of Standards and Technology Special Publication 800-53 Revision 5 — Security and Privacy Controls for Information Systems and Organizations",
url="https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created NIST 800-53 Rev 5 framework")
else:
logger.info("NIST 800-53 Rev 5 framework already exists")
# ── 2. Download STIX bundle ───────────────────────────────────
try:
response = requests.get(NIST_MAPPING_URL, timeout=30)
response.raise_for_status()
stix_bundle = response.json()
except requests.RequestException as e:
logger.warning(f"Failed to download STIX bundle: {e}")
# Fallback: create a sample set of well-known NIST controls
return _import_sample_nist_mappings(db, framework)
# ── 3. Parse STIX objects ─────────────────────────────────────
objects = stix_bundle.get("objects", [])
# Build lookup maps
# STIX IDs -> control info
control_map = {} # stix_id -> {control_id, title, category}
technique_map = {} # stix_id -> mitre_technique_id
relationships = [] # (source_ref, target_ref) for "mitigates" relationships
for obj in objects:
obj_type = obj.get("type", "")
if obj_type == "course-of-action":
# This is a NIST control
name = obj.get("name", "")
desc = obj.get("description", "")
stix_id = obj.get("id", "")
# Extract control ID from name (e.g., "AC-2 Account Management")
match = re.match(r"^([A-Z]{2}-\d+(?:\.\d+)?)\s*(.*)", name)
if match:
control_id = match.group(1)
title = match.group(2) or name
else:
control_id = name
title = name
# Extract category from control family
category_match = re.match(r"^([A-Z]{2})", control_id)
category = _get_nist_category(category_match.group(1)) if category_match else None
control_map[stix_id] = {
"control_id": control_id,
"title": title,
"description": desc[:500] if desc else None,
"category": category,
}
elif obj_type == "attack-pattern":
# This is an ATT&CK technique
stix_id = obj.get("id", "")
ext_refs = obj.get("external_references", [])
for ref in ext_refs:
if ref.get("source_name") == "mitre-attack":
technique_map[stix_id] = ref.get("external_id", "")
break
elif obj_type == "relationship":
rel_type = obj.get("relationship_type", "")
if rel_type == "mitigates":
source_ref = obj.get("source_ref", "")
target_ref = obj.get("target_ref", "")
relationships.append((source_ref, target_ref))
# ── 4. Create controls ────────────────────────────────────────
controls_created = 0
controls_existing = 0
control_db_map = {} # control_id -> ComplianceControl
# Load existing controls for this framework
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
for stix_id, info in control_map.items():
cid = info["control_id"]
if cid in existing_controls:
control_db_map[stix_id] = existing_controls[cid]
controls_existing += 1
else:
ctrl = ComplianceControl(
framework_id=framework.id,
control_id=cid,
title=info["title"],
description=info["description"],
category=info["category"],
)
db.add(ctrl)
db.flush()
control_db_map[stix_id] = ctrl
controls_created += 1
# ── 5. Create mappings ────────────────────────────────────────
mappings_created = 0
mappings_skipped = 0
# Build technique DB lookup (mitre_id -> Technique)
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
# Load existing mappings
existing_mappings = set()
for m in db.query(ComplianceControlMapping).all():
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
for source_ref, target_ref in relationships:
control = control_db_map.get(source_ref)
mitre_id = technique_map.get(target_ref)
if not control or not mitre_id:
mappings_skipped += 1
continue
technique = all_techniques.get(mitre_id)
if not technique:
mappings_skipped += 1
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
mappings_skipped += 1
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
summary = {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": controls_existing,
"mappings_created": mappings_created,
"mappings_skipped": mappings_skipped,
"total_controls": controls_created + controls_existing,
"total_relationships_found": len(relationships),
}
logger.info(f"NIST 800-53 import complete: {summary}")
return summary
def _import_sample_nist_mappings(db: Session, framework: ComplianceFramework) -> dict:
"""Import a curated sample of NIST 800-53 controls when the download fails.
This ensures the feature works even without network access.
"""
SAMPLE_CONTROLS = [
{"control_id": "AC-2", "title": "Account Management", "category": "Access Control",
"techniques": ["T1078", "T1136", "T1098", "T1087", "T1069"]},
{"control_id": "AC-3", "title": "Access Enforcement", "category": "Access Control",
"techniques": ["T1078", "T1548", "T1134"]},
{"control_id": "AC-4", "title": "Information Flow Enforcement", "category": "Access Control",
"techniques": ["T1048", "T1041", "T1572"]},
{"control_id": "AC-6", "title": "Least Privilege", "category": "Access Control",
"techniques": ["T1078", "T1548", "T1134"]},
{"control_id": "AU-2", "title": "Event Logging", "category": "Audit and Accountability",
"techniques": ["T1562", "T1070"]},
{"control_id": "AU-6", "title": "Audit Record Review", "category": "Audit and Accountability",
"techniques": ["T1562", "T1070", "T1027"]},
{"control_id": "CA-7", "title": "Continuous Monitoring", "category": "Assessment, Authorization, and Monitoring",
"techniques": ["T1059", "T1053"]},
{"control_id": "CM-2", "title": "Baseline Configuration", "category": "Configuration Management",
"techniques": ["T1574", "T1546"]},
{"control_id": "CM-6", "title": "Configuration Settings", "category": "Configuration Management",
"techniques": ["T1574", "T1546", "T1112"]},
{"control_id": "CM-7", "title": "Least Functionality", "category": "Configuration Management",
"techniques": ["T1059", "T1218"]},
{"control_id": "IA-2", "title": "Identification and Authentication", "category": "Identification and Authentication",
"techniques": ["T1078", "T1110"]},
{"control_id": "IA-5", "title": "Authenticator Management", "category": "Identification and Authentication",
"techniques": ["T1078", "T1110", "T1003"]},
{"control_id": "IR-4", "title": "Incident Handling", "category": "Incident Response",
"techniques": ["T1059", "T1547"]},
{"control_id": "RA-5", "title": "Vulnerability Monitoring and Scanning", "category": "Risk Assessment",
"techniques": ["T1190", "T1203"]},
{"control_id": "SC-7", "title": "Boundary Protection", "category": "System and Communications Protection",
"techniques": ["T1048", "T1041", "T1071"]},
{"control_id": "SC-28", "title": "Protection of Information at Rest", "category": "System and Communications Protection",
"techniques": ["T1005", "T1114"]},
{"control_id": "SI-3", "title": "Malicious Code Protection", "category": "System and Information Integrity",
"techniques": ["T1059", "T1204", "T1566"]},
{"control_id": "SI-4", "title": "System Monitoring", "category": "System and Information Integrity",
"techniques": ["T1059", "T1053", "T1547"]},
{"control_id": "SI-7", "title": "Software, Firmware, and Information Integrity", "category": "System and Information Integrity",
"techniques": ["T1195", "T1553"]},
{"control_id": "PM-16", "title": "Threat Awareness Program", "category": "Program Management",
"techniques": ["T1566", "T1204"]},
]
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
existing_mappings = set()
for m in db.query(ComplianceControlMapping).all():
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
controls_created = 0
mappings_created = 0
for sample in SAMPLE_CONTROLS:
# Create or get control
if sample["control_id"] in existing_controls:
control = existing_controls[sample["control_id"]]
else:
control = ComplianceControl(
framework_id=framework.id,
control_id=sample["control_id"],
title=sample["title"],
category=sample["category"],
)
db.add(control)
db.flush()
existing_controls[sample["control_id"]] = control
controls_created += 1
# Create mappings
for mitre_id in sample["techniques"]:
technique = all_techniques.get(mitre_id)
if not technique:
# Try with subtechnique prefix
for key, tech in all_techniques.items():
if key.startswith(mitre_id):
technique = tech
break
if not technique:
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
return {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": len(existing_controls) - controls_created,
"mappings_created": mappings_created,
"mappings_skipped": 0,
"total_controls": len(existing_controls),
"source": "sample_data",
}
def import_cis_controls_v8_mappings(db: Session) -> dict:
"""Import CIS Controls v8 with ATT&CK technique mappings.
Uses a curated set of CIS Controls mapped to MITRE ATT&CK techniques
based on the CIS Controls Navigator and official documentation.
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "CIS Controls v8")
.first()
)
if not framework:
framework = ComplianceFramework(
name="CIS Controls v8",
version="8",
description="Center for Internet Security Critical Security Controls Version 8 — "
"a prioritized set of 18 security safeguards organized by Implementation Groups (IG1, IG2, IG3).",
url="https://www.cisecurity.org/controls/v8",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created CIS Controls v8 framework")
else:
logger.info("CIS Controls v8 framework already exists")
# ── 2. Control definitions with ATT&CK mappings ───────────────
CIS_CONTROLS = [
{"control_id": "CIS-1", "title": "Inventory and Control of Enterprise Assets",
"category": "IG1 — Basic",
"techniques": ["T1595", "T1590", "T1018", "T1082"]},
{"control_id": "CIS-2", "title": "Inventory and Control of Software Assets",
"category": "IG1 — Basic",
"techniques": ["T1518", "T1072", "T1195"]},
{"control_id": "CIS-3", "title": "Data Protection",
"category": "IG1 — Basic",
"techniques": ["T1005", "T1114", "T1560", "T1048", "T1041"]},
{"control_id": "CIS-4", "title": "Secure Configuration of Enterprise Assets and Software",
"category": "IG1 — Basic",
"techniques": ["T1574", "T1546", "T1112", "T1543"]},
{"control_id": "CIS-5", "title": "Account Management",
"category": "IG1 — Basic",
"techniques": ["T1078", "T1136", "T1098", "T1087"]},
{"control_id": "CIS-6", "title": "Access Control Management",
"category": "IG1 — Basic",
"techniques": ["T1078", "T1548", "T1134", "T1021"]},
{"control_id": "CIS-7", "title": "Continuous Vulnerability Management",
"category": "IG2 — Foundational",
"techniques": ["T1190", "T1203", "T1068", "T1210"]},
{"control_id": "CIS-8", "title": "Audit Log Management",
"category": "IG2 — Foundational",
"techniques": ["T1562", "T1070", "T1059"]},
{"control_id": "CIS-9", "title": "Email and Web Browser Protections",
"category": "IG2 — Foundational",
"techniques": ["T1566", "T1204", "T1189", "T1598"]},
{"control_id": "CIS-10", "title": "Malware Defenses",
"category": "IG2 — Foundational",
"techniques": ["T1059", "T1204", "T1027", "T1140", "T1497"]},
{"control_id": "CIS-11", "title": "Data Recovery",
"category": "IG1 — Basic",
"techniques": ["T1486", "T1490", "T1561"]},
{"control_id": "CIS-12", "title": "Network Infrastructure Management",
"category": "IG2 — Foundational",
"techniques": ["T1557", "T1071", "T1572", "T1571"]},
{"control_id": "CIS-13", "title": "Network Monitoring and Defense",
"category": "IG2 — Foundational",
"techniques": ["T1071", "T1048", "T1041", "T1105", "T1572"]},
{"control_id": "CIS-14", "title": "Security Awareness and Skills Training",
"category": "IG1 — Basic",
"techniques": ["T1566", "T1204", "T1598"]},
{"control_id": "CIS-15", "title": "Service Provider Management",
"category": "IG2 — Foundational",
"techniques": ["T1199", "T1195"]},
{"control_id": "CIS-16", "title": "Application Software Security",
"category": "IG2 — Foundational",
"techniques": ["T1190", "T1059", "T1203"]},
{"control_id": "CIS-17", "title": "Incident Response Management",
"category": "IG2 — Foundational",
"techniques": ["T1059", "T1547", "T1053"]},
{"control_id": "CIS-18", "title": "Penetration Testing",
"category": "IG3 — Organizational",
"techniques": ["T1595", "T1046", "T1190", "T1059"]},
]
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
existing_mappings = set()
for m in (
db.query(ComplianceControlMapping)
.join(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
):
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
controls_created = 0
mappings_created = 0
for item in CIS_CONTROLS:
if item["control_id"] in existing_controls:
control = existing_controls[item["control_id"]]
else:
control = ComplianceControl(
framework_id=framework.id,
control_id=item["control_id"],
title=item["title"],
category=item["category"],
)
db.add(control)
db.flush()
existing_controls[item["control_id"]] = control
controls_created += 1
for mitre_id in item["techniques"]:
technique = all_techniques.get(mitre_id)
if not technique:
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
summary = {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": len(existing_controls) - controls_created,
"mappings_created": mappings_created,
"total_controls": len(existing_controls),
}
logger.info(f"CIS Controls v8 import complete: {summary}")
return summary
def import_dora_mappings(db: Session) -> dict:
"""Import DORA (Digital Operational Resilience Act) with ATT&CK technique mappings.
DORA (EU 2022/2554) applies to financial entities and ICT third-party providers.
Controls map the key cybersecurity articles (Chapters IIVI) to MITRE ATT&CK
techniques based on ENISA guidance and TIBER-EU threat-led testing framework.
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "DORA")
.first()
)
if not framework:
framework = ComplianceFramework(
name="DORA",
version="2022/2554",
description=(
"Digital Operational Resilience Act (Regulation EU 2022/2554) — "
"EU regulation establishing ICT risk management, incident reporting, "
"digital operational resilience testing, and ICT third-party risk "
"management requirements for financial entities."
),
url="https://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX:32022R2554",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created DORA framework")
else:
logger.info("DORA framework already exists")
# ── 2. Control definitions with ATT&CK mappings ───────────────
# Based on ENISA DORA guidelines and TIBER-EU threat intelligence framework.
# Each control maps to a DORA article and the ATT&CK techniques it addresses.
DORA_CONTROLS = [
# ─── Chapter II — ICT Risk Management ────────────────────────────
{
"control_id": "DORA-Art.5",
"title": "Governance and Organisation",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1078", "T1136", "T1098", "T1087"],
},
{
"control_id": "DORA-Art.6",
"title": "ICT Risk Management Framework",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1595", "T1590", "T1589", "T1046", "T1018", "T1082"],
},
{
"control_id": "DORA-Art.7",
"title": "ICT Systems, Protocols and Tools",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1574", "T1543", "T1112", "T1546", "T1195", "T1133"],
},
{
"control_id": "DORA-Art.8",
"title": "Identification",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1018", "T1082", "T1083", "T1087", "T1590", "T1592"],
},
{
"control_id": "DORA-Art.9",
"title": "Protection and Prevention",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1078", "T1548", "T1134", "T1190", "T1574", "T1543", "T1021"],
},
{
"control_id": "DORA-Art.10",
"title": "Detection",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1562", "T1070", "T1059", "T1053", "T1547", "T1037"],
},
{
"control_id": "DORA-Art.11",
"title": "Response and Recovery",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1486", "T1490", "T1561", "T1485", "T1048", "T1041"],
},
{
"control_id": "DORA-Art.12",
"title": "Backup Policies and Recovery Methods",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1486", "T1490", "T1561", "T1485"],
},
{
"control_id": "DORA-Art.13",
"title": "Learning and Evolving",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1566", "T1589", "T1590", "T1595", "T1598"],
},
{
"control_id": "DORA-Art.14",
"title": "Communication",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1114", "T1566", "T1102", "T1071"],
},
{
"control_id": "DORA-Art.15",
"title": "Further Harmonisation of ICT Risk Management Tools",
"category": "Chapter II — ICT Risk Management",
"techniques": ["T1078", "T1190", "T1133", "T1021", "T1199"],
},
# ─── Chapter III — ICT-related Incident Management ────────────────
{
"control_id": "DORA-Art.17",
"title": "ICT-related Incidents Classification",
"category": "Chapter III — Incident Management",
"techniques": ["T1499", "T1498", "T1486", "T1041", "T1048", "T1565"],
},
{
"control_id": "DORA-Art.18",
"title": "Major ICT-Related Incidents Reporting",
"category": "Chapter III — Incident Management",
"techniques": ["T1486", "T1041", "T1048", "T1499", "T1498"],
},
{
"control_id": "DORA-Art.19",
"title": "Harmonisation of Reporting Content and Formats",
"category": "Chapter III — Incident Management",
"techniques": ["T1566", "T1190", "T1203", "T1059"],
},
# ─── Chapter IV — Digital Operational Resilience Testing ──────────
{
"control_id": "DORA-Art.24",
"title": "General Digital Operational Resilience Testing",
"category": "Chapter IV — Resilience Testing",
"techniques": ["T1059", "T1190", "T1046", "T1595", "T1078"],
},
{
"control_id": "DORA-Art.25",
"title": "Testing of ICT Tools and Systems",
"category": "Chapter IV — Resilience Testing",
"techniques": ["T1059", "T1190", "T1046", "T1595", "T1078", "T1068", "T1210"],
},
{
"control_id": "DORA-Art.26",
"title": "Advanced Testing — Threat-Led Penetration Testing (TLPT)",
"category": "Chapter IV — Resilience Testing",
"techniques": [
"T1566", "T1204", "T1055", "T1059", "T1021", "T1078",
"T1190", "T1046", "T1548", "T1134", "T1027",
],
},
{
"control_id": "DORA-Art.27",
"title": "Requirements for Testers Carrying Out TLPT",
"category": "Chapter IV — Resilience Testing",
"techniques": ["T1595", "T1046", "T1190", "T1059", "T1078"],
},
# ─── Chapter V — ICT Third-Party Risk Management ──────────────────
{
"control_id": "DORA-Art.28",
"title": "General Principles of ICT Third-Party Risk Management",
"category": "Chapter V — Third-Party Risk",
"techniques": ["T1199", "T1195", "T1078", "T1133"],
},
{
"control_id": "DORA-Art.30",
"title": "Key Contractual Provisions for ICT Services",
"category": "Chapter V — Third-Party Risk",
"techniques": ["T1199", "T1195", "T1078"],
},
{
"control_id": "DORA-Art.42",
"title": "Oversight of Critical ICT Third-Party Providers",
"category": "Chapter V — Third-Party Risk",
"techniques": ["T1199", "T1195", "T1133", "T1078", "T1190"],
},
# ─── Chapter VI — Information Sharing ────────────────────────────
{
"control_id": "DORA-Art.45",
"title": "Arrangements for Information Sharing on Cyber Threats",
"category": "Chapter VI — Information Sharing",
"techniques": ["T1566", "T1589", "T1590", "T1595", "T1598"],
},
]
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
existing_mappings = set()
for m in (
db.query(ComplianceControlMapping)
.join(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
):
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
controls_created = 0
mappings_created = 0
for item in DORA_CONTROLS:
if item["control_id"] in existing_controls:
control = existing_controls[item["control_id"]]
else:
control = ComplianceControl(
framework_id=framework.id,
control_id=item["control_id"],
title=item["title"],
category=item["category"],
)
db.add(control)
db.flush()
existing_controls[item["control_id"]] = control
controls_created += 1
for mitre_id in item["techniques"]:
technique = all_techniques.get(mitre_id)
if not technique:
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
summary = {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": len(existing_controls) - controls_created,
"mappings_created": mappings_created,
"total_controls": len(existing_controls),
}
logger.info(f"DORA import complete: {summary}")
return summary
def _get_nist_category(family_code: str) -> str:
"""Map NIST 800-53 family code to category name."""
categories = {
"AC": "Access Control",
"AT": "Awareness and Training",
"AU": "Audit and Accountability",
"CA": "Assessment, Authorization, and Monitoring",
"CM": "Configuration Management",
"CP": "Contingency Planning",
"IA": "Identification and Authentication",
"IR": "Incident Response",
"MA": "Maintenance",
"MP": "Media Protection",
"PE": "Physical and Environmental Protection",
"PL": "Planning",
"PM": "Program Management",
"PS": "Personnel Security",
"PT": "Personally Identifiable Information Processing and Transparency",
"RA": "Risk Assessment",
"SA": "System and Services Acquisition",
"SC": "System and Communications Protection",
"SI": "System and Information Integrity",
"SR": "Supply Chain Risk Management",
}
return categories.get(family_code, "Unknown")