feat(phase-22): add import services for Sigma, LOLBAS, GTFOBins, CALDERA, Elastic and data sources panel (T-203 to T-207)

This commit is contained in:
2026-02-09 16:19:44 +01:00
parent 022c4f2886
commit f4c8cbf768
11 changed files with 2039 additions and 0 deletions

View File

@@ -0,0 +1,375 @@
"""LOLBAS and GTFOBins import service.
Downloads the LOLBAS (Windows) and GTFOBins (Linux) repositories,
parses their YAML / Markdown files, and creates :class:`TestTemplate`
records mapped to MITRE ATT&CK techniques.
LOLBAS
------
- ZIP from ``LOLBAS-Project/LOLBAS``
- YAML files in ``yml/OSBinaries/``, ``yml/OSLibraries/``, ``yml/OSScripts/``
- Each YAML contains: Name, Description, Commands (list with MitreID)
GTFOBins
--------
- ZIP from ``GTFOBins/GTFOBins.github.io``
- Markdown files in ``_gtfobins/``
- Each Markdown has YAML front-matter with function names
- Functions mapped to MITRE via a static dictionary
Idempotency
-----------
Deduplication keys:
- LOLBAS: ``source + Name + MitreID`` → stored in ``atomic_test_id``
- GTFOBins: ``source + binary_name + function`` → stored in ``atomic_test_id``
"""
import io
import logging
import re
import shutil
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
import requests as _requests
import yaml
from sqlalchemy.orm import Session
from app.models.test_template import TestTemplate
from app.models.data_source import DataSource
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
LOLBAS_ZIP_URL = (
"https://github.com/LOLBAS-Project/LOLBAS"
"/archive/refs/heads/master.zip"
)
GTFOBINS_ZIP_URL = (
"https://github.com/GTFOBins/GTFOBins.github.io"
"/archive/refs/heads/master.zip"
)
_DOWNLOAD_TIMEOUT = 300
# GTFOBins function → MITRE technique mapping
_GTFOBINS_FUNCTION_MAP: dict[str, str] = {
"shell": "T1059",
"command": "T1059",
"reverse-shell": "T1059",
"non-interactive-reverse-shell": "T1059",
"bind-shell": "T1059",
"non-interactive-bind-shell": "T1059",
"file-upload": "T1105",
"file-download": "T1105",
"file-write": "T1105",
"file-read": "T1005",
"library-load": "T1129",
"sudo": "T1548.003",
"suid": "T1548.001",
"capabilities": "T1548",
"limited-suid": "T1548.001",
}
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _download_zip(url: str) -> bytes:
"""Download a ZIP from *url* and return raw bytes."""
logger.info("Downloading ZIP from %s", url)
resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True)
resp.raise_for_status()
content = resp.content
logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024))
return content
def _extract_zip(zip_bytes: bytes, dest: str) -> Path:
"""Extract *zip_bytes* into *dest* and return the root directory."""
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
zf.extractall(dest)
return Path(dest)
# ---------------------------------------------------------------------------
# LOLBAS import
# ---------------------------------------------------------------------------
def _parse_lolbas(root_dir: Path) -> list[dict]:
"""Parse LOLBAS YAML files and return template dicts."""
results: list[dict] = []
lolbas_root = root_dir / "LOLBAS-master"
yaml_dirs = [
lolbas_root / "yml" / "OSBinaries",
lolbas_root / "yml" / "OSLibraries",
lolbas_root / "yml" / "OSScripts",
]
yaml_files = []
for d in yaml_dirs:
if d.is_dir():
yaml_files.extend(sorted(d.rglob("*.yml")))
logger.info("LOLBAS: Found %d YAML files", len(yaml_files))
for yaml_path in yaml_files:
try:
with open(yaml_path, "r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
except Exception as exc:
logger.debug("Failed to parse %s: %s", yaml_path, exc)
continue
if not isinstance(data, dict):
continue
binary_name = data.get("Name", "").strip()
if not binary_name:
continue
description = data.get("Description", "")
commands = data.get("Commands", [])
if not isinstance(commands, list):
continue
for cmd_entry in commands:
if not isinstance(cmd_entry, dict):
continue
mitre_id = cmd_entry.get("MitreID")
if not mitre_id:
continue
# Normalise the MITRE ID
mitre_id = str(mitre_id).strip().upper()
if not mitre_id.startswith("T"):
continue
command = cmd_entry.get("Command", "")
usecase = cmd_entry.get("Usecase", "")
cmd_description = cmd_entry.get("Description", "")
# Dedup key
dedup_key = f"lolbas:{binary_name}:{mitre_id}"
procedure = []
if cmd_description:
procedure.append(f"Description: {cmd_description}")
if usecase:
procedure.append(f"Use case: {usecase}")
if command:
procedure.append(f"Command: {command}")
results.append({
"mitre_technique_id": mitre_id,
"name": f"LOLBAS: {binary_name}{usecase or cmd_description or mitre_id}"[:500],
"description": f"{description}\n\n{cmd_description}".strip()[:2000] if description else cmd_description[:2000] if cmd_description else None,
"source": "lolbas",
"platform": "windows",
"tool_suggested": binary_name,
"attack_procedure": "\n".join(procedure)[:4000] if procedure else None,
"atomic_test_id": dedup_key,
"source_url": f"https://lolbas-project.github.io/lolbas/Binaries/{binary_name}/",
})
logger.info("LOLBAS: Parsed %d templates", len(results))
return results
# ---------------------------------------------------------------------------
# GTFOBins import
# ---------------------------------------------------------------------------
def _parse_gtfobins(root_dir: Path) -> list[dict]:
"""Parse GTFOBins markdown files and return template dicts."""
results: list[dict] = []
gtfobins_root = root_dir / "GTFOBins.github.io-master" / "_gtfobins"
if not gtfobins_root.is_dir():
logger.warning("GTFOBins directory not found at %s", gtfobins_root)
return results
md_files = sorted(gtfobins_root.glob("*.md"))
logger.info("GTFOBins: Found %d markdown files", len(md_files))
for md_path in md_files:
binary_name = md_path.stem # e.g. "awk"
try:
with open(md_path, "r", encoding="utf-8") as fh:
content = fh.read()
except Exception as exc:
logger.debug("Failed to read %s: %s", md_path, exc)
continue
# Extract YAML front-matter
front_matter = _extract_front_matter(content)
if not front_matter:
continue
functions = front_matter.get("functions", {})
if not isinstance(functions, dict):
continue
for func_name, func_data in functions.items():
# Map function to MITRE technique
mitre_id = _GTFOBINS_FUNCTION_MAP.get(func_name.lower())
if not mitre_id:
continue
# Extract code examples from function data
examples = []
if isinstance(func_data, list):
for entry in func_data:
if isinstance(entry, dict):
code = entry.get("code", "")
if code:
examples.append(str(code))
elif isinstance(entry, str):
examples.append(entry)
procedure = "\n\n".join(examples) if examples else None
dedup_key = f"gtfobins:{binary_name}:{func_name}"
results.append({
"mitre_technique_id": mitre_id,
"name": f"GTFOBins: {binary_name}{func_name}"[:500],
"description": f"Abuse {binary_name} binary for {func_name} on Linux/Unix."[:2000],
"source": "gtfobins",
"platform": "linux",
"tool_suggested": binary_name,
"attack_procedure": procedure[:4000] if procedure else None,
"atomic_test_id": dedup_key,
"source_url": f"https://gtfobins.github.io/gtfobins/{binary_name}/",
})
logger.info("GTFOBins: Parsed %d templates", len(results))
return results
def _extract_front_matter(content: str) -> dict | None:
"""Extract YAML front-matter from a markdown file."""
match = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL)
if not match:
return None
try:
return yaml.safe_load(match.group(1))
except Exception:
return None
# ---------------------------------------------------------------------------
# Upsert logic
# ---------------------------------------------------------------------------
def _upsert_templates(db: Session, items: list[dict], source_name: str) -> dict:
"""Insert templates, skipping existing ones by atomic_test_id."""
existing_ids: set[str] = {
row[0]
for row in db.query(TestTemplate.atomic_test_id)
.filter(TestTemplate.source == source_name)
.filter(TestTemplate.atomic_test_id.isnot(None))
.all()
}
created = 0
skipped = 0
for item in items:
if item["atomic_test_id"] in existing_ids:
skipped += 1
continue
template = TestTemplate(
mitre_technique_id=item["mitre_technique_id"],
name=item["name"],
description=item["description"],
source=item["source"],
source_url=item.get("source_url"),
attack_procedure=item.get("attack_procedure"),
platform=item["platform"],
tool_suggested=item.get("tool_suggested"),
atomic_test_id=item["atomic_test_id"],
is_active=True,
)
db.add(template)
existing_ids.add(item["atomic_test_id"])
created += 1
db.commit()
return {"created": created, "skipped_existing": skipped, "total_parsed": len(items)}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def sync(db: Session) -> dict:
"""Import LOLBAS templates.
Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``.
"""
tmp_dir = tempfile.mkdtemp(prefix="aegis_lolbas_")
try:
zip_bytes = _download_zip(LOLBAS_ZIP_URL)
root_dir = _extract_zip(zip_bytes, tmp_dir)
parsed = _parse_lolbas(root_dir)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
summary = _upsert_templates(db, parsed, "lolbas")
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "lolbas").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("LOLBAS import complete — %s", summary)
log_action(db, user_id=None, action="import_lolbas",
entity_type="test_template", entity_id=None, details=summary)
return summary
def sync_gtfobins(db: Session) -> dict:
"""Import GTFOBins templates.
Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``.
"""
tmp_dir = tempfile.mkdtemp(prefix="aegis_gtfobins_")
try:
zip_bytes = _download_zip(GTFOBINS_ZIP_URL)
root_dir = _extract_zip(zip_bytes, tmp_dir)
parsed = _parse_gtfobins(root_dir)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
summary = _upsert_templates(db, parsed, "gtfobins")
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "gtfobins").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("GTFOBins import complete — %s", summary)
log_action(db, user_id=None, action="import_gtfobins",
entity_type="test_template", entity_id=None, details=summary)
return summary