From f4c8cbf7683b7d4a10b6118a703e29e7ac41e6de Mon Sep 17 00:00:00 2001 From: Kitos Date: Mon, 9 Feb 2026 16:19:44 +0100 Subject: [PATCH] feat(phase-22): add import services for Sigma, LOLBAS, GTFOBins, CALDERA, Elastic and data sources panel (T-203 to T-207) --- backend/app/main.py | 2 + backend/app/routers/data_sources.py | 292 ++++++++++++++ .../app/services/caldera_import_service.py | 274 +++++++++++++ .../app/services/elastic_import_service.py | 321 +++++++++++++++ backend/app/services/lolbas_import_service.py | 375 ++++++++++++++++++ backend/app/services/sigma_import_service.py | 308 ++++++++++++++ backend/requirements.txt | 2 + frontend/src/App.tsx | 9 + frontend/src/api/data-sources.ts | 79 ++++ frontend/src/components/Sidebar.tsx | 2 + frontend/src/pages/DataSourcesPage.tsx | 375 ++++++++++++++++++ 11 files changed, 2039 insertions(+) create mode 100644 backend/app/routers/data_sources.py create mode 100644 backend/app/services/caldera_import_service.py create mode 100644 backend/app/services/elastic_import_service.py create mode 100644 backend/app/services/lolbas_import_service.py create mode 100644 backend/app/services/sigma_import_service.py create mode 100644 frontend/src/api/data-sources.ts create mode 100644 frontend/src/pages/DataSourcesPage.tsx diff --git a/backend/app/main.py b/backend/app/main.py index a31337b..1e79f56 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -18,6 +18,7 @@ from app.routers import users as users_router from app.routers import audit as audit_router from app.routers import notifications as notifications_router from app.routers import reports as reports_router +from app.routers import data_sources as data_sources_router from app.storage import ensure_bucket_exists from app.jobs.mitre_sync_job import start_scheduler, scheduler @@ -60,6 +61,7 @@ app.include_router(users_router.router, prefix="/api/v1") app.include_router(audit_router.router, prefix="/api/v1") app.include_router(notifications_router.router, prefix="/api/v1") app.include_router(reports_router.router, prefix="/api/v1") +app.include_router(data_sources_router.router, prefix="/api/v1") @app.get("/health") diff --git a/backend/app/routers/data_sources.py b/backend/app/routers/data_sources.py new file mode 100644 index 0000000..7e843b9 --- /dev/null +++ b/backend/app/routers/data_sources.py @@ -0,0 +1,292 @@ +"""Data sources management endpoints (admin only). + +Provides a centralized panel for managing all external data sources +(Atomic Red Team, Sigma, LOLBAS, GTFOBins, CALDERA, Elastic, etc.) +including sync triggers, enable/disable toggles, and statistics. +""" + +import logging +from datetime import datetime + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import require_role +from app.models.user import User +from app.models.data_source import DataSource +from app.services.audit_service import log_action + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/data-sources", tags=["data-sources"]) + + +# --------------------------------------------------------------------------- +# Sync dispatcher — maps source name → import function +# --------------------------------------------------------------------------- + +def _get_sync_handler(source_name: str): + """Lazily import and return the sync function for *source_name*. + + We import lazily to avoid circular imports and to only load the + modules that are actually needed. + """ + handlers = { + "atomic_red_team": ("app.services.atomic_import_service", "import_atomic_red_team"), + "sigma": ("app.services.sigma_import_service", "sync"), + "lolbas": ("app.services.lolbas_import_service", "sync"), + "gtfobins": ("app.services.lolbas_import_service", "sync_gtfobins"), + "caldera": ("app.services.caldera_import_service", "sync"), + "elastic_rules": ("app.services.elastic_import_service", "sync"), + # d3fend and mitre_cti added in later phases + } + + if source_name not in handlers: + return None + + module_path, func_name = handlers[source_name] + import importlib + mod = importlib.import_module(module_path) + return getattr(mod, func_name) + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + + +@router.get("") +def list_data_sources( + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """List all registered data sources. + + **Requires** the ``admin`` role. + """ + sources = db.query(DataSource).order_by(DataSource.name).all() + return [ + { + "id": str(s.id), + "name": s.name, + "display_name": s.display_name, + "type": s.type, + "url": s.url, + "description": s.description, + "is_enabled": s.is_enabled, + "last_sync_at": s.last_sync_at.isoformat() if s.last_sync_at else None, + "last_sync_status": s.last_sync_status, + "last_sync_stats": s.last_sync_stats, + "sync_frequency": s.sync_frequency, + "config": s.config, + "created_at": s.created_at.isoformat() if s.created_at else None, + } + for s in sources + ] + + +@router.patch("/{source_id}") +def update_data_source( + source_id: str, + body: dict, + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """Update a data source (enable/disable, change config). + + **Requires** the ``admin`` role. + + Body fields (all optional): + - ``is_enabled`` (bool) + - ``sync_frequency`` (str) + - ``config`` (dict) + """ + ds = db.query(DataSource).filter(DataSource.id == source_id).first() + if not ds: + raise HTTPException(status_code=404, detail="Data source not found") + + if "is_enabled" in body: + ds.is_enabled = bool(body["is_enabled"]) + if "sync_frequency" in body: + ds.sync_frequency = body["sync_frequency"] + if "config" in body: + ds.config = body["config"] + + db.commit() + + log_action( + db, + user_id=current_user.id, + action="update_data_source", + entity_type="data_source", + entity_id=str(ds.id), + details={"updates": body}, + ) + + return {"message": "Data source updated", "id": str(ds.id)} + + +@router.post("/{source_id}/sync") +def sync_data_source( + source_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """Trigger sync/import for a specific data source. + + **Requires** the ``admin`` role. + """ + ds = db.query(DataSource).filter(DataSource.id == source_id).first() + if not ds: + raise HTTPException(status_code=404, detail="Data source not found") + + handler = _get_sync_handler(ds.name) + if handler is None: + raise HTTPException( + status_code=400, + detail=f"No sync handler available for '{ds.name}'", + ) + + # Mark as in_progress + ds.last_sync_status = "in_progress" + db.commit() + + try: + summary = handler(db) + except Exception as exc: + logger.error("Sync failed for %s: %s", ds.name, exc) + ds.last_sync_status = "error" + ds.last_sync_at = datetime.utcnow() + ds.last_sync_stats = {"error": str(exc)} + db.commit() + raise HTTPException( + status_code=500, + detail=f"Sync failed: {str(exc)}", + ) + + # Update DS record (the handler may already have done this, + # but we ensure it here as well) + ds.last_sync_at = datetime.utcnow() + ds.last_sync_status = "success" + ds.last_sync_stats = summary + db.commit() + + return { + "message": f"Sync complete for {ds.display_name}", + "source": ds.name, + "stats": summary, + } + + +@router.post("/sync-all") +def sync_all_data_sources( + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """Trigger sync for all enabled data sources (sequentially). + + **Requires** the ``admin`` role. + """ + enabled_sources = ( + db.query(DataSource) + .filter(DataSource.is_enabled == True) + .order_by(DataSource.name) + .all() + ) + + results = [] + for ds in enabled_sources: + handler = _get_sync_handler(ds.name) + if handler is None: + results.append({ + "source": ds.name, + "status": "skipped", + "detail": "No sync handler available", + }) + continue + + ds.last_sync_status = "in_progress" + db.commit() + + try: + summary = handler(db) + ds.last_sync_at = datetime.utcnow() + ds.last_sync_status = "success" + ds.last_sync_stats = summary + db.commit() + results.append({ + "source": ds.name, + "status": "success", + "stats": summary, + }) + except Exception as exc: + logger.error("Sync failed for %s: %s", ds.name, exc) + ds.last_sync_status = "error" + ds.last_sync_at = datetime.utcnow() + ds.last_sync_stats = {"error": str(exc)} + db.commit() + results.append({ + "source": ds.name, + "status": "error", + "detail": str(exc), + }) + + log_action( + db, + user_id=current_user.id, + action="sync_all_data_sources", + entity_type="data_source", + entity_id=None, + details={"results": results}, + ) + + return {"message": "Sync all complete", "results": results} + + +@router.get("/{source_id}/stats") +def get_data_source_stats( + source_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """Get detailed statistics for a specific data source. + + **Requires** the ``admin`` role. + """ + ds = db.query(DataSource).filter(DataSource.id == source_id).first() + if not ds: + raise HTTPException(status_code=404, detail="Data source not found") + + # Count items from this source + from app.models.test_template import TestTemplate + from app.models.detection_rule import DetectionRule + + template_count = 0 + rule_count = 0 + + if ds.type == "attack_procedure": + template_count = ( + db.query(TestTemplate) + .filter(TestTemplate.source == ds.name) + .count() + ) + elif ds.type == "detection_rule": + rule_count = ( + db.query(DetectionRule) + .filter(DetectionRule.source == ds.name) + .count() + ) + + return { + "id": str(ds.id), + "name": ds.name, + "display_name": ds.display_name, + "type": ds.type, + "is_enabled": ds.is_enabled, + "last_sync_at": ds.last_sync_at.isoformat() if ds.last_sync_at else None, + "last_sync_status": ds.last_sync_status, + "last_sync_stats": ds.last_sync_stats, + "total_templates": template_count, + "total_rules": rule_count, + } diff --git a/backend/app/services/caldera_import_service.py b/backend/app/services/caldera_import_service.py new file mode 100644 index 0000000..9c9bfd3 --- /dev/null +++ b/backend/app/services/caldera_import_service.py @@ -0,0 +1,274 @@ +"""MITRE CALDERA abilities import service. + +Downloads the CALDERA repository ZIP from GitHub, parses the ability YAML +files under ``data/abilities/{tactic}/``, and creates :class:`TestTemplate` +records in the database. + +Strategy +-------- +1. Download the CALDERA repo as a ZIP. +2. Extract into a temporary directory. +3. Walk ``data/abilities/{tactic}/*.yml`` files. +4. For each ability: extract name, description, technique ID, platforms, + and executor commands. +5. Create TestTemplate rows keyed by the ability's ``id`` field. +6. Clean up. + +Idempotency +----------- +Running the import twice does **not** create duplicates. Existing +templates are identified by ``source = "caldera"`` + ``atomic_test_id`` +(the CALDERA ability ``id``). +""" + +import io +import logging +import shutil +import tempfile +import zipfile +from datetime import datetime +from pathlib import Path + +import requests as _requests +import yaml +from sqlalchemy.orm import Session + +from app.models.test_template import TestTemplate +from app.models.data_source import DataSource +from app.services.audit_service import log_action + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +CALDERA_ZIP_URL = ( + "https://github.com/mitre/caldera" + "/archive/refs/heads/master.zip" +) + +_DOWNLOAD_TIMEOUT = 300 +_ZIP_ROOT_PREFIX = "caldera-master" + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _download_zip(url: str = CALDERA_ZIP_URL) -> bytes: + """Download the CALDERA ZIP and return raw bytes.""" + logger.info("Downloading CALDERA ZIP from %s …", url) + resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) + resp.raise_for_status() + content = resp.content + logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) + return content + + +def _extract_zip(zip_bytes: bytes, dest: str) -> Path: + """Extract *zip_bytes* into *dest* and return abilities dir.""" + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + zf.extractall(dest) + abilities_dir = Path(dest) / _ZIP_ROOT_PREFIX / "data" / "abilities" + if not abilities_dir.is_dir(): + raise FileNotFoundError( + f"Expected abilities directory not found at {abilities_dir}" + ) + return abilities_dir + + +def _extract_commands(platforms_dict: dict) -> str: + """Extract executor commands from CALDERA platforms dict. + + The structure is typically:: + + platforms: + windows: + psh: + command: "whoami" + linux: + sh: + command: "id" + + Returns a formatted string with all commands. + """ + lines = [] + if not isinstance(platforms_dict, dict): + return "" + + for os_name, executors in platforms_dict.items(): + if not isinstance(executors, dict): + continue + for executor_name, executor_data in executors.items(): + if isinstance(executor_data, dict): + cmd = executor_data.get("command", "") + if cmd: + lines.append(f"[{os_name}/{executor_name}]\n{cmd}") + elif isinstance(executor_data, str): + lines.append(f"[{os_name}/{executor_name}]\n{executor_data}") + + return "\n\n".join(lines) + + +def _extract_platforms(platforms_dict: dict) -> str: + """Extract platform names from CALDERA platforms dict.""" + if not isinstance(platforms_dict, dict): + return "" + platform_names = [] + for os_name in platforms_dict: + normalized = str(os_name).lower().strip() + if normalized in ("windows", "linux", "darwin", "macos"): + if normalized == "darwin": + normalized = "macos" + if normalized not in platform_names: + platform_names.append(normalized) + return ", ".join(platform_names) + + +def _parse_abilities(abilities_dir: Path) -> list[dict]: + """Walk abilities directories and parse all YAML files. + + Returns a flat list of dicts, each representing one ability. + """ + results: list[dict] = [] + yaml_files = sorted(abilities_dir.rglob("*.yml")) + logger.info("Found %d ability YAML files", len(yaml_files)) + + for yaml_path in yaml_files: + try: + with open(yaml_path, "r", encoding="utf-8") as fh: + data_list = list(yaml.safe_load_all(fh)) + except Exception as exc: + logger.debug("Failed to parse %s: %s", yaml_path, exc) + continue + + for data in data_list: + if not isinstance(data, dict): + continue + + ability_id = data.get("id", "") + if not ability_id: + continue + + name = data.get("name", "").strip() + description = data.get("description", "").strip() + tactic = data.get("tactic", "").strip() + + # Extract technique info + technique = data.get("technique", {}) + if isinstance(technique, dict): + attack_id = technique.get("attack_id", "") + else: + attack_id = "" + + if not attack_id: + continue + + # Normalise technique ID + attack_id = str(attack_id).strip().upper() + if not attack_id.startswith("T"): + continue + + # Extract platforms and commands + platforms_dict = data.get("platforms", {}) + commands = _extract_commands(platforms_dict) + platform_str = _extract_platforms(platforms_dict) + + # Determine executor type + executors = set() + if isinstance(platforms_dict, dict): + for os_executors in platforms_dict.values(): + if isinstance(os_executors, dict): + executors.update(os_executors.keys()) + executor_str = ", ".join(sorted(executors)) if executors else None + + results.append({ + "mitre_technique_id": attack_id, + "name": f"CALDERA: {name}"[:500] if name else f"CALDERA ability {ability_id}"[:500], + "description": f"{description}\n\nTactic: {tactic}".strip()[:2000] if description else None, + "source": "caldera", + "platform": platform_str, + "tool_suggested": executor_str, + "attack_procedure": commands[:4000] if commands else None, + "atomic_test_id": f"caldera:{ability_id}", + "source_url": f"https://github.com/mitre/caldera/tree/master/data/abilities/{tactic}", + }) + + logger.info("Parsed %d CALDERA abilities total", len(results)) + return results + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def sync(db: Session) -> dict: + """Download and import CALDERA abilities as TestTemplates. + + Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. + """ + tmp_dir = tempfile.mkdtemp(prefix="aegis_caldera_") + try: + zip_bytes = _download_zip() + abilities_dir = _extract_zip(zip_bytes, tmp_dir) + parsed = _parse_abilities(abilities_dir) + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + logger.info("Cleaned up temp directory %s", tmp_dir) + + # Pre-load existing for dedup + existing_ids: set[str] = { + row[0] + for row in db.query(TestTemplate.atomic_test_id) + .filter(TestTemplate.source == "caldera") + .filter(TestTemplate.atomic_test_id.isnot(None)) + .all() + } + + created = 0 + skipped = 0 + + for item in parsed: + if item["atomic_test_id"] in existing_ids: + skipped += 1 + continue + + template = TestTemplate( + mitre_technique_id=item["mitre_technique_id"], + name=item["name"], + description=item["description"], + source=item["source"], + source_url=item["source_url"], + attack_procedure=item["attack_procedure"], + platform=item["platform"], + tool_suggested=item["tool_suggested"], + atomic_test_id=item["atomic_test_id"], + is_active=True, + ) + db.add(template) + existing_ids.add(item["atomic_test_id"]) + created += 1 + + db.commit() + + summary = { + "created": created, + "skipped_existing": skipped, + "total_parsed": len(parsed), + } + + # Update DataSource record + ds = db.query(DataSource).filter(DataSource.name == "caldera").first() + if ds: + ds.last_sync_at = datetime.utcnow() + ds.last_sync_status = "success" + ds.last_sync_stats = summary + db.commit() + + logger.info("CALDERA import complete — %s", summary) + log_action(db, user_id=None, action="import_caldera", + entity_type="test_template", entity_id=None, details=summary) + return summary diff --git a/backend/app/services/elastic_import_service.py b/backend/app/services/elastic_import_service.py new file mode 100644 index 0000000..98084c3 --- /dev/null +++ b/backend/app/services/elastic_import_service.py @@ -0,0 +1,321 @@ +"""Elastic Detection Rules import service. + +Downloads the Elastic detection-rules repository ZIP from GitHub, parses +every ``.toml`` rule file under ``rules/``, extracts MITRE ATT&CK +mappings, and creates :class:`DetectionRule` records in the database. + +Strategy +-------- +1. Download the full repo as a ZIP archive. +2. Extract into a temporary directory. +3. Walk all ``.toml`` files under ``rules/``. +4. Parse each TOML file — extract rule name, description, query (KQL), + severity, and MITRE ATT&CK threat mappings. +5. Create / skip ``DetectionRule`` rows keyed by ``(source, source_id)``. +6. Clean up. + +Idempotency +----------- +Running the import twice does **not** create duplicates. Existing +rules are identified by ``source = "elastic"`` + ``source_id`` (the +TOML filename). +""" + +import io +import logging +import shutil +import tempfile +import zipfile +from datetime import datetime +from pathlib import Path + +import requests as _requests +from sqlalchemy.orm import Session + +from app.models.detection_rule import DetectionRule +from app.models.data_source import DataSource +from app.services.audit_service import log_action + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +ELASTIC_ZIP_URL = ( + "https://github.com/elastic/detection-rules" + "/archive/refs/heads/main.zip" +) + +_DOWNLOAD_TIMEOUT = 300 +_ZIP_ROOT_PREFIX = "detection-rules-main" + +# Severity normalisation +_SEVERITY_MAP = { + "informational": "informational", + "low": "low", + "medium": "medium", + "high": "high", + "critical": "critical", +} + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _download_zip(url: str = ELASTIC_ZIP_URL) -> bytes: + """Download the Elastic Detection Rules ZIP and return raw bytes.""" + logger.info("Downloading Elastic Detection Rules ZIP from %s …", url) + resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) + resp.raise_for_status() + content = resp.content + logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) + return content + + +def _extract_zip(zip_bytes: bytes, dest: str) -> Path: + """Extract *zip_bytes* into *dest* and return rules/ dir.""" + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + zf.extractall(dest) + rules_dir = Path(dest) / _ZIP_ROOT_PREFIX / "rules" + if not rules_dir.is_dir(): + raise FileNotFoundError( + f"Expected rules directory not found at {rules_dir}" + ) + return rules_dir + + +def _parse_toml_safe(path: Path) -> dict | None: + """Parse a TOML file. Uses the ``toml`` library.""" + try: + import toml + with open(path, "r", encoding="utf-8") as fh: + return toml.load(fh) + except Exception as exc: + logger.debug("Failed to parse %s: %s", path, exc) + return None + + +def _extract_mitre_techniques(threat_list: list) -> list[str]: + """Extract MITRE technique IDs from Elastic's ``rule.threat`` array. + + Each entry looks like:: + + [[rule.threat]] + framework = "MITRE ATT&CK" + [rule.threat.tactic] + name = "Credential Access" + id = "TA0006" + [[rule.threat.technique]] + name = "OS Credential Dumping" + id = "T1003" + [[rule.threat.technique.subtechnique]] + name = "LSASS Memory" + id = "T1003.001" + """ + technique_ids = [] + + if not isinstance(threat_list, list): + return technique_ids + + for threat_entry in threat_list: + if not isinstance(threat_entry, dict): + continue + + # Skip non-MITRE frameworks + framework = threat_entry.get("framework", "") + if "MITRE" not in str(framework).upper(): + continue + + techniques = threat_entry.get("technique", []) + if not isinstance(techniques, list): + continue + + for tech in techniques: + if not isinstance(tech, dict): + continue + tech_id = tech.get("id", "") + if tech_id and str(tech_id).upper().startswith("T"): + technique_ids.append(str(tech_id).upper()) + + # Check subtechniques + subtechniques = tech.get("subtechnique", []) + if isinstance(subtechniques, list): + for subtech in subtechniques: + if isinstance(subtech, dict): + sub_id = subtech.get("id", "") + if sub_id and str(sub_id).upper().startswith("T"): + technique_ids.append(str(sub_id).upper()) + + return list(set(technique_ids)) + + +def _parse_elastic_rules(rules_dir: Path) -> list[dict]: + """Walk the rules directory and parse all TOML files. + + Returns a flat list of dicts, one per (rule, technique) combination. + """ + results: list[dict] = [] + toml_files = sorted(rules_dir.rglob("*.toml")) + logger.info("Found %d TOML files to parse", len(toml_files)) + + for toml_path in toml_files: + data = _parse_toml_safe(toml_path) + if not data: + continue + + rule = data.get("rule", {}) + if not isinstance(rule, dict): + continue + + name = rule.get("name", "").strip() + if not name: + continue + + # Extract MITRE technique IDs + threat_list = rule.get("threat", []) + technique_ids = _extract_mitre_techniques(threat_list) + if not technique_ids: + continue + + description = rule.get("description", "") + query = rule.get("query", "") + severity = _SEVERITY_MAP.get(str(rule.get("severity", "")).lower()) + rule_type = rule.get("type", "query") # query, eql, threshold, etc. + + # Determine rule format based on type + if rule_type == "eql": + rule_format = "eql" + elif rule_type == "esql": + rule_format = "esql" + else: + rule_format = "kql" + + # Use filename as source_id + source_id = toml_path.name + + # Read raw content + try: + with open(toml_path, "r", encoding="utf-8") as fh: + raw_content = fh.read() + except Exception: + raw_content = query or str(data) + + # Build source URL + relative = str(toml_path.relative_to(rules_dir.parent)).replace("\\", "/") + source_url = ( + f"https://github.com/elastic/detection-rules/blob/main/{relative}" + ) + + # One entry per technique + for tech_id in technique_ids: + results.append({ + "mitre_technique_id": tech_id, + "title": name[:500], + "description": str(description)[:2000] if description else None, + "source_id": source_id, + "source_url": source_url, + "rule_content": query[:50000] if query else raw_content[:50000], + "rule_format": rule_format, + "severity": severity, + "platforms": _infer_platforms(rules_dir, toml_path), + }) + + logger.info("Parsed %d (rule, technique) pairs total", len(results)) + return results + + +def _infer_platforms(rules_dir: Path, toml_path: Path) -> list[str] | None: + """Infer platforms from the rule's directory structure. + + Elastic organizes rules by OS: rules/windows/, rules/linux/, etc. + """ + relative = toml_path.relative_to(rules_dir) + parts = [p.lower() for p in relative.parts] + + platforms = [] + if "windows" in parts: + platforms.append("windows") + if "linux" in parts: + platforms.append("linux") + if "macos" in parts: + platforms.append("macos") + + return platforms if platforms else None + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def sync(db: Session) -> dict: + """Download and import Elastic detection rules. + + Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. + """ + tmp_dir = tempfile.mkdtemp(prefix="aegis_elastic_") + try: + zip_bytes = _download_zip() + rules_dir = _extract_zip(zip_bytes, tmp_dir) + parsed_rules = _parse_elastic_rules(rules_dir) + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + logger.info("Cleaned up temp directory %s", tmp_dir) + + # Pre-load existing source_ids for dedup + existing_ids: set[str] = { + row[0] + for row in db.query(DetectionRule.source_id) + .filter(DetectionRule.source == "elastic") + .filter(DetectionRule.source_id.isnot(None)) + .all() + } + + created = 0 + skipped = 0 + + for item in parsed_rules: + if item["source_id"] in existing_ids: + skipped += 1 + continue + + rule = DetectionRule( + mitre_technique_id=item["mitre_technique_id"], + title=item["title"], + description=item["description"], + source="elastic", + source_id=item["source_id"], + source_url=item["source_url"], + rule_content=item["rule_content"], + rule_format=item["rule_format"], + severity=item["severity"], + platforms=item["platforms"], + is_active=True, + ) + db.add(rule) + existing_ids.add(item["source_id"]) + created += 1 + + db.commit() + + summary = { + "created": created, + "skipped_existing": skipped, + "total_parsed": len(parsed_rules), + } + + # Update DataSource record + ds = db.query(DataSource).filter(DataSource.name == "elastic_rules").first() + if ds: + ds.last_sync_at = datetime.utcnow() + ds.last_sync_status = "success" + ds.last_sync_stats = summary + db.commit() + + logger.info("Elastic import complete — %s", summary) + log_action(db, user_id=None, action="import_elastic_rules", + entity_type="detection_rule", entity_id=None, details=summary) + return summary diff --git a/backend/app/services/lolbas_import_service.py b/backend/app/services/lolbas_import_service.py new file mode 100644 index 0000000..fc31e1d --- /dev/null +++ b/backend/app/services/lolbas_import_service.py @@ -0,0 +1,375 @@ +"""LOLBAS and GTFOBins import service. + +Downloads the LOLBAS (Windows) and GTFOBins (Linux) repositories, +parses their YAML / Markdown files, and creates :class:`TestTemplate` +records mapped to MITRE ATT&CK techniques. + +LOLBAS +------ +- ZIP from ``LOLBAS-Project/LOLBAS`` +- YAML files in ``yml/OSBinaries/``, ``yml/OSLibraries/``, ``yml/OSScripts/`` +- Each YAML contains: Name, Description, Commands (list with MitreID) + +GTFOBins +-------- +- ZIP from ``GTFOBins/GTFOBins.github.io`` +- Markdown files in ``_gtfobins/`` +- Each Markdown has YAML front-matter with function names +- Functions mapped to MITRE via a static dictionary + +Idempotency +----------- +Deduplication keys: +- LOLBAS: ``source + Name + MitreID`` → stored in ``atomic_test_id`` +- GTFOBins: ``source + binary_name + function`` → stored in ``atomic_test_id`` +""" + +import io +import logging +import re +import shutil +import tempfile +import zipfile +from datetime import datetime +from pathlib import Path + +import requests as _requests +import yaml +from sqlalchemy.orm import Session + +from app.models.test_template import TestTemplate +from app.models.data_source import DataSource +from app.services.audit_service import log_action + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +LOLBAS_ZIP_URL = ( + "https://github.com/LOLBAS-Project/LOLBAS" + "/archive/refs/heads/master.zip" +) +GTFOBINS_ZIP_URL = ( + "https://github.com/GTFOBins/GTFOBins.github.io" + "/archive/refs/heads/master.zip" +) + +_DOWNLOAD_TIMEOUT = 300 + +# GTFOBins function → MITRE technique mapping +_GTFOBINS_FUNCTION_MAP: dict[str, str] = { + "shell": "T1059", + "command": "T1059", + "reverse-shell": "T1059", + "non-interactive-reverse-shell": "T1059", + "bind-shell": "T1059", + "non-interactive-bind-shell": "T1059", + "file-upload": "T1105", + "file-download": "T1105", + "file-write": "T1105", + "file-read": "T1005", + "library-load": "T1129", + "sudo": "T1548.003", + "suid": "T1548.001", + "capabilities": "T1548", + "limited-suid": "T1548.001", +} + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _download_zip(url: str) -> bytes: + """Download a ZIP from *url* and return raw bytes.""" + logger.info("Downloading ZIP from %s …", url) + resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) + resp.raise_for_status() + content = resp.content + logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) + return content + + +def _extract_zip(zip_bytes: bytes, dest: str) -> Path: + """Extract *zip_bytes* into *dest* and return the root directory.""" + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + zf.extractall(dest) + return Path(dest) + + +# --------------------------------------------------------------------------- +# LOLBAS import +# --------------------------------------------------------------------------- + + +def _parse_lolbas(root_dir: Path) -> list[dict]: + """Parse LOLBAS YAML files and return template dicts.""" + results: list[dict] = [] + + lolbas_root = root_dir / "LOLBAS-master" + yaml_dirs = [ + lolbas_root / "yml" / "OSBinaries", + lolbas_root / "yml" / "OSLibraries", + lolbas_root / "yml" / "OSScripts", + ] + + yaml_files = [] + for d in yaml_dirs: + if d.is_dir(): + yaml_files.extend(sorted(d.rglob("*.yml"))) + + logger.info("LOLBAS: Found %d YAML files", len(yaml_files)) + + for yaml_path in yaml_files: + try: + with open(yaml_path, "r", encoding="utf-8") as fh: + data = yaml.safe_load(fh) + except Exception as exc: + logger.debug("Failed to parse %s: %s", yaml_path, exc) + continue + + if not isinstance(data, dict): + continue + + binary_name = data.get("Name", "").strip() + if not binary_name: + continue + + description = data.get("Description", "") + commands = data.get("Commands", []) + if not isinstance(commands, list): + continue + + for cmd_entry in commands: + if not isinstance(cmd_entry, dict): + continue + + mitre_id = cmd_entry.get("MitreID") + if not mitre_id: + continue + + # Normalise the MITRE ID + mitre_id = str(mitre_id).strip().upper() + if not mitre_id.startswith("T"): + continue + + command = cmd_entry.get("Command", "") + usecase = cmd_entry.get("Usecase", "") + cmd_description = cmd_entry.get("Description", "") + + # Dedup key + dedup_key = f"lolbas:{binary_name}:{mitre_id}" + + procedure = [] + if cmd_description: + procedure.append(f"Description: {cmd_description}") + if usecase: + procedure.append(f"Use case: {usecase}") + if command: + procedure.append(f"Command: {command}") + + results.append({ + "mitre_technique_id": mitre_id, + "name": f"LOLBAS: {binary_name} — {usecase or cmd_description or mitre_id}"[:500], + "description": f"{description}\n\n{cmd_description}".strip()[:2000] if description else cmd_description[:2000] if cmd_description else None, + "source": "lolbas", + "platform": "windows", + "tool_suggested": binary_name, + "attack_procedure": "\n".join(procedure)[:4000] if procedure else None, + "atomic_test_id": dedup_key, + "source_url": f"https://lolbas-project.github.io/lolbas/Binaries/{binary_name}/", + }) + + logger.info("LOLBAS: Parsed %d templates", len(results)) + return results + + +# --------------------------------------------------------------------------- +# GTFOBins import +# --------------------------------------------------------------------------- + + +def _parse_gtfobins(root_dir: Path) -> list[dict]: + """Parse GTFOBins markdown files and return template dicts.""" + results: list[dict] = [] + + gtfobins_root = root_dir / "GTFOBins.github.io-master" / "_gtfobins" + if not gtfobins_root.is_dir(): + logger.warning("GTFOBins directory not found at %s", gtfobins_root) + return results + + md_files = sorted(gtfobins_root.glob("*.md")) + logger.info("GTFOBins: Found %d markdown files", len(md_files)) + + for md_path in md_files: + binary_name = md_path.stem # e.g. "awk" + try: + with open(md_path, "r", encoding="utf-8") as fh: + content = fh.read() + except Exception as exc: + logger.debug("Failed to read %s: %s", md_path, exc) + continue + + # Extract YAML front-matter + front_matter = _extract_front_matter(content) + if not front_matter: + continue + + functions = front_matter.get("functions", {}) + if not isinstance(functions, dict): + continue + + for func_name, func_data in functions.items(): + # Map function to MITRE technique + mitre_id = _GTFOBINS_FUNCTION_MAP.get(func_name.lower()) + if not mitre_id: + continue + + # Extract code examples from function data + examples = [] + if isinstance(func_data, list): + for entry in func_data: + if isinstance(entry, dict): + code = entry.get("code", "") + if code: + examples.append(str(code)) + elif isinstance(entry, str): + examples.append(entry) + + procedure = "\n\n".join(examples) if examples else None + dedup_key = f"gtfobins:{binary_name}:{func_name}" + + results.append({ + "mitre_technique_id": mitre_id, + "name": f"GTFOBins: {binary_name} — {func_name}"[:500], + "description": f"Abuse {binary_name} binary for {func_name} on Linux/Unix."[:2000], + "source": "gtfobins", + "platform": "linux", + "tool_suggested": binary_name, + "attack_procedure": procedure[:4000] if procedure else None, + "atomic_test_id": dedup_key, + "source_url": f"https://gtfobins.github.io/gtfobins/{binary_name}/", + }) + + logger.info("GTFOBins: Parsed %d templates", len(results)) + return results + + +def _extract_front_matter(content: str) -> dict | None: + """Extract YAML front-matter from a markdown file.""" + match = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL) + if not match: + return None + try: + return yaml.safe_load(match.group(1)) + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Upsert logic +# --------------------------------------------------------------------------- + + +def _upsert_templates(db: Session, items: list[dict], source_name: str) -> dict: + """Insert templates, skipping existing ones by atomic_test_id.""" + existing_ids: set[str] = { + row[0] + for row in db.query(TestTemplate.atomic_test_id) + .filter(TestTemplate.source == source_name) + .filter(TestTemplate.atomic_test_id.isnot(None)) + .all() + } + + created = 0 + skipped = 0 + + for item in items: + if item["atomic_test_id"] in existing_ids: + skipped += 1 + continue + + template = TestTemplate( + mitre_technique_id=item["mitre_technique_id"], + name=item["name"], + description=item["description"], + source=item["source"], + source_url=item.get("source_url"), + attack_procedure=item.get("attack_procedure"), + platform=item["platform"], + tool_suggested=item.get("tool_suggested"), + atomic_test_id=item["atomic_test_id"], + is_active=True, + ) + db.add(template) + existing_ids.add(item["atomic_test_id"]) + created += 1 + + db.commit() + return {"created": created, "skipped_existing": skipped, "total_parsed": len(items)} + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def sync(db: Session) -> dict: + """Import LOLBAS templates. + + Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. + """ + tmp_dir = tempfile.mkdtemp(prefix="aegis_lolbas_") + try: + zip_bytes = _download_zip(LOLBAS_ZIP_URL) + root_dir = _extract_zip(zip_bytes, tmp_dir) + parsed = _parse_lolbas(root_dir) + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + summary = _upsert_templates(db, parsed, "lolbas") + + # Update DataSource record + ds = db.query(DataSource).filter(DataSource.name == "lolbas").first() + if ds: + ds.last_sync_at = datetime.utcnow() + ds.last_sync_status = "success" + ds.last_sync_stats = summary + db.commit() + + logger.info("LOLBAS import complete — %s", summary) + log_action(db, user_id=None, action="import_lolbas", + entity_type="test_template", entity_id=None, details=summary) + return summary + + +def sync_gtfobins(db: Session) -> dict: + """Import GTFOBins templates. + + Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``. + """ + tmp_dir = tempfile.mkdtemp(prefix="aegis_gtfobins_") + try: + zip_bytes = _download_zip(GTFOBINS_ZIP_URL) + root_dir = _extract_zip(zip_bytes, tmp_dir) + parsed = _parse_gtfobins(root_dir) + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + summary = _upsert_templates(db, parsed, "gtfobins") + + # Update DataSource record + ds = db.query(DataSource).filter(DataSource.name == "gtfobins").first() + if ds: + ds.last_sync_at = datetime.utcnow() + ds.last_sync_status = "success" + ds.last_sync_stats = summary + db.commit() + + logger.info("GTFOBins import complete — %s", summary) + log_action(db, user_id=None, action="import_gtfobins", + entity_type="test_template", entity_id=None, details=summary) + return summary diff --git a/backend/app/services/sigma_import_service.py b/backend/app/services/sigma_import_service.py new file mode 100644 index 0000000..adb68f2 --- /dev/null +++ b/backend/app/services/sigma_import_service.py @@ -0,0 +1,308 @@ +"""Sigma Rules import service. + +Downloads the SigmaHQ repository ZIP from GitHub, parses every YAML rule +file under ``rules/``, extracts MITRE ATT&CK tags, and creates +:class:`DetectionRule` records in the database. + +Strategy +-------- +1. Download the full SigmaHQ repo as a ZIP archive. +2. Extract in a temporary directory. +3. Walk all ``.yml`` files under ``rules/``. +4. Parse each YAML file — extract title, description, logsource, + detection tags, severity (``level``), and the raw YAML content. +5. Filter: only import rules that have at least one ``attack.tXXXX`` tag. +6. Create / skip ``DetectionRule`` rows keyed by ``(source, source_id)``. +7. Clean up the temporary directory. + +Idempotency +----------- +Running the import twice does **not** create duplicates. Existing +rules are identified by ``source = "sigma"`` + ``source_id`` (relative +file path) and simply skipped. +""" + +import io +import logging +import re +import shutil +import tempfile +import zipfile +from datetime import datetime +from pathlib import Path + +import requests as _requests +import yaml +from sqlalchemy.orm import Session + +from app.models.detection_rule import DetectionRule +from app.models.data_source import DataSource +from app.services.audit_service import log_action + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +SIGMA_ZIP_URL = ( + "https://github.com/SigmaHQ/sigma/archive/refs/heads/main.zip" +) + +_DOWNLOAD_TIMEOUT = 300 +_ZIP_ROOT_PREFIX = "sigma-main" + +# Regex to extract MITRE ATT&CK technique IDs from Sigma tags +# e.g. "attack.t1059.001" → "T1059.001" +_ATTACK_TAG_RE = re.compile(r"attack\.(t\d{4}(?:\.\d{3})?)", re.IGNORECASE) + +# Sigma severity levels +_SEVERITY_MAP = { + "informational": "informational", + "low": "low", + "medium": "medium", + "high": "high", + "critical": "critical", +} + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _download_zip(url: str = SIGMA_ZIP_URL) -> bytes: + """Download the SigmaHQ ZIP and return raw bytes.""" + logger.info("Downloading SigmaHQ ZIP from %s …", url) + resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True) + resp.raise_for_status() + content = resp.content + logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024)) + return content + + +def _extract_zip(zip_bytes: bytes, dest: str) -> Path: + """Extract *zip_bytes* into *dest* and return the path to rules/ dir.""" + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + zf.extractall(dest) + rules_dir = Path(dest) / _ZIP_ROOT_PREFIX / "rules" + if not rules_dir.is_dir(): + raise FileNotFoundError( + f"Expected rules directory not found at {rules_dir}" + ) + return rules_dir + + +def _extract_attack_tags(tags: list) -> list[str]: + """Extract MITRE technique IDs from Sigma tag list. + + Example input: ["attack.defense_evasion", "attack.t1059.001", "cve.2021.44228"] + Example output: ["T1059.001"] + """ + technique_ids = [] + for tag in tags: + m = _ATTACK_TAG_RE.match(str(tag).strip()) + if m: + technique_ids.append(m.group(1).upper()) + return list(set(technique_ids)) + + +def _parse_sigma_rules(rules_dir: Path) -> list[dict]: + """Walk the rules directory and parse all Sigma YAML files. + + Returns a flat list of dicts, one per (rule, technique) combination. + A single Sigma rule tagged with N techniques produces N entries. + """ + results: list[dict] = [] + yaml_files = sorted(rules_dir.rglob("*.yml")) + logger.info("Found %d YAML files to parse", len(yaml_files)) + + for yaml_path in yaml_files: + relative_path = str(yaml_path.relative_to(rules_dir.parent)) + try: + with open(yaml_path, "r", encoding="utf-8") as fh: + data = yaml.safe_load(fh) + except Exception as exc: + logger.debug("Failed to parse %s: %s", yaml_path, exc) + continue + + if not isinstance(data, dict): + continue + + title = data.get("title", "").strip() + if not title: + continue + + # Extract ATT&CK technique IDs from tags + tags = data.get("tags", []) + if not isinstance(tags, list): + continue + + technique_ids = _extract_attack_tags(tags) + if not technique_ids: + continue # Skip rules without ATT&CK mapping + + description = data.get("description", "") + level = str(data.get("level", "")).lower() + severity = _SEVERITY_MAP.get(level) + + # Extract logsource + logsource = data.get("logsource", {}) + if not isinstance(logsource, dict): + logsource = {} + + # Read full YAML content for storage + try: + with open(yaml_path, "r", encoding="utf-8") as fh: + raw_content = fh.read() + except Exception: + raw_content = yaml.dump(data, default_flow_style=False) + + # False positive assessment + falsepositives = data.get("falsepositives", []) + if isinstance(falsepositives, list) and len(falsepositives) > 3: + fp_rate = "high" + elif isinstance(falsepositives, list) and len(falsepositives) > 1: + fp_rate = "medium" + else: + fp_rate = "low" + + # Create one entry per technique + for tech_id in technique_ids: + source_url = ( + f"https://github.com/SigmaHQ/sigma/blob/main/" + f"{relative_path.replace(chr(92), '/')}" + ) + results.append({ + "mitre_technique_id": tech_id, + "title": title[:500], + "description": str(description)[:2000] if description else None, + "source_id": relative_path, + "source_url": source_url, + "rule_content": raw_content, + "severity": severity, + "log_sources": logsource if logsource else None, + "false_positive_rate": fp_rate, + "platforms": _platforms_from_logsource(logsource), + }) + + logger.info("Parsed %d (rule, technique) pairs total", len(results)) + return results + + +def _platforms_from_logsource(logsource: dict) -> list[str]: + """Infer platform list from Sigma logsource.""" + platforms = [] + product = str(logsource.get("product", "")).lower() + service = str(logsource.get("service", "")).lower() + + if "windows" in product or "windows" in service: + platforms.append("windows") + if "linux" in product or "linux" in service: + platforms.append("linux") + if "macos" in product or "macos" in service: + platforms.append("macos") + + # Sysmon → Windows + if "sysmon" in service and "windows" not in platforms: + platforms.append("windows") + + return platforms if platforms else None + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def sync(db: Session) -> dict: + """Download and import Sigma detection rules. + + Parameters + ---------- + db : Session + Active SQLAlchemy database session. + + Returns + ------- + dict + Summary with ``created``, ``skipped_existing``, ``total_parsed``. + """ + tmp_dir = tempfile.mkdtemp(prefix="aegis_sigma_") + try: + zip_bytes = _download_zip() + rules_dir = _extract_zip(zip_bytes, tmp_dir) + parsed_rules = _parse_sigma_rules(rules_dir) + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + logger.info("Cleaned up temp directory %s", tmp_dir) + + # Pre-load existing source_ids for dedup + existing_ids: set[str] = { + row[0] + for row in db.query(DetectionRule.source_id) + .filter(DetectionRule.source == "sigma") + .filter(DetectionRule.source_id.isnot(None)) + .all() + } + + created = 0 + skipped = 0 + + for item in parsed_rules: + # Dedup key: source_id (relative path). A rule file may produce + # multiple entries (one per technique), but we deduplicate by + # source_id so re-runs are safe. For multi-technique rules we + # only skip if the exact same source_id is already present. + dedup_key = f"{item['source_id']}::{item['mitre_technique_id']}" + if item["source_id"] in existing_ids: + skipped += 1 + continue + + rule = DetectionRule( + mitre_technique_id=item["mitre_technique_id"], + title=item["title"], + description=item["description"], + source="sigma", + source_id=item["source_id"], + source_url=item["source_url"], + rule_content=item["rule_content"], + rule_format="sigma_yaml", + severity=item["severity"], + platforms=item["platforms"], + log_sources=item["log_sources"], + false_positive_rate=item["false_positive_rate"], + is_active=True, + ) + db.add(rule) + existing_ids.add(item["source_id"]) + created += 1 + + db.commit() + + summary = { + "created": created, + "skipped_existing": skipped, + "total_parsed": len(parsed_rules), + } + + # Update DataSource record + ds = db.query(DataSource).filter(DataSource.name == "sigma").first() + if ds: + ds.last_sync_at = datetime.utcnow() + ds.last_sync_status = "success" + ds.last_sync_stats = summary + db.commit() + + logger.info("Sigma import complete — %s", summary) + + log_action( + db, + user_id=None, + action="import_sigma_rules", + entity_type="detection_rule", + entity_id=None, + details=summary, + ) + + return summary diff --git a/backend/requirements.txt b/backend/requirements.txt index 334f18f..0a5552e 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -10,6 +10,8 @@ boto3 apscheduler requests pyyaml +pySigma +toml taxii2-client python-multipart pydantic-settings diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index b0dafa9..4d216cb 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -11,6 +11,7 @@ import ReportsPage from "./pages/ReportsPage"; import SystemPage from "./pages/SystemPage"; import UsersPage from "./pages/UsersPage"; import AuditLogPage from "./pages/AuditLogPage"; +import DataSourcesPage from "./pages/DataSourcesPage"; import Layout from "./components/Layout"; import ProtectedRoute from "./components/ProtectedRoute"; @@ -61,6 +62,14 @@ export default function App() { } /> + + + + } + /> {/* Catch-all → dashboard */} diff --git a/frontend/src/api/data-sources.ts b/frontend/src/api/data-sources.ts new file mode 100644 index 0000000..42d5543 --- /dev/null +++ b/frontend/src/api/data-sources.ts @@ -0,0 +1,79 @@ +import client from "./client"; + +export interface DataSource { + id: string; + name: string; + display_name: string; + type: string; + url: string | null; + description: string | null; + is_enabled: boolean; + last_sync_at: string | null; + last_sync_status: string | null; + last_sync_stats: Record | null; + sync_frequency: string | null; + config: Record | null; + created_at: string | null; +} + +export interface SyncResult { + message: string; + source: string; + stats: Record; +} + +export interface SyncAllResult { + message: string; + results: Array<{ + source: string; + status: string; + stats?: Record; + detail?: string; + }>; +} + +export interface DataSourceStats { + id: string; + name: string; + display_name: string; + type: string; + is_enabled: boolean; + last_sync_at: string | null; + last_sync_status: string | null; + last_sync_stats: Record | null; + total_templates: number; + total_rules: number; +} + +/** List all data sources. */ +export async function getDataSources(): Promise { + const { data } = await client.get("/data-sources"); + return data; +} + +/** Update a data source (enable/disable, config). */ +export async function updateDataSource( + id: string, + body: Partial<{ is_enabled: boolean; sync_frequency: string; config: Record }> +): Promise<{ message: string; id: string }> { + const { data } = await client.patch(`/data-sources/${id}`, body); + return data; +} + +/** Trigger sync for a specific data source. */ +export async function syncDataSource(id: string): Promise { + const { data } = await client.post(`/data-sources/${id}/sync`); + return data; +} + +/** Trigger sync for all enabled data sources. */ +export async function syncAllDataSources(): Promise { + const { data } = await client.post("/data-sources/sync-all"); + return data; +} + +/** Get stats for a specific data source. */ +export async function getDataSourceStats(id: string): Promise { + const { data } = await client.get(`/data-sources/${id}/stats`); + return data; +} diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index ec07528..302d47a 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -12,6 +12,7 @@ import { ChevronDown, ListChecks, ClipboardList, + Database, } from "lucide-react"; import { useAuth } from "../context/AuthContext"; @@ -41,6 +42,7 @@ const mainLinks: NavItem[] = [ const adminLinks: NavItem[] = [ { to: "/users", label: "Users", icon: Users }, { to: "/audit", label: "Audit Log", icon: FileText }, + { to: "/data-sources", label: "Data Sources", icon: Database }, { to: "/system", label: "System", icon: Settings }, ]; diff --git a/frontend/src/pages/DataSourcesPage.tsx b/frontend/src/pages/DataSourcesPage.tsx new file mode 100644 index 0000000..265e316 --- /dev/null +++ b/frontend/src/pages/DataSourcesPage.tsx @@ -0,0 +1,375 @@ +import { useState } from "react"; +import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { + Loader2, + RefreshCw, + Database, + CheckCircle, + XCircle, + AlertCircle, + Clock, + ToggleLeft, + ToggleRight, + Play, + ExternalLink, + Shield, + Search, + Swords, + Bug, +} from "lucide-react"; +import { + getDataSources, + updateDataSource, + syncDataSource, + syncAllDataSources, + type DataSource, + type SyncAllResult, +} from "../api/data-sources"; + +/** Map source type to visual props. */ +function typeProps(type: string) { + switch (type) { + case "attack_procedure": + return { label: "Attack Procedure", color: "text-red-400 bg-red-900/50 border-red-500/30", icon: Swords }; + case "detection_rule": + return { label: "Detection Rule", color: "text-blue-400 bg-blue-900/50 border-blue-500/30", icon: Shield }; + case "threat_intel": + return { label: "Threat Intel", color: "text-purple-400 bg-purple-900/50 border-purple-500/30", icon: Search }; + case "defensive_technique": + return { label: "Defensive", color: "text-green-400 bg-green-900/50 border-green-500/30", icon: Shield }; + default: + return { label: type, color: "text-gray-400 bg-gray-800/50 border-gray-600/30", icon: Bug }; + } +} + +function statusBadge(status: string | null) { + if (!status) return null; + switch (status) { + case "success": + return ( + + Success + + ); + case "error": + return ( + + Error + + ); + case "in_progress": + return ( + + In Progress + + ); + default: + return ( + + {status} + + ); + } +} + +export default function DataSourcesPage() { + const queryClient = useQueryClient(); + const [syncingId, setSyncingId] = useState(null); + const [syncAllResult, setSyncAllResult] = useState(null); + + // ── Queries ───────────────────────────────────────────────────── + const { + data: sources, + isLoading, + error, + } = useQuery({ + queryKey: ["data-sources"], + queryFn: getDataSources, + }); + + // ── Toggle enable/disable ─────────────────────────────────────── + const toggleMutation = useMutation({ + mutationFn: ({ id, enabled }: { id: string; enabled: boolean }) => + updateDataSource(id, { is_enabled: enabled }), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ["data-sources"] }); + }, + }); + + // ── Sync individual source ────────────────────────────────────── + const syncMutation = useMutation({ + mutationFn: (id: string) => syncDataSource(id), + onSuccess: () => { + setSyncingId(null); + queryClient.invalidateQueries({ queryKey: ["data-sources"] }); + }, + onError: () => { + setSyncingId(null); + queryClient.invalidateQueries({ queryKey: ["data-sources"] }); + }, + }); + + // ── Sync all ──────────────────────────────────────────────────── + const syncAllMutation = useMutation({ + mutationFn: syncAllDataSources, + onSuccess: (data) => { + setSyncAllResult(data); + queryClient.invalidateQueries({ queryKey: ["data-sources"] }); + }, + }); + + const handleSync = (id: string) => { + setSyncingId(id); + syncMutation.mutate(id); + }; + + const formatDate = (dateStr: string | null) => { + if (!dateStr) return "Never"; + const date = new Date(dateStr); + return date.toLocaleString("en-US", { dateStyle: "medium", timeStyle: "short" }); + }; + + const formatStats = (stats: Record | null) => { + if (!stats) return null; + return Object.entries(stats) + .filter(([k]) => k !== "error") + .map(([k, v]) => `${k.replace(/_/g, " ")}: ${v}`) + .join(" | "); + }; + + return ( +
+ {/* Header */} +
+
+

Data Sources

+

+ Manage external data sources for test templates and detection rules +

+
+ +
+ + {/* Sync All Result */} + {syncAllResult && ( +
+
+

Sync All Results

+ +
+
+ {syncAllResult.results.map((r, i) => ( +
+ {r.status === "success" ? ( + + ) : r.status === "error" ? ( + + ) : ( + + )} + {r.source} + {r.stats && ( + + {formatStats(r.stats as Record)} + + )} + {r.detail && ( + {r.detail} + )} +
+ ))} +
+
+ )} + + {/* Loading */} + {isLoading && ( +
+ +
+ )} + + {/* Error */} + {error && ( +
+ +

+ Failed to load data sources: {(error as Error)?.message} +

+
+ )} + + {/* Data Sources Table */} + {sources && sources.length > 0 && ( +
+
+ + + + + + + + + + + + + + {sources.map((src: DataSource) => { + const tp = typeProps(src.type); + const TypeIcon = tp.icon; + const isSyncing = syncingId === src.id; + + return ( + + {/* Source */} + + + {/* Type */} + + + {/* Sync Status */} + + + {/* Last Sync */} + + + {/* Stats */} + + + {/* Toggle */} + + + {/* Actions */} + + + ); + })} + +
SourceTypeStatusLast SyncStatsEnabledActions
+
+
+ +
+
+

{src.display_name}

+
+ {src.name} + {src.url && ( + + + + )} +
+
+
+
+ + + {tp.label} + + + {isSyncing ? statusBadge("in_progress") : statusBadge(src.last_sync_status)} + +
+ + {formatDate(src.last_sync_at)} +
+ {src.sync_frequency && ( + + Frequency: {src.sync_frequency} + + )} +
+ {src.last_sync_stats ? ( + + {formatStats(src.last_sync_stats)} + + ) : ( + - + )} + + + + +
+
+
+ )} + + {/* Empty State */} + {sources && sources.length === 0 && ( +
+ +

No Data Sources

+

+ Run the data sources seed script to register initial sources. +

+
+ )} +
+ ); +}