"""MITRE CALDERA abilities import service. Downloads the CALDERA repository ZIP from GitHub, parses the ability YAML files under ``data/abilities/{tactic}/``, and creates :class:`TestTemplate` records in the database. Strategy -------- 1. Download the CALDERA repo as a ZIP. 2. Extract into a temporary directory. 3. Walk ``data/abilities/{tactic}/*.yml`` files. 4. For each ability: extract name, description, technique ID, platforms, and executor commands. 5. Create TestTemplate rows keyed by the ability's ``id`` field. 6. Clean up. Idempotency ----------- Running the import twice does **not** create duplicates. Existing templates are identified by ``source = "caldera"`` + ``atomic_test_id`` (the CALDERA ability ``id``). """ import io import logging import shutil import tempfile import zipfile from datetime import datetime from pathlib import Path import requests as _requests import yaml from sqlalchemy.orm import Session from app.models.test_template import TestTemplate from app.models.data_source import DataSource from app.models.technique import Technique from app.services.audit_service import log_action logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- CALDERA_ZIP_URL = ( "https://github.com/mitre/stockpile" "/archive/refs/heads/master.zip" ) _DOWNLOAD_TIMEOUT = 300 _ZIP_ROOT_PREFIX = "stockpile-master" # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _download_zip(url: str = CALDERA_ZIP_URL) -> bytes: """Download the CALDERA ZIP and return raw bytes.""" logger.info("Downloading CALDERA ZIP from %s …", url) resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) resp.raise_for_status() content = resp.content logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) return content def _extract_zip(zip_bytes: bytes, dest: str) -> Path: """Extract *zip_bytes* into *dest* and return abilities dir.""" with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: zf.extractall(dest) abilities_dir = Path(dest) / _ZIP_ROOT_PREFIX / "data" / "abilities" if not abilities_dir.is_dir(): raise FileNotFoundError( f"Expected abilities directory not found at {abilities_dir}" ) return abilities_dir def _extract_commands(platforms_dict: dict) -> str: """Extract executor commands from CALDERA platforms dict. The structure is typically:: platforms: windows: psh: command: "whoami" linux: sh: command: "id" Returns a formatted string with all commands. """ lines = [] if not isinstance(platforms_dict, dict): return "" for os_name, executors in platforms_dict.items(): if not isinstance(executors, dict): continue for executor_name, executor_data in executors.items(): if isinstance(executor_data, dict): cmd = executor_data.get("command", "") if cmd: lines.append(f"[{os_name}/{executor_name}]\n{cmd}") elif isinstance(executor_data, str): lines.append(f"[{os_name}/{executor_name}]\n{executor_data}") return "\n\n".join(lines) def _extract_platforms(platforms_dict: dict) -> str: """Extract platform names from CALDERA platforms dict.""" if not isinstance(platforms_dict, dict): return "" platform_names = [] for os_name in platforms_dict: normalized = str(os_name).lower().strip() if normalized in ("windows", "linux", "darwin", "macos"): if normalized == "darwin": normalized = "macos" if normalized not in platform_names: platform_names.append(normalized) return ", ".join(platform_names) def _parse_abilities(abilities_dir: Path) -> list[dict]: """Walk abilities directories and parse all YAML files. Returns a flat list of dicts, each representing one ability. """ results: list[dict] = [] yaml_files = sorted(abilities_dir.rglob("*.yml")) logger.info("Found %d ability YAML files", len(yaml_files)) for yaml_path in yaml_files: try: with open(yaml_path, "r", encoding="utf-8") as fh: data_list = list(yaml.safe_load_all(fh)) except Exception as exc: logger.debug("Failed to parse %s: %s", yaml_path, exc) continue # Stockpile YAML files may contain YAML lists of abilities # (e.g. [- id: ..., - id: ...]) or single-document dicts. # Flatten everything into individual ability dicts. abilities: list[dict] = [] for data in data_list: if isinstance(data, dict): abilities.append(data) elif isinstance(data, list): abilities.extend(d for d in data if isinstance(d, dict)) for data in abilities: ability_id = data.get("id", "") if not ability_id: continue name = data.get("name", "").strip() description = data.get("description", "").strip() tactic = data.get("tactic", "").strip() # Extract technique info technique = data.get("technique", {}) if isinstance(technique, dict): attack_id = technique.get("attack_id", "") else: attack_id = "" if not attack_id: continue # Normalise technique ID attack_id = str(attack_id).strip().upper() if not attack_id.startswith("T"): continue # Extract platforms and commands platforms_dict = data.get("platforms", {}) commands = _extract_commands(platforms_dict) platform_str = _extract_platforms(platforms_dict) # Determine executor type executors = set() if isinstance(platforms_dict, dict): for os_executors in platforms_dict.values(): if isinstance(os_executors, dict): executors.update(os_executors.keys()) executor_str = ", ".join(sorted(executors)) if executors else None results.append({ "mitre_technique_id": attack_id, "name": f"CALDERA: {name}"[:500] if name else f"CALDERA ability {ability_id}"[:500], "description": f"{description}\n\nTactic: {tactic}".strip()[:2000] if description else None, "source": "caldera", "platform": platform_str, "tool_suggested": executor_str, "attack_procedure": commands[:4000] if commands else None, "atomic_test_id": f"caldera:{ability_id}", "source_url": f"https://github.com/mitre/stockpile/tree/master/data/abilities/{tactic}", }) logger.info("Parsed %d CALDERA abilities total", len(results)) return results # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def sync(db: Session) -> dict: """Download and import CALDERA abilities as TestTemplates. Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. """ tmp_dir = tempfile.mkdtemp(prefix="aegis_caldera_") try: zip_bytes = _download_zip() abilities_dir = _extract_zip(zip_bytes, tmp_dir) parsed = _parse_abilities(abilities_dir) finally: shutil.rmtree(tmp_dir, ignore_errors=True) logger.info("Cleaned up temp directory %s", tmp_dir) # Pre-load existing for dedup existing_ids: set[str] = { row[0] for row in db.query(TestTemplate.atomic_test_id) .filter(TestTemplate.source == "caldera") .filter(TestTemplate.atomic_test_id.isnot(None)) .all() } created = 0 skipped = 0 new_technique_ids: set[str] = set() for item in parsed: if item["atomic_test_id"] in existing_ids: skipped += 1 continue template = TestTemplate( mitre_technique_id=item["mitre_technique_id"], name=item["name"], description=item["description"], source=item["source"], source_url=item["source_url"], attack_procedure=item["attack_procedure"], platform=item["platform"], tool_suggested=item["tool_suggested"], atomic_test_id=item["atomic_test_id"], is_active=True, ) db.add(template) existing_ids.add(item["atomic_test_id"]) new_technique_ids.add(item["mitre_technique_id"]) created += 1 if new_technique_ids: db.query(Technique).filter( Technique.mitre_id.in_(new_technique_ids) ).update({"review_required": True}, synchronize_session=False) db.commit() summary = { "created": created, "skipped_existing": skipped, "total_parsed": len(parsed), } # Update DataSource record ds = db.query(DataSource).filter(DataSource.name == "caldera").first() if ds: ds.last_sync_at = datetime.utcnow() ds.last_sync_status = "success" ds.last_sync_stats = summary db.commit() logger.info("CALDERA import complete — %s", summary) log_action(db, user_id=None, action="import_caldera", entity_type="test_template", entity_id=None, details=summary) db.commit() return summary