"""Heatmap service — ATT&CK Navigator-compatible layer generation. Builds multiple layer types (coverage, threat actor, detection rules, campaign) as plain dictionaries ready for JSON serialisation. This module is framework-agnostic: no FastAPI imports, no HTTPException, no ``db.commit()``. """ from __future__ import annotations import json from typing import Optional from sqlalchemy import func, or_ from sqlalchemy.orm import Session from app.models.campaign import Campaign, CampaignTest from app.models.detection_rule import DetectionRule from app.models.defensive_technique import DefensiveTechniqueMapping from app.models.enums import TechniqueStatus, TestState from app.models.technique import Technique from app.models.test import Test from app.models.test_detection_result import TestDetectionResult from app.models.threat_actor import ThreatActor, ThreatActorTechnique from app.utils import escape_like # ── Constants ───────────────────────────────────────────────────────── ATTACK_VERSION = "15" NAVIGATOR_VERSION = "5.0" LAYER_VERSION = "4.5" DOMAIN = "enterprise-attack" STATUS_SCORE_MAP: dict[TechniqueStatus, int] = { TechniqueStatus.validated: 100, TechniqueStatus.partial: 60, TechniqueStatus.in_progress: 30, TechniqueStatus.not_covered: 10, TechniqueStatus.not_evaluated: 0, TechniqueStatus.review_required: 10, } TEST_STATE_SCORE: dict[TestState, int] = { TestState.validated: 100, TestState.in_review: 70, TestState.blue_evaluating: 50, TestState.red_executing: 30, TestState.draft: 10, TestState.rejected: 5, } # ── Internal helpers ────────────────────────────────────────────────── def _score_to_color(score: int) -> str: """Map a 0-100 score to a red-yellow-green colour hex.""" if score <= 0: return "#d3d3d3" if score <= 25: return "#ff6666" if score <= 50: return "#ff9933" if score <= 75: return "#ffff66" return "#66ff66" def _build_layer_skeleton( name: str, description: str, gradient_colors: list[str] | None = None, ) -> dict: """Return a base layer dict compatible with ATT&CK Navigator.""" return { "name": name, "versions": { "attack": ATTACK_VERSION, "navigator": NAVIGATOR_VERSION, "layer": LAYER_VERSION, }, "domain": DOMAIN, "description": description, "filters": {"platforms": ["windows", "linux", "macos"]}, "gradient": { "colors": gradient_colors or ["#ff6666", "#ffff66", "#66ff66"], "minValue": 0, "maxValue": 100, }, "techniques": [], } def _apply_filters( query, model, platforms: list[str] | None = None, tactics: list[str] | None = None, ): """Apply common platform and tactic filters to a technique query.""" if platforms: platform_filters = [ model.platforms.op("@>")(json.dumps([p])) for p in platforms ] query = query.filter(or_(*platform_filters)) if tactics: tactic_filters = [ model.tactic.ilike(f"%{escape_like(t)}%") for t in tactics ] query = query.filter(or_(*tactic_filters)) return query def _format_tactic(tactic_str: str | None) -> str: """Normalize tactic string to ATT&CK Navigator format (kebab-case).""" if not tactic_str: return "" return tactic_str.split(",")[0].strip().lower() def _parse_csv(value: str | None) -> list[str] | None: """Split a comma-separated string into a trimmed list, or ``None``.""" if not value: return None return [v.strip() for v in value.split(",") if v.strip()] # ── Public API ──────────────────────────────────────────────────────── def build_coverage_layer( db: Session, *, platforms: str | None = None, tactics: str | None = None, min_score: int = 0, ) -> dict: """Coverage layer -- score based on ``status_global`` of each technique.""" layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated by Aegis") query = _apply_filters( db.query(Technique), Technique, _parse_csv(platforms), _parse_csv(tactics), ) techniques = query.all() # Bulk-fetch test counts and rule counts to avoid N+1 tech_ids = [t.id for t in techniques] mitre_ids = [t.mitre_id for t in techniques] test_counts = dict( db.query(Test.technique_id, func.count(Test.id)) .filter(Test.technique_id.in_(tech_ids), Test.state == TestState.validated) .group_by(Test.technique_id) .all() ) if tech_ids else {} rule_counts = dict( db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id)) .filter(DetectionRule.mitre_technique_id.in_(mitre_ids)) .group_by(DetectionRule.mitre_technique_id) .all() ) if mitre_ids else {} for tech in techniques: score = STATUS_SCORE_MAP.get(tech.status_global, 0) if score < min_score: continue tc = test_counts.get(tech.id, 0) rc = rule_counts.get(tech.mitre_id, 0) metadata = [ {"name": "tests_count", "value": str(tc)}, {"name": "detection_rules", "value": str(rc)}, ] if tech.last_review_date: metadata.append( {"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")} ) comment_parts = [ f"Status: {tech.status_global.value}", f"{tc} tests validated", f"{rc} detection rules", ] layer["techniques"].append({ "techniqueID": tech.mitre_id, "tactic": _format_tactic(tech.tactic), "color": _score_to_color(score), "score": score, "comment": " - ".join(comment_parts), "enabled": True, "metadata": metadata, }) return layer def build_threat_actor_layer( db: Session, actor_id: str, *, platforms: str | None = None, tactics: str | None = None, min_score: int = 0, ) -> dict | None: """Threat actor layer -- techniques used by an actor with coverage colour. Returns ``None`` if the actor does not exist. """ actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first() if not actor: return None layer = _build_layer_skeleton( f"Threat Actor: {actor.name}", f"Techniques used by {actor.name} with coverage overlay", gradient_colors=["#808080", "#ff6666", "#66ff66"], ) actor_technique_ids = { row.technique_id for row in db.query(ThreatActorTechnique.technique_id) .filter(ThreatActorTechnique.threat_actor_id == actor.id) .all() } if not actor_technique_ids: return layer query = _apply_filters( db.query(Technique), Technique, _parse_csv(platforms), _parse_csv(tactics), ) techniques = query.all() # Bulk-fetch metadata for actor techniques only test_counts = dict( db.query(Test.technique_id, func.count(Test.id)) .filter(Test.technique_id.in_(actor_technique_ids), Test.state == TestState.validated) .group_by(Test.technique_id) .all() ) actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids] rule_counts = dict( db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id)) .filter(DetectionRule.mitre_technique_id.in_(actor_mitre_ids)) .group_by(DetectionRule.mitre_technique_id) .all() ) if actor_mitre_ids else {} for tech in techniques: is_actor_technique = tech.id in actor_technique_ids score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique else 0 if is_actor_technique and score < min_score: continue if is_actor_technique: tc = test_counts.get(tech.id, 0) rc = rule_counts.get(tech.mitre_id, 0) metadata = [ {"name": "tests_count", "value": str(tc)}, {"name": "detection_rules", "value": str(rc)}, ] if tech.last_review_date: metadata.append( {"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")} ) layer["techniques"].append({ "techniqueID": tech.mitre_id, "tactic": _format_tactic(tech.tactic), "color": _score_to_color(score), "score": score, "comment": f"Used by {actor.name} - Coverage: {tech.status_global.value}", "enabled": True, "metadata": metadata, }) else: layer["techniques"].append({ "techniqueID": tech.mitre_id, "tactic": _format_tactic(tech.tactic), "color": "", "score": 0, "comment": "", "enabled": False, "metadata": [], }) return layer def build_detection_rules_layer( db: Session, *, platforms: str | None = None, tactics: str | None = None, min_score: int = 0, ) -> dict: """Detection rules layer -- score based on rule availability and evaluation ratio.""" layer = _build_layer_skeleton( "Detection Rules Coverage", "Coverage of detection rules per technique", ) query = _apply_filters( db.query(Technique), Technique, _parse_csv(platforms), _parse_csv(tactics), ) techniques = query.all() rule_counts = dict( db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id)) .filter(DetectionRule.is_active == True) # noqa: E712 .group_by(DetectionRule.mitre_technique_id) .all() ) max_rules = max(rule_counts.values()) if rule_counts else 1 evaluated_counts = dict( db.query(DetectionRule.mitre_technique_id, func.count(TestDetectionResult.id)) .join(TestDetectionResult, TestDetectionResult.detection_rule_id == DetectionRule.id) .filter(TestDetectionResult.triggered.isnot(None)) .group_by(DetectionRule.mitre_technique_id) .all() ) for tech in techniques: total_rules = rule_counts.get(tech.mitre_id, 0) evaluated_rules = evaluated_counts.get(tech.mitre_id, 0) if total_rules > 0: availability_score = min((total_rules / max_rules) * 50, 50) evaluation_score = (evaluated_rules / total_rules) * 50 score = int(min(availability_score + evaluation_score, 100)) else: score = 0 if score < min_score: continue layer["techniques"].append({ "techniqueID": tech.mitre_id, "tactic": _format_tactic(tech.tactic), "color": _score_to_color(score), "score": score, "comment": f"{total_rules} rules available, {evaluated_rules} evaluated", "enabled": True, "metadata": [ {"name": "total_rules", "value": str(total_rules)}, {"name": "evaluated_rules", "value": str(evaluated_rules)}, ], }) return layer def build_campaign_layer( db: Session, campaign_id: str, *, platforms: str | None = None, tactics: str | None = None, min_score: int = 0, ) -> dict | None: """Campaign layer -- techniques in a campaign, coloured by test state. Returns ``None`` if the campaign does not exist. """ campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first() if not campaign: return None layer = _build_layer_skeleton( f"Campaign: {campaign.name}", f"Progress of campaign '{campaign.name}'", ) campaign_tests = ( db.query(CampaignTest) .filter(CampaignTest.campaign_id == campaign.id) .all() ) if not campaign_tests: return layer test_ids = [ct.test_id for ct in campaign_tests] tests = db.query(Test).filter(Test.id.in_(test_ids)).all() test_map = {t.id: t for t in tests} technique_ids = {t.technique_id for t in tests if t.technique_id} techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all() tech_map = {t.id: t for t in techniques} # Group tests by technique, keeping the best state score tech_scores: dict = {} for ct in campaign_tests: test = test_map.get(ct.test_id) if not test: continue tech = tech_map.get(test.technique_id) if not tech: continue state_score = TEST_STATE_SCORE.get(test.state, 0) if tech.mitre_id not in tech_scores: tech_scores[tech.mitre_id] = { "technique": tech, "max_score": state_score, "tests": [], } else: tech_scores[tech.mitre_id]["max_score"] = max( tech_scores[tech.mitre_id]["max_score"], state_score, ) tech_scores[tech.mitre_id]["tests"].append(test) platform_list = _parse_csv(platforms) tactic_list = _parse_csv(tactics) for mitre_id, info in tech_scores.items(): tech = info["technique"] score = info["max_score"] if platform_list: tech_platforms = tech.platforms or [] if not any(p in tech_platforms for p in platform_list): continue if tactic_list: tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")] if not any(t in tech_tactics for t in tactic_list): continue if score < min_score: continue test_states = [t.state.value for t in info["tests"]] layer["techniques"].append({ "techniqueID": mitre_id, "tactic": _format_tactic(tech.tactic), "color": _score_to_color(score), "score": score, "comment": f"Campaign tests: {', '.join(test_states)}", "enabled": True, "metadata": [ {"name": "campaign_tests", "value": str(len(info["tests"]))}, {"name": "best_state", "value": max(test_states) if test_states else "none"}, ], }) return layer