"""Service for synchronizing MITRE ATT&CK techniques via TAXII 2.0. Connects to the official MITRE CTI TAXII server, fetches the Enterprise ATT&CK collection, and upserts attack-pattern objects into the local ``techniques`` table. Falls back to the MITRE CTI GitHub repository when the TAXII server is unreachable. """ import logging from datetime import datetime import requests as _requests from sqlalchemy.orm import Session from taxii2client.v20 import Server as TaxiiServer from app.models.technique import Technique from app.models.enums import TechniqueStatus from app.services.audit_service import log_action logger = logging.getLogger(__name__) TAXII_SERVER_URL = "https://cti-taxii.mitre.org/taxii/" MITRE_SOURCE_NAME = "mitre-attack" GITHUB_ENTERPRISE_URL = ( "https://raw.githubusercontent.com/mitre/cti/master/" "enterprise-attack/enterprise-attack.json" ) def _extract_mitre_id(external_references: list) -> str | None: """Return the MITRE ATT&CK ID (e.g. ``T1059.001``) from external_references.""" if not external_references: return None for ref in external_references: if ref.get("source_name") == MITRE_SOURCE_NAME: return ref.get("external_id") return None def _extract_tactics(kill_chain_phases: list) -> str | None: """Return a comma-separated string of tactic phase names.""" if not kill_chain_phases: return None tactics = [ phase.get("phase_name") for phase in kill_chain_phases if phase.get("kill_chain_name") == "mitre-attack" ] return ", ".join(tactics) if tactics else None def _extract_platforms(stix_object: dict) -> list: """Return the list of platforms from the STIX object.""" return stix_object.get("x_mitre_platforms", []) def _extract_version(stix_object: dict) -> str | None: """Return the MITRE ATT&CK version string.""" return stix_object.get("x_mitre_version") def _extract_last_modified(stix_object: dict) -> datetime | None: """Return the ``modified`` timestamp as a datetime, or None.""" modified = stix_object.get("modified") if modified is None: return None if isinstance(modified, datetime): return modified try: return datetime.fromisoformat(modified.replace("Z", "+00:00")) except (ValueError, AttributeError): return None def _fetch_attack_patterns_taxii() -> list[dict]: """Connect to the MITRE TAXII server and return all attack-pattern objects.""" logger.info("Connecting to MITRE TAXII server at %s", TAXII_SERVER_URL) server = TaxiiServer(TAXII_SERVER_URL) api_root = server.api_roots[0] collection = api_root.collections[0] # Enterprise ATT&CK logger.info( "Fetching objects from collection '%s' (id=%s)", collection.title, collection.id, ) bundle = collection.get_objects() objects = bundle.get("objects", []) attack_patterns = [ obj for obj in objects if obj.get("type") == "attack-pattern" ] logger.info("Retrieved %d attack-pattern objects via TAXII", len(attack_patterns)) return attack_patterns def _fetch_attack_patterns_github() -> list[dict]: """Fallback: fetch Enterprise ATT&CK bundle from the MITRE CTI GitHub repo.""" logger.info("Fetching Enterprise ATT&CK bundle from GitHub (%s)", GITHUB_ENTERPRISE_URL) resp = _requests.get(GITHUB_ENTERPRISE_URL, timeout=120) resp.raise_for_status() bundle = resp.json() objects = bundle.get("objects", []) attack_patterns = [ obj for obj in objects if obj.get("type") == "attack-pattern" ] logger.info("Retrieved %d attack-pattern objects via GitHub", len(attack_patterns)) return attack_patterns def _fetch_attack_patterns() -> list[dict]: """Return all attack-pattern objects, trying TAXII first then GitHub.""" try: return _fetch_attack_patterns_taxii() except Exception as exc: logger.warning( "TAXII server unavailable (%s), falling back to GitHub mirror", exc, ) return _fetch_attack_patterns_github() def sync_mitre(db: Session) -> dict: """Synchronize MITRE ATT&CK techniques into the local database. Parameters ---------- db : Session Active SQLAlchemy database session. Returns ------- dict Summary with keys ``created``, ``updated``, ``unchanged``, ``skipped``. """ attack_patterns = _fetch_attack_patterns() # Pre-load existing techniques keyed by mitre_id for fast lookup existing_techniques: dict[str, Technique] = { t.mitre_id: t for t in db.query(Technique).all() } created = 0 updated = 0 unchanged = 0 skipped = 0 for obj in attack_patterns: # ------------------------------------------------------------------ # Skip revoked / deprecated objects # ------------------------------------------------------------------ if obj.get("revoked", False) or obj.get("x_mitre_deprecated", False): skipped += 1 continue mitre_id = _extract_mitre_id(obj.get("external_references", [])) if not mitre_id: skipped += 1 continue name = obj.get("name", "") description = obj.get("description", "") tactic = _extract_tactics(obj.get("kill_chain_phases", [])) platforms = _extract_platforms(obj) version = _extract_version(obj) last_modified = _extract_last_modified(obj) is_subtechnique = "." in mitre_id parent_mitre_id = mitre_id.split(".")[0] if is_subtechnique else None existing = existing_techniques.get(mitre_id) if existing is None: # ---- Create new technique ---- technique = Technique( mitre_id=mitre_id, name=name, description=description, tactic=tactic, platforms=platforms, mitre_version=version, mitre_last_modified=last_modified, is_subtechnique=is_subtechnique, parent_mitre_id=parent_mitre_id, status_global=TechniqueStatus.not_evaluated, review_required=False, ) db.add(technique) existing_techniques[mitre_id] = technique created += 1 else: # ---- Update if name or description changed ---- changes = False if existing.name != name: existing.name = name changes = True if (existing.description or "") != (description or ""): existing.description = description changes = True # Always keep metadata up-to-date (does not trigger review) existing.tactic = tactic existing.platforms = platforms existing.mitre_version = version existing.mitre_last_modified = last_modified existing.is_subtechnique = is_subtechnique existing.parent_mitre_id = parent_mitre_id if changes: existing.review_required = True updated += 1 else: unchanged += 1 # Single commit for the whole batch db.commit() summary = { "created": created, "updated": updated, "unchanged": unchanged, "skipped": skipped, } logger.info( "MITRE sync complete — created=%d, updated=%d, unchanged=%d, skipped=%d", created, updated, unchanged, skipped, ) # Audit log (system action → user_id=None) log_action( db, user_id=None, action="mitre_sync", entity_type="technique", entity_id=None, details=summary, ) return summary