b60e5562c0
1. fix(search 500): func.cast(col, func.text()) is invalid SQLAlchemy — replaced with cast(col, Text) for both aliases and target_sectors JSONB columns. Generating correct CAST(col AS TEXT) SQL. 2. feat(motivation): extract primary_motivation and sophistication from STIX intrusion-set objects during MITRE sync. Added _normalize_motivation() to map STIX vocabulary → simplified frontend values (espionage / financial / destruction / hacktivism). Both create and update paths now set these fields. Run MITRE sync to backfill existing actors.
421 lines
14 KiB
Python
421 lines
14 KiB
Python
"""Threat Actor import service (MITRE CTI / STIX 2.0).
|
|
|
|
Downloads the MITRE CTI repository, parses the STIX 2.0 bundle for
|
|
``intrusion-set`` objects (APT groups) and ``relationship`` objects
|
|
linking them to ``attack-pattern`` (techniques), then creates
|
|
:class:`ThreatActor` and :class:`ThreatActorTechnique` records.
|
|
|
|
STIX 2.0 structure
|
|
------------------
|
|
The enterprise-attack bundle contains:
|
|
- ``intrusion-set`` objects → our ThreatActor rows
|
|
- ``attack-pattern`` objects → already in our Technique table
|
|
- ``relationship`` objects (type=uses) → connects intrusion-set → attack-pattern
|
|
|
|
Strategy
|
|
--------
|
|
1. Download ZIP of ``github.com/mitre/cti``.
|
|
2. Load ``enterprise-attack/enterprise-attack.json`` (single STIX bundle).
|
|
3. Build lookup maps for intrusion-sets and attack-patterns.
|
|
4. Parse relationships to connect actors → techniques.
|
|
5. Upsert into database.
|
|
|
|
Idempotency
|
|
-----------
|
|
Deduplication by ``mitre_id`` for ThreatActor and by the unique
|
|
constraint ``(threat_actor_id, technique_id)`` for ThreatActorTechnique.
|
|
"""
|
|
|
|
import io
|
|
import json
|
|
import logging
|
|
import shutil
|
|
import tempfile
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import requests as _requests
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
|
|
from app.models.technique import Technique
|
|
from app.models.data_source import DataSource
|
|
from app.services.audit_service import log_action
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MITRE_CTI_ZIP_URL = (
|
|
"https://github.com/mitre/cti"
|
|
"/archive/refs/heads/master.zip"
|
|
)
|
|
|
|
_DOWNLOAD_TIMEOUT = 300
|
|
_ZIP_ROOT_PREFIX = "cti-master"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _download_zip(url: str = MITRE_CTI_ZIP_URL) -> bytes:
|
|
"""Download the MITRE CTI ZIP and return raw bytes."""
|
|
logger.info("Downloading MITRE CTI ZIP from %s …", url)
|
|
resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True)
|
|
resp.raise_for_status()
|
|
content = resp.content
|
|
logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024))
|
|
return content
|
|
|
|
|
|
def _extract_zip_and_load_bundle(zip_bytes: bytes, dest: str) -> dict:
|
|
"""Extract ZIP and load the enterprise-attack STIX bundle."""
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
|
|
zf.extractall(dest)
|
|
|
|
bundle_path = (
|
|
Path(dest) / _ZIP_ROOT_PREFIX
|
|
/ "enterprise-attack" / "enterprise-attack.json"
|
|
)
|
|
if not bundle_path.is_file():
|
|
raise FileNotFoundError(
|
|
f"STIX bundle not found at {bundle_path}"
|
|
)
|
|
|
|
logger.info("Loading STIX bundle from %s …", bundle_path)
|
|
with open(bundle_path, "r", encoding="utf-8") as fh:
|
|
bundle = json.load(fh)
|
|
|
|
objects = bundle.get("objects", [])
|
|
logger.info("Loaded %d STIX objects", len(objects))
|
|
return bundle
|
|
|
|
|
|
def _extract_mitre_id(external_references: list) -> str | None:
|
|
"""Extract the MITRE ATT&CK ID from external_references."""
|
|
if not isinstance(external_references, list):
|
|
return None
|
|
for ref in external_references:
|
|
if isinstance(ref, dict) and ref.get("source_name") == "mitre-attack":
|
|
return ref.get("external_id")
|
|
return None
|
|
|
|
|
|
def _extract_mitre_url(external_references: list) -> str | None:
|
|
"""Extract the MITRE ATT&CK URL from external_references."""
|
|
if not isinstance(external_references, list):
|
|
return None
|
|
for ref in external_references:
|
|
if isinstance(ref, dict) and ref.get("source_name") == "mitre-attack":
|
|
return ref.get("url")
|
|
return None
|
|
|
|
|
|
# Map STIX primary_motivation vocabulary → simplified frontend values
|
|
_MOTIVATION_MAP: dict[str, str] = {
|
|
# Espionage / nation-state
|
|
"espionage": "espionage",
|
|
"national-security": "espionage",
|
|
"political": "espionage",
|
|
# Financial
|
|
"financial": "financial",
|
|
"financial-gain": "financial",
|
|
"personal-gain": "financial",
|
|
"organizational-gain": "financial",
|
|
# Destruction / disruption
|
|
"destruction": "destruction",
|
|
"disruption": "destruction",
|
|
"coercion": "destruction",
|
|
"dominance": "destruction",
|
|
# Hacktivism / ideology
|
|
"ideology": "hacktivism",
|
|
"hacktivism": "hacktivism",
|
|
"notoriety": "hacktivism",
|
|
"personal-satisfaction": "hacktivism",
|
|
"revenge": "hacktivism",
|
|
}
|
|
|
|
|
|
def _normalize_motivation(raw: str | None) -> str | None:
|
|
"""Normalize a STIX primary_motivation value to the Aegis vocabulary."""
|
|
if not raw:
|
|
return None
|
|
return _MOTIVATION_MAP.get(raw.lower().strip())
|
|
|
|
|
|
def _parse_intrusion_sets(objects: list) -> list[dict]:
|
|
"""Parse STIX intrusion-set objects into ThreatActor dicts."""
|
|
actors = []
|
|
for obj in objects:
|
|
if obj.get("type") != "intrusion-set":
|
|
continue
|
|
if obj.get("revoked"):
|
|
continue
|
|
|
|
ext_refs = obj.get("external_references", [])
|
|
mitre_id = _extract_mitre_id(ext_refs)
|
|
mitre_url = _extract_mitre_url(ext_refs)
|
|
|
|
name = obj.get("name", "").strip()
|
|
if not name:
|
|
continue
|
|
|
|
aliases = obj.get("aliases", [])
|
|
if isinstance(aliases, list) and name in aliases:
|
|
aliases = [a for a in aliases if a != name]
|
|
|
|
description = obj.get("description", "")
|
|
|
|
# Extract primary_motivation and sophistication from STIX object
|
|
raw_motivation = obj.get("primary_motivation")
|
|
motivation = _normalize_motivation(raw_motivation)
|
|
sophistication = obj.get("sophistication") # e.g. "advanced", "expert"
|
|
|
|
# Extract references (non-MITRE)
|
|
references = []
|
|
for ref in ext_refs:
|
|
if isinstance(ref, dict) and ref.get("source_name") != "mitre-attack":
|
|
references.append({
|
|
"source": ref.get("source_name", ""),
|
|
"url": ref.get("url", ""),
|
|
"description": ref.get("description", ""),
|
|
})
|
|
|
|
actors.append({
|
|
"stix_id": obj.get("id"), # e.g. "intrusion-set--abc123"
|
|
"mitre_id": mitre_id,
|
|
"name": name,
|
|
"aliases": aliases if aliases else [],
|
|
"description": description,
|
|
"mitre_url": mitre_url,
|
|
"references": references[:20], # cap to avoid bloat
|
|
"first_seen": obj.get("first_seen"),
|
|
"last_seen": obj.get("last_seen"),
|
|
"motivation": motivation,
|
|
"sophistication": sophistication,
|
|
})
|
|
|
|
logger.info("Parsed %d intrusion-sets (threat actors)", len(actors))
|
|
return actors
|
|
|
|
|
|
def _parse_relationships(objects: list) -> list[dict]:
|
|
"""Parse STIX relationship objects (type=uses) linking
|
|
intrusion-sets to attack-patterns.
|
|
"""
|
|
relationships = []
|
|
for obj in objects:
|
|
if obj.get("type") != "relationship":
|
|
continue
|
|
if obj.get("relationship_type") != "uses":
|
|
continue
|
|
if obj.get("revoked"):
|
|
continue
|
|
|
|
source_ref = obj.get("source_ref", "")
|
|
target_ref = obj.get("target_ref", "")
|
|
|
|
# We want intrusion-set → attack-pattern
|
|
if not source_ref.startswith("intrusion-set--"):
|
|
continue
|
|
if not target_ref.startswith("attack-pattern--"):
|
|
continue
|
|
|
|
relationships.append({
|
|
"source_ref": source_ref,
|
|
"target_ref": target_ref,
|
|
"description": obj.get("description", ""),
|
|
})
|
|
|
|
logger.info("Parsed %d uses-relationships (actor→technique)", len(relationships))
|
|
return relationships
|
|
|
|
|
|
def _build_attack_pattern_map(objects: list) -> dict[str, str]:
|
|
"""Build a map from STIX attack-pattern ID → MITRE technique ID.
|
|
|
|
e.g. {"attack-pattern--abc123": "T1059.001"}
|
|
"""
|
|
mapping = {}
|
|
for obj in objects:
|
|
if obj.get("type") != "attack-pattern":
|
|
continue
|
|
if obj.get("revoked"):
|
|
continue
|
|
stix_id = obj.get("id", "")
|
|
mitre_id = _extract_mitre_id(obj.get("external_references", []))
|
|
if stix_id and mitre_id:
|
|
mapping[stix_id] = mitre_id
|
|
logger.info("Built attack-pattern map with %d entries", len(mapping))
|
|
return mapping
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def sync(db: Session) -> dict:
|
|
"""Download and import threat actors from MITRE CTI.
|
|
|
|
Returns a summary dict.
|
|
"""
|
|
tmp_dir = tempfile.mkdtemp(prefix="aegis_mitre_cti_")
|
|
try:
|
|
zip_bytes = _download_zip()
|
|
bundle = _extract_zip_and_load_bundle(zip_bytes, tmp_dir)
|
|
finally:
|
|
shutil.rmtree(tmp_dir, ignore_errors=True)
|
|
logger.info("Cleaned up temp directory %s", tmp_dir)
|
|
|
|
objects = bundle.get("objects", [])
|
|
|
|
# Step 1: Parse data
|
|
actor_dicts = _parse_intrusion_sets(objects)
|
|
relationships = _parse_relationships(objects)
|
|
attack_pattern_map = _build_attack_pattern_map(objects)
|
|
|
|
# Step 2: Build STIX-ID → actor dict map
|
|
stix_to_actor = {a["stix_id"]: a for a in actor_dicts}
|
|
|
|
# Step 3: Load existing actors and techniques from DB
|
|
existing_actors = {
|
|
row.mitre_id: row
|
|
for row in db.query(ThreatActor).all()
|
|
if row.mitre_id
|
|
}
|
|
|
|
technique_by_mitre_id = {
|
|
row.mitre_id: row
|
|
for row in db.query(Technique).all()
|
|
}
|
|
|
|
# Step 4: Upsert threat actors
|
|
actors_created = 0
|
|
actors_skipped = 0
|
|
stix_to_db_actor: dict[str, ThreatActor] = {}
|
|
|
|
for actor_dict in actor_dicts:
|
|
mitre_id = actor_dict["mitre_id"]
|
|
stix_id = actor_dict["stix_id"]
|
|
|
|
if mitre_id and mitre_id in existing_actors:
|
|
# Update existing actor
|
|
db_actor = existing_actors[mitre_id]
|
|
db_actor.name = actor_dict["name"]
|
|
db_actor.aliases = actor_dict["aliases"]
|
|
db_actor.description = actor_dict["description"]
|
|
db_actor.mitre_url = actor_dict["mitre_url"]
|
|
db_actor.references = actor_dict["references"]
|
|
db_actor.first_seen = actor_dict.get("first_seen")
|
|
db_actor.last_seen = actor_dict.get("last_seen")
|
|
# Update enrichment fields if available
|
|
if actor_dict.get("motivation"):
|
|
db_actor.motivation = actor_dict["motivation"]
|
|
if actor_dict.get("sophistication"):
|
|
db_actor.sophistication = actor_dict["sophistication"]
|
|
stix_to_db_actor[stix_id] = db_actor
|
|
actors_skipped += 1
|
|
else:
|
|
# Create new actor
|
|
db_actor = ThreatActor(
|
|
mitre_id=mitre_id,
|
|
name=actor_dict["name"],
|
|
aliases=actor_dict["aliases"],
|
|
description=actor_dict["description"],
|
|
mitre_url=actor_dict["mitre_url"],
|
|
references=actor_dict["references"],
|
|
first_seen=actor_dict.get("first_seen"),
|
|
last_seen=actor_dict.get("last_seen"),
|
|
motivation=actor_dict.get("motivation"),
|
|
sophistication=actor_dict.get("sophistication"),
|
|
is_active=True,
|
|
)
|
|
db.add(db_actor)
|
|
db.flush() # get the ID
|
|
if mitre_id:
|
|
existing_actors[mitre_id] = db_actor
|
|
stix_to_db_actor[stix_id] = db_actor
|
|
actors_created += 1
|
|
|
|
db.flush()
|
|
|
|
# Step 5: Upsert actor-technique relationships
|
|
# Load existing relationships
|
|
existing_rels: set[tuple] = set()
|
|
for row in db.query(ThreatActorTechnique).all():
|
|
existing_rels.add((str(row.threat_actor_id), str(row.technique_id)))
|
|
|
|
rels_created = 0
|
|
rels_skipped = 0
|
|
|
|
for rel in relationships:
|
|
source_ref = rel["source_ref"]
|
|
target_ref = rel["target_ref"]
|
|
|
|
# Resolve actor
|
|
db_actor = stix_to_db_actor.get(source_ref)
|
|
if not db_actor:
|
|
continue
|
|
|
|
# Resolve technique
|
|
mitre_technique_id = attack_pattern_map.get(target_ref)
|
|
if not mitre_technique_id:
|
|
continue
|
|
|
|
db_technique = technique_by_mitre_id.get(mitre_technique_id)
|
|
if not db_technique:
|
|
continue
|
|
|
|
rel_key = (str(db_actor.id), str(db_technique.id))
|
|
if rel_key in existing_rels:
|
|
rels_skipped += 1
|
|
continue
|
|
|
|
actor_technique = ThreatActorTechnique(
|
|
threat_actor_id=db_actor.id,
|
|
technique_id=db_technique.id,
|
|
usage_description=rel["description"][:5000] if rel["description"] else None,
|
|
)
|
|
db.add(actor_technique)
|
|
existing_rels.add(rel_key)
|
|
rels_created += 1
|
|
|
|
db.commit()
|
|
|
|
summary = {
|
|
"actors_created": actors_created,
|
|
"actors_updated": actors_skipped,
|
|
"relationships_created": rels_created,
|
|
"relationships_skipped": rels_skipped,
|
|
"total_actors_parsed": len(actor_dicts),
|
|
"total_relationships_parsed": len(relationships),
|
|
}
|
|
|
|
# Update DataSource record
|
|
ds = db.query(DataSource).filter(DataSource.name == "mitre_cti").first()
|
|
if ds:
|
|
ds.last_sync_at = datetime.utcnow()
|
|
ds.last_sync_status = "success"
|
|
ds.last_sync_stats = summary
|
|
db.commit()
|
|
|
|
logger.info("MITRE CTI threat actor import complete — %s", summary)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=None,
|
|
action="import_threat_actors",
|
|
entity_type="threat_actor",
|
|
entity_id=None,
|
|
details=summary,
|
|
)
|
|
db.commit()
|
|
|
|
return summary
|