Files
Aegis/backend/app/services/heatmap_service.py
kitos 15eda30b75
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix(heatmap): hide empty tactics in threat-actor layer
build_threat_actor_layer was adding ALL techniques to the layer —
actor techniques with their real score and non-actor techniques with
score=0/enabled=False. This caused every tactic column to appear in
the matrix even when the actor has no techniques for that tactic.

Now only actor techniques are included. The frontend already filters
visible tactics to those with data, so empty tactic columns disappear
automatically.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-04 17:23:28 +02:00

547 lines
18 KiB
Python

"""Heatmap service — ATT&CK Navigator-compatible layer generation.
Builds multiple layer types (coverage, threat actor, detection rules,
campaign) as plain dictionaries ready for JSON serialisation.
This module is framework-agnostic: no FastAPI imports, no HTTPException,
no ``db.commit()``.
"""
from __future__ import annotations
import json
from typing import Optional
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.domain.errors import BusinessRuleViolation, EntityNotFoundError
from app.models.campaign import Campaign, CampaignTest
from app.models.detection_rule import DetectionRule
from app.models.defensive_technique import DefensiveTechniqueMapping
from app.models.enums import TechniqueStatus, TestState
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_detection_result import TestDetectionResult
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.utils import escape_like
# ── Constants ─────────────────────────────────────────────────────────
ATTACK_VERSION = "15"
NAVIGATOR_VERSION = "5.0"
LAYER_VERSION = "4.5"
DOMAIN = "enterprise-attack"
STATUS_SCORE_MAP: dict[TechniqueStatus, int] = {
TechniqueStatus.validated: 100,
TechniqueStatus.partial: 60,
TechniqueStatus.in_progress: 30,
TechniqueStatus.not_covered: 10,
TechniqueStatus.not_evaluated: 0,
TechniqueStatus.review_required: 10,
}
TEST_STATE_SCORE: dict[TestState, int] = {
TestState.validated: 100,
TestState.in_review: 70,
TestState.blue_evaluating: 50,
TestState.red_executing: 30,
TestState.draft: 10,
TestState.rejected: 5,
}
# ── Internal helpers ──────────────────────────────────────────────────
def _score_to_color(score: int) -> str:
"""Map a 0-100 score to a red-yellow-green colour hex."""
if score <= 0:
return "#d3d3d3"
if score <= 25:
return "#ff6666"
if score <= 50:
return "#ff9933"
if score <= 75:
return "#ffff66"
return "#66ff66"
def _build_layer_skeleton(
name: str,
description: str,
gradient_colors: list[str] | None = None,
) -> dict:
"""Return a base layer dict compatible with ATT&CK Navigator."""
return {
"name": name,
"versions": {
"attack": ATTACK_VERSION,
"navigator": NAVIGATOR_VERSION,
"layer": LAYER_VERSION,
},
"domain": DOMAIN,
"description": description,
"filters": {"platforms": ["windows", "linux", "macos"]},
"gradient": {
"colors": gradient_colors or ["#ff6666", "#ffff66", "#66ff66"],
"minValue": 0,
"maxValue": 100,
},
"techniques": [],
}
def _apply_filters(
query,
model,
platforms: list[str] | None = None,
tactics: list[str] | None = None,
):
"""Apply common platform and tactic filters to a technique query."""
if platforms:
platform_filters = [
model.platforms.op("@>")(json.dumps([p])) for p in platforms
]
query = query.filter(or_(*platform_filters))
if tactics:
tactic_filters = [
model.tactic.ilike(f"%{escape_like(t)}%") for t in tactics
]
query = query.filter(or_(*tactic_filters))
return query
def _format_tactic(tactic_str: str | None) -> str:
"""Normalize tactic string to ATT&CK Navigator format (kebab-case)."""
if not tactic_str:
return ""
return tactic_str.split(",")[0].strip().lower()
def _parse_csv(value: str | None) -> list[str] | None:
"""Split a comma-separated string into a trimmed list, or ``None``."""
if not value:
return None
return [v.strip() for v in value.split(",") if v.strip()]
# ── Public API ────────────────────────────────────────────────────────
def build_coverage_layer(
db: Session,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Coverage layer -- score based on ``status_global`` of each technique."""
layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated by Aegis")
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
# Bulk-fetch test counts and rule counts to avoid N+1
tech_ids = [t.id for t in techniques]
mitre_ids = [t.mitre_id for t in techniques]
test_counts = dict(
db.query(Test.technique_id, func.count(Test.id))
.filter(Test.technique_id.in_(tech_ids), Test.state == TestState.validated)
.group_by(Test.technique_id)
.all()
) if tech_ids else {}
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.mitre_technique_id.in_(mitre_ids))
.group_by(DetectionRule.mitre_technique_id)
.all()
) if mitre_ids else {}
for tech in techniques:
score = STATUS_SCORE_MAP.get(tech.status_global, 0)
if score < min_score:
continue
tc = test_counts.get(tech.id, 0)
rc = rule_counts.get(tech.mitre_id, 0)
metadata = [
{"name": "tests_count", "value": str(tc)},
{"name": "detection_rules", "value": str(rc)},
]
if tech.last_review_date:
metadata.append(
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
)
comment_parts = [
f"Status: {tech.status_global.value}",
f"{tc} tests validated",
f"{rc} detection rules",
]
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": " - ".join(comment_parts),
"enabled": True,
"metadata": metadata,
})
return layer
def build_threat_actor_layer(
db: Session,
actor_id: str,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Threat actor layer -- techniques used by an actor with coverage colour.
Raises :class:`EntityNotFoundError` if the actor does not exist.
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
raise EntityNotFoundError("ThreatActor", actor_id)
layer = _build_layer_skeleton(
f"Threat Actor: {actor.name}",
f"Techniques used by {actor.name} with coverage overlay",
gradient_colors=["#808080", "#ff6666", "#66ff66"],
)
actor_technique_ids = {
row.technique_id
for row in db.query(ThreatActorTechnique.technique_id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.all()
}
if not actor_technique_ids:
return layer
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
# Bulk-fetch metadata for actor techniques only
test_counts = dict(
db.query(Test.technique_id, func.count(Test.id))
.filter(Test.technique_id.in_(actor_technique_ids), Test.state == TestState.validated)
.group_by(Test.technique_id)
.all()
)
actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids]
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.mitre_technique_id.in_(actor_mitre_ids))
.group_by(DetectionRule.mitre_technique_id)
.all()
) if actor_mitre_ids else {}
for tech in techniques:
is_actor_technique = tech.id in actor_technique_ids
score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique else 0
if is_actor_technique and score < min_score:
continue
# Only include techniques actually used by this actor — skip the rest
# so that tactics with no actor techniques are hidden in the matrix.
if not is_actor_technique:
continue
tc = test_counts.get(tech.id, 0)
rc = rule_counts.get(tech.mitre_id, 0)
metadata = [
{"name": "tests_count", "value": str(tc)},
{"name": "detection_rules", "value": str(rc)},
]
if tech.last_review_date:
metadata.append(
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
)
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"Used by {actor.name} - Coverage: {tech.status_global.value}",
"enabled": True,
"metadata": metadata,
})
return layer
def build_detection_rules_layer(
db: Session,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Detection rules layer -- score based on absolute rule count per technique.
Scoring uses fixed thresholds so the colour reflects real coverage regardless
of what other techniques have:
0 rules → gray (score 0)
1 rule → red (score 25)
2 rules → orange (score 50)
3 rules → yellow (score 75)
4+ rules → green (score 100)
"""
layer = _build_layer_skeleton(
"Detection Rules Coverage",
"Number of active detection rules per technique",
)
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.is_active == True) # noqa: E712
.group_by(DetectionRule.mitre_technique_id)
.all()
)
evaluated_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(TestDetectionResult.id))
.join(TestDetectionResult, TestDetectionResult.detection_rule_id == DetectionRule.id)
.filter(TestDetectionResult.triggered.isnot(None))
.group_by(DetectionRule.mitre_technique_id)
.all()
)
# 4 rules = full coverage (100). Each rule adds 25 points.
RULES_FOR_FULL_COVERAGE = 4
for tech in techniques:
total_rules = rule_counts.get(tech.mitre_id, 0)
evaluated_rules = evaluated_counts.get(tech.mitre_id, 0)
score = min(int((total_rules / RULES_FOR_FULL_COVERAGE) * 100), 100)
if score < min_score:
continue
rule_word = "rule" if total_rules == 1 else "rules"
eval_note = f", {evaluated_rules} evaluated" if evaluated_rules > 0 else ""
comment = f"{total_rules} active {rule_word}{eval_note}"
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": comment,
"enabled": True,
"metadata": [
{"name": "total_rules", "value": str(total_rules)},
{"name": "evaluated_rules", "value": str(evaluated_rules)},
],
})
return layer
def build_campaign_layer(
db: Session,
campaign_id: str,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Campaign layer -- techniques in a campaign, coloured by test state.
Raises :class:`EntityNotFoundError` if the campaign does not exist.
"""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
if not campaign:
raise EntityNotFoundError("Campaign", campaign_id)
layer = _build_layer_skeleton(
f"Campaign: {campaign.name}",
f"Progress of campaign '{campaign.name}'",
)
campaign_tests = (
db.query(CampaignTest)
.filter(CampaignTest.campaign_id == campaign.id)
.all()
)
if not campaign_tests:
return layer
test_ids = [ct.test_id for ct in campaign_tests]
tests = db.query(Test).filter(Test.id.in_(test_ids)).all()
test_map = {t.id: t for t in tests}
technique_ids = {t.technique_id for t in tests if t.technique_id}
techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all()
tech_map = {t.id: t for t in techniques}
# Group tests by technique, keeping the best state score
tech_scores: dict = {}
for ct in campaign_tests:
test = test_map.get(ct.test_id)
if not test:
continue
tech = tech_map.get(test.technique_id)
if not tech:
continue
state_score = TEST_STATE_SCORE.get(test.state, 0)
if tech.mitre_id not in tech_scores:
tech_scores[tech.mitre_id] = {
"technique": tech,
"max_score": state_score,
"tests": [],
}
else:
tech_scores[tech.mitre_id]["max_score"] = max(
tech_scores[tech.mitre_id]["max_score"], state_score,
)
tech_scores[tech.mitre_id]["tests"].append(test)
platform_list = _parse_csv(platforms)
tactic_list = _parse_csv(tactics)
for mitre_id, info in tech_scores.items():
tech = info["technique"]
score = info["max_score"]
if platform_list:
tech_platforms = tech.platforms or []
if not any(p in tech_platforms for p in platform_list):
continue
if tactic_list:
tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")]
if not any(t in tech_tactics for t in tactic_list):
continue
if score < min_score:
continue
test_states = [t.state.value for t in info["tests"]]
layer["techniques"].append({
"techniqueID": mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"Campaign tests: {', '.join(test_states)}",
"enabled": True,
"metadata": [
{"name": "campaign_tests", "value": str(len(info["tests"]))},
{"name": "best_state", "value": max(test_states) if test_states else "none"},
],
})
return layer
# ── Layer registry (OCP-compliant dispatch) ──────────────────────────
#
# To add a new layer type:
# 1. Write a builder function: ``def build_X_layer(db, *, platforms, tactics, min_score) -> dict``
# 2. Call ``register_layer("x", build_X_layer)`` (or ``register_layer("x", fn, requires_id=True)``)
# 3. Optionally add a convenience endpoint in the router
#
# The ``/export-navigator?layer=x`` endpoint picks up new layers automatically.
class _LayerRegistry:
"""Extensible registry that maps layer type names to builder functions."""
__slots__ = ("_simple", "_with_id")
def __init__(self) -> None:
self._simple: dict[str, object] = {}
self._with_id: dict[str, object] = {}
def register(self, name: str, builder, *, requires_id: bool = False) -> None:
target = self._with_id if requires_id else self._simple
target[name] = builder
@property
def supported_types(self) -> set[str]:
return set(self._simple) | set(self._with_id)
def build(
self,
db: Session,
layer_type: str,
*,
layer_id: str | None = None,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score)
if layer_type in self._simple:
return self._simple[layer_type](db, **kwargs)
if layer_type in self._with_id:
if not layer_id:
raise BusinessRuleViolation(
f"layer_id is required for '{layer_type}' layer"
)
return self._with_id[layer_type](db, layer_id, **kwargs)
raise BusinessRuleViolation(f"Unknown layer type: {layer_type}")
LAYER_REGISTRY = _LayerRegistry()
LAYER_REGISTRY.register("coverage", build_coverage_layer)
LAYER_REGISTRY.register("detection-rules", build_detection_rules_layer)
LAYER_REGISTRY.register("threat-actor", build_threat_actor_layer, requires_id=True)
LAYER_REGISTRY.register("campaign", build_campaign_layer, requires_id=True)
SUPPORTED_LAYER_TYPES = LAYER_REGISTRY.supported_types # snapshot of built-in types
def register_layer(name: str, builder, *, requires_id: bool = False) -> None:
"""Public API to register a new heatmap layer type at import time."""
LAYER_REGISTRY.register(name, builder, requires_id=requires_id)
def build_navigator_export(
db: Session,
layer_type: str,
*,
layer_id: str | None = None,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Build a heatmap layer dict by type name.
Raises :class:`BusinessRuleViolation` for unknown layer types or
missing ``layer_id``. Raises :class:`EntityNotFoundError` when
an entity-bound layer (threat-actor, campaign) references a
non-existent record.
"""
return LAYER_REGISTRY.build(
db, layer_type,
layer_id=layer_id, platforms=platforms, tactics=tactics, min_score=min_score,
)