"""Sigma Rules import service. Downloads the SigmaHQ repository ZIP from GitHub, parses every YAML rule file under ``rules/``, extracts MITRE ATT&CK tags, and creates :class:`DetectionRule` records in the database. Strategy -------- 1. Download the full SigmaHQ repo as a ZIP archive. 2. Extract in a temporary directory. 3. Walk all ``.yml`` files under ``rules/``. 4. Parse each YAML file — extract title, description, logsource, detection tags, severity (``level``), and the raw YAML content. 5. Filter: only import rules that have at least one ``attack.tXXXX`` tag. 6. Create / skip ``DetectionRule`` rows keyed by ``(source, source_id)``. 7. Clean up the temporary directory. Idempotency ----------- Running the import twice does **not** create duplicates. Existing rules are identified by ``source = "sigma"`` + ``source_id`` (relative file path) and simply skipped. """ import io import logging import re import shutil import tempfile import zipfile from datetime import datetime from pathlib import Path import requests as _requests import yaml from sqlalchemy.orm import Session from app.models.detection_rule import DetectionRule from app.models.data_source import DataSource from app.services.audit_service import log_action logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- SIGMA_ZIP_URL = ( "https://github.com/SigmaHQ/sigma/archive/refs/heads/master.zip" ) _DOWNLOAD_TIMEOUT = 300 _ZIP_ROOT_PREFIX = "sigma-master" # Regex to extract MITRE ATT&CK technique IDs from Sigma tags # e.g. "attack.t1059.001" → "T1059.001" _ATTACK_TAG_RE = re.compile(r"attack\.(t\d{4}(?:\.\d{3})?)", re.IGNORECASE) # Sigma severity levels _SEVERITY_MAP = { "informational": "informational", "low": "low", "medium": "medium", "high": "high", "critical": "critical", } # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _download_zip(url: str = SIGMA_ZIP_URL) -> bytes: """Download the SigmaHQ ZIP and return raw bytes.""" logger.info("Downloading SigmaHQ ZIP from %s …", url) resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) resp.raise_for_status() content = resp.content logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) return content def _extract_zip(zip_bytes: bytes, dest: str) -> Path: """Extract *zip_bytes* into *dest* and return the path to rules/ dir.""" with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: zf.extractall(dest) rules_dir = Path(dest) / _ZIP_ROOT_PREFIX / "rules" if not rules_dir.is_dir(): raise FileNotFoundError( f"Expected rules directory not found at {rules_dir}" ) return rules_dir def _extract_attack_tags(tags: list) -> list[str]: """Extract MITRE technique IDs from Sigma tag list. Example input: ["attack.defense_evasion", "attack.t1059.001", "cve.2021.44228"] Example output: ["T1059.001"] """ technique_ids = [] for tag in tags: m = _ATTACK_TAG_RE.match(str(tag).strip()) if m: technique_ids.append(m.group(1).upper()) return list(set(technique_ids)) def _parse_sigma_rules(rules_dir: Path) -> list[dict]: """Walk the rules directory and parse all Sigma YAML files. Returns a flat list of dicts, one per (rule, technique) combination. A single Sigma rule tagged with N techniques produces N entries. """ results: list[dict] = [] yaml_files = sorted(rules_dir.rglob("*.yml")) logger.info("Found %d YAML files to parse", len(yaml_files)) for yaml_path in yaml_files: relative_path = str(yaml_path.relative_to(rules_dir.parent)) try: with open(yaml_path, "r", encoding="utf-8") as fh: data = yaml.safe_load(fh) except Exception as exc: logger.debug("Failed to parse %s: %s", yaml_path, exc) continue if not isinstance(data, dict): continue title = data.get("title", "").strip() if not title: continue # Extract ATT&CK technique IDs from tags tags = data.get("tags", []) if not isinstance(tags, list): continue technique_ids = _extract_attack_tags(tags) if not technique_ids: continue # Skip rules without ATT&CK mapping description = data.get("description", "") level = str(data.get("level", "")).lower() severity = _SEVERITY_MAP.get(level) # Extract logsource logsource = data.get("logsource", {}) if not isinstance(logsource, dict): logsource = {} # Read full YAML content for storage try: with open(yaml_path, "r", encoding="utf-8") as fh: raw_content = fh.read() except Exception: raw_content = yaml.dump(data, default_flow_style=False) # False positive assessment falsepositives = data.get("falsepositives", []) if isinstance(falsepositives, list) and len(falsepositives) > 3: fp_rate = "high" elif isinstance(falsepositives, list) and len(falsepositives) > 1: fp_rate = "medium" else: fp_rate = "low" # Create one entry per technique for tech_id in technique_ids: source_url = ( f"https://github.com/SigmaHQ/sigma/blob/master/" f"{relative_path.replace(chr(92), '/')}" ) results.append({ "mitre_technique_id": tech_id, "title": title[:500], "description": str(description)[:2000] if description else None, "source_id": relative_path, "source_url": source_url, "rule_content": raw_content, "severity": severity, "log_sources": logsource if logsource else None, "false_positive_rate": fp_rate, "platforms": _platforms_from_logsource(logsource), }) logger.info("Parsed %d (rule, technique) pairs total", len(results)) return results def _platforms_from_logsource(logsource: dict) -> list[str]: """Infer platform list from Sigma logsource.""" platforms = [] product = str(logsource.get("product", "")).lower() service = str(logsource.get("service", "")).lower() if "windows" in product or "windows" in service: platforms.append("windows") if "linux" in product or "linux" in service: platforms.append("linux") if "macos" in product or "macos" in service: platforms.append("macos") # Sysmon → Windows if "sysmon" in service and "windows" not in platforms: platforms.append("windows") return platforms if platforms else None # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def sync(db: Session) -> dict: """Download and import Sigma detection rules. Parameters ---------- db : Session Active SQLAlchemy database session. Returns ------- dict Summary with ``created``, ``skipped_existing``, ``total_parsed``. """ tmp_dir = tempfile.mkdtemp(prefix="aegis_sigma_") try: zip_bytes = _download_zip() rules_dir = _extract_zip(zip_bytes, tmp_dir) parsed_rules = _parse_sigma_rules(rules_dir) finally: shutil.rmtree(tmp_dir, ignore_errors=True) logger.info("Cleaned up temp directory %s", tmp_dir) # Pre-load existing source_ids for dedup existing_ids: set[str] = { row[0] for row in db.query(DetectionRule.source_id) .filter(DetectionRule.source == "sigma") .filter(DetectionRule.source_id.isnot(None)) .all() } created = 0 skipped = 0 for item in parsed_rules: # Dedup key: source_id (relative path). A rule file may produce # multiple entries (one per technique), but we deduplicate by # source_id so re-runs are safe. For multi-technique rules we # only skip if the exact same source_id is already present. dedup_key = f"{item['source_id']}::{item['mitre_technique_id']}" if item["source_id"] in existing_ids: skipped += 1 continue rule = DetectionRule( mitre_technique_id=item["mitre_technique_id"], title=item["title"], description=item["description"], source="sigma", source_id=item["source_id"], source_url=item["source_url"], rule_content=item["rule_content"], rule_format="sigma_yaml", severity=item["severity"], platforms=item["platforms"], log_sources=item["log_sources"], false_positive_rate=item["false_positive_rate"], is_active=True, ) db.add(rule) existing_ids.add(item["source_id"]) created += 1 db.commit() summary = { "created": created, "skipped_existing": skipped, "total_parsed": len(parsed_rules), } # Update DataSource record ds = db.query(DataSource).filter(DataSource.name == "sigma").first() if ds: ds.last_sync_at = datetime.utcnow() ds.last_sync_status = "success" ds.last_sync_stats = summary db.commit() logger.info("Sigma import complete — %s", summary) log_action( db, user_id=None, action="import_sigma_rules", entity_type="detection_rule", entity_id=None, details=summary, ) return summary