"""LOLBAS and GTFOBins import service. Downloads the LOLBAS (Windows) and GTFOBins (Linux) repositories, parses their YAML / Markdown files, and creates :class:`TestTemplate` records mapped to MITRE ATT&CK techniques. LOLBAS ------ - ZIP from ``LOLBAS-Project/LOLBAS`` - YAML files in ``yml/OSBinaries/``, ``yml/OSLibraries/``, ``yml/OSScripts/`` - Each YAML contains: Name, Description, Commands (list with MitreID) GTFOBins -------- - ZIP from ``GTFOBins/GTFOBins.github.io`` - Markdown files in ``_gtfobins/`` - Each Markdown has YAML front-matter with function names - Functions mapped to MITRE via a static dictionary Idempotency ----------- Deduplication keys: - LOLBAS: ``source + Name + MitreID`` → stored in ``atomic_test_id`` - GTFOBins: ``source + binary_name + function`` → stored in ``atomic_test_id`` """ # Import io import io # Import logging import logging # Import re import re # Import shutil import shutil # Import tempfile import tempfile # Import zipfile import zipfile # Import datetime from datetime from datetime import datetime # Import Path from pathlib from pathlib import Path # Import requests import requests as _requests # Import yaml import yaml # Import Session from sqlalchemy.orm from sqlalchemy.orm import Session # Import DataSource from app.models.data_source from app.models.data_source import DataSource from app.models.technique import Technique from app.services.audit_service import log_action # Assign logger = logging.getLogger(__name__) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- LOLBAS_ZIP_URL = ( # Literal argument value "https://github.com/LOLBAS-Project/LOLBAS" # Literal argument value "/archive/refs/heads/master.zip" ) # Assign GTFOBINS_ZIP_URL = ( GTFOBINS_ZIP_URL = ( # Literal argument value "https://github.com/GTFOBins/GTFOBins.github.io" # Literal argument value "/archive/refs/heads/master.zip" ) # Assign _DOWNLOAD_TIMEOUT = 300 _DOWNLOAD_TIMEOUT = 300 # GTFOBins function → MITRE technique mapping _GTFOBINS_FUNCTION_MAP: dict[str, str] = { # Literal argument value "shell": "T1059", # Literal argument value "command": "T1059", # Literal argument value "reverse-shell": "T1059", # Literal argument value "non-interactive-reverse-shell": "T1059", # Literal argument value "bind-shell": "T1059", # Literal argument value "non-interactive-bind-shell": "T1059", # Literal argument value "file-upload": "T1105", # Literal argument value "file-download": "T1105", # Literal argument value "upload": "T1105", # Literal argument value "download": "T1105", # Literal argument value "file-write": "T1105", # Literal argument value "file-read": "T1005", # Literal argument value "library-load": "T1129", # Literal argument value "sudo": "T1548.003", # Literal argument value "suid": "T1548.001", # Literal argument value "capabilities": "T1548", # Literal argument value "limited-suid": "T1548.001", } # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _download_zip(url: str) -> bytes: """Download a ZIP from *url* and return raw bytes.""" # Log info: "Downloading ZIP from %s …", url logger.info("Downloading ZIP from %s …", url) # Assign resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) # Call resp.raise_for_status() resp.raise_for_status() # Assign content = resp.content content = resp.content # Log info: "Downloaded %.1f MB", len(content) / (1024 * 1024 logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) # Return content return content # Define function _extract_zip def _extract_zip(zip_bytes: bytes, dest: str) -> Path: """Extract *zip_bytes* into *dest* and return the root directory.""" # Open context manager with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: # Call zf.extractall() zf.extractall(dest) # Return Path(dest) return Path(dest) # --------------------------------------------------------------------------- # LOLBAS import # --------------------------------------------------------------------------- def _parse_lolbas(root_dir: Path) -> list[dict]: """Parse LOLBAS YAML files and return template dicts.""" # Assign results = [] results: list[dict] = [] # Assign lolbas_root = root_dir / "LOLBAS-master" lolbas_root = root_dir / "LOLBAS-master" # Assign yaml_dirs = [ yaml_dirs = [ lolbas_root / "yml" / "OSBinaries", lolbas_root / "yml" / "OSLibraries", lolbas_root / "yml" / "OSScripts", ] # Assign yaml_files = [] yaml_files = [] # Iterate over yaml_dirs for d in yaml_dirs: # Check: d.is_dir() if d.is_dir(): # Call yaml_files.extend() yaml_files.extend(sorted(d.rglob("*.yml"))) # Log info: "LOLBAS: Found %d YAML files", len(yaml_files logger.info("LOLBAS: Found %d YAML files", len(yaml_files)) # Iterate over yaml_files for yaml_path in yaml_files: # Attempt the following; catch errors below try: # Open context manager with open(yaml_path, "r", encoding="utf-8") as fh: # Assign data = yaml.safe_load(fh) data = yaml.safe_load(fh) # Handle Exception except Exception as exc: # Log debug: "Failed to parse %s: %s", yaml_path, exc logger.debug("Failed to parse %s: %s", yaml_path, exc) # Skip to the next loop iteration continue # Check: not isinstance(data, dict) if not isinstance(data, dict): # Skip to the next loop iteration continue # Assign binary_name = data.get("Name", "").strip() binary_name = data.get("Name", "").strip() # Check: not binary_name if not binary_name: # Skip to the next loop iteration continue # Assign description = data.get("Description", "") description = data.get("Description", "") # Assign commands = data.get("Commands", []) commands = data.get("Commands", []) # Check: not isinstance(commands, list) if not isinstance(commands, list): # Skip to the next loop iteration continue # Iterate over commands for cmd_entry in commands: # Check: not isinstance(cmd_entry, dict) if not isinstance(cmd_entry, dict): # Skip to the next loop iteration continue # Assign mitre_id = cmd_entry.get("MitreID") mitre_id = cmd_entry.get("MitreID") # Check: not mitre_id if not mitre_id: # Skip to the next loop iteration continue # Normalise the MITRE ID mitre_id = str(mitre_id).strip().upper() # Check: not mitre_id.startswith("T") if not mitre_id.startswith("T"): # Skip to the next loop iteration continue # Assign command = cmd_entry.get("Command", "") command = cmd_entry.get("Command", "") # Assign usecase = cmd_entry.get("Usecase", "") usecase = cmd_entry.get("Usecase", "") # Assign cmd_description = cmd_entry.get("Description", "") cmd_description = cmd_entry.get("Description", "") # Dedup key dedup_key = f"lolbas:{binary_name}:{mitre_id}" # Assign procedure = [] procedure = [] # Check: cmd_description if cmd_description: # Call procedure.append() procedure.append(f"Description: {cmd_description}") # Check: usecase if usecase: # Call procedure.append() procedure.append(f"Use case: {usecase}") # Check: command if command: # Call procedure.append() procedure.append(f"Command: {command}") # Call results.append() results.append({ # Literal argument value "mitre_technique_id": mitre_id, # Literal argument value "name": f"LOLBAS: {binary_name} — {usecase or cmd_description or mitre_id}"[:500], # Literal argument value "description": ( f"{description}\n\n{cmd_description}".strip()[:2000] if description else cmd_description[:2000] if cmd_description else None ), # Literal argument value "source": "lolbas", # Literal argument value "platform": "windows", # Literal argument value "tool_suggested": binary_name, # Literal argument value "attack_procedure": "\n".join(procedure)[:4000] if procedure else None, # Literal argument value "atomic_test_id": dedup_key, # Literal argument value "source_url": f"https://lolbas-project.github.io/lolbas/Binaries/{binary_name}/", }) # Log info: "LOLBAS: Parsed %d templates", len(results logger.info("LOLBAS: Parsed %d templates", len(results)) # Return results return results # --------------------------------------------------------------------------- # GTFOBins import # --------------------------------------------------------------------------- def _parse_gtfobins(root_dir: Path) -> list[dict]: """Parse GTFOBins markdown files and return template dicts.""" # Assign results = [] results: list[dict] = [] # Assign gtfobins_root = root_dir / "GTFOBins.github.io-master" / "_gtfobins" gtfobins_root = root_dir / "GTFOBins.github.io-master" / "_gtfobins" # Check: not gtfobins_root.is_dir() if not gtfobins_root.is_dir(): # Log warning: "GTFOBins directory not found at %s", gtfobins_roo logger.warning("GTFOBins directory not found at %s", gtfobins_root) # Return results return results # Assign md_files = sorted( md_files = sorted( f for f in gtfobins_root.iterdir() if f.is_file() and f.suffix in (".md", "") ) # Log info: "GTFOBins: Found %d files", len(md_files logger.info("GTFOBins: Found %d files", len(md_files)) # Iterate over md_files for md_path in md_files: # Assign binary_name = md_path.stem # e.g. "awk" binary_name = md_path.stem # e.g. "awk" # Attempt the following; catch errors below try: # Open context manager with open(md_path, "r", encoding="utf-8") as fh: # Assign content = fh.read() content = fh.read() # Handle Exception except Exception as exc: # Log debug: "Failed to read %s: %s", md_path, exc logger.debug("Failed to read %s: %s", md_path, exc) # Skip to the next loop iteration continue # Extract YAML front-matter front_matter = _extract_front_matter(content) # Check: not front_matter if not front_matter: # Skip to the next loop iteration continue # Assign functions = front_matter.get("functions", {}) functions = front_matter.get("functions", {}) # Check: not isinstance(functions, dict) if not isinstance(functions, dict): # Skip to the next loop iteration continue # Iterate over functions.items() for func_name, func_data in functions.items(): # Map function to MITRE technique mitre_id = _GTFOBINS_FUNCTION_MAP.get(func_name.lower()) # Check: not mitre_id if not mitre_id: # Skip to the next loop iteration continue # Extract code examples from function data examples = [] # Check: isinstance(func_data, list) if isinstance(func_data, list): # Iterate over func_data for entry in func_data: # Check: isinstance(entry, dict) if isinstance(entry, dict): # Assign code = entry.get("code", "") code = entry.get("code", "") # Check: code if code: # Call examples.append() examples.append(str(code)) # Alternative: isinstance(entry, str) elif isinstance(entry, str): # Call examples.append() examples.append(entry) # Assign procedure = "\n\n".join(examples) if examples else None procedure = "\n\n".join(examples) if examples else None # Assign dedup_key = f"gtfobins:{binary_name}:{func_name}" dedup_key = f"gtfobins:{binary_name}:{func_name}" # Call results.append() results.append({ # Literal argument value "mitre_technique_id": mitre_id, # Literal argument value "name": f"GTFOBins: {binary_name} — {func_name}"[:500], # Literal argument value "description": f"Abuse {binary_name} binary for {func_name} on Linux/Unix."[:2000], # Literal argument value "source": "gtfobins", # Literal argument value "platform": "linux", # Literal argument value "tool_suggested": binary_name, # Literal argument value "attack_procedure": procedure[:4000] if procedure else None, # Literal argument value "atomic_test_id": dedup_key, # Literal argument value "source_url": f"https://gtfobins.github.io/gtfobins/{binary_name}/", }) # Log info: "GTFOBins: Parsed %d templates", len(results logger.info("GTFOBins: Parsed %d templates", len(results)) # Return results return results # Define function _extract_front_matter def _extract_front_matter(content: str) -> dict | None: """Extract YAML front-matter from a markdown/GTFOBins file. Supports both ``---/---`` (standard front-matter) and ``---/...`` (YAML document-end marker used by GTFOBins). """ # Assign match = re.match(r"^---\s*\n(.*?)\n(?:---|\.\.\.)", content, re.DOTALL) match = re.match(r"^---\s*\n(.*?)\n(?:---|\.\.\.)", content, re.DOTALL) # Check: not match if not match: # Return None return None # Attempt the following; catch errors below try: # Return yaml.safe_load(match.group(1)) return yaml.safe_load(match.group(1)) # Handle Exception except Exception: # Return None return None # --------------------------------------------------------------------------- # Upsert logic # --------------------------------------------------------------------------- def _upsert_templates(db: Session, items: list[dict], source_name: str) -> dict: """Insert templates, skipping existing ones by atomic_test_id.""" # Assign existing_ids = { existing_ids: set[str] = { row[0] for row in db.query(TestTemplate.atomic_test_id) # Chain .filter() call .filter(TestTemplate.source == source_name) # Chain .filter() call .filter(TestTemplate.atomic_test_id.isnot(None)) # Chain .all() call .all() } # Assign created = 0 created = 0 # Assign skipped = 0 skipped = 0 new_technique_ids: set[str] = set() # Iterate over items for item in items: # Check: item["atomic_test_id"] in existing_ids if item["atomic_test_id"] in existing_ids: # Assign skipped = 1 skipped += 1 # Skip to the next loop iteration continue # Assign template = TestTemplate( template = TestTemplate( # Keyword argument: mitre_technique_id mitre_technique_id=item["mitre_technique_id"], # Keyword argument: name name=item["name"], # Keyword argument: description description=item["description"], # Keyword argument: source source=item["source"], # Keyword argument: source_url source_url=item.get("source_url"), # Keyword argument: attack_procedure attack_procedure=item.get("attack_procedure"), # Keyword argument: platform platform=item["platform"], # Keyword argument: tool_suggested tool_suggested=item.get("tool_suggested"), # Keyword argument: atomic_test_id atomic_test_id=item["atomic_test_id"], # Keyword argument: is_active is_active=True, ) # Stage new record(s) for database insertion db.add(template) # Call existing_ids.add() existing_ids.add(item["atomic_test_id"]) new_technique_ids.add(item["mitre_technique_id"]) created += 1 if new_technique_ids: db.query(Technique).filter( Technique.mitre_id.in_(new_technique_ids) ).update({"review_required": True}, synchronize_session=False) db.commit() # Return {"created": created, "skipped_existing": skipped, "total_parsed": l... return {"created": created, "skipped_existing": skipped, "total_parsed": len(items)} # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def sync(db: Session) -> dict: """Import LOLBAS templates. Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. """ # Assign tmp_dir = tempfile.mkdtemp(prefix="aegis_lolbas_") tmp_dir = tempfile.mkdtemp(prefix="aegis_lolbas_") # Attempt the following; catch errors below try: # Assign zip_bytes = _download_zip(LOLBAS_ZIP_URL) zip_bytes = _download_zip(LOLBAS_ZIP_URL) # Assign root_dir = _extract_zip(zip_bytes, tmp_dir) root_dir = _extract_zip(zip_bytes, tmp_dir) # Assign parsed = _parse_lolbas(root_dir) parsed = _parse_lolbas(root_dir) # Always execute this cleanup block finally: # Call shutil.rmtree() shutil.rmtree(tmp_dir, ignore_errors=True) # Assign summary = _upsert_templates(db, parsed, "lolbas") summary = _upsert_templates(db, parsed, "lolbas") # Update DataSource record ds = db.query(DataSource).filter(DataSource.name == "lolbas").first() # Check: ds if ds: # Assign ds.last_sync_at = datetime.utcnow() ds.last_sync_at = datetime.utcnow() # Assign ds.last_sync_status = "success" ds.last_sync_status = "success" # Assign ds.last_sync_stats = summary ds.last_sync_stats = summary # Commit all pending changes to the database db.commit() # Log info: "LOLBAS import complete — %s", summary logger.info("LOLBAS import complete — %s", summary) # Call log_action() log_action(db, user_id=None, action="import_lolbas", # Keyword argument: entity_type entity_type="test_template", entity_id=None, details=summary) # Commit all pending changes to the database db.commit() # Return summary return summary # Define function sync_gtfobins def sync_gtfobins(db: Session) -> dict: """Import GTFOBins templates. Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. """ # Assign tmp_dir = tempfile.mkdtemp(prefix="aegis_gtfobins_") tmp_dir = tempfile.mkdtemp(prefix="aegis_gtfobins_") # Attempt the following; catch errors below try: # Assign zip_bytes = _download_zip(GTFOBINS_ZIP_URL) zip_bytes = _download_zip(GTFOBINS_ZIP_URL) # Assign root_dir = _extract_zip(zip_bytes, tmp_dir) root_dir = _extract_zip(zip_bytes, tmp_dir) # Assign parsed = _parse_gtfobins(root_dir) parsed = _parse_gtfobins(root_dir) # Always execute this cleanup block finally: # Call shutil.rmtree() shutil.rmtree(tmp_dir, ignore_errors=True) # Assign summary = _upsert_templates(db, parsed, "gtfobins") summary = _upsert_templates(db, parsed, "gtfobins") # Update DataSource record ds = db.query(DataSource).filter(DataSource.name == "gtfobins").first() # Check: ds if ds: # Assign ds.last_sync_at = datetime.utcnow() ds.last_sync_at = datetime.utcnow() # Assign ds.last_sync_status = "success" ds.last_sync_status = "success" # Assign ds.last_sync_stats = summary ds.last_sync_stats = summary # Commit all pending changes to the database db.commit() # Log info: "GTFOBins import complete — %s", summary logger.info("GTFOBins import complete — %s", summary) # Call log_action() log_action(db, user_id=None, action="import_gtfobins", # Keyword argument: entity_type entity_type="test_template", entity_id=None, details=summary) # Commit all pending changes to the database db.commit() # Return summary return summary