"""Threat Actor import service (MITRE CTI / STIX 2.0). Downloads the MITRE CTI repository, parses the STIX 2.0 bundle for ``intrusion-set`` objects (APT groups) and ``relationship`` objects linking them to ``attack-pattern`` (techniques), then creates :class:`ThreatActor` and :class:`ThreatActorTechnique` records. STIX 2.0 structure ------------------ The enterprise-attack bundle contains: - ``intrusion-set`` objects → our ThreatActor rows - ``attack-pattern`` objects → already in our Technique table - ``relationship`` objects (type=uses) → connects intrusion-set → attack-pattern Strategy -------- 1. Download ZIP of ``github.com/mitre/cti``. 2. Load ``enterprise-attack/enterprise-attack.json`` (single STIX bundle). 3. Build lookup maps for intrusion-sets and attack-patterns. 4. Parse relationships to connect actors → techniques. 5. Upsert into database. Idempotency ----------- Deduplication by ``mitre_id`` for ThreatActor and by the unique constraint ``(threat_actor_id, technique_id)`` for ThreatActorTechnique. """ import io import json import logging import shutil import tempfile import zipfile from datetime import datetime from pathlib import Path import requests as _requests from sqlalchemy.orm import Session from app.models.threat_actor import ThreatActor, ThreatActorTechnique from app.models.technique import Technique from app.models.data_source import DataSource from app.services.audit_service import log_action logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- MITRE_CTI_ZIP_URL = ( "https://github.com/mitre/cti" "/archive/refs/heads/master.zip" ) _DOWNLOAD_TIMEOUT = 300 _ZIP_ROOT_PREFIX = "cti-master" # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _download_zip(url: str = MITRE_CTI_ZIP_URL) -> bytes: """Download the MITRE CTI ZIP and return raw bytes.""" logger.info("Downloading MITRE CTI ZIP from %s …", url) resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) resp.raise_for_status() content = resp.content logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) return content def _extract_zip_and_load_bundle(zip_bytes: bytes, dest: str) -> dict: """Extract ZIP and load the enterprise-attack STIX bundle.""" with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: zf.extractall(dest) bundle_path = ( Path(dest) / _ZIP_ROOT_PREFIX / "enterprise-attack" / "enterprise-attack.json" ) if not bundle_path.is_file(): raise FileNotFoundError( f"STIX bundle not found at {bundle_path}" ) logger.info("Loading STIX bundle from %s …", bundle_path) with open(bundle_path, "r", encoding="utf-8") as fh: bundle = json.load(fh) objects = bundle.get("objects", []) logger.info("Loaded %d STIX objects", len(objects)) return bundle def _extract_mitre_id(external_references: list) -> str | None: """Extract the MITRE ATT&CK ID from external_references.""" if not isinstance(external_references, list): return None for ref in external_references: if isinstance(ref, dict) and ref.get("source_name") == "mitre-attack": return ref.get("external_id") return None def _extract_mitre_url(external_references: list) -> str | None: """Extract the MITRE ATT&CK URL from external_references.""" if not isinstance(external_references, list): return None for ref in external_references: if isinstance(ref, dict) and ref.get("source_name") == "mitre-attack": return ref.get("url") return None def _parse_intrusion_sets(objects: list) -> list[dict]: """Parse STIX intrusion-set objects into ThreatActor dicts.""" actors = [] for obj in objects: if obj.get("type") != "intrusion-set": continue if obj.get("revoked"): continue ext_refs = obj.get("external_references", []) mitre_id = _extract_mitre_id(ext_refs) mitre_url = _extract_mitre_url(ext_refs) name = obj.get("name", "").strip() if not name: continue aliases = obj.get("aliases", []) if isinstance(aliases, list) and name in aliases: aliases = [a for a in aliases if a != name] description = obj.get("description", "") # Extract references (non-MITRE) references = [] for ref in ext_refs: if isinstance(ref, dict) and ref.get("source_name") != "mitre-attack": references.append({ "source": ref.get("source_name", ""), "url": ref.get("url", ""), "description": ref.get("description", ""), }) actors.append({ "stix_id": obj.get("id"), # e.g. "intrusion-set--abc123" "mitre_id": mitre_id, "name": name, "aliases": aliases if aliases else [], "description": description, "mitre_url": mitre_url, "references": references[:20], # cap to avoid bloat "first_seen": obj.get("first_seen"), "last_seen": obj.get("last_seen"), }) logger.info("Parsed %d intrusion-sets (threat actors)", len(actors)) return actors def _parse_relationships(objects: list) -> list[dict]: """Parse STIX relationship objects (type=uses) linking intrusion-sets to attack-patterns. """ relationships = [] for obj in objects: if obj.get("type") != "relationship": continue if obj.get("relationship_type") != "uses": continue if obj.get("revoked"): continue source_ref = obj.get("source_ref", "") target_ref = obj.get("target_ref", "") # We want intrusion-set → attack-pattern if not source_ref.startswith("intrusion-set--"): continue if not target_ref.startswith("attack-pattern--"): continue relationships.append({ "source_ref": source_ref, "target_ref": target_ref, "description": obj.get("description", ""), }) logger.info("Parsed %d uses-relationships (actor→technique)", len(relationships)) return relationships def _build_attack_pattern_map(objects: list) -> dict[str, str]: """Build a map from STIX attack-pattern ID → MITRE technique ID. e.g. {"attack-pattern--abc123": "T1059.001"} """ mapping = {} for obj in objects: if obj.get("type") != "attack-pattern": continue if obj.get("revoked"): continue stix_id = obj.get("id", "") mitre_id = _extract_mitre_id(obj.get("external_references", [])) if stix_id and mitre_id: mapping[stix_id] = mitre_id logger.info("Built attack-pattern map with %d entries", len(mapping)) return mapping # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def sync(db: Session) -> dict: """Download and import threat actors from MITRE CTI. Returns a summary dict. """ tmp_dir = tempfile.mkdtemp(prefix="aegis_mitre_cti_") try: zip_bytes = _download_zip() bundle = _extract_zip_and_load_bundle(zip_bytes, tmp_dir) finally: shutil.rmtree(tmp_dir, ignore_errors=True) logger.info("Cleaned up temp directory %s", tmp_dir) objects = bundle.get("objects", []) # Step 1: Parse data actor_dicts = _parse_intrusion_sets(objects) relationships = _parse_relationships(objects) attack_pattern_map = _build_attack_pattern_map(objects) # Step 2: Build STIX-ID → actor dict map stix_to_actor = {a["stix_id"]: a for a in actor_dicts} # Step 3: Load existing actors and techniques from DB existing_actors = { row.mitre_id: row for row in db.query(ThreatActor).all() if row.mitre_id } technique_by_mitre_id = { row.mitre_id: row for row in db.query(Technique).all() } # Step 4: Upsert threat actors actors_created = 0 actors_skipped = 0 stix_to_db_actor: dict[str, ThreatActor] = {} for actor_dict in actor_dicts: mitre_id = actor_dict["mitre_id"] stix_id = actor_dict["stix_id"] if mitre_id and mitre_id in existing_actors: # Update existing actor db_actor = existing_actors[mitre_id] db_actor.name = actor_dict["name"] db_actor.aliases = actor_dict["aliases"] db_actor.description = actor_dict["description"] db_actor.mitre_url = actor_dict["mitre_url"] db_actor.references = actor_dict["references"] db_actor.first_seen = actor_dict.get("first_seen") db_actor.last_seen = actor_dict.get("last_seen") stix_to_db_actor[stix_id] = db_actor actors_skipped += 1 else: # Create new actor db_actor = ThreatActor( mitre_id=mitre_id, name=actor_dict["name"], aliases=actor_dict["aliases"], description=actor_dict["description"], mitre_url=actor_dict["mitre_url"], references=actor_dict["references"], first_seen=actor_dict.get("first_seen"), last_seen=actor_dict.get("last_seen"), is_active=True, ) db.add(db_actor) db.flush() # get the ID if mitre_id: existing_actors[mitre_id] = db_actor stix_to_db_actor[stix_id] = db_actor actors_created += 1 db.flush() # Step 5: Upsert actor-technique relationships # Load existing relationships existing_rels: set[tuple] = set() for row in db.query(ThreatActorTechnique).all(): existing_rels.add((str(row.threat_actor_id), str(row.technique_id))) rels_created = 0 rels_skipped = 0 for rel in relationships: source_ref = rel["source_ref"] target_ref = rel["target_ref"] # Resolve actor db_actor = stix_to_db_actor.get(source_ref) if not db_actor: continue # Resolve technique mitre_technique_id = attack_pattern_map.get(target_ref) if not mitre_technique_id: continue db_technique = technique_by_mitre_id.get(mitre_technique_id) if not db_technique: continue rel_key = (str(db_actor.id), str(db_technique.id)) if rel_key in existing_rels: rels_skipped += 1 continue actor_technique = ThreatActorTechnique( threat_actor_id=db_actor.id, technique_id=db_technique.id, usage_description=rel["description"][:5000] if rel["description"] else None, ) db.add(actor_technique) existing_rels.add(rel_key) rels_created += 1 db.commit() summary = { "actors_created": actors_created, "actors_updated": actors_skipped, "relationships_created": rels_created, "relationships_skipped": rels_skipped, "total_actors_parsed": len(actor_dicts), "total_relationships_parsed": len(relationships), } # Update DataSource record ds = db.query(DataSource).filter(DataSource.name == "mitre_cti").first() if ds: ds.last_sync_at = datetime.utcnow() ds.last_sync_status = "success" ds.last_sync_stats = summary db.commit() logger.info("MITRE CTI threat actor import complete — %s", summary) log_action( db, user_id=None, action="import_threat_actors", entity_type="threat_actor", entity_id=None, details=summary, ) db.commit() return summary