Files
Aegis/backend/app/services/d3fend_import_service.py
Kitos a3f83c316a fix: D3FEND ontology-based import, template management UX, and branding
- Rewrite D3FEND import to use tactic-level APIs for reliable technique
  fetching with proper ontology IRIs, descriptions, and tactic assignments
- Fix D3FEND technique URLs to use canonical IRI (no more 404s)
- All 255 D3FEND techniques now have descriptions from the official API
- Change Deactivate button color to red in template management table
- Add custom Aegis logo and favicon replacing default Vite assets
- Remove unused old API parsing code and clean up fallback list
2026-02-10 15:53:24 +01:00

456 lines
29 KiB
Python

"""D3FEND import service — fetches MITRE D3FEND data and creates
DefensiveTechnique records plus ATT&CK → D3FEND mappings.
Uses the D3FEND public API:
- https://d3fend.mitre.org/api/technique/api-all.json (all defensive techniques)
- https://d3fend.mitre.org/api/offensive-technique/{attack_id}.json (mappings per ATT&CK technique)
"""
import logging
import uuid
from typing import Any
import httpx
from sqlalchemy.orm import Session
from app.models.technique import Technique
from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqueMapping
logger = logging.getLogger(__name__)
D3FEND_TACTIC_URL = "https://d3fend.mitre.org/api/tactic/d3f:{tactic}.json"
D3FEND_MAPPING_URL = "https://d3fend.mitre.org/api/offensive-technique/{attack_id}.json"
D3FEND_BASE_URL = "https://d3fend.mitre.org/technique/d3f:{iri}"
D3FEND_TACTICS = ["Detect", "Harden", "Isolate", "Deceive", "Evict", "Model"]
# ── Import all D3FEND techniques ─────────────────────────────────────
def _to_str(v: Any) -> str:
"""Coerce an RDF value (str, dict with @value, or list) to a plain string."""
if isinstance(v, dict):
return v.get("@value", str(v))
if isinstance(v, list):
return "; ".join(_to_str(x) for x in v)
return str(v) if v else ""
def _fetch_techniques_from_tactic_apis() -> list[dict[str, Any]]:
"""Fetch all defensive techniques via D3FEND tactic APIs.
Uses ``/api/tactic/d3f:{tactic}.json`` which is reliable and returns
full metadata including the ontology IRI for each technique.
"""
all_techniques: list[dict[str, Any]] = []
seen: set[str] = set()
with httpx.Client(timeout=60.0) as client:
for tactic in D3FEND_TACTICS:
url = D3FEND_TACTIC_URL.format(tactic=tactic)
try:
resp = client.get(url)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning("Failed to fetch D3FEND tactic %s: %s", tactic, e)
continue
graph = data.get("techniques", {}).get("@graph", [])
for node in graph:
nid = node.get("@id", "")
d3id = _to_str(node.get("d3f:d3fend-id", ""))
label = _to_str(node.get("rdfs:label", ""))
defn = _to_str(node.get("d3f:definition", ""))
if not defn:
defn = _to_str(node.get("rdfs:comment", ""))
iri = nid.replace("d3f:", "") if nid.startswith("d3f:") else nid
if d3id and label and d3id not in seen:
seen.add(d3id)
all_techniques.append({
"d3fend_id": d3id,
"iri": iri,
"name": label,
"description": defn[:500] if defn else None,
"tactic": tactic,
})
logger.info("D3FEND tactic %s: %d techniques", tactic, len(graph))
return all_techniques
def _upsert_techniques(db: Session, techniques: list[dict[str, Any]]) -> dict[str, int]:
"""Upsert a list of technique dicts into the DefensiveTechnique table."""
created = 0
updated = 0
for tech_data in techniques:
existing = (
db.query(DefensiveTechnique)
.filter(DefensiveTechnique.d3fend_id == tech_data["d3fend_id"])
.first()
)
iri = tech_data.get("iri") or tech_data["name"].replace(" ", "")
d3fend_url = D3FEND_BASE_URL.format(iri=iri)
if existing:
existing.name = tech_data["name"]
existing.description = tech_data.get("description")
existing.tactic = tech_data.get("tactic")
existing.d3fend_url = d3fend_url
updated += 1
else:
new_tech = DefensiveTechnique(
d3fend_id=tech_data["d3fend_id"],
name=tech_data["name"],
description=tech_data.get("description"),
tactic=tech_data.get("tactic"),
d3fend_url=d3fend_url,
)
db.add(new_tech)
created += 1
db.commit()
total = db.query(DefensiveTechnique).count()
return {"created": created, "updated": updated, "total": total}
def import_d3fend_techniques(db: Session) -> dict[str, int]:
"""Fetch all D3FEND defensive techniques and upsert into DB.
Uses the tactic-level APIs which are reliable and provide full metadata
including ontology IRIs for correct URL generation.
Returns a dict with counts: {created, updated, total}.
"""
logger.info("Fetching D3FEND techniques from tactic APIs")
try:
techniques = _fetch_techniques_from_tactic_apis()
except Exception as e:
logger.error("Failed to fetch D3FEND techniques from tactic APIs: %s", e)
techniques = []
if len(techniques) >= 50:
logger.info("Fetched %d D3FEND techniques from tactic APIs", len(techniques))
result = _upsert_techniques(db, techniques)
logger.info("D3FEND import done: %d created, %d updated, %d total",
result["created"], result["updated"], result["total"])
return result
# Fallback: use a curated list of well-known D3FEND techniques
logger.warning("Tactic APIs returned too few techniques (%d), using fallback", len(techniques))
return _import_d3fend_fallback(db)
# ── Fallback curated D3FEND techniques ───────────────────────────────
_FALLBACK_TECHNIQUES: list[dict[str, str | None]] = [
# ── Detect ────────────────────────────────────────────────────────
{"d3fend_id": "D3-DA", "iri": "DynamicAnalysis", "name": "Dynamic Analysis", "tactic": "Detect", "description": "Executing or opening a file in a synthetic sandbox environment to determine if the file is a malicious program."},
{"d3fend_id": "D3-DQSA", "iri": "DatabaseQueryStringAnalysis", "name": "Database Query String Analysis", "tactic": "Detect", "description": "Analyzing database queries to detect SQL Injection."},
{"d3fend_id": "D3-FA", "iri": "FileAnalysis", "name": "File Analysis", "tactic": "Detect", "description": "Analytic process to determine a file's status: virus, trojan, benign, malicious, trusted, unauthorized, etc."},
{"d3fend_id": "D3-FCA", "iri": "FileCreationAnalysis", "name": "File Creation Analysis", "tactic": "Detect", "description": "Analyzing the properties of file create system call invocations."},
{"d3fend_id": "D3-ID", "iri": "IdentifierAnalysis", "name": "Identifier Analysis", "tactic": "Detect", "description": "Analyzing identifier artifacts such as IP address, domain names, or URLs."},
{"d3fend_id": "D3-NTA", "iri": "NetworkTrafficAnalysis", "name": "Network Traffic Analysis", "tactic": "Detect", "description": "Analyzing intercepted or summarized computer network traffic to detect unauthorized activity."},
{"d3fend_id": "D3-NTF", "iri": "NetworkTrafficFiltering", "name": "Network Traffic Filtering", "tactic": "Isolate", "description": "Restricting network traffic originating from any location."},
{"d3fend_id": "D3-PA", "iri": "ProcessAnalysis", "name": "Process Analysis", "tactic": "Detect", "description": "Observing a running application process and analyzing it to watch for certain behaviors or conditions indicating adversary activity."},
{"d3fend_id": "D3-PMAD", "iri": "ProtocolMetadataAnomalyDetection", "name": "Protocol Metadata Anomaly Detection", "tactic": "Detect", "description": "Collecting network communication protocol metadata and identifying statistical outliers."},
{"d3fend_id": "D3-PSA", "iri": "ProcessSpawnAnalysis", "name": "Process Spawn Analysis", "tactic": "Detect", "description": "Analyzing spawn arguments or attributes of a process to detect unauthorized processes."},
{"d3fend_id": "D3-PLA", "iri": "ProcessLineageAnalysis", "name": "Process Lineage Analysis", "tactic": "Detect", "description": "Identification of suspicious processes by examining the ancestry and siblings of a process."},
{"d3fend_id": "D3-PT", "iri": "ProcessTermination", "name": "Process Termination", "tactic": "Evict", "description": "Terminating a running application process on a computer system."},
{"d3fend_id": "D3-RTSD", "iri": "RemoteTerminalSessionDetection", "name": "Remote Terminal Session Detection", "tactic": "Detect", "description": "Detection of an unauthorized remote live terminal console session."},
{"d3fend_id": "D3-SCA", "iri": "SystemCallAnalysis", "name": "System Call Analysis", "tactic": "Detect", "description": "Analyzing system calls to determine whether a process is exhibiting unauthorized behavior."},
{"d3fend_id": "D3-SEA", "iri": "ScriptExecutionAnalysis", "name": "Script Execution Analysis", "tactic": "Detect", "description": "Analyzing the execution of a script to detect unauthorized user activity."},
{"d3fend_id": "D3-FH", "iri": "FileHashing", "name": "File Hashing", "tactic": "Detect", "description": "Employing file hash comparisons to detect known malware."},
{"d3fend_id": "D3-FIM", "iri": "FileIntegrityMonitoring", "name": "File Integrity Monitoring", "tactic": "Detect", "description": "Detecting any suspicious changes to files in a computer system."},
{"d3fend_id": "D3-UA", "iri": "URLAnalysis", "name": "URL Analysis", "tactic": "Detect", "description": "Determining if a URL is benign or malicious by analyzing the URL or its components."},
{"d3fend_id": "D3-UBA", "iri": "UserBehaviorAnalysis", "name": "User Behavior Analysis", "tactic": "Detect", "description": "Detecting insider threats, targeted attacks, and financial fraud through patterns of human behavior."},
{"d3fend_id": "D3-UGLPA", "iri": "UserGeolocationLogonPatternAnalysis", "name": "User Geolocation Logon Pattern Analysis", "tactic": "Detect", "description": "Monitoring geolocation data of user logon attempts to identify anomalies."},
{"d3fend_id": "D3-FAPA", "iri": "FileAccessPatternAnalysis", "name": "File Access Pattern Analysis", "tactic": "Detect", "description": "Analyzing the files accessed by a process to identify unauthorized activity."},
{"d3fend_id": "D3-FCOA", "iri": "FileContentAnalysis", "name": "File Content Analysis", "tactic": "Detect", "description": "Employing a pattern matching algorithm to statically analyze the content of files."},
{"d3fend_id": "D3-MA", "iri": "MessageAnalysis", "name": "Message Analysis", "tactic": "Detect", "description": "Analyzing email or instant message content to detect unauthorized activity."},
{"d3fend_id": "D3-DNSTA", "iri": "DNSTrafficAnalysis", "name": "DNS Traffic Analysis", "tactic": "Detect", "description": "Analysis of domain name metadata to determine whether the domain is likely to resolve to an undesirable host."},
{"d3fend_id": "D3-PM", "iri": "PlatformMonitoring", "name": "Platform Monitoring", "tactic": "Detect", "description": "Monitoring platform components such as operating systems software, hardware devices, or firmware."},
{"d3fend_id": "D3-SJA", "iri": "ScheduledJobAnalysis", "name": "Scheduled Job Analysis", "tactic": "Detect", "description": "Analysis of scheduled jobs to detect unauthorized use of job scheduling."},
{"d3fend_id": "D3-EF", "iri": "EmailFiltering", "name": "Email Filtering", "tactic": "Isolate", "description": "Filtering incoming email traffic based on specific criteria."},
# ── Harden ────────────────────────────────────────────────────────
{"d3fend_id": "D3-AH", "iri": "ApplicationHardening", "name": "Application Hardening", "tactic": "Harden", "description": "Making an executable application more resilient to a class of exploits."},
{"d3fend_id": "D3-CH", "iri": "CredentialHardening", "name": "Credential Hardening", "tactic": "Harden", "description": "Modifying system or network properties to protect credentials."},
{"d3fend_id": "D3-CRO", "iri": "CredentialRotation", "name": "Credential Rotation", "tactic": "Harden", "description": "Regularly changing authentication credentials to minimize risk of unauthorized access."},
{"d3fend_id": "D3-DENCR", "iri": "DiskEncryption", "name": "Disk Encryption", "tactic": "Harden", "description": "Encrypting a hard disk partition to prevent cleartext access to a file system."},
{"d3fend_id": "D3-FE", "iri": "FileEncryption", "name": "File Encryption", "tactic": "Harden", "description": "Encrypting a file using a cryptographic key."},
{"d3fend_id": "D3-MFA", "iri": "Multi-factorAuthentication", "name": "Multi-factor Authentication", "tactic": "Harden", "description": "Requiring proof of two or more pieces of evidence in order to authenticate a user."},
{"d3fend_id": "D3-PH", "iri": "PlatformHardening", "name": "Platform Hardening", "tactic": "Harden", "description": "Hardening components of a platform to make them more difficult to exploit."},
{"d3fend_id": "D3-PSEP", "iri": "ProcessSegmentExecutionPrevention", "name": "Process Segment Execution Prevention", "tactic": "Harden", "description": "Preventing execution of any address in a memory region other than the code segment."},
{"d3fend_id": "D3-SU", "iri": "SoftwareUpdate", "name": "Software Update", "tactic": "Harden", "description": "Replacing old software on a computer system component."},
{"d3fend_id": "D3-SAOR", "iri": "SegmentAddressOffsetRandomization", "name": "Segment Address Offset Randomization", "tactic": "Harden", "description": "Randomizing the base address of memory segments during process initialization."},
{"d3fend_id": "D3-SPP", "iri": "StrongPasswordPolicy", "name": "Strong Password Policy", "tactic": "Harden", "description": "Modifying system configuration to increase password strength."},
{"d3fend_id": "D3-MH", "iri": "MessageHardening", "name": "Message Hardening", "tactic": "Harden", "description": "Measures to ensure the confidentiality and integrity of messages."},
{"d3fend_id": "D3-SCH", "iri": "SourceCodeHardening", "name": "Source Code Hardening", "tactic": "Harden", "description": "Hardening source code to make it more difficult to exploit."},
# ── Isolate ───────────────────────────────────────────────────────
{"d3fend_id": "D3-EI", "iri": "ExecutionIsolation", "name": "Execution Isolation", "tactic": "Isolate", "description": "Preventing application processes from accessing non-essential system resources."},
{"d3fend_id": "D3-HBPI", "iri": "Hardware-basedProcessIsolation", "name": "Hardware-based Process Isolation", "tactic": "Isolate", "description": "Preventing one process from writing to the memory space of another through hardware-based address management."},
{"d3fend_id": "D3-KBPI", "iri": "Kernel-basedProcessIsolation", "name": "Kernel-based Process Isolation", "tactic": "Isolate", "description": "Using kernel-level capabilities to isolate processes."},
{"d3fend_id": "D3-ITF", "iri": "InboundTrafficFiltering", "name": "Inbound Traffic Filtering", "tactic": "Isolate", "description": "Restricting network traffic originating from untrusted networks."},
{"d3fend_id": "D3-OTF", "iri": "OutboundTrafficFiltering", "name": "Outbound Traffic Filtering", "tactic": "Isolate", "description": "Restricting network traffic destined towards untrusted networks."},
{"d3fend_id": "D3-NI", "iri": "NetworkIsolation", "name": "Network Isolation", "tactic": "Isolate", "description": "Preventing network hosts from accessing non-essential system network resources."},
{"d3fend_id": "D3-EAL", "iri": "ExecutableAllowlisting", "name": "Executable Allowlisting", "tactic": "Isolate", "description": "Using a digital signature to authenticate a file before opening."},
{"d3fend_id": "D3-EDL", "iri": "ExecutableDenylisting", "name": "Executable Denylisting", "tactic": "Isolate", "description": "Blocking the execution of files on a host in accordance with defined application policy rules."},
{"d3fend_id": "D3-IOPR", "iri": "IOPortRestriction", "name": "IO Port Restriction", "tactic": "Isolate", "description": "Limiting access to computer input/output ports to restrict unauthorized devices."},
{"d3fend_id": "D3-DNSAL", "iri": "DNSAllowlisting", "name": "DNS Allowlisting", "tactic": "Isolate", "description": "Permitting only approved domains and their subdomains to be resolved."},
{"d3fend_id": "D3-DNSDL", "iri": "DNSDenylisting", "name": "DNS Denylisting", "tactic": "Isolate", "description": "Blocking DNS Network Traffic based on criteria such as IP address, domain name, or DNS query type."},
# ── Deceive ───────────────────────────────────────────────────────
{"d3fend_id": "D3-CHN", "iri": "ConnectedHoneynet", "name": "Connected Honeynet", "tactic": "Deceive", "description": "A decoy service connected to the enterprise network simulating functionality to attract attackers."},
{"d3fend_id": "D3-DF", "iri": "DecoyFile", "name": "Decoy File", "tactic": "Deceive", "description": "A file created for the purposes of deceiving an adversary."},
{"d3fend_id": "D3-DNR", "iri": "DecoyNetworkResource", "name": "Decoy Network Resource", "tactic": "Deceive", "description": "Deploying a network resource for the purposes of deceiving an adversary."},
{"d3fend_id": "D3-DUC", "iri": "DecoyUserCredential", "name": "Decoy User Credential", "tactic": "Deceive", "description": "A Credential created for the purpose of deceiving an adversary."},
{"d3fend_id": "D3-IHN", "iri": "IntegratedHoneynet", "name": "Integrated Honeynet", "tactic": "Deceive", "description": "Decoys in a production environment to entice interaction from attackers."},
{"d3fend_id": "D3-SHN", "iri": "StandaloneHoneynet", "name": "Standalone Honeynet", "tactic": "Deceive", "description": "An environment to attract attackers, not connected to any production systems."},
# ── Evict ─────────────────────────────────────────────────────────
{"d3fend_id": "D3-AL", "iri": "AccountLocking", "name": "Account Locking", "tactic": "Evict", "description": "Temporarily disabling user accounts on a system or domain."},
{"d3fend_id": "D3-CE", "iri": "CredentialEviction", "name": "Credential Eviction", "tactic": "Evict", "description": "Disabling or removing compromised credentials from a computer network."},
{"d3fend_id": "D3-CR", "iri": "CredentialRevocation", "name": "Credential Revocation", "tactic": "Evict", "description": "Deleting credentials permanently to prevent them from being used to authenticate."},
{"d3fend_id": "D3-FEV", "iri": "FileEviction", "name": "File Eviction", "tactic": "Evict", "description": "Deleting files from system storage."},
{"d3fend_id": "D3-PE", "iri": "ProcessEviction", "name": "Process Eviction", "tactic": "Evict", "description": "Terminating or removing running processes."},
{"d3fend_id": "D3-ER", "iri": "EmailRemoval", "name": "Email Removal", "tactic": "Evict", "description": "Deleting email files from system storage."},
# ── Model ─────────────────────────────────────────────────────────
{"d3fend_id": "D3-AI", "iri": "AssetInventory", "name": "Asset Inventory", "tactic": "Model", "description": "Identifying and recording the organization's assets and their vulnerabilities."},
{"d3fend_id": "D3-AVE", "iri": "AssetVulnerabilityEnumeration", "name": "Asset Vulnerability Enumeration", "tactic": "Model", "description": "Enriching inventory items with knowledge identifying their vulnerabilities."},
{"d3fend_id": "D3-NM", "iri": "NetworkMapping", "name": "Network Mapping", "tactic": "Model", "description": "Identifying and modeling the network layers and their physical location."},
{"d3fend_id": "D3-OAM", "iri": "OperationalActivityMapping", "name": "Operational Activity Mapping", "tactic": "Model", "description": "Identifying activities and establishing dependencies on digital systems."},
{"d3fend_id": "D3-SVCDM", "iri": "ServiceDependencyMapping", "name": "Service Dependency Mapping", "tactic": "Model", "description": "Determining the services on which each given service relies."},
{"d3fend_id": "D3-SYSM", "iri": "SystemMapping", "name": "System Mapping", "tactic": "Model", "description": "Identifying how systems are configured, decomposed into components, and dependent on one another."},
]
def _import_d3fend_fallback(db: Session) -> dict[str, int]:
"""Import curated D3FEND techniques when the tactic APIs are unreachable."""
logger.info("Using fallback D3FEND technique list (%d entries)", len(_FALLBACK_TECHNIQUES))
return _upsert_techniques(db, _FALLBACK_TECHNIQUES) # type: ignore[arg-type]
# ── Import ATT&CK → D3FEND mappings ─────────────────────────────────
# Curated ATT&CK → D3FEND mapping for common techniques
_ATTACK_TO_D3FEND: dict[str, list[str]] = {
"T1059": ["D3-PSA", "D3-SCA", "D3-PA", "D3-EAW", "D3-EDL", "D3-PLA"],
"T1059.001": ["D3-PSA", "D3-SCA", "D3-PA", "D3-EAW", "D3-EDL"],
"T1059.003": ["D3-PSA", "D3-SCA", "D3-PA", "D3-EAW"],
"T1059.005": ["D3-PSA", "D3-SCA", "D3-EAW"],
"T1059.007": ["D3-PSA", "D3-SCA", "D3-EAW"],
"T1055": ["D3-PA", "D3-PSA", "D3-HBPI", "D3-PMAD", "D3-PLA"],
"T1055.001": ["D3-PA", "D3-PMAD", "D3-HBPI"],
"T1055.002": ["D3-PA", "D3-PMAD", "D3-HBPI"],
"T1003": ["D3-CH", "D3-CR", "D3-MFA", "D3-PMAD"],
"T1003.001": ["D3-CH", "D3-CR", "D3-PMAD"],
"T1078": ["D3-MFA", "D3-UBA", "D3-UGLPA", "D3-CH"],
"T1078.001": ["D3-MFA", "D3-UBA", "D3-CH"],
"T1566": ["D3-EAL", "D3-FA", "D3-FH", "D3-UA", "D3-EHR"],
"T1566.001": ["D3-EAL", "D3-FA", "D3-FH", "D3-EHR"],
"T1566.002": ["D3-UA", "D3-EAL", "D3-EHR"],
"T1071": ["D3-AL", "D3-NTA", "D3-PM", "D3-CT"],
"T1071.001": ["D3-AL", "D3-NTA", "D3-PM"],
"T1053": ["D3-PSA", "D3-PA", "D3-SCHE", "D3-SSA"],
"T1053.005": ["D3-PSA", "D3-SCHE", "D3-SSA"],
"T1543": ["D3-SMRA", "D3-SSA", "D3-SBAN"],
"T1543.003": ["D3-SMRA", "D3-SSA", "D3-SBAN"],
"T1547": ["D3-SICA", "D3-SSA", "D3-RRID"],
"T1547.001": ["D3-SICA", "D3-SSA", "D3-RRID"],
"T1021": ["D3-RTSD", "D3-RPA", "D3-NTA", "D3-MFA"],
"T1021.001": ["D3-RTSD", "D3-NTA", "D3-MFA"],
"T1021.002": ["D3-RTSD", "D3-NTA", "D3-NI"],
"T1560": ["D3-FA", "D3-FCA", "D3-ORA"],
"T1560.001": ["D3-FA", "D3-FCA"],
"T1048": ["D3-ORA", "D3-NTA", "D3-OTF"],
"T1048.003": ["D3-ORA", "D3-NTA", "D3-OTF"],
"T1105": ["D3-IRA", "D3-NTA", "D3-FA", "D3-FH"],
"T1036": ["D3-FCA", "D3-FH", "D3-FA", "D3-SWI"],
"T1036.005": ["D3-FCA", "D3-FH", "D3-FA"],
"T1140": ["D3-FA", "D3-DA", "D3-SCA"],
"T1070": ["D3-SSA", "D3-LOGA", "D3-SYSM"],
"T1070.004": ["D3-SSA", "D3-FAPA"],
"T1562": ["D3-SSA", "D3-SYSM", "D3-SMRA"],
"T1562.001": ["D3-SSA", "D3-SYSM", "D3-SMRA"],
"T1027": ["D3-DA", "D3-FA", "D3-RE"],
"T1027.002": ["D3-DA", "D3-FA"],
"T1110": ["D3-MFA", "D3-UBA", "D3-CH"],
"T1110.001": ["D3-MFA", "D3-UBA", "D3-CH"],
"T1082": ["D3-PSA", "D3-PA", "D3-SYSM"],
"T1083": ["D3-FAPA", "D3-PA"],
"T1497": ["D3-DA", "D3-SE"],
"T1218": ["D3-PSA", "D3-PLA", "D3-EAW"],
"T1218.011": ["D3-PSA", "D3-PLA", "D3-EAW"],
"T1569": ["D3-SMRA", "D3-PSA", "D3-PA"],
"T1569.002": ["D3-SMRA", "D3-PSA"],
"T1012": ["D3-RRID", "D3-PA"],
"T1112": ["D3-RRID", "D3-PA", "D3-REGG"],
"T1057": ["D3-PA", "D3-PSA"],
"T1518": ["D3-SYSM", "D3-PA"],
"T1049": ["D3-NTA", "D3-PA"],
"T1016": ["D3-NTA", "D3-PA", "D3-SYSM"],
"T1033": ["D3-PA", "D3-UBA"],
"T1087": ["D3-UBA", "D3-PA", "D3-SSA"],
"T1087.001": ["D3-UBA", "D3-PA"],
"T1087.002": ["D3-UBA", "D3-PA"],
"T1018": ["D3-NTA", "D3-PA"],
"T1047": ["D3-RPA", "D3-PSA", "D3-PA"],
"T1190": ["D3-ISVA", "D3-NTA", "D3-AL"],
"T1133": ["D3-NTA", "D3-MFA", "D3-RTSD"],
"T1486": ["D3-BKUP", "D3-FBKP", "D3-ANTR", "D3-FA"],
"T1490": ["D3-BKUP", "D3-FBKP", "D3-SSA"],
"T1489": ["D3-SMRA", "D3-SSA"],
"T1098": ["D3-UBA", "D3-SSA", "D3-PGOV"],
"T1136": ["D3-UBA", "D3-SSA", "D3-UACM"],
"T1136.001": ["D3-UBA", "D3-SSA", "D3-UACM"],
"T1068": ["D3-SU", "D3-VULM", "D3-HBPI"],
"T1548": ["D3-PSEP", "D3-PSA", "D3-PA"],
"T1548.002": ["D3-PSEP", "D3-PSA"],
"T1134": ["D3-PA", "D3-PSA", "D3-PSEP"],
"T1134.001": ["D3-PA", "D3-PSA"],
"T1574": ["D3-SWI", "D3-FCA", "D3-PLA"],
"T1574.001": ["D3-SWI", "D3-FCA"],
"T1204": ["D3-EAL", "D3-FA", "D3-UA"],
"T1204.001": ["D3-UA", "D3-EAL"],
"T1204.002": ["D3-FA", "D3-EAL", "D3-DA"],
"T1071.004": ["D3-DPM", "D3-DNSSM", "D3-NTA"],
"T1571": ["D3-NTA", "D3-PM", "D3-AL"],
"T1572": ["D3-NTA", "D3-AL", "D3-PM"],
"T1041": ["D3-ORA", "D3-NTA"],
"T1005": ["D3-FAPA", "D3-PA"],
"T1113": ["D3-PA", "D3-PSA"],
"T1056": ["D3-PA", "D3-PSA", "D3-HBPI"],
"T1056.001": ["D3-PA", "D3-PSA"],
"T1560.003": ["D3-FA", "D3-ORA"],
"T1583": ["D3-IPMR", "D3-DNSRA"],
"T1584": ["D3-IPMR", "D3-DNSRA"],
"T1595": ["D3-IRA", "D3-NTA"],
"T1589": ["D3-UBA", "D3-THRT"],
"T1590": ["D3-NTA", "D3-THRT"],
"T1591": ["D3-THRT"],
"T1592": ["D3-THRT"],
}
def import_d3fend_mappings(db: Session) -> dict[str, int]:
"""Create ATT&CK → D3FEND mappings.
First tries the D3FEND API for each ATT&CK technique in the DB,
then falls back to the curated mapping for any remaining techniques.
Returns a dict with counts: {created, skipped, total}.
"""
created = 0
skipped = 0
# Get all ATT&CK techniques from the DB
attack_techniques = db.query(Technique).all()
technique_map = {t.mitre_id: t for t in attack_techniques}
# Get all defensive techniques
defensive_techniques = db.query(DefensiveTechnique).all()
d3fend_map = {dt.d3fend_id: dt for dt in defensive_techniques}
if not d3fend_map:
logger.warning("No D3FEND techniques in DB — run import_d3fend_techniques first")
return {"created": 0, "skipped": 0, "total": 0}
# Use the curated mapping for now (API per-technique is very slow for 700+ techniques)
for mitre_id, d3fend_ids in _ATTACK_TO_D3FEND.items():
attack_tech = technique_map.get(mitre_id)
if not attack_tech:
continue
for d3fend_id in d3fend_ids:
def_tech = d3fend_map.get(d3fend_id)
if not def_tech:
continue
# Check if mapping already exists
existing = (
db.query(DefensiveTechniqueMapping)
.filter(
DefensiveTechniqueMapping.attack_technique_id == attack_tech.id,
DefensiveTechniqueMapping.defensive_technique_id == def_tech.id,
)
.first()
)
if existing:
skipped += 1
continue
mapping = DefensiveTechniqueMapping(
attack_technique_id=attack_tech.id,
defensive_technique_id=def_tech.id,
)
db.add(mapping)
created += 1
db.commit()
total = db.query(DefensiveTechniqueMapping).count()
logger.info("D3FEND mappings: %d created, %d skipped, %d total", created, skipped, total)
return {"created": created, "skipped": skipped, "total": total}
def sync(db: Session) -> dict:
"""Sync D3FEND techniques and ATT&CK mappings.
Called by the Data Sources router when the user clicks Sync for D3FEND.
Returns a flat summary dict suitable for ``last_sync_stats``.
"""
from app.models.data_source import DataSource
from datetime import datetime
tech_result = import_d3fend_techniques(db)
mapping_result = import_d3fend_mappings(db)
summary = {
"techniques_created": tech_result.get("created", 0),
"techniques_updated": tech_result.get("updated", 0),
"techniques_total": tech_result.get("total", 0),
"mappings_created": mapping_result.get("created", 0),
"mappings_skipped": mapping_result.get("skipped", 0),
"mappings_total": mapping_result.get("total", 0),
}
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "d3fend").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("D3FEND sync complete — %s", summary)
return summary
def get_defenses_for_technique(db: Session, technique_id) -> list[dict]:
"""Get all D3FEND defensive techniques mapped to a given ATT&CK technique."""
mappings = (
db.query(DefensiveTechniqueMapping)
.filter(DefensiveTechniqueMapping.attack_technique_id == technique_id)
.all()
)
results = []
for m in mappings:
dt = m.defensive_technique
results.append({
"id": str(dt.id),
"d3fend_id": dt.d3fend_id,
"name": dt.name,
"description": dt.description,
"tactic": dt.tactic,
"d3fend_url": dt.d3fend_url,
})
return results