Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
build_threat_actor_layer was adding ALL techniques to the layer — actor techniques with their real score and non-actor techniques with score=0/enabled=False. This caused every tactic column to appear in the matrix even when the actor has no techniques for that tactic. Now only actor techniques are included. The frontend already filters visible tactics to those with data, so empty tactic columns disappear automatically. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
547 lines
18 KiB
Python
547 lines
18 KiB
Python
"""Heatmap service — ATT&CK Navigator-compatible layer generation.
|
|
|
|
Builds multiple layer types (coverage, threat actor, detection rules,
|
|
campaign) as plain dictionaries ready for JSON serialisation.
|
|
|
|
This module is framework-agnostic: no FastAPI imports, no HTTPException,
|
|
no ``db.commit()``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import func, or_
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.domain.errors import BusinessRuleViolation, EntityNotFoundError
|
|
from app.models.campaign import Campaign, CampaignTest
|
|
from app.models.detection_rule import DetectionRule
|
|
from app.models.defensive_technique import DefensiveTechniqueMapping
|
|
from app.models.enums import TechniqueStatus, TestState
|
|
from app.models.technique import Technique
|
|
from app.models.test import Test
|
|
from app.models.test_detection_result import TestDetectionResult
|
|
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
|
|
from app.utils import escape_like
|
|
|
|
# ── Constants ─────────────────────────────────────────────────────────
|
|
|
|
ATTACK_VERSION = "15"
|
|
NAVIGATOR_VERSION = "5.0"
|
|
LAYER_VERSION = "4.5"
|
|
DOMAIN = "enterprise-attack"
|
|
|
|
STATUS_SCORE_MAP: dict[TechniqueStatus, int] = {
|
|
TechniqueStatus.validated: 100,
|
|
TechniqueStatus.partial: 60,
|
|
TechniqueStatus.in_progress: 30,
|
|
TechniqueStatus.not_covered: 10,
|
|
TechniqueStatus.not_evaluated: 0,
|
|
TechniqueStatus.review_required: 10,
|
|
}
|
|
|
|
TEST_STATE_SCORE: dict[TestState, int] = {
|
|
TestState.validated: 100,
|
|
TestState.in_review: 70,
|
|
TestState.blue_evaluating: 50,
|
|
TestState.red_executing: 30,
|
|
TestState.draft: 10,
|
|
TestState.rejected: 5,
|
|
}
|
|
|
|
|
|
# ── Internal helpers ──────────────────────────────────────────────────
|
|
|
|
|
|
def _score_to_color(score: int) -> str:
|
|
"""Map a 0-100 score to a red-yellow-green colour hex."""
|
|
if score <= 0:
|
|
return "#d3d3d3"
|
|
if score <= 25:
|
|
return "#ff6666"
|
|
if score <= 50:
|
|
return "#ff9933"
|
|
if score <= 75:
|
|
return "#ffff66"
|
|
return "#66ff66"
|
|
|
|
|
|
def _build_layer_skeleton(
|
|
name: str,
|
|
description: str,
|
|
gradient_colors: list[str] | None = None,
|
|
) -> dict:
|
|
"""Return a base layer dict compatible with ATT&CK Navigator."""
|
|
return {
|
|
"name": name,
|
|
"versions": {
|
|
"attack": ATTACK_VERSION,
|
|
"navigator": NAVIGATOR_VERSION,
|
|
"layer": LAYER_VERSION,
|
|
},
|
|
"domain": DOMAIN,
|
|
"description": description,
|
|
"filters": {"platforms": ["windows", "linux", "macos"]},
|
|
"gradient": {
|
|
"colors": gradient_colors or ["#ff6666", "#ffff66", "#66ff66"],
|
|
"minValue": 0,
|
|
"maxValue": 100,
|
|
},
|
|
"techniques": [],
|
|
}
|
|
|
|
|
|
def _apply_filters(
|
|
query,
|
|
model,
|
|
platforms: list[str] | None = None,
|
|
tactics: list[str] | None = None,
|
|
):
|
|
"""Apply common platform and tactic filters to a technique query."""
|
|
if platforms:
|
|
platform_filters = [
|
|
model.platforms.op("@>")(json.dumps([p])) for p in platforms
|
|
]
|
|
query = query.filter(or_(*platform_filters))
|
|
if tactics:
|
|
tactic_filters = [
|
|
model.tactic.ilike(f"%{escape_like(t)}%") for t in tactics
|
|
]
|
|
query = query.filter(or_(*tactic_filters))
|
|
return query
|
|
|
|
|
|
def _format_tactic(tactic_str: str | None) -> str:
|
|
"""Normalize tactic string to ATT&CK Navigator format (kebab-case)."""
|
|
if not tactic_str:
|
|
return ""
|
|
return tactic_str.split(",")[0].strip().lower()
|
|
|
|
|
|
def _parse_csv(value: str | None) -> list[str] | None:
|
|
"""Split a comma-separated string into a trimmed list, or ``None``."""
|
|
if not value:
|
|
return None
|
|
return [v.strip() for v in value.split(",") if v.strip()]
|
|
|
|
|
|
# ── Public API ────────────────────────────────────────────────────────
|
|
|
|
|
|
def build_coverage_layer(
|
|
db: Session,
|
|
*,
|
|
platforms: str | None = None,
|
|
tactics: str | None = None,
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Coverage layer -- score based on ``status_global`` of each technique."""
|
|
layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated by Aegis")
|
|
|
|
query = _apply_filters(
|
|
db.query(Technique), Technique,
|
|
_parse_csv(platforms), _parse_csv(tactics),
|
|
)
|
|
techniques = query.all()
|
|
|
|
# Bulk-fetch test counts and rule counts to avoid N+1
|
|
tech_ids = [t.id for t in techniques]
|
|
mitre_ids = [t.mitre_id for t in techniques]
|
|
|
|
test_counts = dict(
|
|
db.query(Test.technique_id, func.count(Test.id))
|
|
.filter(Test.technique_id.in_(tech_ids), Test.state == TestState.validated)
|
|
.group_by(Test.technique_id)
|
|
.all()
|
|
) if tech_ids else {}
|
|
|
|
rule_counts = dict(
|
|
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
|
|
.filter(DetectionRule.mitre_technique_id.in_(mitre_ids))
|
|
.group_by(DetectionRule.mitre_technique_id)
|
|
.all()
|
|
) if mitre_ids else {}
|
|
|
|
for tech in techniques:
|
|
score = STATUS_SCORE_MAP.get(tech.status_global, 0)
|
|
if score < min_score:
|
|
continue
|
|
|
|
tc = test_counts.get(tech.id, 0)
|
|
rc = rule_counts.get(tech.mitre_id, 0)
|
|
|
|
metadata = [
|
|
{"name": "tests_count", "value": str(tc)},
|
|
{"name": "detection_rules", "value": str(rc)},
|
|
]
|
|
if tech.last_review_date:
|
|
metadata.append(
|
|
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
|
|
)
|
|
|
|
comment_parts = [
|
|
f"Status: {tech.status_global.value}",
|
|
f"{tc} tests validated",
|
|
f"{rc} detection rules",
|
|
]
|
|
|
|
layer["techniques"].append({
|
|
"techniqueID": tech.mitre_id,
|
|
"tactic": _format_tactic(tech.tactic),
|
|
"color": _score_to_color(score),
|
|
"score": score,
|
|
"comment": " - ".join(comment_parts),
|
|
"enabled": True,
|
|
"metadata": metadata,
|
|
})
|
|
|
|
return layer
|
|
|
|
|
|
def build_threat_actor_layer(
|
|
db: Session,
|
|
actor_id: str,
|
|
*,
|
|
platforms: str | None = None,
|
|
tactics: str | None = None,
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Threat actor layer -- techniques used by an actor with coverage colour.
|
|
|
|
Raises :class:`EntityNotFoundError` if the actor does not exist.
|
|
"""
|
|
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
|
|
if not actor:
|
|
raise EntityNotFoundError("ThreatActor", actor_id)
|
|
|
|
layer = _build_layer_skeleton(
|
|
f"Threat Actor: {actor.name}",
|
|
f"Techniques used by {actor.name} with coverage overlay",
|
|
gradient_colors=["#808080", "#ff6666", "#66ff66"],
|
|
)
|
|
|
|
actor_technique_ids = {
|
|
row.technique_id
|
|
for row in db.query(ThreatActorTechnique.technique_id)
|
|
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
|
|
.all()
|
|
}
|
|
if not actor_technique_ids:
|
|
return layer
|
|
|
|
query = _apply_filters(
|
|
db.query(Technique), Technique,
|
|
_parse_csv(platforms), _parse_csv(tactics),
|
|
)
|
|
techniques = query.all()
|
|
|
|
# Bulk-fetch metadata for actor techniques only
|
|
test_counts = dict(
|
|
db.query(Test.technique_id, func.count(Test.id))
|
|
.filter(Test.technique_id.in_(actor_technique_ids), Test.state == TestState.validated)
|
|
.group_by(Test.technique_id)
|
|
.all()
|
|
)
|
|
actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids]
|
|
rule_counts = dict(
|
|
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
|
|
.filter(DetectionRule.mitre_technique_id.in_(actor_mitre_ids))
|
|
.group_by(DetectionRule.mitre_technique_id)
|
|
.all()
|
|
) if actor_mitre_ids else {}
|
|
|
|
for tech in techniques:
|
|
is_actor_technique = tech.id in actor_technique_ids
|
|
score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique else 0
|
|
|
|
if is_actor_technique and score < min_score:
|
|
continue
|
|
|
|
# Only include techniques actually used by this actor — skip the rest
|
|
# so that tactics with no actor techniques are hidden in the matrix.
|
|
if not is_actor_technique:
|
|
continue
|
|
|
|
tc = test_counts.get(tech.id, 0)
|
|
rc = rule_counts.get(tech.mitre_id, 0)
|
|
metadata = [
|
|
{"name": "tests_count", "value": str(tc)},
|
|
{"name": "detection_rules", "value": str(rc)},
|
|
]
|
|
if tech.last_review_date:
|
|
metadata.append(
|
|
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
|
|
)
|
|
layer["techniques"].append({
|
|
"techniqueID": tech.mitre_id,
|
|
"tactic": _format_tactic(tech.tactic),
|
|
"color": _score_to_color(score),
|
|
"score": score,
|
|
"comment": f"Used by {actor.name} - Coverage: {tech.status_global.value}",
|
|
"enabled": True,
|
|
"metadata": metadata,
|
|
})
|
|
|
|
return layer
|
|
|
|
|
|
def build_detection_rules_layer(
|
|
db: Session,
|
|
*,
|
|
platforms: str | None = None,
|
|
tactics: str | None = None,
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Detection rules layer -- score based on absolute rule count per technique.
|
|
|
|
Scoring uses fixed thresholds so the colour reflects real coverage regardless
|
|
of what other techniques have:
|
|
0 rules → gray (score 0)
|
|
1 rule → red (score 25)
|
|
2 rules → orange (score 50)
|
|
3 rules → yellow (score 75)
|
|
4+ rules → green (score 100)
|
|
"""
|
|
layer = _build_layer_skeleton(
|
|
"Detection Rules Coverage",
|
|
"Number of active detection rules per technique",
|
|
)
|
|
|
|
query = _apply_filters(
|
|
db.query(Technique), Technique,
|
|
_parse_csv(platforms), _parse_csv(tactics),
|
|
)
|
|
techniques = query.all()
|
|
|
|
rule_counts = dict(
|
|
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
|
|
.filter(DetectionRule.is_active == True) # noqa: E712
|
|
.group_by(DetectionRule.mitre_technique_id)
|
|
.all()
|
|
)
|
|
|
|
evaluated_counts = dict(
|
|
db.query(DetectionRule.mitre_technique_id, func.count(TestDetectionResult.id))
|
|
.join(TestDetectionResult, TestDetectionResult.detection_rule_id == DetectionRule.id)
|
|
.filter(TestDetectionResult.triggered.isnot(None))
|
|
.group_by(DetectionRule.mitre_technique_id)
|
|
.all()
|
|
)
|
|
|
|
# 4 rules = full coverage (100). Each rule adds 25 points.
|
|
RULES_FOR_FULL_COVERAGE = 4
|
|
|
|
for tech in techniques:
|
|
total_rules = rule_counts.get(tech.mitre_id, 0)
|
|
evaluated_rules = evaluated_counts.get(tech.mitre_id, 0)
|
|
|
|
score = min(int((total_rules / RULES_FOR_FULL_COVERAGE) * 100), 100)
|
|
|
|
if score < min_score:
|
|
continue
|
|
|
|
rule_word = "rule" if total_rules == 1 else "rules"
|
|
eval_note = f", {evaluated_rules} evaluated" if evaluated_rules > 0 else ""
|
|
comment = f"{total_rules} active {rule_word}{eval_note}"
|
|
|
|
layer["techniques"].append({
|
|
"techniqueID": tech.mitre_id,
|
|
"tactic": _format_tactic(tech.tactic),
|
|
"color": _score_to_color(score),
|
|
"score": score,
|
|
"comment": comment,
|
|
"enabled": True,
|
|
"metadata": [
|
|
{"name": "total_rules", "value": str(total_rules)},
|
|
{"name": "evaluated_rules", "value": str(evaluated_rules)},
|
|
],
|
|
})
|
|
|
|
return layer
|
|
|
|
|
|
def build_campaign_layer(
|
|
db: Session,
|
|
campaign_id: str,
|
|
*,
|
|
platforms: str | None = None,
|
|
tactics: str | None = None,
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Campaign layer -- techniques in a campaign, coloured by test state.
|
|
|
|
Raises :class:`EntityNotFoundError` if the campaign does not exist.
|
|
"""
|
|
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
|
|
if not campaign:
|
|
raise EntityNotFoundError("Campaign", campaign_id)
|
|
|
|
layer = _build_layer_skeleton(
|
|
f"Campaign: {campaign.name}",
|
|
f"Progress of campaign '{campaign.name}'",
|
|
)
|
|
|
|
campaign_tests = (
|
|
db.query(CampaignTest)
|
|
.filter(CampaignTest.campaign_id == campaign.id)
|
|
.all()
|
|
)
|
|
if not campaign_tests:
|
|
return layer
|
|
|
|
test_ids = [ct.test_id for ct in campaign_tests]
|
|
tests = db.query(Test).filter(Test.id.in_(test_ids)).all()
|
|
test_map = {t.id: t for t in tests}
|
|
|
|
technique_ids = {t.technique_id for t in tests if t.technique_id}
|
|
techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all()
|
|
tech_map = {t.id: t for t in techniques}
|
|
|
|
# Group tests by technique, keeping the best state score
|
|
tech_scores: dict = {}
|
|
for ct in campaign_tests:
|
|
test = test_map.get(ct.test_id)
|
|
if not test:
|
|
continue
|
|
tech = tech_map.get(test.technique_id)
|
|
if not tech:
|
|
continue
|
|
|
|
state_score = TEST_STATE_SCORE.get(test.state, 0)
|
|
if tech.mitre_id not in tech_scores:
|
|
tech_scores[tech.mitre_id] = {
|
|
"technique": tech,
|
|
"max_score": state_score,
|
|
"tests": [],
|
|
}
|
|
else:
|
|
tech_scores[tech.mitre_id]["max_score"] = max(
|
|
tech_scores[tech.mitre_id]["max_score"], state_score,
|
|
)
|
|
tech_scores[tech.mitre_id]["tests"].append(test)
|
|
|
|
platform_list = _parse_csv(platforms)
|
|
tactic_list = _parse_csv(tactics)
|
|
|
|
for mitre_id, info in tech_scores.items():
|
|
tech = info["technique"]
|
|
score = info["max_score"]
|
|
|
|
if platform_list:
|
|
tech_platforms = tech.platforms or []
|
|
if not any(p in tech_platforms for p in platform_list):
|
|
continue
|
|
if tactic_list:
|
|
tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")]
|
|
if not any(t in tech_tactics for t in tactic_list):
|
|
continue
|
|
if score < min_score:
|
|
continue
|
|
|
|
test_states = [t.state.value for t in info["tests"]]
|
|
layer["techniques"].append({
|
|
"techniqueID": mitre_id,
|
|
"tactic": _format_tactic(tech.tactic),
|
|
"color": _score_to_color(score),
|
|
"score": score,
|
|
"comment": f"Campaign tests: {', '.join(test_states)}",
|
|
"enabled": True,
|
|
"metadata": [
|
|
{"name": "campaign_tests", "value": str(len(info["tests"]))},
|
|
{"name": "best_state", "value": max(test_states) if test_states else "none"},
|
|
],
|
|
})
|
|
|
|
return layer
|
|
|
|
|
|
# ── Layer registry (OCP-compliant dispatch) ──────────────────────────
|
|
#
|
|
# To add a new layer type:
|
|
# 1. Write a builder function: ``def build_X_layer(db, *, platforms, tactics, min_score) -> dict``
|
|
# 2. Call ``register_layer("x", build_X_layer)`` (or ``register_layer("x", fn, requires_id=True)``)
|
|
# 3. Optionally add a convenience endpoint in the router
|
|
#
|
|
# The ``/export-navigator?layer=x`` endpoint picks up new layers automatically.
|
|
|
|
|
|
class _LayerRegistry:
|
|
"""Extensible registry that maps layer type names to builder functions."""
|
|
|
|
__slots__ = ("_simple", "_with_id")
|
|
|
|
def __init__(self) -> None:
|
|
self._simple: dict[str, object] = {}
|
|
self._with_id: dict[str, object] = {}
|
|
|
|
def register(self, name: str, builder, *, requires_id: bool = False) -> None:
|
|
target = self._with_id if requires_id else self._simple
|
|
target[name] = builder
|
|
|
|
@property
|
|
def supported_types(self) -> set[str]:
|
|
return set(self._simple) | set(self._with_id)
|
|
|
|
def build(
|
|
self,
|
|
db: Session,
|
|
layer_type: str,
|
|
*,
|
|
layer_id: str | None = None,
|
|
platforms: str | None = None,
|
|
tactics: str | None = None,
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score)
|
|
|
|
if layer_type in self._simple:
|
|
return self._simple[layer_type](db, **kwargs)
|
|
|
|
if layer_type in self._with_id:
|
|
if not layer_id:
|
|
raise BusinessRuleViolation(
|
|
f"layer_id is required for '{layer_type}' layer"
|
|
)
|
|
return self._with_id[layer_type](db, layer_id, **kwargs)
|
|
|
|
raise BusinessRuleViolation(f"Unknown layer type: {layer_type}")
|
|
|
|
|
|
LAYER_REGISTRY = _LayerRegistry()
|
|
|
|
LAYER_REGISTRY.register("coverage", build_coverage_layer)
|
|
LAYER_REGISTRY.register("detection-rules", build_detection_rules_layer)
|
|
LAYER_REGISTRY.register("threat-actor", build_threat_actor_layer, requires_id=True)
|
|
LAYER_REGISTRY.register("campaign", build_campaign_layer, requires_id=True)
|
|
|
|
SUPPORTED_LAYER_TYPES = LAYER_REGISTRY.supported_types # snapshot of built-in types
|
|
|
|
|
|
def register_layer(name: str, builder, *, requires_id: bool = False) -> None:
|
|
"""Public API to register a new heatmap layer type at import time."""
|
|
LAYER_REGISTRY.register(name, builder, requires_id=requires_id)
|
|
|
|
|
|
def build_navigator_export(
|
|
db: Session,
|
|
layer_type: str,
|
|
*,
|
|
layer_id: str | None = None,
|
|
platforms: str | None = None,
|
|
tactics: str | None = None,
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Build a heatmap layer dict by type name.
|
|
|
|
Raises :class:`BusinessRuleViolation` for unknown layer types or
|
|
missing ``layer_id``. Raises :class:`EntityNotFoundError` when
|
|
an entity-bound layer (threat-actor, campaign) references a
|
|
non-existent record.
|
|
"""
|
|
return LAYER_REGISTRY.build(
|
|
db, layer_type,
|
|
layer_id=layer_id, platforms=platforms, tactics=tactics, min_score=min_score,
|
|
)
|