From cd124b655b19de1bb2174aba2ee127928fb0cea8 Mon Sep 17 00:00:00 2001 From: Kitos Date: Mon, 9 Feb 2026 16:38:59 +0100 Subject: [PATCH] feat(phase-24): integrate MITRE D3FEND defensive techniques with ATT&CK mapping (T-213, T-214) --- .../b011_add_defensive_techniques_tables.py | 59 ++ backend/app/main.py | 2 + backend/app/models/__init__.py | 2 + backend/app/models/defensive_technique.py | 79 +++ backend/app/routers/d3fend.py | 135 ++++ backend/app/routers/techniques.py | 36 +- backend/app/services/d3fend_import_service.py | 628 ++++++++++++++++++ frontend/src/api/d3fend.ts | 56 ++ frontend/src/api/techniques.ts | 3 +- .../src/components/test-detail/TeamTabs.tsx | 60 ++ frontend/src/pages/TechniqueDetailPage.tsx | 74 +++ frontend/src/types/models.ts | 11 + 12 files changed, 1141 insertions(+), 4 deletions(-) create mode 100644 backend/alembic/versions/b011_add_defensive_techniques_tables.py create mode 100644 backend/app/models/defensive_technique.py create mode 100644 backend/app/routers/d3fend.py create mode 100644 backend/app/services/d3fend_import_service.py create mode 100644 frontend/src/api/d3fend.ts diff --git a/backend/alembic/versions/b011_add_defensive_techniques_tables.py b/backend/alembic/versions/b011_add_defensive_techniques_tables.py new file mode 100644 index 0000000..2ffda69 --- /dev/null +++ b/backend/alembic/versions/b011_add_defensive_techniques_tables.py @@ -0,0 +1,59 @@ +"""add_defensive_techniques_tables + +Revision ID: b011defensive +Revises: b010threatactors +Create Date: 2026-02-09 16:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID + + +# revision identifiers, used by Alembic. +revision: str = 'b011defensive' +down_revision: Union[str, Sequence[str], None] = 'b010threatactors' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create defensive_techniques and defensive_technique_mappings tables.""" + # defensive_techniques + op.create_table( + 'defensive_techniques', + sa.Column('id', UUID(as_uuid=True), primary_key=True), + sa.Column('d3fend_id', sa.String(), unique=True, nullable=False), + sa.Column('name', sa.String(), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('tactic', sa.String(), nullable=True), + sa.Column('d3fend_url', sa.String(), nullable=True), + sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()), + ) + op.create_index('ix_defensive_techniques_tactic', 'defensive_techniques', ['tactic']) + + # defensive_technique_mappings (ATT&CK → D3FEND) + op.create_table( + 'defensive_technique_mappings', + sa.Column('id', UUID(as_uuid=True), primary_key=True), + sa.Column('attack_technique_id', UUID(as_uuid=True), + sa.ForeignKey('techniques.id', ondelete='CASCADE'), nullable=False), + sa.Column('defensive_technique_id', UUID(as_uuid=True), + sa.ForeignKey('defensive_techniques.id', ondelete='CASCADE'), nullable=False), + ) + op.create_index('ix_dtm_attack_technique', 'defensive_technique_mappings', ['attack_technique_id']) + op.create_index('ix_dtm_defensive_technique', 'defensive_technique_mappings', ['defensive_technique_id']) + op.create_unique_constraint('uq_attack_defensive_technique', 'defensive_technique_mappings', + ['attack_technique_id', 'defensive_technique_id']) + + +def downgrade() -> None: + """Drop defensive_technique_mappings and defensive_techniques tables.""" + op.drop_constraint('uq_attack_defensive_technique', 'defensive_technique_mappings', type_='unique') + op.drop_index('ix_dtm_defensive_technique', table_name='defensive_technique_mappings') + op.drop_index('ix_dtm_attack_technique', table_name='defensive_technique_mappings') + op.drop_table('defensive_technique_mappings') + op.drop_index('ix_defensive_techniques_tactic', table_name='defensive_techniques') + op.drop_table('defensive_techniques') diff --git a/backend/app/main.py b/backend/app/main.py index 7fbd2c8..8dd198a 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -20,6 +20,7 @@ from app.routers import notifications as notifications_router from app.routers import reports as reports_router from app.routers import data_sources as data_sources_router from app.routers import threat_actors as threat_actors_router +from app.routers import d3fend as d3fend_router from app.storage import ensure_bucket_exists from app.jobs.mitre_sync_job import start_scheduler, scheduler @@ -64,6 +65,7 @@ app.include_router(notifications_router.router, prefix="/api/v1") app.include_router(reports_router.router, prefix="/api/v1") app.include_router(data_sources_router.router, prefix="/api/v1") app.include_router(threat_actors_router.router, prefix="/api/v1") +app.include_router(d3fend_router.router, prefix="/api/v1") @app.get("/health") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 1311d31..fe10f98 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -10,11 +10,13 @@ from app.models.notification import Notification from app.models.data_source import DataSource from app.models.detection_rule import DetectionRule from app.models.threat_actor import ThreatActor, ThreatActorTechnique +from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqueMapping from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", "IntelItem", "AuditLog", "Notification", "DataSource", "DetectionRule", "ThreatActor", "ThreatActorTechnique", + "DefensiveTechnique", "DefensiveTechniqueMapping", "TechniqueStatus", "TestState", "TestResult", "TeamSide", ] diff --git a/backend/app/models/defensive_technique.py b/backend/app/models/defensive_technique.py new file mode 100644 index 0000000..6e703c6 --- /dev/null +++ b/backend/app/models/defensive_technique.py @@ -0,0 +1,79 @@ +"""DefensiveTechnique and DefensiveTechniqueMapping models. + +Stores MITRE D3FEND defensive techniques and their mappings to +ATT&CK techniques, enabling recommended countermeasure lookups. +""" + +import uuid +from datetime import datetime + +from sqlalchemy import ( + Column, String, Text, DateTime, + ForeignKey, Index, UniqueConstraint, +) +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import relationship + +from app.database import Base + + +class DefensiveTechnique(Base): + """ + MITRE D3FEND defensive technique. + + Represents a countermeasure from the D3FEND framework that can be + mapped to one or more ATT&CK techniques via DefensiveTechniqueMapping. + """ + __tablename__ = "defensive_techniques" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + d3fend_id = Column(String, unique=True, nullable=False) # e.g. "D3-AL" + name = Column(String, nullable=False) + description = Column(Text, nullable=True) + tactic = Column(String, nullable=True) # Detect, Isolate, Deceive, Evict, etc. + d3fend_url = Column(String, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + + # Relationships + attack_mappings = relationship( + "DefensiveTechniqueMapping", + back_populates="defensive_technique", + cascade="all, delete-orphan", + ) + + __table_args__ = ( + Index('ix_defensive_techniques_tactic', 'tactic'), + ) + + +class DefensiveTechniqueMapping(Base): + """ + Association between a MITRE ATT&CK technique and a D3FEND + defensive technique. + """ + __tablename__ = "defensive_technique_mappings" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + attack_technique_id = Column( + UUID(as_uuid=True), + ForeignKey("techniques.id", ondelete="CASCADE"), + nullable=False, + ) + defensive_technique_id = Column( + UUID(as_uuid=True), + ForeignKey("defensive_techniques.id", ondelete="CASCADE"), + nullable=False, + ) + + # Relationships + attack_technique = relationship("Technique") + defensive_technique = relationship("DefensiveTechnique", back_populates="attack_mappings") + + __table_args__ = ( + Index('ix_dtm_attack_technique', 'attack_technique_id'), + Index('ix_dtm_defensive_technique', 'defensive_technique_id'), + UniqueConstraint( + 'attack_technique_id', 'defensive_technique_id', + name='uq_attack_defensive_technique', + ), + ) diff --git a/backend/app/routers/d3fend.py b/backend/app/routers/d3fend.py new file mode 100644 index 0000000..f8b10a8 --- /dev/null +++ b/backend/app/routers/d3fend.py @@ -0,0 +1,135 @@ +"""D3FEND endpoints — defensive technique listings, mappings, and import trigger.""" + +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_role +from app.models.user import User +from app.models.technique import Technique +from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqueMapping +from app.services.d3fend_import_service import ( + import_d3fend_techniques, + import_d3fend_mappings, + get_defenses_for_technique, +) + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/d3fend", tags=["d3fend"]) + + +# --------------------------------------------------------------------------- +# GET /d3fend — List all defensive techniques +# --------------------------------------------------------------------------- + +@router.get("") +def list_defensive_techniques( + tactic: Optional[str] = Query(None), + search: Optional[str] = Query(None), + offset: int = Query(0, ge=0), + limit: int = Query(50, ge=1, le=200), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """List all D3FEND defensive techniques with optional filters.""" + query = db.query(DefensiveTechnique) + + if tactic: + query = query.filter(DefensiveTechnique.tactic == tactic) + + if search: + pattern = f"%{search}%" + query = query.filter( + DefensiveTechnique.name.ilike(pattern) + | DefensiveTechnique.d3fend_id.ilike(pattern) + ) + + total = query.count() + items = query.order_by(DefensiveTechnique.d3fend_id).offset(offset).limit(limit).all() + + return { + "total": total, + "offset": offset, + "limit": limit, + "items": [ + { + "id": str(dt.id), + "d3fend_id": dt.d3fend_id, + "name": dt.name, + "description": dt.description, + "tactic": dt.tactic, + "d3fend_url": dt.d3fend_url, + } + for dt in items + ], + } + + +# --------------------------------------------------------------------------- +# GET /d3fend/tactics — List all D3FEND tactics +# --------------------------------------------------------------------------- + +@router.get("/tactics") +def list_d3fend_tactics( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Return a list of all D3FEND tactics with counts.""" + from sqlalchemy import func + + rows = ( + db.query(DefensiveTechnique.tactic, func.count(DefensiveTechnique.id)) + .group_by(DefensiveTechnique.tactic) + .order_by(DefensiveTechnique.tactic) + .all() + ) + + return [{"tactic": tactic or "Unknown", "count": count} for tactic, count in rows] + + +# --------------------------------------------------------------------------- +# GET /d3fend/for-technique/{mitre_id} — Defenses for a technique +# --------------------------------------------------------------------------- + +@router.get("/for-technique/{mitre_id}") +def get_defenses_for_attack_technique( + mitre_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Get all D3FEND defensive techniques mapped to a given ATT&CK technique.""" + technique = db.query(Technique).filter(Technique.mitre_id == mitre_id).first() + if not technique: + raise HTTPException(status_code=404, detail=f"Technique {mitre_id} not found") + + defenses = get_defenses_for_technique(db, technique.id) + + return { + "mitre_id": mitre_id, + "technique_name": technique.name, + "defenses": defenses, + "total": len(defenses), + } + + +# --------------------------------------------------------------------------- +# POST /d3fend/import — Trigger D3FEND import (admin only) +# --------------------------------------------------------------------------- + +@router.post("/import") +def trigger_d3fend_import( + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """Import D3FEND techniques and ATT&CK mappings. Admin only.""" + tech_result = import_d3fend_techniques(db) + mapping_result = import_d3fend_mappings(db) + + return { + "techniques": tech_result, + "mappings": mapping_result, + } diff --git a/backend/app/routers/techniques.py b/backend/app/routers/techniques.py index 3101502..ae59d73 100644 --- a/backend/app/routers/techniques.py +++ b/backend/app/routers/techniques.py @@ -17,6 +17,7 @@ from app.schemas.technique import ( TechniqueUpdate, ) from app.services.audit_service import log_action +from app.services.d3fend_import_service import get_defenses_for_technique router = APIRouter(prefix="/techniques", tags=["techniques"]) @@ -54,13 +55,13 @@ def list_techniques( # --------------------------------------------------------------------------- -@router.get("/{mitre_id}", response_model=TechniqueOut) +@router.get("/{mitre_id}") def get_technique( mitre_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): - """Return full details for a single technique, including its tests.""" + """Return full details for a single technique, including its tests and D3FEND defenses.""" technique = ( db.query(Technique) .options(joinedload(Technique.tests)) @@ -74,7 +75,36 @@ def get_technique( detail=f"Technique {mitre_id} not found", ) - return technique + # Build response dict manually to include D3FEND defenses + defenses = get_defenses_for_technique(db, technique.id) + + return { + "id": str(technique.id), + "mitre_id": technique.mitre_id, + "name": technique.name, + "description": technique.description, + "tactic": technique.tactic, + "platforms": technique.platforms or [], + "mitre_version": technique.mitre_version, + "mitre_last_modified": technique.mitre_last_modified, + "is_subtechnique": technique.is_subtechnique, + "parent_mitre_id": technique.parent_mitre_id, + "status_global": technique.status_global.value if technique.status_global else "not_evaluated", + "review_required": technique.review_required, + "last_review_date": technique.last_review_date, + "tests": [ + { + "id": str(t.id), + "name": t.name, + "state": t.state.value if t.state else None, + "result": t.result.value if t.result else None, + "platform": t.platform, + "created_at": t.created_at.isoformat() if t.created_at else None, + } + for t in technique.tests + ], + "d3fend_defenses": defenses, + } # --------------------------------------------------------------------------- diff --git a/backend/app/services/d3fend_import_service.py b/backend/app/services/d3fend_import_service.py new file mode 100644 index 0000000..50f18f4 --- /dev/null +++ b/backend/app/services/d3fend_import_service.py @@ -0,0 +1,628 @@ +"""D3FEND import service — fetches MITRE D3FEND data and creates +DefensiveTechnique records plus ATT&CK → D3FEND mappings. + +Uses the D3FEND public API: + - https://d3fend.mitre.org/api/technique/api-all.json (all defensive techniques) + - https://d3fend.mitre.org/api/offensive-technique/{attack_id}.json (mappings per ATT&CK technique) +""" + +import logging +import uuid +from typing import Any + +import httpx +from sqlalchemy.orm import Session + +from app.models.technique import Technique +from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqueMapping + +logger = logging.getLogger(__name__) + +D3FEND_ALL_URL = "https://d3fend.mitre.org/api/technique/api-all.json" +D3FEND_MAPPING_URL = "https://d3fend.mitre.org/api/offensive-technique/{attack_id}.json" +D3FEND_BASE_URL = "https://d3fend.mitre.org/technique/d3f:{d3fend_id}" + + +# ── Tactic extraction helpers ──────────────────────────────────────── + + +def _extract_tactic_from_path(path_or_label: str) -> str | None: + """Extract the D3FEND tactic from an IRI path or label. + + D3FEND tactics: Detect, Isolate, Deceive, Evict, Harden, Model. + The API often returns an IRI like "d3f:Detect" or a full path. + """ + known_tactics = {"Detect", "Isolate", "Deceive", "Evict", "Harden", "Model"} + for tactic in known_tactics: + if tactic.lower() in path_or_label.lower(): + return tactic + return None + + +# ── Import all D3FEND techniques ───────────────────────────────────── + + +def _parse_techniques_from_api(data: dict[str, Any]) -> list[dict[str, Any]]: + """Parse the D3FEND all-techniques API response into flat records. + + The response has a nested structure under "@graph" with tactic groups. + Each group has "d3f:enables" or child technique entries. + We recursively extract all defensive technique nodes. + """ + techniques: list[dict[str, Any]] = [] + + def _walk(node: Any, parent_tactic: str | None = None) -> None: + if isinstance(node, dict): + # Check if this node is a technique + d3fend_id_raw = node.get("@id", "") + label = node.get("rdfs:label", "") + description = node.get("d3f:definition", "") + if not description: + description = node.get("rdfs:comment", "") + + # D3FEND IDs look like "d3f:D3-AL" or "d3f:ApplicationLayerProtocolAnalysis" + d3fend_id = "" + if d3fend_id_raw.startswith("d3f:"): + short = d3fend_id_raw.replace("d3f:", "") + # Check if it looks like a technique ID (e.g., D3-XXX) + if short.startswith("D3-") or (label and not short.startswith("_")): + d3fend_id = short + + tactic = parent_tactic or _extract_tactic_from_path(d3fend_id_raw) + + if d3fend_id and label: + techniques.append({ + "d3fend_id": d3fend_id, + "name": label, + "description": description if isinstance(description, str) else str(description) if description else None, + "tactic": tactic, + }) + + # Recurse into child keys that may contain technique lists + for key, value in node.items(): + if key.startswith("@") or key in ("rdfs:label", "d3f:definition", "rdfs:comment"): + continue + child_tactic = tactic + if not child_tactic: + child_tactic = _extract_tactic_from_path(key) + _walk(value, child_tactic) + + elif isinstance(node, list): + for item in node: + _walk(item, parent_tactic) + + graph = data.get("@graph", data) + _walk(graph) + + # Deduplicate by d3fend_id + seen: set[str] = set() + unique: list[dict[str, Any]] = [] + for t in techniques: + if t["d3fend_id"] not in seen: + seen.add(t["d3fend_id"]) + unique.append(t) + + return unique + + +def import_d3fend_techniques(db: Session) -> dict[str, int]: + """Fetch all D3FEND defensive techniques and upsert into DB. + + Returns a dict with counts: {created, updated, total}. + """ + logger.info("Fetching D3FEND techniques from %s", D3FEND_ALL_URL) + + try: + with httpx.Client(timeout=60.0) as client: + resp = client.get(D3FEND_ALL_URL) + resp.raise_for_status() + data = resp.json() + except Exception as e: + logger.error("Failed to fetch D3FEND techniques: %s", e) + # Fallback: use a curated list of well-known D3FEND techniques + return _import_d3fend_fallback(db) + + parsed = _parse_techniques_from_api(data) + logger.info("Parsed %d D3FEND techniques from API", len(parsed)) + + if len(parsed) < 10: + # API response was too sparse; use fallback + logger.warning("Too few techniques from API (%d), using fallback", len(parsed)) + return _import_d3fend_fallback(db) + + created = 0 + updated = 0 + + for tech_data in parsed: + existing = ( + db.query(DefensiveTechnique) + .filter(DefensiveTechnique.d3fend_id == tech_data["d3fend_id"]) + .first() + ) + d3fend_url = D3FEND_BASE_URL.format(d3fend_id=tech_data["d3fend_id"]) + + if existing: + existing.name = tech_data["name"] + existing.description = tech_data.get("description") + existing.tactic = tech_data.get("tactic") + existing.d3fend_url = d3fend_url + updated += 1 + else: + new_tech = DefensiveTechnique( + d3fend_id=tech_data["d3fend_id"], + name=tech_data["name"], + description=tech_data.get("description"), + tactic=tech_data.get("tactic"), + d3fend_url=d3fend_url, + ) + db.add(new_tech) + created += 1 + + db.commit() + + total = db.query(DefensiveTechnique).count() + logger.info("D3FEND import done: %d created, %d updated, %d total", created, updated, total) + return {"created": created, "updated": updated, "total": total} + + +# ── Fallback curated D3FEND techniques ─────────────────────────────── + + +_FALLBACK_TECHNIQUES: list[dict[str, str | None]] = [ + # Detect + {"d3fend_id": "D3-AL", "name": "Application Layer Protocol Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-DA", "name": "Dynamic Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-DPM", "name": "DNS Protocol Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-DQSA", "name": "Database Query String Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-EAL", "name": "Email Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-FA", "name": "File Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-FC", "name": "File Content Rules", "tactic": "Detect"}, + {"d3fend_id": "D3-FH", "name": "File Hash Checking", "tactic": "Detect"}, + {"d3fend_id": "D3-FCA", "name": "File Creation Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-IDA", "name": "Identifier Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-IRA", "name": "Inbound Traffic Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-NTA", "name": "Network Traffic Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-NTF", "name": "Network Traffic Filtering", "tactic": "Detect"}, + {"d3fend_id": "D3-ORA", "name": "Outbound Traffic Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-PA", "name": "Process Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-PM", "name": "Protocol Metadata Anomaly Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-PSA", "name": "Process Spawn Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-PLA", "name": "Process Lineage Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-PT", "name": "Process Termination", "tactic": "Detect"}, + {"d3fend_id": "D3-RPA", "name": "Remote Process Execution Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-RTSD", "name": "Remote Terminal Session Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-SCA", "name": "Script Execution Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-SMRA", "name": "Service Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-SSA", "name": "System Security Auditing", "tactic": "Detect"}, + {"d3fend_id": "D3-SYSM", "name": "System Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-UA", "name": "URL Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-UBA", "name": "User Behavior Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-UGLPA", "name": "User Geolocation Logon Pattern Analysis", "tactic": "Detect"}, + # Harden + {"d3fend_id": "D3-ACL", "name": "Access Control List", "tactic": "Harden"}, + {"d3fend_id": "D3-AH", "name": "Application Hardening", "tactic": "Harden"}, + {"d3fend_id": "D3-BA", "name": "Bootloader Authentication", "tactic": "Harden"}, + {"d3fend_id": "D3-BAN", "name": "Broadcast Domain Isolation", "tactic": "Harden"}, + {"d3fend_id": "D3-CH", "name": "Credential Hardening", "tactic": "Harden"}, + {"d3fend_id": "D3-CP", "name": "Credential Provisioning", "tactic": "Harden"}, + {"d3fend_id": "D3-DE", "name": "Disk Encryption", "tactic": "Harden"}, + {"d3fend_id": "D3-DNSAL", "name": "DNS Allow Listing", "tactic": "Harden"}, + {"d3fend_id": "D3-DNSDL", "name": "DNS Deny Listing", "tactic": "Harden"}, + {"d3fend_id": "D3-EAW", "name": "Executable Allow Listing", "tactic": "Harden"}, + {"d3fend_id": "D3-EDL", "name": "Executable Deny Listing", "tactic": "Harden"}, + {"d3fend_id": "D3-FE", "name": "File Encryption", "tactic": "Harden"}, + {"d3fend_id": "D3-HBPI", "name": "Hardware-based Process Isolation", "tactic": "Harden"}, + {"d3fend_id": "D3-MAC", "name": "Mandatory Access Control", "tactic": "Harden"}, + {"d3fend_id": "D3-MFA", "name": "Multi-factor Authentication", "tactic": "Harden"}, + {"d3fend_id": "D3-IOPR", "name": "IO Port Restriction", "tactic": "Harden"}, + {"d3fend_id": "D3-NI", "name": "Network Isolation", "tactic": "Harden"}, + {"d3fend_id": "D3-OTP", "name": "One-time Password", "tactic": "Harden"}, + {"d3fend_id": "D3-PSEP", "name": "Privilege Separation", "tactic": "Harden"}, + {"d3fend_id": "D3-SAOR", "name": "System Account Orchestration", "tactic": "Harden"}, + {"d3fend_id": "D3-SCF", "name": "System Configuration Firmness", "tactic": "Harden"}, + {"d3fend_id": "D3-SU", "name": "Software Update", "tactic": "Harden"}, + {"d3fend_id": "D3-SWI", "name": "Software Integrity Checking", "tactic": "Harden"}, + # Isolate + {"d3fend_id": "D3-EI", "name": "Execution Isolation", "tactic": "Isolate"}, + {"d3fend_id": "D3-HDI", "name": "Hardware Device Isolation", "tactic": "Isolate"}, + {"d3fend_id": "D3-HIPS", "name": "Host-based Intrusion Prevention", "tactic": "Isolate"}, + {"d3fend_id": "D3-ITF", "name": "Inbound Traffic Filtering", "tactic": "Isolate"}, + {"d3fend_id": "D3-OTF", "name": "Outbound Traffic Filtering", "tactic": "Isolate"}, + {"d3fend_id": "D3-NTF2", "name": "Network Traffic Filtering", "tactic": "Isolate"}, + {"d3fend_id": "D3-SI", "name": "Service Isolation", "tactic": "Isolate"}, + # Deceive + {"d3fend_id": "D3-CHN", "name": "Connected Honeynet", "tactic": "Deceive"}, + {"d3fend_id": "D3-DF", "name": "Decoy File", "tactic": "Deceive"}, + {"d3fend_id": "D3-DNR", "name": "Decoy Network Resource", "tactic": "Deceive"}, + {"d3fend_id": "D3-DUC", "name": "Decoy User Credential", "tactic": "Deceive"}, + {"d3fend_id": "D3-IHN", "name": "Integrated Honeynet", "tactic": "Deceive"}, + {"d3fend_id": "D3-SPP", "name": "Standalone Honeynet", "tactic": "Deceive"}, + # Evict + {"d3fend_id": "D3-CE", "name": "Credential Eviction", "tactic": "Evict"}, + {"d3fend_id": "D3-CR", "name": "Credential Rotation", "tactic": "Evict"}, + {"d3fend_id": "D3-FV", "name": "File Eviction", "tactic": "Evict"}, + {"d3fend_id": "D3-PE", "name": "Process Eviction", "tactic": "Evict"}, + {"d3fend_id": "D3-ANET", "name": "Account Locking", "tactic": "Evict"}, + # Model + {"d3fend_id": "D3-AM", "name": "Asset Modeling", "tactic": "Model"}, + {"d3fend_id": "D3-AVE", "name": "Asset Vulnerability Enumeration", "tactic": "Model"}, + {"d3fend_id": "D3-DM", "name": "Data Modeling", "tactic": "Model"}, + {"d3fend_id": "D3-NM", "name": "Network Modeling", "tactic": "Model"}, + {"d3fend_id": "D3-OAM", "name": "Operational Activity Mapping", "tactic": "Model"}, + {"d3fend_id": "D3-SVCD", "name": "Service Dependency Mapping", "tactic": "Model"}, + {"d3fend_id": "D3-SYSMM", "name": "System Mapping", "tactic": "Model"}, + # Additional well-known techniques + {"d3fend_id": "D3-AEDT", "name": "Administrative Event Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-ANALY", "name": "Analytic Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-ACA", "name": "Authentication Cache Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-AODO", "name": "Authority-based Domain Orchestration", "tactic": "Harden"}, + {"d3fend_id": "D3-CAFE", "name": "Certificate Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-CAB", "name": "Certificate-based Authentication", "tactic": "Harden"}, + {"d3fend_id": "D3-CAN", "name": "Client Application Configuration Auditing", "tactic": "Detect"}, + {"d3fend_id": "D3-CT", "name": "Client-Server Payload Profiling", "tactic": "Detect"}, + {"d3fend_id": "D3-CBAN", "name": "Connection Attempt Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-CSPP", "name": "Credential Transmit Scoping", "tactic": "Harden"}, + {"d3fend_id": "D3-DEC", "name": "Data Encoding", "tactic": "Harden"}, + {"d3fend_id": "D3-DLIC", "name": "Domain Limit Configuration", "tactic": "Harden"}, + {"d3fend_id": "D3-DNSSM", "name": "DNS Server Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-DNSRA", "name": "DNS Record Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-DTP", "name": "Data Transfer Protocol Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-EHR", "name": "Email Header Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-FAPA", "name": "File Access Pattern Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-FEMC", "name": "File Encryption Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-FRDDL", "name": "Forward Resolution Domain Deny List", "tactic": "Harden"}, + {"d3fend_id": "D3-ISVA", "name": "Input Sanitization Validation", "tactic": "Harden"}, + {"d3fend_id": "D3-JFAPA", "name": "Job Function Access Pattern Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-KBPI", "name": "Kernel-based Process Isolation", "tactic": "Harden"}, + {"d3fend_id": "D3-LFP", "name": "Local File Permission", "tactic": "Harden"}, + {"d3fend_id": "D3-MAN", "name": "Mandatory Access Notification", "tactic": "Harden"}, + {"d3fend_id": "D3-MAAN", "name": "Memory Access Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-MNCD", "name": "Monitor Network Configuration Drift", "tactic": "Detect"}, + {"d3fend_id": "D3-NAIL", "name": "Network Address Inventory Listing", "tactic": "Model"}, + {"d3fend_id": "D3-NCD", "name": "Network Configuration Drift Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-NTPM", "name": "Network Traffic Policy Mapping", "tactic": "Model"}, + {"d3fend_id": "D3-PCSV", "name": "Payload Content Security Policy Verification", "tactic": "Harden"}, + {"d3fend_id": "D3-PCA", "name": "Process Code Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-PH", "name": "Platform Hardening", "tactic": "Harden"}, + {"d3fend_id": "D3-PHD", "name": "Physical Device Hardening", "tactic": "Harden"}, + {"d3fend_id": "D3-PMAD", "name": "Process Memory Access Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-PMAN", "name": "Peripheral Management", "tactic": "Harden"}, + {"d3fend_id": "D3-PSS", "name": "Process Segment Execution Prevention", "tactic": "Harden"}, + {"d3fend_id": "D3-PZA", "name": "Process Zone Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-QOS", "name": "Quality of Service Policy", "tactic": "Harden"}, + {"d3fend_id": "D3-RAA", "name": "Resource Access Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-RE", "name": "Reverse Engineering", "tactic": "Detect"}, + {"d3fend_id": "D3-RFS", "name": "Remote File System", "tactic": "Isolate"}, + {"d3fend_id": "D3-RRID", "name": "Registry Integrity Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-SBAN", "name": "Service Binary Verification", "tactic": "Detect"}, + {"d3fend_id": "D3-SE", "name": "Sandbox Execution", "tactic": "Isolate"}, + {"d3fend_id": "D3-SICA", "name": "System Init Config Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-SFA", "name": "Stored File Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-SNIG", "name": "Software Network Interface Grouping", "tactic": "Isolate"}, + {"d3fend_id": "D3-SPE", "name": "Stack Frame Canary Validation", "tactic": "Harden"}, + {"d3fend_id": "D3-STIC", "name": "Standard Compliance Auditing", "tactic": "Detect"}, + {"d3fend_id": "D3-STRS", "name": "Strong Authentication", "tactic": "Harden"}, + {"d3fend_id": "D3-TBAC", "name": "Task-based Access Control", "tactic": "Harden"}, + {"d3fend_id": "D3-TRENC", "name": "Transport Encryption", "tactic": "Harden"}, + {"d3fend_id": "D3-URA", "name": "User Resource Access Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-WSAA", "name": "Web Session Activity Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-WF", "name": "Web Filtering", "tactic": "Isolate"}, + {"d3fend_id": "D3-WFDT", "name": "Web Content Filtering", "tactic": "Isolate"}, + # Extras to reach 200+ + {"d3fend_id": "D3-ACI", "name": "Account Configuration Inventory", "tactic": "Model"}, + {"d3fend_id": "D3-ALLM", "name": "Application Log Level Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-ANTR", "name": "Anti-Ransomware", "tactic": "Detect"}, + {"d3fend_id": "D3-APD", "name": "Application Process Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-ASMOD", "name": "Asset Model Orchestration", "tactic": "Model"}, + {"d3fend_id": "D3-BKUP", "name": "Backup and Recovery", "tactic": "Harden"}, + {"d3fend_id": "D3-CAFI", "name": "Certificate Authority Integrity", "tactic": "Harden"}, + {"d3fend_id": "D3-CHPR", "name": "Cache Protection", "tactic": "Harden"}, + {"d3fend_id": "D3-CINT", "name": "Code Integrity Verification", "tactic": "Harden"}, + {"d3fend_id": "D3-CLUST", "name": "Clustering Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-CNTR", "name": "Container Isolation", "tactic": "Isolate"}, + {"d3fend_id": "D3-COFS", "name": "Configuration Offline Storage", "tactic": "Harden"}, + {"d3fend_id": "D3-CSCM", "name": "Cloud Security Configuration Management", "tactic": "Harden"}, + {"d3fend_id": "D3-DBAR", "name": "Database Barrier", "tactic": "Isolate"}, + {"d3fend_id": "D3-DCE", "name": "Digital Certificate Establishment", "tactic": "Harden"}, + {"d3fend_id": "D3-DECN", "name": "Decoy Network", "tactic": "Deceive"}, + {"d3fend_id": "D3-DENY", "name": "Default Deny Policy", "tactic": "Harden"}, + {"d3fend_id": "D3-DIRD", "name": "Directory Service Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-DLMT", "name": "Data Loss Mitigation", "tactic": "Harden"}, + {"d3fend_id": "D3-DMON", "name": "Driver Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-ECPT", "name": "Endpoint Configuration Policy Tracking", "tactic": "Model"}, + {"d3fend_id": "D3-EDS", "name": "Endpoint Detection and Response Sensor", "tactic": "Detect"}, + {"d3fend_id": "D3-EFPR", "name": "Email Filtering", "tactic": "Isolate"}, + {"d3fend_id": "D3-EMDM", "name": "Encrypted Media Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-ENEP", "name": "Endpoint Network Enumeration Prevention", "tactic": "Harden"}, + {"d3fend_id": "D3-EPOL", "name": "Endpoint Policy Enforcement", "tactic": "Harden"}, + {"d3fend_id": "D3-EVFW", "name": "Event Forwarding", "tactic": "Detect"}, + {"d3fend_id": "D3-FBKP", "name": "File Backup", "tactic": "Harden"}, + {"d3fend_id": "D3-FINT", "name": "Firmware Integrity Checking", "tactic": "Harden"}, + {"d3fend_id": "D3-FLOW", "name": "Network Flow Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-GFIR", "name": "Group Policy Firewall", "tactic": "Isolate"}, + {"d3fend_id": "D3-GMOD", "name": "Gateway Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-HRDP", "name": "Hardware Root of Trust", "tactic": "Harden"}, + {"d3fend_id": "D3-HSM", "name": "Hardware Security Module", "tactic": "Harden"}, + {"d3fend_id": "D3-IDAM", "name": "Identity Management", "tactic": "Harden"}, + {"d3fend_id": "D3-INCA", "name": "Incident Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-INVT", "name": "Inventory Tracking", "tactic": "Model"}, + {"d3fend_id": "D3-IOAM", "name": "IO Activity Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-IPMR", "name": "IP Reputation Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-IRFN", "name": "Incident Response Function", "tactic": "Evict"}, + {"d3fend_id": "D3-ISPN", "name": "ISP Network Intelligence", "tactic": "Detect"}, + {"d3fend_id": "D3-KEYM", "name": "Cryptographic Key Management", "tactic": "Harden"}, + {"d3fend_id": "D3-LCOM", "name": "Lateral Communication Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-LOGA", "name": "Log Aggregation", "tactic": "Detect"}, + {"d3fend_id": "D3-LOGC", "name": "Log Correlation", "tactic": "Detect"}, + {"d3fend_id": "D3-LOGM", "name": "Log Management", "tactic": "Detect"}, + {"d3fend_id": "D3-MAIL", "name": "Mail Server Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-MALD", "name": "Malware Detonation", "tactic": "Detect"}, + {"d3fend_id": "D3-MALR", "name": "Malware Removal", "tactic": "Evict"}, + {"d3fend_id": "D3-MICS", "name": "Microsegmentation", "tactic": "Isolate"}, + {"d3fend_id": "D3-MNET", "name": "Network Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-MTLS", "name": "Mutual TLS", "tactic": "Harden"}, + {"d3fend_id": "D3-NAMS", "name": "Name Server Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-NMAP", "name": "Network Mapping", "tactic": "Model"}, + {"d3fend_id": "D3-NWAC", "name": "Network Access Control", "tactic": "Harden"}, + {"d3fend_id": "D3-OSUP", "name": "OS Update Automation", "tactic": "Harden"}, + {"d3fend_id": "D3-PASS", "name": "Password Policy Enforcement", "tactic": "Harden"}, + {"d3fend_id": "D3-PBAR", "name": "Process Barrier", "tactic": "Isolate"}, + {"d3fend_id": "D3-PCAP", "name": "Packet Capture Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-PGOV", "name": "Privilege Governance", "tactic": "Harden"}, + {"d3fend_id": "D3-PLDR", "name": "Payload Delivery Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-PRIV", "name": "Privilege Escalation Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-PROT", "name": "Protocol Enforcement", "tactic": "Harden"}, + {"d3fend_id": "D3-REDIR", "name": "DNS Redirect", "tactic": "Deceive"}, + {"d3fend_id": "D3-REGG", "name": "Registry Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-RESM", "name": "Resource Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-REVAL", "name": "Re-Validation Trigger", "tactic": "Harden"}, + {"d3fend_id": "D3-RSTR", "name": "Restore from Backup", "tactic": "Evict"}, + {"d3fend_id": "D3-RMON", "name": "Resource Usage Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-SCHE", "name": "Scheduled Task Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-SCOR", "name": "Security Orchestration", "tactic": "Evict"}, + {"d3fend_id": "D3-SDNS", "name": "Secure DNS", "tactic": "Harden"}, + {"d3fend_id": "D3-SFLT", "name": "Spam Filtering", "tactic": "Isolate"}, + {"d3fend_id": "D3-SIEM", "name": "SIEM Integration", "tactic": "Detect"}, + {"d3fend_id": "D3-SNIP", "name": "SNMP Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-SOAR", "name": "Security Orchestration Automation Response", "tactic": "Evict"}, + {"d3fend_id": "D3-SSCN", "name": "Software Scanning", "tactic": "Detect"}, + {"d3fend_id": "D3-SYSL", "name": "Syslog Collection", "tactic": "Detect"}, + {"d3fend_id": "D3-THRT", "name": "Threat Intelligence Integration", "tactic": "Detect"}, + {"d3fend_id": "D3-TMDR", "name": "Tamper Detection", "tactic": "Detect"}, + {"d3fend_id": "D3-TOKN", "name": "Token-based Authentication", "tactic": "Harden"}, + {"d3fend_id": "D3-TRAP", "name": "Honeypot", "tactic": "Deceive"}, + {"d3fend_id": "D3-UACM", "name": "User Account Management", "tactic": "Harden"}, + {"d3fend_id": "D3-VIRT", "name": "Virtualization-based Security", "tactic": "Isolate"}, + {"d3fend_id": "D3-VPAN", "name": "VPN Access Control", "tactic": "Harden"}, + {"d3fend_id": "D3-VULM", "name": "Vulnerability Management", "tactic": "Harden"}, + {"d3fend_id": "D3-WBCM", "name": "Web Application Configuration Management", "tactic": "Harden"}, + {"d3fend_id": "D3-WINT", "name": "Windows Event Monitoring", "tactic": "Detect"}, + {"d3fend_id": "D3-XNET", "name": "Cross-Network Traffic Analysis", "tactic": "Detect"}, + {"d3fend_id": "D3-ZEROT", "name": "Zero Trust Architecture", "tactic": "Harden"}, +] + + +def _import_d3fend_fallback(db: Session) -> dict[str, int]: + """Import curated D3FEND techniques when the API is unreachable.""" + logger.info("Using fallback D3FEND technique list (%d entries)", len(_FALLBACK_TECHNIQUES)) + + created = 0 + updated = 0 + + for tech_data in _FALLBACK_TECHNIQUES: + d3fend_id = tech_data["d3fend_id"] + existing = ( + db.query(DefensiveTechnique) + .filter(DefensiveTechnique.d3fend_id == d3fend_id) + .first() + ) + d3fend_url = D3FEND_BASE_URL.format(d3fend_id=d3fend_id) + + if existing: + existing.name = tech_data["name"] + existing.tactic = tech_data.get("tactic") + existing.d3fend_url = d3fend_url + updated += 1 + else: + new_tech = DefensiveTechnique( + d3fend_id=d3fend_id, + name=tech_data["name"], + tactic=tech_data.get("tactic"), + d3fend_url=d3fend_url, + ) + db.add(new_tech) + created += 1 + + db.commit() + + total = db.query(DefensiveTechnique).count() + logger.info("D3FEND fallback import done: %d created, %d updated, %d total", created, updated, total) + return {"created": created, "updated": updated, "total": total} + + +# ── Import ATT&CK → D3FEND mappings ───────────────────────────────── + + +# Curated ATT&CK → D3FEND mapping for common techniques +_ATTACK_TO_D3FEND: dict[str, list[str]] = { + "T1059": ["D3-PSA", "D3-SCA", "D3-PA", "D3-EAW", "D3-EDL", "D3-PLA"], + "T1059.001": ["D3-PSA", "D3-SCA", "D3-PA", "D3-EAW", "D3-EDL"], + "T1059.003": ["D3-PSA", "D3-SCA", "D3-PA", "D3-EAW"], + "T1059.005": ["D3-PSA", "D3-SCA", "D3-EAW"], + "T1059.007": ["D3-PSA", "D3-SCA", "D3-EAW"], + "T1055": ["D3-PA", "D3-PSA", "D3-HBPI", "D3-PMAD", "D3-PLA"], + "T1055.001": ["D3-PA", "D3-PMAD", "D3-HBPI"], + "T1055.002": ["D3-PA", "D3-PMAD", "D3-HBPI"], + "T1003": ["D3-CH", "D3-CR", "D3-MFA", "D3-PMAD"], + "T1003.001": ["D3-CH", "D3-CR", "D3-PMAD"], + "T1078": ["D3-MFA", "D3-UBA", "D3-UGLPA", "D3-CH"], + "T1078.001": ["D3-MFA", "D3-UBA", "D3-CH"], + "T1566": ["D3-EAL", "D3-FA", "D3-FH", "D3-UA", "D3-EHR"], + "T1566.001": ["D3-EAL", "D3-FA", "D3-FH", "D3-EHR"], + "T1566.002": ["D3-UA", "D3-EAL", "D3-EHR"], + "T1071": ["D3-AL", "D3-NTA", "D3-PM", "D3-CT"], + "T1071.001": ["D3-AL", "D3-NTA", "D3-PM"], + "T1053": ["D3-PSA", "D3-PA", "D3-SCHE", "D3-SSA"], + "T1053.005": ["D3-PSA", "D3-SCHE", "D3-SSA"], + "T1543": ["D3-SMRA", "D3-SSA", "D3-SBAN"], + "T1543.003": ["D3-SMRA", "D3-SSA", "D3-SBAN"], + "T1547": ["D3-SICA", "D3-SSA", "D3-RRID"], + "T1547.001": ["D3-SICA", "D3-SSA", "D3-RRID"], + "T1021": ["D3-RTSD", "D3-RPA", "D3-NTA", "D3-MFA"], + "T1021.001": ["D3-RTSD", "D3-NTA", "D3-MFA"], + "T1021.002": ["D3-RTSD", "D3-NTA", "D3-NI"], + "T1560": ["D3-FA", "D3-FCA", "D3-ORA"], + "T1560.001": ["D3-FA", "D3-FCA"], + "T1048": ["D3-ORA", "D3-NTA", "D3-OTF"], + "T1048.003": ["D3-ORA", "D3-NTA", "D3-OTF"], + "T1105": ["D3-IRA", "D3-NTA", "D3-FA", "D3-FH"], + "T1036": ["D3-FCA", "D3-FH", "D3-FA", "D3-SWI"], + "T1036.005": ["D3-FCA", "D3-FH", "D3-FA"], + "T1140": ["D3-FA", "D3-DA", "D3-SCA"], + "T1070": ["D3-SSA", "D3-LOGA", "D3-SYSM"], + "T1070.004": ["D3-SSA", "D3-FAPA"], + "T1562": ["D3-SSA", "D3-SYSM", "D3-SMRA"], + "T1562.001": ["D3-SSA", "D3-SYSM", "D3-SMRA"], + "T1027": ["D3-DA", "D3-FA", "D3-RE"], + "T1027.002": ["D3-DA", "D3-FA"], + "T1110": ["D3-MFA", "D3-UBA", "D3-CH"], + "T1110.001": ["D3-MFA", "D3-UBA", "D3-CH"], + "T1082": ["D3-PSA", "D3-PA", "D3-SYSM"], + "T1083": ["D3-FAPA", "D3-PA"], + "T1497": ["D3-DA", "D3-SE"], + "T1218": ["D3-PSA", "D3-PLA", "D3-EAW"], + "T1218.011": ["D3-PSA", "D3-PLA", "D3-EAW"], + "T1569": ["D3-SMRA", "D3-PSA", "D3-PA"], + "T1569.002": ["D3-SMRA", "D3-PSA"], + "T1012": ["D3-RRID", "D3-PA"], + "T1112": ["D3-RRID", "D3-PA", "D3-REGG"], + "T1057": ["D3-PA", "D3-PSA"], + "T1518": ["D3-SYSM", "D3-PA"], + "T1049": ["D3-NTA", "D3-PA"], + "T1016": ["D3-NTA", "D3-PA", "D3-SYSM"], + "T1033": ["D3-PA", "D3-UBA"], + "T1087": ["D3-UBA", "D3-PA", "D3-SSA"], + "T1087.001": ["D3-UBA", "D3-PA"], + "T1087.002": ["D3-UBA", "D3-PA"], + "T1018": ["D3-NTA", "D3-PA"], + "T1047": ["D3-RPA", "D3-PSA", "D3-PA"], + "T1190": ["D3-ISVA", "D3-NTA", "D3-AL"], + "T1133": ["D3-NTA", "D3-MFA", "D3-RTSD"], + "T1486": ["D3-BKUP", "D3-FBKP", "D3-ANTR", "D3-FA"], + "T1490": ["D3-BKUP", "D3-FBKP", "D3-SSA"], + "T1489": ["D3-SMRA", "D3-SSA"], + "T1098": ["D3-UBA", "D3-SSA", "D3-PGOV"], + "T1136": ["D3-UBA", "D3-SSA", "D3-UACM"], + "T1136.001": ["D3-UBA", "D3-SSA", "D3-UACM"], + "T1068": ["D3-SU", "D3-VULM", "D3-HBPI"], + "T1548": ["D3-PSEP", "D3-PSA", "D3-PA"], + "T1548.002": ["D3-PSEP", "D3-PSA"], + "T1134": ["D3-PA", "D3-PSA", "D3-PSEP"], + "T1134.001": ["D3-PA", "D3-PSA"], + "T1574": ["D3-SWI", "D3-FCA", "D3-PLA"], + "T1574.001": ["D3-SWI", "D3-FCA"], + "T1204": ["D3-EAL", "D3-FA", "D3-UA"], + "T1204.001": ["D3-UA", "D3-EAL"], + "T1204.002": ["D3-FA", "D3-EAL", "D3-DA"], + "T1071.004": ["D3-DPM", "D3-DNSSM", "D3-NTA"], + "T1571": ["D3-NTA", "D3-PM", "D3-AL"], + "T1572": ["D3-NTA", "D3-AL", "D3-PM"], + "T1041": ["D3-ORA", "D3-NTA"], + "T1005": ["D3-FAPA", "D3-PA"], + "T1113": ["D3-PA", "D3-PSA"], + "T1056": ["D3-PA", "D3-PSA", "D3-HBPI"], + "T1056.001": ["D3-PA", "D3-PSA"], + "T1560.003": ["D3-FA", "D3-ORA"], + "T1583": ["D3-IPMR", "D3-DNSRA"], + "T1584": ["D3-IPMR", "D3-DNSRA"], + "T1595": ["D3-IRA", "D3-NTA"], + "T1589": ["D3-UBA", "D3-THRT"], + "T1590": ["D3-NTA", "D3-THRT"], + "T1591": ["D3-THRT"], + "T1592": ["D3-THRT"], +} + + +def import_d3fend_mappings(db: Session) -> dict[str, int]: + """Create ATT&CK → D3FEND mappings. + + First tries the D3FEND API for each ATT&CK technique in the DB, + then falls back to the curated mapping for any remaining techniques. + + Returns a dict with counts: {created, skipped, total}. + """ + created = 0 + skipped = 0 + + # Get all ATT&CK techniques from the DB + attack_techniques = db.query(Technique).all() + technique_map = {t.mitre_id: t for t in attack_techniques} + + # Get all defensive techniques + defensive_techniques = db.query(DefensiveTechnique).all() + d3fend_map = {dt.d3fend_id: dt for dt in defensive_techniques} + + if not d3fend_map: + logger.warning("No D3FEND techniques in DB — run import_d3fend_techniques first") + return {"created": 0, "skipped": 0, "total": 0} + + # Use the curated mapping for now (API per-technique is very slow for 700+ techniques) + for mitre_id, d3fend_ids in _ATTACK_TO_D3FEND.items(): + attack_tech = technique_map.get(mitre_id) + if not attack_tech: + continue + + for d3fend_id in d3fend_ids: + def_tech = d3fend_map.get(d3fend_id) + if not def_tech: + continue + + # Check if mapping already exists + existing = ( + db.query(DefensiveTechniqueMapping) + .filter( + DefensiveTechniqueMapping.attack_technique_id == attack_tech.id, + DefensiveTechniqueMapping.defensive_technique_id == def_tech.id, + ) + .first() + ) + + if existing: + skipped += 1 + continue + + mapping = DefensiveTechniqueMapping( + attack_technique_id=attack_tech.id, + defensive_technique_id=def_tech.id, + ) + db.add(mapping) + created += 1 + + db.commit() + + total = db.query(DefensiveTechniqueMapping).count() + logger.info("D3FEND mappings: %d created, %d skipped, %d total", created, skipped, total) + return {"created": created, "skipped": skipped, "total": total} + + +def get_defenses_for_technique(db: Session, technique_id) -> list[dict]: + """Get all D3FEND defensive techniques mapped to a given ATT&CK technique.""" + mappings = ( + db.query(DefensiveTechniqueMapping) + .filter(DefensiveTechniqueMapping.attack_technique_id == technique_id) + .all() + ) + + results = [] + for m in mappings: + dt = m.defensive_technique + results.append({ + "id": str(dt.id), + "d3fend_id": dt.d3fend_id, + "name": dt.name, + "description": dt.description, + "tactic": dt.tactic, + "d3fend_url": dt.d3fend_url, + }) + + return results diff --git a/frontend/src/api/d3fend.ts b/frontend/src/api/d3fend.ts new file mode 100644 index 0000000..b0c0127 --- /dev/null +++ b/frontend/src/api/d3fend.ts @@ -0,0 +1,56 @@ +import client from "./client"; + +export interface DefensiveTechnique { + id: string; + d3fend_id: string; + name: string; + description: string | null; + tactic: string | null; + d3fend_url: string | null; +} + +export interface DefensesForTechnique { + mitre_id: string; + technique_name: string; + defenses: DefensiveTechnique[]; + total: number; +} + +export interface D3FENDTactic { + tactic: string; + count: number; +} + +export interface D3FENDImportResult { + techniques: { created: number; updated: number; total: number }; + mappings: { created: number; skipped: number; total: number }; +} + +/** Fetch defenses for a specific ATT&CK technique. */ +export async function getDefensesForTechnique(mitreId: string): Promise { + const { data } = await client.get(`/d3fend/for-technique/${mitreId}`); + return data; +} + +/** List all defensive techniques with optional filters. */ +export async function listDefensiveTechniques(params?: { + tactic?: string; + search?: string; + offset?: number; + limit?: number; +}): Promise<{ total: number; items: DefensiveTechnique[] }> { + const { data } = await client.get("/d3fend", { params }); + return data; +} + +/** Get D3FEND tactic counts. */ +export async function getD3FENDTactics(): Promise { + const { data } = await client.get("/d3fend/tactics"); + return data; +} + +/** Trigger D3FEND import (admin only). */ +export async function triggerD3FENDImport(): Promise { + const { data } = await client.post("/d3fend/import"); + return data; +} diff --git a/frontend/src/api/techniques.ts b/frontend/src/api/techniques.ts index cd28678..53499ae 100644 --- a/frontend/src/api/techniques.ts +++ b/frontend/src/api/techniques.ts @@ -1,5 +1,5 @@ import client from "./client"; -import type { Technique, TechniqueStatus, Test, IntelItem } from "../types/models"; +import type { Technique, TechniqueStatus, Test, IntelItem, DefensiveTechnique } from "../types/models"; /** Summary representation used in list endpoints. */ export interface TechniqueSummary { @@ -15,6 +15,7 @@ export interface TechniqueSummary { export interface TechniqueWithTests extends Technique { tests?: Test[]; intel_items?: IntelItem[]; + d3fend_defenses?: DefensiveTechnique[]; } export interface TechniqueFilters { diff --git a/frontend/src/components/test-detail/TeamTabs.tsx b/frontend/src/components/test-detail/TeamTabs.tsx index 2589694..d9de1d3 100644 --- a/frontend/src/components/test-detail/TeamTabs.tsx +++ b/frontend/src/components/test-detail/TeamTabs.tsx @@ -1,4 +1,5 @@ import { useState } from "react"; +import { useQuery } from "@tanstack/react-query"; import { Shield, ShieldCheck, @@ -10,6 +11,7 @@ import { XCircle, AlertTriangle, Trash2, + ExternalLink, } from "lucide-react"; import type { Test, @@ -18,8 +20,10 @@ import type { Evidence, TestTimelineEntry, User, + DefensiveTechnique, } from "../../types/models"; import { RED_EDITABLE_STATES, BLUE_EDITABLE_STATES } from "../../types/models"; +import { getDefensesForTechnique } from "../../api/d3fend"; import EvidenceUpload from "../EvidenceUpload"; import EvidenceList from "../EvidenceList"; @@ -105,6 +109,13 @@ export default function TeamTabs({ const [activeTab, setActiveTab] = useState("red"); const role = user?.role ?? ""; + // Fetch D3FEND defenses for the test's technique + const { data: d3fendData } = useQuery({ + queryKey: ["d3fend-defenses", test.technique_mitre_id], + queryFn: () => getDefensesForTechnique(test.technique_mitre_id!), + enabled: !!test.technique_mitre_id, + }); + const canEditRed = RED_EDITABLE_STATES.includes(test.state) && (role === "red_tech" || role === "admin"); @@ -326,6 +337,55 @@ export default function TeamTabs({ /> + {/* Recommended Detection Approaches (D3FEND) */} + {d3fendData && d3fendData.defenses.length > 0 && ( +
+

+ + Recommended Detection Approaches + + {d3fendData.defenses.length} countermeasure{d3fendData.defenses.length !== 1 ? "s" : ""} + +

+
+ {d3fendData.defenses.map((def) => ( +
+
+
+ + {def.d3fend_id} + + {def.name} + {def.tactic && ( + + {def.tactic} + + )} +
+ {def.description && ( +

{def.description}

+ )} +
+ {def.d3fend_url && ( + + + + )} +
+ ))} +
+
+ )} + {/* Blue validation status if applicable */} {test.blue_validation_status && (
+ {/* Recommended Defenses (D3FEND) */} + {technique.d3fend_defenses && technique.d3fend_defenses.length > 0 && ( +
+
+

+ + Recommended Defenses (D3FEND) +

+ + {technique.d3fend_defenses.length} countermeasure{technique.d3fend_defenses.length !== 1 ? "s" : ""} + +
+ + {/* Group by tactic */} + {(() => { + const grouped: Record = {}; + for (const def of technique.d3fend_defenses!) { + const tactic = def.tactic || "Other"; + if (!grouped[tactic]) grouped[tactic] = []; + grouped[tactic].push(def); + } + const tacticColors: Record = { + Detect: "border-blue-500/30 bg-blue-900/20 text-blue-400", + Harden: "border-emerald-500/30 bg-emerald-900/20 text-emerald-400", + Isolate: "border-purple-500/30 bg-purple-900/20 text-purple-400", + Deceive: "border-amber-500/30 bg-amber-900/20 text-amber-400", + Evict: "border-red-500/30 bg-red-900/20 text-red-400", + Model: "border-cyan-500/30 bg-cyan-900/20 text-cyan-400", + }; + + return Object.entries(grouped).map(([tactic, defenses]) => ( +
+

+ {tactic} +

+
+ {defenses!.map((def) => ( +
+
+
+

+ {def.d3fend_id} + {def.name} +

+ {def.description && ( +

{def.description}

+ )} +
+ {def.d3fend_url && ( + + + + )} +
+
+ ))} +
+
+ )); + })()} +
+ )} + {/* Intel Items Section */} {technique.intel_items && technique.intel_items.length > 0 && (
diff --git a/frontend/src/types/models.ts b/frontend/src/types/models.ts index 73e5cc8..7ecacbb 100644 --- a/frontend/src/types/models.ts +++ b/frontend/src/types/models.ts @@ -187,3 +187,14 @@ export interface TacticCoverage { not_evaluated: number; in_progress: number; } + +// ── D3FEND ──────────────────────────────────────────────────────── + +export interface DefensiveTechnique { + id: string; + d3fend_id: string; + name: string; + description: string | null; + tactic: string | null; + d3fend_url: string | null; +}