Files
Aegis/backend/app/services/heatmap_service.py
kitos 2371318e9e
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix(heatmap): detection rules layer uses absolute rule count, not relative max
Before: score = (rules/max_rules)*50 + (evaluated/rules)*50
  -> everything red because relative to the 1 technique with most rules

After: score = min(rules/4 * 100, 100)  — absolute thresholds
  0 rules  = gray  (not covered)
  1 rule   = red   (25 — minimal)
  2 rules  = orange (50 — some)
  3 rules  = yellow (75 — good)
  4+ rules = green  (100 — well covered)

Also update HeatmapLegend labels to show actual rule counts instead of
meaningless percentage ranges.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 16:11:29 +02:00

553 lines
18 KiB
Python

"""Heatmap service — ATT&CK Navigator-compatible layer generation.
Builds multiple layer types (coverage, threat actor, detection rules,
campaign) as plain dictionaries ready for JSON serialisation.
This module is framework-agnostic: no FastAPI imports, no HTTPException,
no ``db.commit()``.
"""
from __future__ import annotations
import json
from typing import Optional
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.domain.errors import BusinessRuleViolation, EntityNotFoundError
from app.models.campaign import Campaign, CampaignTest
from app.models.detection_rule import DetectionRule
from app.models.defensive_technique import DefensiveTechniqueMapping
from app.models.enums import TechniqueStatus, TestState
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_detection_result import TestDetectionResult
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.utils import escape_like
# ── Constants ─────────────────────────────────────────────────────────
ATTACK_VERSION = "15"
NAVIGATOR_VERSION = "5.0"
LAYER_VERSION = "4.5"
DOMAIN = "enterprise-attack"
STATUS_SCORE_MAP: dict[TechniqueStatus, int] = {
TechniqueStatus.validated: 100,
TechniqueStatus.partial: 60,
TechniqueStatus.in_progress: 30,
TechniqueStatus.not_covered: 10,
TechniqueStatus.not_evaluated: 0,
TechniqueStatus.review_required: 10,
}
TEST_STATE_SCORE: dict[TestState, int] = {
TestState.validated: 100,
TestState.in_review: 70,
TestState.blue_evaluating: 50,
TestState.red_executing: 30,
TestState.draft: 10,
TestState.rejected: 5,
}
# ── Internal helpers ──────────────────────────────────────────────────
def _score_to_color(score: int) -> str:
"""Map a 0-100 score to a red-yellow-green colour hex."""
if score <= 0:
return "#d3d3d3"
if score <= 25:
return "#ff6666"
if score <= 50:
return "#ff9933"
if score <= 75:
return "#ffff66"
return "#66ff66"
def _build_layer_skeleton(
name: str,
description: str,
gradient_colors: list[str] | None = None,
) -> dict:
"""Return a base layer dict compatible with ATT&CK Navigator."""
return {
"name": name,
"versions": {
"attack": ATTACK_VERSION,
"navigator": NAVIGATOR_VERSION,
"layer": LAYER_VERSION,
},
"domain": DOMAIN,
"description": description,
"filters": {"platforms": ["windows", "linux", "macos"]},
"gradient": {
"colors": gradient_colors or ["#ff6666", "#ffff66", "#66ff66"],
"minValue": 0,
"maxValue": 100,
},
"techniques": [],
}
def _apply_filters(
query,
model,
platforms: list[str] | None = None,
tactics: list[str] | None = None,
):
"""Apply common platform and tactic filters to a technique query."""
if platforms:
platform_filters = [
model.platforms.op("@>")(json.dumps([p])) for p in platforms
]
query = query.filter(or_(*platform_filters))
if tactics:
tactic_filters = [
model.tactic.ilike(f"%{escape_like(t)}%") for t in tactics
]
query = query.filter(or_(*tactic_filters))
return query
def _format_tactic(tactic_str: str | None) -> str:
"""Normalize tactic string to ATT&CK Navigator format (kebab-case)."""
if not tactic_str:
return ""
return tactic_str.split(",")[0].strip().lower()
def _parse_csv(value: str | None) -> list[str] | None:
"""Split a comma-separated string into a trimmed list, or ``None``."""
if not value:
return None
return [v.strip() for v in value.split(",") if v.strip()]
# ── Public API ────────────────────────────────────────────────────────
def build_coverage_layer(
db: Session,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Coverage layer -- score based on ``status_global`` of each technique."""
layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated by Aegis")
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
# Bulk-fetch test counts and rule counts to avoid N+1
tech_ids = [t.id for t in techniques]
mitre_ids = [t.mitre_id for t in techniques]
test_counts = dict(
db.query(Test.technique_id, func.count(Test.id))
.filter(Test.technique_id.in_(tech_ids), Test.state == TestState.validated)
.group_by(Test.technique_id)
.all()
) if tech_ids else {}
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.mitre_technique_id.in_(mitre_ids))
.group_by(DetectionRule.mitre_technique_id)
.all()
) if mitre_ids else {}
for tech in techniques:
score = STATUS_SCORE_MAP.get(tech.status_global, 0)
if score < min_score:
continue
tc = test_counts.get(tech.id, 0)
rc = rule_counts.get(tech.mitre_id, 0)
metadata = [
{"name": "tests_count", "value": str(tc)},
{"name": "detection_rules", "value": str(rc)},
]
if tech.last_review_date:
metadata.append(
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
)
comment_parts = [
f"Status: {tech.status_global.value}",
f"{tc} tests validated",
f"{rc} detection rules",
]
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": " - ".join(comment_parts),
"enabled": True,
"metadata": metadata,
})
return layer
def build_threat_actor_layer(
db: Session,
actor_id: str,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Threat actor layer -- techniques used by an actor with coverage colour.
Raises :class:`EntityNotFoundError` if the actor does not exist.
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
raise EntityNotFoundError("ThreatActor", actor_id)
layer = _build_layer_skeleton(
f"Threat Actor: {actor.name}",
f"Techniques used by {actor.name} with coverage overlay",
gradient_colors=["#808080", "#ff6666", "#66ff66"],
)
actor_technique_ids = {
row.technique_id
for row in db.query(ThreatActorTechnique.technique_id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.all()
}
if not actor_technique_ids:
return layer
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
# Bulk-fetch metadata for actor techniques only
test_counts = dict(
db.query(Test.technique_id, func.count(Test.id))
.filter(Test.technique_id.in_(actor_technique_ids), Test.state == TestState.validated)
.group_by(Test.technique_id)
.all()
)
actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids]
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.mitre_technique_id.in_(actor_mitre_ids))
.group_by(DetectionRule.mitre_technique_id)
.all()
) if actor_mitre_ids else {}
for tech in techniques:
is_actor_technique = tech.id in actor_technique_ids
score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique else 0
if is_actor_technique and score < min_score:
continue
if is_actor_technique:
tc = test_counts.get(tech.id, 0)
rc = rule_counts.get(tech.mitre_id, 0)
metadata = [
{"name": "tests_count", "value": str(tc)},
{"name": "detection_rules", "value": str(rc)},
]
if tech.last_review_date:
metadata.append(
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
)
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"Used by {actor.name} - Coverage: {tech.status_global.value}",
"enabled": True,
"metadata": metadata,
})
else:
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": "",
"score": 0,
"comment": "",
"enabled": False,
"metadata": [],
})
return layer
def build_detection_rules_layer(
db: Session,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Detection rules layer -- score based on absolute rule count per technique.
Scoring uses fixed thresholds so the colour reflects real coverage regardless
of what other techniques have:
0 rules → gray (score 0)
1 rule → red (score 25)
2 rules → orange (score 50)
3 rules → yellow (score 75)
4+ rules → green (score 100)
"""
layer = _build_layer_skeleton(
"Detection Rules Coverage",
"Number of active detection rules per technique",
)
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.is_active == True) # noqa: E712
.group_by(DetectionRule.mitre_technique_id)
.all()
)
evaluated_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(TestDetectionResult.id))
.join(TestDetectionResult, TestDetectionResult.detection_rule_id == DetectionRule.id)
.filter(TestDetectionResult.triggered.isnot(None))
.group_by(DetectionRule.mitre_technique_id)
.all()
)
# 4 rules = full coverage (100). Each rule adds 25 points.
RULES_FOR_FULL_COVERAGE = 4
for tech in techniques:
total_rules = rule_counts.get(tech.mitre_id, 0)
evaluated_rules = evaluated_counts.get(tech.mitre_id, 0)
score = min(int((total_rules / RULES_FOR_FULL_COVERAGE) * 100), 100)
if score < min_score:
continue
rule_word = "rule" if total_rules == 1 else "rules"
eval_note = f", {evaluated_rules} evaluated" if evaluated_rules > 0 else ""
comment = f"{total_rules} active {rule_word}{eval_note}"
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": comment,
"enabled": True,
"metadata": [
{"name": "total_rules", "value": str(total_rules)},
{"name": "evaluated_rules", "value": str(evaluated_rules)},
],
})
return layer
def build_campaign_layer(
db: Session,
campaign_id: str,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Campaign layer -- techniques in a campaign, coloured by test state.
Raises :class:`EntityNotFoundError` if the campaign does not exist.
"""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
if not campaign:
raise EntityNotFoundError("Campaign", campaign_id)
layer = _build_layer_skeleton(
f"Campaign: {campaign.name}",
f"Progress of campaign '{campaign.name}'",
)
campaign_tests = (
db.query(CampaignTest)
.filter(CampaignTest.campaign_id == campaign.id)
.all()
)
if not campaign_tests:
return layer
test_ids = [ct.test_id for ct in campaign_tests]
tests = db.query(Test).filter(Test.id.in_(test_ids)).all()
test_map = {t.id: t for t in tests}
technique_ids = {t.technique_id for t in tests if t.technique_id}
techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all()
tech_map = {t.id: t for t in techniques}
# Group tests by technique, keeping the best state score
tech_scores: dict = {}
for ct in campaign_tests:
test = test_map.get(ct.test_id)
if not test:
continue
tech = tech_map.get(test.technique_id)
if not tech:
continue
state_score = TEST_STATE_SCORE.get(test.state, 0)
if tech.mitre_id not in tech_scores:
tech_scores[tech.mitre_id] = {
"technique": tech,
"max_score": state_score,
"tests": [],
}
else:
tech_scores[tech.mitre_id]["max_score"] = max(
tech_scores[tech.mitre_id]["max_score"], state_score,
)
tech_scores[tech.mitre_id]["tests"].append(test)
platform_list = _parse_csv(platforms)
tactic_list = _parse_csv(tactics)
for mitre_id, info in tech_scores.items():
tech = info["technique"]
score = info["max_score"]
if platform_list:
tech_platforms = tech.platforms or []
if not any(p in tech_platforms for p in platform_list):
continue
if tactic_list:
tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")]
if not any(t in tech_tactics for t in tactic_list):
continue
if score < min_score:
continue
test_states = [t.state.value for t in info["tests"]]
layer["techniques"].append({
"techniqueID": mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"Campaign tests: {', '.join(test_states)}",
"enabled": True,
"metadata": [
{"name": "campaign_tests", "value": str(len(info["tests"]))},
{"name": "best_state", "value": max(test_states) if test_states else "none"},
],
})
return layer
# ── Layer registry (OCP-compliant dispatch) ──────────────────────────
#
# To add a new layer type:
# 1. Write a builder function: ``def build_X_layer(db, *, platforms, tactics, min_score) -> dict``
# 2. Call ``register_layer("x", build_X_layer)`` (or ``register_layer("x", fn, requires_id=True)``)
# 3. Optionally add a convenience endpoint in the router
#
# The ``/export-navigator?layer=x`` endpoint picks up new layers automatically.
class _LayerRegistry:
"""Extensible registry that maps layer type names to builder functions."""
__slots__ = ("_simple", "_with_id")
def __init__(self) -> None:
self._simple: dict[str, object] = {}
self._with_id: dict[str, object] = {}
def register(self, name: str, builder, *, requires_id: bool = False) -> None:
target = self._with_id if requires_id else self._simple
target[name] = builder
@property
def supported_types(self) -> set[str]:
return set(self._simple) | set(self._with_id)
def build(
self,
db: Session,
layer_type: str,
*,
layer_id: str | None = None,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score)
if layer_type in self._simple:
return self._simple[layer_type](db, **kwargs)
if layer_type in self._with_id:
if not layer_id:
raise BusinessRuleViolation(
f"layer_id is required for '{layer_type}' layer"
)
return self._with_id[layer_type](db, layer_id, **kwargs)
raise BusinessRuleViolation(f"Unknown layer type: {layer_type}")
LAYER_REGISTRY = _LayerRegistry()
LAYER_REGISTRY.register("coverage", build_coverage_layer)
LAYER_REGISTRY.register("detection-rules", build_detection_rules_layer)
LAYER_REGISTRY.register("threat-actor", build_threat_actor_layer, requires_id=True)
LAYER_REGISTRY.register("campaign", build_campaign_layer, requires_id=True)
SUPPORTED_LAYER_TYPES = LAYER_REGISTRY.supported_types # snapshot of built-in types
def register_layer(name: str, builder, *, requires_id: bool = False) -> None:
"""Public API to register a new heatmap layer type at import time."""
LAYER_REGISTRY.register(name, builder, requires_id=requires_id)
def build_navigator_export(
db: Session,
layer_type: str,
*,
layer_id: str | None = None,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Build a heatmap layer dict by type name.
Raises :class:`BusinessRuleViolation` for unknown layer types or
missing ``layer_id``. Raises :class:`EntityNotFoundError` when
an entity-bound layer (threat-actor, campaign) references a
non-existent record.
"""
return LAYER_REGISTRY.build(
db, layer_type,
layer_id=layer_id, platforms=platforms, tactics=tactics, min_score=min_score,
)