Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Implements the Digital Operational Resilience Act as a compliance framework
using the same pattern as CIS Controls v8 (hardcoded curated mappings,
no official STIX bundle exists for DORA).
22 controls across 5 chapters mapped to MITRE ATT&CK techniques:
Ch. II — ICT Risk Management (Art. 5–15): governance, identification,
protection, detection, response, backup, threat intel
Ch. III — Incident Management (Art. 17–19): classification, reporting
Ch. IV — Resilience Testing (Art. 24–27): general testing + TLPT
(Art. 26 explicitly based on TIBER-EU/ATT&CK threat-led testing)
Ch. V — Third-Party Risk (Art. 28, 30, 42): supply chain, trusted rels.
Ch. VI — Information Sharing (Art. 45)
Technique mappings derived from ENISA DORA guidelines and TIBER-EU framework.
Import is triggered via POST /api/v1/compliance/import/dora (admin only).
Frontend: new 'DORA' button in the Compliance page import section.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
756 lines
30 KiB
Python
756 lines
30 KiB
Python
"""Compliance import service — imports NIST 800-53 to ATT&CK mappings.
|
||
|
||
Downloads and parses the STIX bundle from the Center for Threat-Informed
|
||
Defense's attack_to_nist_mapping repository to create ComplianceFramework,
|
||
ComplianceControl, and ComplianceControlMapping records.
|
||
"""
|
||
|
||
import logging
|
||
import json
|
||
import re
|
||
from typing import Optional
|
||
|
||
import requests
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.models.compliance import (
|
||
ComplianceFramework,
|
||
ComplianceControl,
|
||
ComplianceControlMapping,
|
||
)
|
||
from app.models.technique import Technique
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# URL for the NIST 800-53 Rev 5 to ATT&CK mapping
|
||
# This is the JSON STIX bundle that contains the relationships
|
||
NIST_MAPPING_URL = (
|
||
"https://raw.githubusercontent.com/center-for-threat-informed-defense/"
|
||
"attack_to_nist_mapping/main/data/attack-to-nist-rev5.json"
|
||
)
|
||
|
||
|
||
def import_nist_800_53_mappings(db: Session) -> dict:
|
||
"""Import NIST 800-53 Rev 5 mappings from MITRE CTI repository.
|
||
|
||
Steps:
|
||
1. Create or get the NIST 800-53 Rev 5 framework
|
||
2. Download the STIX bundle JSON
|
||
3. Parse controls and relationship objects
|
||
4. Create ComplianceControl records
|
||
5. Create ComplianceControlMapping records
|
||
|
||
Returns a summary dict with counts.
|
||
"""
|
||
# ── 1. Create or get framework ────────────────────────────────
|
||
framework = (
|
||
db.query(ComplianceFramework)
|
||
.filter(ComplianceFramework.name == "NIST 800-53 Rev 5")
|
||
.first()
|
||
)
|
||
|
||
if not framework:
|
||
framework = ComplianceFramework(
|
||
name="NIST 800-53 Rev 5",
|
||
version="5",
|
||
description="National Institute of Standards and Technology Special Publication 800-53 Revision 5 — Security and Privacy Controls for Information Systems and Organizations",
|
||
url="https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final",
|
||
is_active=True,
|
||
)
|
||
db.add(framework)
|
||
db.flush()
|
||
logger.info("Created NIST 800-53 Rev 5 framework")
|
||
else:
|
||
logger.info("NIST 800-53 Rev 5 framework already exists")
|
||
|
||
# ── 2. Download STIX bundle ───────────────────────────────────
|
||
try:
|
||
response = requests.get(NIST_MAPPING_URL, timeout=30)
|
||
response.raise_for_status()
|
||
stix_bundle = response.json()
|
||
except requests.RequestException as e:
|
||
logger.warning(f"Failed to download STIX bundle: {e}")
|
||
# Fallback: create a sample set of well-known NIST controls
|
||
return _import_sample_nist_mappings(db, framework)
|
||
|
||
# ── 3. Parse STIX objects ─────────────────────────────────────
|
||
objects = stix_bundle.get("objects", [])
|
||
|
||
# Build lookup maps
|
||
# STIX IDs -> control info
|
||
control_map = {} # stix_id -> {control_id, title, category}
|
||
technique_map = {} # stix_id -> mitre_technique_id
|
||
relationships = [] # (source_ref, target_ref) for "mitigates" relationships
|
||
|
||
for obj in objects:
|
||
obj_type = obj.get("type", "")
|
||
|
||
if obj_type == "course-of-action":
|
||
# This is a NIST control
|
||
name = obj.get("name", "")
|
||
desc = obj.get("description", "")
|
||
stix_id = obj.get("id", "")
|
||
|
||
# Extract control ID from name (e.g., "AC-2 Account Management")
|
||
match = re.match(r"^([A-Z]{2}-\d+(?:\.\d+)?)\s*(.*)", name)
|
||
if match:
|
||
control_id = match.group(1)
|
||
title = match.group(2) or name
|
||
else:
|
||
control_id = name
|
||
title = name
|
||
|
||
# Extract category from control family
|
||
category_match = re.match(r"^([A-Z]{2})", control_id)
|
||
category = _get_nist_category(category_match.group(1)) if category_match else None
|
||
|
||
control_map[stix_id] = {
|
||
"control_id": control_id,
|
||
"title": title,
|
||
"description": desc[:500] if desc else None,
|
||
"category": category,
|
||
}
|
||
|
||
elif obj_type == "attack-pattern":
|
||
# This is an ATT&CK technique
|
||
stix_id = obj.get("id", "")
|
||
ext_refs = obj.get("external_references", [])
|
||
for ref in ext_refs:
|
||
if ref.get("source_name") == "mitre-attack":
|
||
technique_map[stix_id] = ref.get("external_id", "")
|
||
break
|
||
|
||
elif obj_type == "relationship":
|
||
rel_type = obj.get("relationship_type", "")
|
||
if rel_type == "mitigates":
|
||
source_ref = obj.get("source_ref", "")
|
||
target_ref = obj.get("target_ref", "")
|
||
relationships.append((source_ref, target_ref))
|
||
|
||
# ── 4. Create controls ────────────────────────────────────────
|
||
controls_created = 0
|
||
controls_existing = 0
|
||
control_db_map = {} # control_id -> ComplianceControl
|
||
|
||
# Load existing controls for this framework
|
||
existing_controls = {
|
||
c.control_id: c
|
||
for c in db.query(ComplianceControl)
|
||
.filter(ComplianceControl.framework_id == framework.id)
|
||
.all()
|
||
}
|
||
|
||
for stix_id, info in control_map.items():
|
||
cid = info["control_id"]
|
||
if cid in existing_controls:
|
||
control_db_map[stix_id] = existing_controls[cid]
|
||
controls_existing += 1
|
||
else:
|
||
ctrl = ComplianceControl(
|
||
framework_id=framework.id,
|
||
control_id=cid,
|
||
title=info["title"],
|
||
description=info["description"],
|
||
category=info["category"],
|
||
)
|
||
db.add(ctrl)
|
||
db.flush()
|
||
control_db_map[stix_id] = ctrl
|
||
controls_created += 1
|
||
|
||
# ── 5. Create mappings ────────────────────────────────────────
|
||
mappings_created = 0
|
||
mappings_skipped = 0
|
||
|
||
# Build technique DB lookup (mitre_id -> Technique)
|
||
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
|
||
|
||
# Load existing mappings
|
||
existing_mappings = set()
|
||
for m in db.query(ComplianceControlMapping).all():
|
||
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
|
||
|
||
for source_ref, target_ref in relationships:
|
||
control = control_db_map.get(source_ref)
|
||
mitre_id = technique_map.get(target_ref)
|
||
|
||
if not control or not mitre_id:
|
||
mappings_skipped += 1
|
||
continue
|
||
|
||
technique = all_techniques.get(mitre_id)
|
||
if not technique:
|
||
mappings_skipped += 1
|
||
continue
|
||
|
||
key = (str(control.id), str(technique.id))
|
||
if key in existing_mappings:
|
||
mappings_skipped += 1
|
||
continue
|
||
|
||
mapping = ComplianceControlMapping(
|
||
compliance_control_id=control.id,
|
||
technique_id=technique.id,
|
||
)
|
||
db.add(mapping)
|
||
existing_mappings.add(key)
|
||
mappings_created += 1
|
||
|
||
db.commit()
|
||
|
||
summary = {
|
||
"framework": framework.name,
|
||
"controls_created": controls_created,
|
||
"controls_existing": controls_existing,
|
||
"mappings_created": mappings_created,
|
||
"mappings_skipped": mappings_skipped,
|
||
"total_controls": controls_created + controls_existing,
|
||
"total_relationships_found": len(relationships),
|
||
}
|
||
logger.info(f"NIST 800-53 import complete: {summary}")
|
||
return summary
|
||
|
||
|
||
def _import_sample_nist_mappings(db: Session, framework: ComplianceFramework) -> dict:
|
||
"""Import a curated sample of NIST 800-53 controls when the download fails.
|
||
|
||
This ensures the feature works even without network access.
|
||
"""
|
||
SAMPLE_CONTROLS = [
|
||
{"control_id": "AC-2", "title": "Account Management", "category": "Access Control",
|
||
"techniques": ["T1078", "T1136", "T1098", "T1087", "T1069"]},
|
||
{"control_id": "AC-3", "title": "Access Enforcement", "category": "Access Control",
|
||
"techniques": ["T1078", "T1548", "T1134"]},
|
||
{"control_id": "AC-4", "title": "Information Flow Enforcement", "category": "Access Control",
|
||
"techniques": ["T1048", "T1041", "T1572"]},
|
||
{"control_id": "AC-6", "title": "Least Privilege", "category": "Access Control",
|
||
"techniques": ["T1078", "T1548", "T1134"]},
|
||
{"control_id": "AU-2", "title": "Event Logging", "category": "Audit and Accountability",
|
||
"techniques": ["T1562", "T1070"]},
|
||
{"control_id": "AU-6", "title": "Audit Record Review", "category": "Audit and Accountability",
|
||
"techniques": ["T1562", "T1070", "T1027"]},
|
||
{"control_id": "CA-7", "title": "Continuous Monitoring", "category": "Assessment, Authorization, and Monitoring",
|
||
"techniques": ["T1059", "T1053"]},
|
||
{"control_id": "CM-2", "title": "Baseline Configuration", "category": "Configuration Management",
|
||
"techniques": ["T1574", "T1546"]},
|
||
{"control_id": "CM-6", "title": "Configuration Settings", "category": "Configuration Management",
|
||
"techniques": ["T1574", "T1546", "T1112"]},
|
||
{"control_id": "CM-7", "title": "Least Functionality", "category": "Configuration Management",
|
||
"techniques": ["T1059", "T1218"]},
|
||
{"control_id": "IA-2", "title": "Identification and Authentication", "category": "Identification and Authentication",
|
||
"techniques": ["T1078", "T1110"]},
|
||
{"control_id": "IA-5", "title": "Authenticator Management", "category": "Identification and Authentication",
|
||
"techniques": ["T1078", "T1110", "T1003"]},
|
||
{"control_id": "IR-4", "title": "Incident Handling", "category": "Incident Response",
|
||
"techniques": ["T1059", "T1547"]},
|
||
{"control_id": "RA-5", "title": "Vulnerability Monitoring and Scanning", "category": "Risk Assessment",
|
||
"techniques": ["T1190", "T1203"]},
|
||
{"control_id": "SC-7", "title": "Boundary Protection", "category": "System and Communications Protection",
|
||
"techniques": ["T1048", "T1041", "T1071"]},
|
||
{"control_id": "SC-28", "title": "Protection of Information at Rest", "category": "System and Communications Protection",
|
||
"techniques": ["T1005", "T1114"]},
|
||
{"control_id": "SI-3", "title": "Malicious Code Protection", "category": "System and Information Integrity",
|
||
"techniques": ["T1059", "T1204", "T1566"]},
|
||
{"control_id": "SI-4", "title": "System Monitoring", "category": "System and Information Integrity",
|
||
"techniques": ["T1059", "T1053", "T1547"]},
|
||
{"control_id": "SI-7", "title": "Software, Firmware, and Information Integrity", "category": "System and Information Integrity",
|
||
"techniques": ["T1195", "T1553"]},
|
||
{"control_id": "PM-16", "title": "Threat Awareness Program", "category": "Program Management",
|
||
"techniques": ["T1566", "T1204"]},
|
||
]
|
||
|
||
# Build technique lookup
|
||
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
|
||
|
||
existing_controls = {
|
||
c.control_id: c
|
||
for c in db.query(ComplianceControl)
|
||
.filter(ComplianceControl.framework_id == framework.id)
|
||
.all()
|
||
}
|
||
|
||
existing_mappings = set()
|
||
for m in db.query(ComplianceControlMapping).all():
|
||
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
|
||
|
||
controls_created = 0
|
||
mappings_created = 0
|
||
|
||
for sample in SAMPLE_CONTROLS:
|
||
# Create or get control
|
||
if sample["control_id"] in existing_controls:
|
||
control = existing_controls[sample["control_id"]]
|
||
else:
|
||
control = ComplianceControl(
|
||
framework_id=framework.id,
|
||
control_id=sample["control_id"],
|
||
title=sample["title"],
|
||
category=sample["category"],
|
||
)
|
||
db.add(control)
|
||
db.flush()
|
||
existing_controls[sample["control_id"]] = control
|
||
controls_created += 1
|
||
|
||
# Create mappings
|
||
for mitre_id in sample["techniques"]:
|
||
technique = all_techniques.get(mitre_id)
|
||
if not technique:
|
||
# Try with subtechnique prefix
|
||
for key, tech in all_techniques.items():
|
||
if key.startswith(mitre_id):
|
||
technique = tech
|
||
break
|
||
if not technique:
|
||
continue
|
||
|
||
key = (str(control.id), str(technique.id))
|
||
if key in existing_mappings:
|
||
continue
|
||
|
||
mapping = ComplianceControlMapping(
|
||
compliance_control_id=control.id,
|
||
technique_id=technique.id,
|
||
)
|
||
db.add(mapping)
|
||
existing_mappings.add(key)
|
||
mappings_created += 1
|
||
|
||
db.commit()
|
||
|
||
return {
|
||
"framework": framework.name,
|
||
"controls_created": controls_created,
|
||
"controls_existing": len(existing_controls) - controls_created,
|
||
"mappings_created": mappings_created,
|
||
"mappings_skipped": 0,
|
||
"total_controls": len(existing_controls),
|
||
"source": "sample_data",
|
||
}
|
||
|
||
|
||
def import_cis_controls_v8_mappings(db: Session) -> dict:
|
||
"""Import CIS Controls v8 with ATT&CK technique mappings.
|
||
|
||
Uses a curated set of CIS Controls mapped to MITRE ATT&CK techniques
|
||
based on the CIS Controls Navigator and official documentation.
|
||
|
||
Returns a summary dict with counts.
|
||
"""
|
||
# ── 1. Create or get framework ────────────────────────────────
|
||
framework = (
|
||
db.query(ComplianceFramework)
|
||
.filter(ComplianceFramework.name == "CIS Controls v8")
|
||
.first()
|
||
)
|
||
|
||
if not framework:
|
||
framework = ComplianceFramework(
|
||
name="CIS Controls v8",
|
||
version="8",
|
||
description="Center for Internet Security Critical Security Controls Version 8 — "
|
||
"a prioritized set of 18 security safeguards organized by Implementation Groups (IG1, IG2, IG3).",
|
||
url="https://www.cisecurity.org/controls/v8",
|
||
is_active=True,
|
||
)
|
||
db.add(framework)
|
||
db.flush()
|
||
logger.info("Created CIS Controls v8 framework")
|
||
else:
|
||
logger.info("CIS Controls v8 framework already exists")
|
||
|
||
# ── 2. Control definitions with ATT&CK mappings ───────────────
|
||
CIS_CONTROLS = [
|
||
{"control_id": "CIS-1", "title": "Inventory and Control of Enterprise Assets",
|
||
"category": "IG1 — Basic",
|
||
"techniques": ["T1595", "T1590", "T1018", "T1082"]},
|
||
{"control_id": "CIS-2", "title": "Inventory and Control of Software Assets",
|
||
"category": "IG1 — Basic",
|
||
"techniques": ["T1518", "T1072", "T1195"]},
|
||
{"control_id": "CIS-3", "title": "Data Protection",
|
||
"category": "IG1 — Basic",
|
||
"techniques": ["T1005", "T1114", "T1560", "T1048", "T1041"]},
|
||
{"control_id": "CIS-4", "title": "Secure Configuration of Enterprise Assets and Software",
|
||
"category": "IG1 — Basic",
|
||
"techniques": ["T1574", "T1546", "T1112", "T1543"]},
|
||
{"control_id": "CIS-5", "title": "Account Management",
|
||
"category": "IG1 — Basic",
|
||
"techniques": ["T1078", "T1136", "T1098", "T1087"]},
|
||
{"control_id": "CIS-6", "title": "Access Control Management",
|
||
"category": "IG1 — Basic",
|
||
"techniques": ["T1078", "T1548", "T1134", "T1021"]},
|
||
{"control_id": "CIS-7", "title": "Continuous Vulnerability Management",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1190", "T1203", "T1068", "T1210"]},
|
||
{"control_id": "CIS-8", "title": "Audit Log Management",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1562", "T1070", "T1059"]},
|
||
{"control_id": "CIS-9", "title": "Email and Web Browser Protections",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1566", "T1204", "T1189", "T1598"]},
|
||
{"control_id": "CIS-10", "title": "Malware Defenses",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1059", "T1204", "T1027", "T1140", "T1497"]},
|
||
{"control_id": "CIS-11", "title": "Data Recovery",
|
||
"category": "IG1 — Basic",
|
||
"techniques": ["T1486", "T1490", "T1561"]},
|
||
{"control_id": "CIS-12", "title": "Network Infrastructure Management",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1557", "T1071", "T1572", "T1571"]},
|
||
{"control_id": "CIS-13", "title": "Network Monitoring and Defense",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1071", "T1048", "T1041", "T1105", "T1572"]},
|
||
{"control_id": "CIS-14", "title": "Security Awareness and Skills Training",
|
||
"category": "IG1 — Basic",
|
||
"techniques": ["T1566", "T1204", "T1598"]},
|
||
{"control_id": "CIS-15", "title": "Service Provider Management",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1199", "T1195"]},
|
||
{"control_id": "CIS-16", "title": "Application Software Security",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1190", "T1059", "T1203"]},
|
||
{"control_id": "CIS-17", "title": "Incident Response Management",
|
||
"category": "IG2 — Foundational",
|
||
"techniques": ["T1059", "T1547", "T1053"]},
|
||
{"control_id": "CIS-18", "title": "Penetration Testing",
|
||
"category": "IG3 — Organizational",
|
||
"techniques": ["T1595", "T1046", "T1190", "T1059"]},
|
||
]
|
||
|
||
# Build technique lookup
|
||
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
|
||
|
||
existing_controls = {
|
||
c.control_id: c
|
||
for c in db.query(ComplianceControl)
|
||
.filter(ComplianceControl.framework_id == framework.id)
|
||
.all()
|
||
}
|
||
|
||
existing_mappings = set()
|
||
for m in (
|
||
db.query(ComplianceControlMapping)
|
||
.join(ComplianceControl)
|
||
.filter(ComplianceControl.framework_id == framework.id)
|
||
.all()
|
||
):
|
||
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
|
||
|
||
controls_created = 0
|
||
mappings_created = 0
|
||
|
||
for item in CIS_CONTROLS:
|
||
if item["control_id"] in existing_controls:
|
||
control = existing_controls[item["control_id"]]
|
||
else:
|
||
control = ComplianceControl(
|
||
framework_id=framework.id,
|
||
control_id=item["control_id"],
|
||
title=item["title"],
|
||
category=item["category"],
|
||
)
|
||
db.add(control)
|
||
db.flush()
|
||
existing_controls[item["control_id"]] = control
|
||
controls_created += 1
|
||
|
||
for mitre_id in item["techniques"]:
|
||
technique = all_techniques.get(mitre_id)
|
||
if not technique:
|
||
continue
|
||
key = (str(control.id), str(technique.id))
|
||
if key in existing_mappings:
|
||
continue
|
||
mapping = ComplianceControlMapping(
|
||
compliance_control_id=control.id,
|
||
technique_id=technique.id,
|
||
)
|
||
db.add(mapping)
|
||
existing_mappings.add(key)
|
||
mappings_created += 1
|
||
|
||
db.commit()
|
||
|
||
summary = {
|
||
"framework": framework.name,
|
||
"controls_created": controls_created,
|
||
"controls_existing": len(existing_controls) - controls_created,
|
||
"mappings_created": mappings_created,
|
||
"total_controls": len(existing_controls),
|
||
}
|
||
logger.info(f"CIS Controls v8 import complete: {summary}")
|
||
return summary
|
||
|
||
|
||
def import_dora_mappings(db: Session) -> dict:
|
||
"""Import DORA (Digital Operational Resilience Act) with ATT&CK technique mappings.
|
||
|
||
DORA (EU 2022/2554) applies to financial entities and ICT third-party providers.
|
||
Controls map the key cybersecurity articles (Chapters II–VI) to MITRE ATT&CK
|
||
techniques based on ENISA guidance and TIBER-EU threat-led testing framework.
|
||
|
||
Returns a summary dict with counts.
|
||
"""
|
||
# ── 1. Create or get framework ────────────────────────────────
|
||
framework = (
|
||
db.query(ComplianceFramework)
|
||
.filter(ComplianceFramework.name == "DORA")
|
||
.first()
|
||
)
|
||
|
||
if not framework:
|
||
framework = ComplianceFramework(
|
||
name="DORA",
|
||
version="2022/2554",
|
||
description=(
|
||
"Digital Operational Resilience Act (Regulation EU 2022/2554) — "
|
||
"EU regulation establishing ICT risk management, incident reporting, "
|
||
"digital operational resilience testing, and ICT third-party risk "
|
||
"management requirements for financial entities."
|
||
),
|
||
url="https://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX:32022R2554",
|
||
is_active=True,
|
||
)
|
||
db.add(framework)
|
||
db.flush()
|
||
logger.info("Created DORA framework")
|
||
else:
|
||
logger.info("DORA framework already exists")
|
||
|
||
# ── 2. Control definitions with ATT&CK mappings ───────────────
|
||
# Based on ENISA DORA guidelines and TIBER-EU threat intelligence framework.
|
||
# Each control maps to a DORA article and the ATT&CK techniques it addresses.
|
||
DORA_CONTROLS = [
|
||
# ─── Chapter II — ICT Risk Management ────────────────────────────
|
||
{
|
||
"control_id": "DORA-Art.5",
|
||
"title": "Governance and Organisation",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1078", "T1136", "T1098", "T1087"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.6",
|
||
"title": "ICT Risk Management Framework",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1595", "T1590", "T1589", "T1046", "T1018", "T1082"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.7",
|
||
"title": "ICT Systems, Protocols and Tools",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1574", "T1543", "T1112", "T1546", "T1195", "T1133"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.8",
|
||
"title": "Identification",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1018", "T1082", "T1083", "T1087", "T1590", "T1592"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.9",
|
||
"title": "Protection and Prevention",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1078", "T1548", "T1134", "T1190", "T1574", "T1543", "T1021"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.10",
|
||
"title": "Detection",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1562", "T1070", "T1059", "T1053", "T1547", "T1037"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.11",
|
||
"title": "Response and Recovery",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1486", "T1490", "T1561", "T1485", "T1048", "T1041"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.12",
|
||
"title": "Backup Policies and Recovery Methods",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1486", "T1490", "T1561", "T1485"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.13",
|
||
"title": "Learning and Evolving",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1566", "T1589", "T1590", "T1595", "T1598"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.14",
|
||
"title": "Communication",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1114", "T1566", "T1102", "T1071"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.15",
|
||
"title": "Further Harmonisation of ICT Risk Management Tools",
|
||
"category": "Chapter II — ICT Risk Management",
|
||
"techniques": ["T1078", "T1190", "T1133", "T1021", "T1199"],
|
||
},
|
||
# ─── Chapter III — ICT-related Incident Management ────────────────
|
||
{
|
||
"control_id": "DORA-Art.17",
|
||
"title": "ICT-related Incidents Classification",
|
||
"category": "Chapter III — Incident Management",
|
||
"techniques": ["T1499", "T1498", "T1486", "T1041", "T1048", "T1565"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.18",
|
||
"title": "Major ICT-Related Incidents Reporting",
|
||
"category": "Chapter III — Incident Management",
|
||
"techniques": ["T1486", "T1041", "T1048", "T1499", "T1498"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.19",
|
||
"title": "Harmonisation of Reporting Content and Formats",
|
||
"category": "Chapter III — Incident Management",
|
||
"techniques": ["T1566", "T1190", "T1203", "T1059"],
|
||
},
|
||
# ─── Chapter IV — Digital Operational Resilience Testing ──────────
|
||
{
|
||
"control_id": "DORA-Art.24",
|
||
"title": "General Digital Operational Resilience Testing",
|
||
"category": "Chapter IV — Resilience Testing",
|
||
"techniques": ["T1059", "T1190", "T1046", "T1595", "T1078"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.25",
|
||
"title": "Testing of ICT Tools and Systems",
|
||
"category": "Chapter IV — Resilience Testing",
|
||
"techniques": ["T1059", "T1190", "T1046", "T1595", "T1078", "T1068", "T1210"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.26",
|
||
"title": "Advanced Testing — Threat-Led Penetration Testing (TLPT)",
|
||
"category": "Chapter IV — Resilience Testing",
|
||
"techniques": [
|
||
"T1566", "T1204", "T1055", "T1059", "T1021", "T1078",
|
||
"T1190", "T1046", "T1548", "T1134", "T1027",
|
||
],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.27",
|
||
"title": "Requirements for Testers Carrying Out TLPT",
|
||
"category": "Chapter IV — Resilience Testing",
|
||
"techniques": ["T1595", "T1046", "T1190", "T1059", "T1078"],
|
||
},
|
||
# ─── Chapter V — ICT Third-Party Risk Management ──────────────────
|
||
{
|
||
"control_id": "DORA-Art.28",
|
||
"title": "General Principles of ICT Third-Party Risk Management",
|
||
"category": "Chapter V — Third-Party Risk",
|
||
"techniques": ["T1199", "T1195", "T1078", "T1133"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.30",
|
||
"title": "Key Contractual Provisions for ICT Services",
|
||
"category": "Chapter V — Third-Party Risk",
|
||
"techniques": ["T1199", "T1195", "T1078"],
|
||
},
|
||
{
|
||
"control_id": "DORA-Art.42",
|
||
"title": "Oversight of Critical ICT Third-Party Providers",
|
||
"category": "Chapter V — Third-Party Risk",
|
||
"techniques": ["T1199", "T1195", "T1133", "T1078", "T1190"],
|
||
},
|
||
# ─── Chapter VI — Information Sharing ────────────────────────────
|
||
{
|
||
"control_id": "DORA-Art.45",
|
||
"title": "Arrangements for Information Sharing on Cyber Threats",
|
||
"category": "Chapter VI — Information Sharing",
|
||
"techniques": ["T1566", "T1589", "T1590", "T1595", "T1598"],
|
||
},
|
||
]
|
||
|
||
# Build technique lookup
|
||
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
|
||
|
||
existing_controls = {
|
||
c.control_id: c
|
||
for c in db.query(ComplianceControl)
|
||
.filter(ComplianceControl.framework_id == framework.id)
|
||
.all()
|
||
}
|
||
|
||
existing_mappings = set()
|
||
for m in (
|
||
db.query(ComplianceControlMapping)
|
||
.join(ComplianceControl)
|
||
.filter(ComplianceControl.framework_id == framework.id)
|
||
.all()
|
||
):
|
||
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
|
||
|
||
controls_created = 0
|
||
mappings_created = 0
|
||
|
||
for item in DORA_CONTROLS:
|
||
if item["control_id"] in existing_controls:
|
||
control = existing_controls[item["control_id"]]
|
||
else:
|
||
control = ComplianceControl(
|
||
framework_id=framework.id,
|
||
control_id=item["control_id"],
|
||
title=item["title"],
|
||
category=item["category"],
|
||
)
|
||
db.add(control)
|
||
db.flush()
|
||
existing_controls[item["control_id"]] = control
|
||
controls_created += 1
|
||
|
||
for mitre_id in item["techniques"]:
|
||
technique = all_techniques.get(mitre_id)
|
||
if not technique:
|
||
continue
|
||
key = (str(control.id), str(technique.id))
|
||
if key in existing_mappings:
|
||
continue
|
||
mapping = ComplianceControlMapping(
|
||
compliance_control_id=control.id,
|
||
technique_id=technique.id,
|
||
)
|
||
db.add(mapping)
|
||
existing_mappings.add(key)
|
||
mappings_created += 1
|
||
|
||
db.commit()
|
||
|
||
summary = {
|
||
"framework": framework.name,
|
||
"controls_created": controls_created,
|
||
"controls_existing": len(existing_controls) - controls_created,
|
||
"mappings_created": mappings_created,
|
||
"total_controls": len(existing_controls),
|
||
}
|
||
logger.info(f"DORA import complete: {summary}")
|
||
return summary
|
||
|
||
|
||
def _get_nist_category(family_code: str) -> str:
|
||
"""Map NIST 800-53 family code to category name."""
|
||
categories = {
|
||
"AC": "Access Control",
|
||
"AT": "Awareness and Training",
|
||
"AU": "Audit and Accountability",
|
||
"CA": "Assessment, Authorization, and Monitoring",
|
||
"CM": "Configuration Management",
|
||
"CP": "Contingency Planning",
|
||
"IA": "Identification and Authentication",
|
||
"IR": "Incident Response",
|
||
"MA": "Maintenance",
|
||
"MP": "Media Protection",
|
||
"PE": "Physical and Environmental Protection",
|
||
"PL": "Planning",
|
||
"PM": "Program Management",
|
||
"PS": "Personnel Security",
|
||
"PT": "Personally Identifiable Information Processing and Transparency",
|
||
"RA": "Risk Assessment",
|
||
"SA": "System and Services Acquisition",
|
||
"SC": "System and Communications Protection",
|
||
"SI": "System and Information Integrity",
|
||
"SR": "Supply Chain Risk Management",
|
||
}
|
||
return categories.get(family_code, "Unknown")
|