"""LOLBAS and GTFOBins import service. Downloads the LOLBAS (Windows) and GTFOBins (Linux) repositories, parses their YAML / Markdown files, and creates :class:`TestTemplate` records mapped to MITRE ATT&CK techniques. LOLBAS ------ - ZIP from ``LOLBAS-Project/LOLBAS`` - YAML files in ``yml/OSBinaries/``, ``yml/OSLibraries/``, ``yml/OSScripts/`` - Each YAML contains: Name, Description, Commands (list with MitreID) GTFOBins -------- - ZIP from ``GTFOBins/GTFOBins.github.io`` - Markdown files in ``_gtfobins/`` - Each Markdown has YAML front-matter with function names - Functions mapped to MITRE via a static dictionary Idempotency ----------- Deduplication keys: - LOLBAS: ``source + Name + MitreID`` → stored in ``atomic_test_id`` - GTFOBins: ``source + binary_name + function`` → stored in ``atomic_test_id`` """ import io import logging import re import shutil import tempfile import zipfile from datetime import datetime from pathlib import Path import requests as _requests import yaml from sqlalchemy.orm import Session from app.models.test_template import TestTemplate from app.models.data_source import DataSource from app.services.audit_service import log_action logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- LOLBAS_ZIP_URL = ( "https://github.com/LOLBAS-Project/LOLBAS" "/archive/refs/heads/master.zip" ) GTFOBINS_ZIP_URL = ( "https://github.com/GTFOBins/GTFOBins.github.io" "/archive/refs/heads/master.zip" ) _DOWNLOAD_TIMEOUT = 300 # GTFOBins function → MITRE technique mapping _GTFOBINS_FUNCTION_MAP: dict[str, str] = { "shell": "T1059", "command": "T1059", "reverse-shell": "T1059", "non-interactive-reverse-shell": "T1059", "bind-shell": "T1059", "non-interactive-bind-shell": "T1059", "file-upload": "T1105", "file-download": "T1105", "upload": "T1105", "download": "T1105", "file-write": "T1105", "file-read": "T1005", "library-load": "T1129", "sudo": "T1548.003", "suid": "T1548.001", "capabilities": "T1548", "limited-suid": "T1548.001", } # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _download_zip(url: str) -> bytes: """Download a ZIP from *url* and return raw bytes.""" logger.info("Downloading ZIP from %s …", url) resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) resp.raise_for_status() content = resp.content logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) return content def _extract_zip(zip_bytes: bytes, dest: str) -> Path: """Extract *zip_bytes* into *dest* and return the root directory.""" with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: zf.extractall(dest) return Path(dest) # --------------------------------------------------------------------------- # LOLBAS import # --------------------------------------------------------------------------- def _parse_lolbas(root_dir: Path) -> list[dict]: """Parse LOLBAS YAML files and return template dicts.""" results: list[dict] = [] lolbas_root = root_dir / "LOLBAS-master" yaml_dirs = [ lolbas_root / "yml" / "OSBinaries", lolbas_root / "yml" / "OSLibraries", lolbas_root / "yml" / "OSScripts", ] yaml_files = [] for d in yaml_dirs: if d.is_dir(): yaml_files.extend(sorted(d.rglob("*.yml"))) logger.info("LOLBAS: Found %d YAML files", len(yaml_files)) for yaml_path in yaml_files: try: with open(yaml_path, "r", encoding="utf-8") as fh: data = yaml.safe_load(fh) except Exception as exc: logger.debug("Failed to parse %s: %s", yaml_path, exc) continue if not isinstance(data, dict): continue binary_name = data.get("Name", "").strip() if not binary_name: continue description = data.get("Description", "") commands = data.get("Commands", []) if not isinstance(commands, list): continue for cmd_entry in commands: if not isinstance(cmd_entry, dict): continue mitre_id = cmd_entry.get("MitreID") if not mitre_id: continue # Normalise the MITRE ID mitre_id = str(mitre_id).strip().upper() if not mitre_id.startswith("T"): continue command = cmd_entry.get("Command", "") usecase = cmd_entry.get("Usecase", "") cmd_description = cmd_entry.get("Description", "") # Dedup key dedup_key = f"lolbas:{binary_name}:{mitre_id}" procedure = [] if cmd_description: procedure.append(f"Description: {cmd_description}") if usecase: procedure.append(f"Use case: {usecase}") if command: procedure.append(f"Command: {command}") results.append({ "mitre_technique_id": mitre_id, "name": f"LOLBAS: {binary_name} — {usecase or cmd_description or mitre_id}"[:500], "description": f"{description}\n\n{cmd_description}".strip()[:2000] if description else cmd_description[:2000] if cmd_description else None, "source": "lolbas", "platform": "windows", "tool_suggested": binary_name, "attack_procedure": "\n".join(procedure)[:4000] if procedure else None, "atomic_test_id": dedup_key, "source_url": f"https://lolbas-project.github.io/lolbas/Binaries/{binary_name}/", }) logger.info("LOLBAS: Parsed %d templates", len(results)) return results # --------------------------------------------------------------------------- # GTFOBins import # --------------------------------------------------------------------------- def _parse_gtfobins(root_dir: Path) -> list[dict]: """Parse GTFOBins markdown files and return template dicts.""" results: list[dict] = [] gtfobins_root = root_dir / "GTFOBins.github.io-master" / "_gtfobins" if not gtfobins_root.is_dir(): logger.warning("GTFOBins directory not found at %s", gtfobins_root) return results md_files = sorted( f for f in gtfobins_root.iterdir() if f.is_file() and f.suffix in (".md", "") ) logger.info("GTFOBins: Found %d files", len(md_files)) for md_path in md_files: binary_name = md_path.stem # e.g. "awk" try: with open(md_path, "r", encoding="utf-8") as fh: content = fh.read() except Exception as exc: logger.debug("Failed to read %s: %s", md_path, exc) continue # Extract YAML front-matter front_matter = _extract_front_matter(content) if not front_matter: continue functions = front_matter.get("functions", {}) if not isinstance(functions, dict): continue for func_name, func_data in functions.items(): # Map function to MITRE technique mitre_id = _GTFOBINS_FUNCTION_MAP.get(func_name.lower()) if not mitre_id: continue # Extract code examples from function data examples = [] if isinstance(func_data, list): for entry in func_data: if isinstance(entry, dict): code = entry.get("code", "") if code: examples.append(str(code)) elif isinstance(entry, str): examples.append(entry) procedure = "\n\n".join(examples) if examples else None dedup_key = f"gtfobins:{binary_name}:{func_name}" results.append({ "mitre_technique_id": mitre_id, "name": f"GTFOBins: {binary_name} — {func_name}"[:500], "description": f"Abuse {binary_name} binary for {func_name} on Linux/Unix."[:2000], "source": "gtfobins", "platform": "linux", "tool_suggested": binary_name, "attack_procedure": procedure[:4000] if procedure else None, "atomic_test_id": dedup_key, "source_url": f"https://gtfobins.github.io/gtfobins/{binary_name}/", }) logger.info("GTFOBins: Parsed %d templates", len(results)) return results def _extract_front_matter(content: str) -> dict | None: """Extract YAML front-matter from a markdown/GTFOBins file. Supports both ``---/---`` (standard front-matter) and ``---/...`` (YAML document-end marker used by GTFOBins). """ match = re.match(r"^---\s*\n(.*?)\n(?:---|\.\.\.)", content, re.DOTALL) if not match: return None try: return yaml.safe_load(match.group(1)) except Exception: return None # --------------------------------------------------------------------------- # Upsert logic # --------------------------------------------------------------------------- def _upsert_templates(db: Session, items: list[dict], source_name: str) -> dict: """Insert templates, skipping existing ones by atomic_test_id.""" existing_ids: set[str] = { row[0] for row in db.query(TestTemplate.atomic_test_id) .filter(TestTemplate.source == source_name) .filter(TestTemplate.atomic_test_id.isnot(None)) .all() } created = 0 skipped = 0 for item in items: if item["atomic_test_id"] in existing_ids: skipped += 1 continue template = TestTemplate( mitre_technique_id=item["mitre_technique_id"], name=item["name"], description=item["description"], source=item["source"], source_url=item.get("source_url"), attack_procedure=item.get("attack_procedure"), platform=item["platform"], tool_suggested=item.get("tool_suggested"), atomic_test_id=item["atomic_test_id"], is_active=True, ) db.add(template) existing_ids.add(item["atomic_test_id"]) created += 1 db.commit() return {"created": created, "skipped_existing": skipped, "total_parsed": len(items)} # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def sync(db: Session) -> dict: """Import LOLBAS templates. Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. """ tmp_dir = tempfile.mkdtemp(prefix="aegis_lolbas_") try: zip_bytes = _download_zip(LOLBAS_ZIP_URL) root_dir = _extract_zip(zip_bytes, tmp_dir) parsed = _parse_lolbas(root_dir) finally: shutil.rmtree(tmp_dir, ignore_errors=True) summary = _upsert_templates(db, parsed, "lolbas") # Update DataSource record ds = db.query(DataSource).filter(DataSource.name == "lolbas").first() if ds: ds.last_sync_at = datetime.utcnow() ds.last_sync_status = "success" ds.last_sync_stats = summary db.commit() logger.info("LOLBAS import complete — %s", summary) log_action(db, user_id=None, action="import_lolbas", entity_type="test_template", entity_id=None, details=summary) return summary def sync_gtfobins(db: Session) -> dict: """Import GTFOBins templates. Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. """ tmp_dir = tempfile.mkdtemp(prefix="aegis_gtfobins_") try: zip_bytes = _download_zip(GTFOBINS_ZIP_URL) root_dir = _extract_zip(zip_bytes, tmp_dir) parsed = _parse_gtfobins(root_dir) finally: shutil.rmtree(tmp_dir, ignore_errors=True) summary = _upsert_templates(db, parsed, "gtfobins") # Update DataSource record ds = db.query(DataSource).filter(DataSource.name == "gtfobins").first() if ds: ds.last_sync_at = datetime.utcnow() ds.last_sync_status = "success" ds.last_sync_stats = summary db.commit() logger.info("GTFOBins import complete — %s", summary) log_action(db, user_id=None, action="import_gtfobins", entity_type="test_template", entity_id=None, details=summary) return summary