"""Heatmap service — ATT&CK Navigator-compatible layer generation. Builds multiple layer types (coverage, threat actor, detection rules, campaign) as plain dictionaries ready for JSON serialisation. This module is framework-agnostic: no FastAPI imports, no HTTPException, no ``db.commit()``. """ # Enable future language features for compatibility from __future__ import annotations # Import json import json # Import Callable from collections.abc from collections.abc import Callable # Import func, or_ from sqlalchemy from sqlalchemy import func, or_ # Import Query, Session from sqlalchemy.orm from sqlalchemy.orm import Query, Session # Import BusinessRuleViolation, EntityNotFoundError from app.domain.errors from app.domain.errors import BusinessRuleViolation, EntityNotFoundError # Import Campaign, CampaignTest from app.models.campaign from app.models.campaign import Campaign, CampaignTest # Import DetectionRule from app.models.detection_rule from app.models.detection_rule import DetectionRule # Import TechniqueStatus, TestState from app.models.enums from app.models.enums import TechniqueStatus, TestState # Import Technique from app.models.technique from app.models.technique import Technique # Import Test from app.models.test from app.models.test import Test # Import TestDetectionResult from app.models.test_detection_result from app.models.test_detection_result import TestDetectionResult # Import ThreatActor, ThreatActorTechnique from app.models.threat_actor from app.models.threat_actor import ThreatActor, ThreatActorTechnique # Import escape_like from app.utils from app.utils import escape_like # ── Constants ───────────────────────────────────────────────────────── ATTACK_VERSION = "15" # Assign NAVIGATOR_VERSION = "5.0" NAVIGATOR_VERSION = "5.0" # Assign LAYER_VERSION = "4.5" LAYER_VERSION = "4.5" # Assign DOMAIN = "enterprise-attack" DOMAIN = "enterprise-attack" # Assign STATUS_SCORE_MAP = { STATUS_SCORE_MAP: dict[TechniqueStatus, int] = { TechniqueStatus.validated: 100, TechniqueStatus.partial: 60, TechniqueStatus.in_progress: 30, TechniqueStatus.not_covered: 10, TechniqueStatus.not_evaluated: 0, TechniqueStatus.review_required: 10, } # Assign TEST_STATE_SCORE = { TEST_STATE_SCORE: dict[TestState, int] = { TestState.validated: 100, TestState.in_review: 70, TestState.blue_evaluating: 50, TestState.red_executing: 30, TestState.draft: 10, TestState.rejected: 5, } # ── Internal helpers ────────────────────────────────────────────────── def _score_to_color(score: int) -> str: """Map a 0-100 score to a red-yellow-green colour hex. Args: score (int): Coverage score between 0 and 100 inclusive. Returns: str: Hex colour string representing the score tier. """ # Check: score <= 0 if score <= 0: # Return "#d3d3d3" return "#d3d3d3" # Check: score <= 25 if score <= 25: # Return "#ff6666" return "#ff6666" # Check: score <= 50 if score <= 50: # Return "#ff9933" return "#ff9933" # Check: score <= 75 if score <= 75: # Return "#ffff66" return "#ffff66" # Return "#66ff66" return "#66ff66" # Define function _build_layer_skeleton def _build_layer_skeleton( # Entry: name name: str, # Entry: description description: str, # Entry: gradient_colors gradient_colors: list[str] | None = None, ) -> dict: """Return a base layer dict compatible with ATT&CK Navigator. Args: name (str): Human-readable name for the layer. description (str): Description text embedded in the layer metadata. gradient_colors (list[str] | None): Optional list of hex colour stops for the gradient; defaults to red-yellow-green if omitted. Returns: dict: Skeleton layer dictionary with versions, domain, and empty techniques list. """ # Return { return { # Literal argument value "name": name, # Literal argument value "versions": { # Literal argument value "attack": ATTACK_VERSION, # Literal argument value "navigator": NAVIGATOR_VERSION, # Literal argument value "layer": LAYER_VERSION, }, # Literal argument value "domain": DOMAIN, # Literal argument value "description": description, # Literal argument value "filters": {"platforms": ["windows", "linux", "macos"]}, # Literal argument value "gradient": { # Literal argument value "colors": gradient_colors or ["#ff6666", "#ffff66", "#66ff66"], # Literal argument value "minValue": 0, # Literal argument value "maxValue": 100, }, # Literal argument value "techniques": [], } # Define function _apply_filters def _apply_filters( # Entry: query query: Query, # type: ignore[type-arg] # Entry: model model: type, # Entry: platforms platforms: list[str] | None = None, # Entry: tactics tactics: list[str] | None = None, ) -> Query: # type: ignore[type-arg] """Apply common platform and tactic filters to a technique query. Args: query (Query): Base SQLAlchemy query targeting a technique-like model. model (type): The SQLAlchemy model class that owns ``platforms`` and ``tactic`` columns. platforms (list[str] | None): Optional list of platform names to filter by (OR-joined). tactics (list[str] | None): Optional list of tactic strings to filter by (OR-joined, case-insensitive substring match). Returns: Query: The query with platform and tactic filters applied. """ # Check: platforms if platforms: # Assign platform_filters = [ platform_filters = [ model.platforms.op("@>")(json.dumps([p])) for p in platforms ] # Assign query = query.filter(or_(*platform_filters)) query = query.filter(or_(*platform_filters)) # Check: tactics if tactics: # Assign tactic_filters = [ tactic_filters = [ model.tactic.ilike(f"%{escape_like(t)}%") for t in tactics ] # Assign query = query.filter(or_(*tactic_filters)) query = query.filter(or_(*tactic_filters)) # Return query return query # Define function _format_tactic def _format_tactic(tactic_str: str | None) -> str: """Normalize tactic string to ATT&CK Navigator format (kebab-case). Args: tactic_str (str | None): Raw tactic string, possibly comma-separated or mixed-case. Returns: str: First tactic value lowercased and trimmed, or empty string if the input is falsy. """ # Check: not tactic_str if not tactic_str: # Return "" return "" # Return tactic_str.split(",")[0].strip().lower() return tactic_str.split(",")[0].strip().lower() # Define function _parse_csv def _parse_csv(value: str | None) -> list[str] | None: """Split a comma-separated string into a trimmed list, or ``None``. Args: value (str | None): Comma-separated string to split, or ``None``. Returns: list[str] | None: Non-empty trimmed tokens, or ``None`` if the input is falsy or produces no tokens. """ # Check: not value if not value: # Return None return None # Return [v.strip() for v in value.split(",") if v.strip()] return [v.strip() for v in value.split(",") if v.strip()] # ── Public API ──────────────────────────────────────────────────────── def build_coverage_layer( # Entry: db db: Session, *, # Entry: platforms platforms: str | None = None, # Entry: tactics tactics: str | None = None, # Entry: min_score min_score: int = 0, ) -> dict: """Coverage layer -- score based on ``status_global`` of each technique. Args: db (Session): Active SQLAlchemy database session. platforms (str | None): Optional comma-separated platform names to filter techniques. tactics (str | None): Optional comma-separated tactic names to filter techniques. min_score (int): Minimum score threshold; techniques below this are omitted from the layer. Returns: dict: ATT&CK Navigator-compatible layer dictionary. """ # Assign layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated b... layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated by Aegis") # Assign query = _apply_filters( query = _apply_filters( db.query(Technique), Technique, _parse_csv(platforms), _parse_csv(tactics), ) # Assign techniques = query.all() techniques = query.all() # Bulk-fetch test counts and rule counts to avoid N+1 tech_ids = [t.id for t in techniques] # Assign mitre_ids = [t.mitre_id for t in techniques] mitre_ids = [t.mitre_id for t in techniques] # Assign test_counts = dict( test_counts = dict( db.query(Test.technique_id, func.count(Test.id)) # Chain .filter() call .filter(Test.technique_id.in_(tech_ids), Test.state == TestState.validated) # Chain .group_by() call .group_by(Test.technique_id) # Chain .all() call .all() ) if tech_ids else {} # Assign rule_counts = dict( rule_counts = dict( db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id)) # Chain .filter() call .filter(DetectionRule.mitre_technique_id.in_(mitre_ids)) # Chain .group_by() call .group_by(DetectionRule.mitre_technique_id) # Chain .all() call .all() ) if mitre_ids else {} # Iterate over techniques for tech in techniques: # Assign score = STATUS_SCORE_MAP.get(tech.status_global, 0) score = STATUS_SCORE_MAP.get(tech.status_global, 0) # Check: score < min_score if score < min_score: # Skip to the next loop iteration continue # Assign tc = test_counts.get(tech.id, 0) tc = test_counts.get(tech.id, 0) # Assign rc = rule_counts.get(tech.mitre_id, 0) rc = rule_counts.get(tech.mitre_id, 0) # Assign metadata = [ metadata = [ {"name": "tests_count", "value": str(tc)}, {"name": "detection_rules", "value": str(rc)}, ] # Check: tech.last_review_date if tech.last_review_date: # Call metadata.append() metadata.append( {"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")} ) # Assign comment_parts = [ comment_parts = [ f"Status: {tech.status_global.value}", f"{tc} tests validated", f"{rc} detection rules", ] # layer["techniques"].append({ layer["techniques"].append({ # Literal argument value "techniqueID": tech.mitre_id, # Literal argument value "tactic": _format_tactic(tech.tactic), # Literal argument value "color": _score_to_color(score), # Literal argument value "score": score, # Literal argument value "comment": " - ".join(comment_parts), # Literal argument value "enabled": True, # Literal argument value "metadata": metadata, }) # Return layer return layer # Define function build_threat_actor_layer def build_threat_actor_layer( # Entry: db db: Session, # Entry: actor_id actor_id: str, *, # Entry: platforms platforms: str | None = None, # Entry: tactics tactics: str | None = None, # Entry: min_score min_score: int = 0, ) -> dict: """Threat actor layer -- techniques used by an actor with coverage colour. Raises :class:`EntityNotFoundError` if the actor does not exist. Args: db (Session): Active SQLAlchemy database session. actor_id (str): UUID string identifying the threat actor. platforms (str | None): Optional comma-separated platform names to filter techniques. tactics (str | None): Optional comma-separated tactic names to filter techniques. min_score (int): Minimum score threshold for actor techniques. Returns: dict: ATT&CK Navigator-compatible layer dictionary coloured by coverage status for the specified actor. """ # Assign actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first() actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first() # Check: not actor if not actor: # Raise EntityNotFoundError raise EntityNotFoundError("ThreatActor", actor_id) # Assign layer = _build_layer_skeleton( layer = _build_layer_skeleton( f"Threat Actor: {actor.name}", f"Techniques used by {actor.name} with coverage overlay", # Keyword argument: gradient_colors gradient_colors=["#808080", "#ff6666", "#66ff66"], ) # Assign actor_technique_ids = { actor_technique_ids = { row.technique_id for row in db.query(ThreatActorTechnique.technique_id) # Chain .filter() call .filter(ThreatActorTechnique.threat_actor_id == actor.id) # Chain .all() call .all() } # Check: not actor_technique_ids if not actor_technique_ids: # Return layer return layer # Assign query = _apply_filters( query = _apply_filters( db.query(Technique), Technique, _parse_csv(platforms), _parse_csv(tactics), ) # Assign techniques = query.all() techniques = query.all() # Bulk-fetch metadata for actor techniques only test_counts = dict( db.query(Test.technique_id, func.count(Test.id)) # Chain .filter() call .filter(Test.technique_id.in_(actor_technique_ids), Test.state == TestState.validated) # Chain .group_by() call .group_by(Test.technique_id) # Chain .all() call .all() ) # Assign actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids] actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids] # Assign rule_counts = dict( rule_counts = dict( db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id)) # Chain .filter() call .filter(DetectionRule.mitre_technique_id.in_(actor_mitre_ids)) # Chain .group_by() call .group_by(DetectionRule.mitre_technique_id) # Chain .all() call .all() ) if actor_mitre_ids else {} # Iterate over techniques for tech in techniques: # Assign is_actor_technique = tech.id in actor_technique_ids is_actor_technique = tech.id in actor_technique_ids # Assign score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique e... score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique else 0 # Check: is_actor_technique and score < min_score if is_actor_technique and score < min_score: # Skip to the next loop iteration continue # Only include techniques actually used by this actor — skip the rest # so that tactics with no actor techniques are hidden in the matrix. if not is_actor_technique: continue tc = test_counts.get(tech.id, 0) rc = rule_counts.get(tech.mitre_id, 0) metadata = [ {"name": "tests_count", "value": str(tc)}, {"name": "detection_rules", "value": str(rc)}, ] if tech.last_review_date: metadata.append( {"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")} ) layer["techniques"].append({ "techniqueID": tech.mitre_id, "tactic": _format_tactic(tech.tactic), "color": _score_to_color(score), "score": score, "comment": f"Used by {actor.name} - Coverage: {tech.status_global.value}", "enabled": True, "metadata": metadata, }) # Return layer return layer # Define function build_detection_rules_layer def build_detection_rules_layer( # Entry: db db: Session, *, # Entry: platforms platforms: str | None = None, # Entry: tactics tactics: str | None = None, # Entry: min_score min_score: int = 0, ) -> dict: """Detection rules layer -- score based on absolute rule count per technique. Scoring uses fixed thresholds so the colour reflects real coverage regardless of what other techniques have: 0 rules → gray (score 0) 1 rule → red (score 25) 2 rules → orange (score 50) 3 rules → yellow (score 75) 4+ rules → green (score 100) """ layer = _build_layer_skeleton( # Literal argument value "Detection Rules Coverage", "Number of active detection rules per technique", ) # Assign query = _apply_filters( query = _apply_filters( db.query(Technique), Technique, _parse_csv(platforms), _parse_csv(tactics), ) # Assign techniques = query.all() techniques = query.all() # Assign rule_counts = dict( rule_counts = dict( db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id)) # Chain .filter() call .filter(DetectionRule.is_active == True) # noqa: E712 # Chain .group_by() call .group_by(DetectionRule.mitre_technique_id) # Chain .all() call .all() ) # Assign evaluated_counts = dict( evaluated_counts = dict( db.query(DetectionRule.mitre_technique_id, func.count(TestDetectionResult.id)) # Chain .join() call .join(TestDetectionResult, TestDetectionResult.detection_rule_id == DetectionRule.id) # Chain .filter() call .filter(TestDetectionResult.triggered.isnot(None)) # Chain .group_by() call .group_by(DetectionRule.mitre_technique_id) # Chain .all() call .all() ) # 4 rules = full coverage (100). Each rule adds 25 points. rules_for_full_coverage = 4 for tech in techniques: # Assign total_rules = rule_counts.get(tech.mitre_id, 0) total_rules = rule_counts.get(tech.mitre_id, 0) # Assign evaluated_rules = evaluated_counts.get(tech.mitre_id, 0) evaluated_rules = evaluated_counts.get(tech.mitre_id, 0) score = min(int((total_rules / rules_for_full_coverage) * 100), 100) # Check: score < min_score if score < min_score: # Skip to the next loop iteration continue rule_word = "rule" if total_rules == 1 else "rules" eval_note = f", {evaluated_rules} evaluated" if evaluated_rules > 0 else "" comment = f"{total_rules} active {rule_word}{eval_note}" layer["techniques"].append({ # Literal argument value "techniqueID": tech.mitre_id, # Literal argument value "tactic": _format_tactic(tech.tactic), # Literal argument value "color": _score_to_color(score), # Literal argument value "score": score, "comment": comment, "enabled": True, # Literal argument value "metadata": [ {"name": "total_rules", "value": str(total_rules)}, {"name": "evaluated_rules", "value": str(evaluated_rules)}, ], }) # Return layer return layer # Define function build_campaign_layer def build_campaign_layer( # Entry: db db: Session, # Entry: campaign_id campaign_id: str, *, # Entry: platforms platforms: str | None = None, # Entry: tactics tactics: str | None = None, # Entry: min_score min_score: int = 0, ) -> dict: """Campaign layer -- techniques in a campaign, coloured by test state. Raises :class:`EntityNotFoundError` if the campaign does not exist. Args: db (Session): Active SQLAlchemy database session. campaign_id (str): UUID string identifying the campaign. platforms (str | None): Optional comma-separated platform names to filter techniques. tactics (str | None): Optional comma-separated tactic names to filter techniques. min_score (int): Minimum score threshold for techniques in the layer. Returns: dict: ATT&CK Navigator-compatible layer dictionary where each technique colour reflects the best test state within the campaign. """ # Assign campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first() campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first() # Check: not campaign if not campaign: # Raise EntityNotFoundError raise EntityNotFoundError("Campaign", campaign_id) # Assign layer = _build_layer_skeleton( layer = _build_layer_skeleton( f"Campaign: {campaign.name}", f"Progress of campaign '{campaign.name}'", ) # Assign campaign_tests = ( campaign_tests = ( db.query(CampaignTest) # Chain .filter() call .filter(CampaignTest.campaign_id == campaign.id) # Chain .all() call .all() ) # Check: not campaign_tests if not campaign_tests: # Return layer return layer # Assign test_ids = [ct.test_id for ct in campaign_tests] test_ids = [ct.test_id for ct in campaign_tests] # Assign tests = db.query(Test).filter(Test.id.in_(test_ids)).all() tests = db.query(Test).filter(Test.id.in_(test_ids)).all() # Assign test_map = {t.id: t for t in tests} test_map = {t.id: t for t in tests} # Assign technique_ids = {t.technique_id for t in tests if t.technique_id} technique_ids = {t.technique_id for t in tests if t.technique_id} # Assign techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all() techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all() # Assign tech_map = {t.id: t for t in techniques} tech_map = {t.id: t for t in techniques} # Group tests by technique, keeping the best state score tech_scores: dict = {} # Iterate over campaign_tests for ct in campaign_tests: # Assign test = test_map.get(ct.test_id) test = test_map.get(ct.test_id) # Check: not test if not test: # Skip to the next loop iteration continue # Assign tech = tech_map.get(test.technique_id) tech = tech_map.get(test.technique_id) # Check: not tech if not tech: # Skip to the next loop iteration continue # Assign state_score = TEST_STATE_SCORE.get(test.state, 0) state_score = TEST_STATE_SCORE.get(test.state, 0) # Check: tech.mitre_id not in tech_scores if tech.mitre_id not in tech_scores: # Assign tech_scores[tech.mitre_id] = { tech_scores[tech.mitre_id] = { # Literal argument value "technique": tech, # Literal argument value "max_score": state_score, # Literal argument value "tests": [], } # Fallback: handle remaining cases else: # Assign tech_scores[tech.mitre_id]["max_score"] = max( tech_scores[tech.mitre_id]["max_score"] = max( tech_scores[tech.mitre_id]["max_score"], state_score, ) # tech_scores[tech.mitre_id]["tests"].append(test) tech_scores[tech.mitre_id]["tests"].append(test) # Assign platform_list = _parse_csv(platforms) platform_list = _parse_csv(platforms) # Assign tactic_list = _parse_csv(tactics) tactic_list = _parse_csv(tactics) # Iterate over tech_scores.items() for mitre_id, info in tech_scores.items(): # Assign tech = info["technique"] tech = info["technique"] # Assign score = info["max_score"] score = info["max_score"] # Check: platform_list if platform_list: # Assign tech_platforms = tech.platforms or [] tech_platforms = tech.platforms or [] # Check: not any(p in tech_platforms for p in platform_list) if not any(p in tech_platforms for p in platform_list): # Skip to the next loop iteration continue # Check: tactic_list if tactic_list: # Assign tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")] tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")] # Check: not any(t in tech_tactics for t in tactic_list) if not any(t in tech_tactics for t in tactic_list): # Skip to the next loop iteration continue # Check: score < min_score if score < min_score: # Skip to the next loop iteration continue # Assign test_states = [t.state.value for t in info["tests"]] test_states = [t.state.value for t in info["tests"]] # layer["techniques"].append({ layer["techniques"].append({ # Literal argument value "techniqueID": mitre_id, # Literal argument value "tactic": _format_tactic(tech.tactic), # Literal argument value "color": _score_to_color(score), # Literal argument value "score": score, # Literal argument value "comment": f"Campaign tests: {', '.join(test_states)}", # Literal argument value "enabled": True, # Literal argument value "metadata": [ {"name": "campaign_tests", "value": str(len(info["tests"]))}, {"name": "best_state", "value": max(test_states) if test_states else "none"}, ], }) # Return layer return layer # ── Layer registry (OCP-compliant dispatch) ────────────────────────── # # To add a new layer type: # 1. Write a builder function: ``def build_X_layer(db, *, platforms, tactics, min_score) -> dict`` # 2. Call ``register_layer("x", build_X_layer)`` (or ``register_layer("x", fn, requires_id=True)``) # 3. Optionally add a convenience endpoint in the router # # The ``/export-navigator?layer=x`` endpoint picks up new layers automatically. class _LayerRegistry: """Extensible registry that maps layer type names to builder functions.""" # Assign __slots__ = ("_simple", "_with_id") __slots__ = ("_simple", "_with_id") # Define function __init__ def __init__(self) -> None: # Assign self._simple = {} self._simple: dict[str, object] = {} # Assign self._with_id = {} self._with_id: dict[str, object] = {} # Define function register def register(self, name: str, builder: Callable[..., dict], *, requires_id: bool = False) -> None: """Register a builder function under *name*. Args: name (str): Unique layer type identifier. builder (Callable[..., dict]): Layer builder function. requires_id (bool): Whether the builder needs a positional ``layer_id`` argument. """ # Assign target = self._with_id if requires_id else self._simple target = self._with_id if requires_id else self._simple # Assign target[name] = builder target[name] = builder # Apply the @property decorator @property # Define function supported_types def supported_types(self) -> set[str]: """Return the set of all registered layer type names. Returns: set[str]: Union of simple and entity-bound layer type names. """ # Return set(self._simple) | set(self._with_id) return set(self._simple) | set(self._with_id) # Define function build def build( self, # Entry: db db: Session, # Entry: layer_type layer_type: str, *, # Entry: layer_id layer_id: str | None = None, # Entry: platforms platforms: str | None = None, # Entry: tactics tactics: str | None = None, # Entry: min_score min_score: int = 0, ) -> dict: """Dispatch to the registered builder for *layer_type*. Args: db (Session): Active SQLAlchemy database session. layer_type (str): Registered layer type name. layer_id (str | None): Entity UUID for entity-bound layer types. platforms (str | None): Optional comma-separated platform filter. tactics (str | None): Optional comma-separated tactic filter. min_score (int): Minimum score threshold. Returns: dict: ATT&CK Navigator-compatible layer dictionary. """ # Assign kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score) kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score) # Check: layer_type in self._simple if layer_type in self._simple: # Return self._simple[layer_type](db, **kwargs) return self._simple[layer_type](db, **kwargs) # Check: layer_type in self._with_id if layer_type in self._with_id: # Check: not layer_id if not layer_id: # Raise BusinessRuleViolation raise BusinessRuleViolation( f"layer_id is required for '{layer_type}' layer" ) # Return self._with_id[layer_type](db, layer_id, **kwargs) return self._with_id[layer_type](db, layer_id, **kwargs) # Raise BusinessRuleViolation raise BusinessRuleViolation(f"Unknown layer type: {layer_type}") # Assign LAYER_REGISTRY = _LayerRegistry() LAYER_REGISTRY = _LayerRegistry() # Call LAYER_REGISTRY.register() LAYER_REGISTRY.register("coverage", build_coverage_layer) # Call LAYER_REGISTRY.register() LAYER_REGISTRY.register("detection-rules", build_detection_rules_layer) # Call LAYER_REGISTRY.register() LAYER_REGISTRY.register("threat-actor", build_threat_actor_layer, requires_id=True) # Call LAYER_REGISTRY.register() LAYER_REGISTRY.register("campaign", build_campaign_layer, requires_id=True) # Assign SUPPORTED_LAYER_TYPES = LAYER_REGISTRY.supported_types # snapshot of built-in types SUPPORTED_LAYER_TYPES = LAYER_REGISTRY.supported_types # snapshot of built-in types # Define function register_layer def register_layer(name: str, builder: Callable[..., dict], *, requires_id: bool = False) -> None: """Register a new heatmap layer type at import time. Args: name (str): Unique identifier for the layer type used in API requests. builder (Callable[..., dict]): Function that builds the layer dict; must accept ``(db, *, platforms, tactics, min_score)`` and optionally a positional ``layer_id`` when ``requires_id`` is ``True``. requires_id (bool): Set to ``True`` when the builder needs a ``layer_id`` argument (e.g. threat-actor, campaign layers). """ # Call LAYER_REGISTRY.register() LAYER_REGISTRY.register(name, builder, requires_id=requires_id) # Define function build_navigator_export def build_navigator_export( # Entry: db db: Session, # Entry: layer_type layer_type: str, *, # Entry: layer_id layer_id: str | None = None, # Entry: platforms platforms: str | None = None, # Entry: tactics tactics: str | None = None, # Entry: min_score min_score: int = 0, ) -> dict: """Build a heatmap layer dict by type name. Raises :class:`BusinessRuleViolation` for unknown layer types or missing ``layer_id``. Raises :class:`EntityNotFoundError` when an entity-bound layer (threat-actor, campaign) references a non-existent record. Args: db (Session): Active SQLAlchemy database session. layer_type (str): Registered layer type name (e.g. ``"coverage"``, ``"threat-actor"``). layer_id (str | None): Entity UUID required for entity-bound layer types such as ``"threat-actor"`` and ``"campaign"``. platforms (str | None): Optional comma-separated platform filter. tactics (str | None): Optional comma-separated tactic filter. min_score (int): Minimum score; techniques below this are excluded. Returns: dict: ATT&CK Navigator-compatible layer dictionary. """ # Return LAYER_REGISTRY.build( return LAYER_REGISTRY.build( db, layer_type, # Keyword argument: layer_id layer_id=layer_id, platforms=platforms, tactics=tactics, min_score=min_score, )