refactor(heatmap): extract business logic to dedicated service
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

- Create heatmap_service.py with all layer-building logic (coverage, threat-actor, detection-rules, campaign)

- Service is framework-agnostic: no FastAPI imports, no HTTPException, no db.commit()

- Fix N+1 in coverage and threat-actor layers: bulk-fetch test_counts and rule_counts with GROUP BY

- Router reduced from 528 to 140 lines: validates request, calls service, returns response
This commit is contained in:
2026-02-18 13:14:41 +01:00
parent bfce1a8a0e
commit 6147abc87a
2 changed files with 492 additions and 425 deletions

View File

@@ -1,157 +1,23 @@
"""Heatmap endpoints — ATT&CK Navigator-compatible layer generation. """Heatmap endpoints — ATT&CK Navigator-compatible layer generation.
Provides multiple layer types (coverage, threat actor, detection rules, Thin router that delegates to :mod:`app.services.heatmap_service`.
campaign) and an export endpoint that produces a JSON file importable
by the official MITRE ATT&CK Navigator.
""" """
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import func
from sqlalchemy.orm import Session
import io import io
import json import json
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.database import get_db from app.database import get_db
from app.dependencies.auth import get_current_user from app.dependencies.auth import get_current_user
from app.models.user import User from app.models.user import User
from app.models.technique import Technique from app.services import heatmap_service
from app.models.test import Test
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.models.detection_rule import DetectionRule
from app.models.campaign import Campaign, CampaignTest
from app.models.defensive_technique import DefensiveTechniqueMapping
from app.models.enums import TechniqueStatus, TestState
router = APIRouter(prefix="/heatmap", tags=["heatmap"]) router = APIRouter(prefix="/heatmap", tags=["heatmap"])
# ── Constants ─────────────────────────────────────────────────────────
ATTACK_VERSION = "15"
NAVIGATOR_VERSION = "5.0"
LAYER_VERSION = "4.5"
DOMAIN = "enterprise-attack"
# Score mapping for technique status_global
STATUS_SCORE_MAP = {
TechniqueStatus.validated: 100,
TechniqueStatus.partial: 60,
TechniqueStatus.in_progress: 30,
TechniqueStatus.not_covered: 10,
TechniqueStatus.not_evaluated: 0,
TechniqueStatus.review_required: 10,
}
# ── Helpers ───────────────────────────────────────────────────────────
def _score_to_color(score: int) -> str:
"""Map a 0-100 score to a red → yellow → green color hex."""
if score <= 0:
return "#d3d3d3" # gray for not evaluated
if score <= 25:
return "#ff6666" # red
if score <= 50:
return "#ff9933" # orange
if score <= 75:
return "#ffff66" # yellow
return "#66ff66" # green
def _build_layer_skeleton(
name: str,
description: str,
gradient_colors: List[str] | None = None,
) -> dict:
"""Return a base layer dict compatible with ATT&CK Navigator."""
return {
"name": name,
"versions": {
"attack": ATTACK_VERSION,
"navigator": NAVIGATOR_VERSION,
"layer": LAYER_VERSION,
},
"domain": DOMAIN,
"description": description,
"filters": {"platforms": ["windows", "linux", "macos"]},
"gradient": {
"colors": gradient_colors or ["#ff6666", "#ffff66", "#66ff66"],
"minValue": 0,
"maxValue": 100,
},
"techniques": [],
}
def _apply_filters(
query,
model,
platforms: Optional[List[str]] = None,
tactics: Optional[List[str]] = None,
):
"""Apply common platform and tactic filters to a technique query."""
if platforms:
from sqlalchemy import or_, cast, String
from sqlalchemy.dialects.postgresql import JSONB
# Filter techniques that have any of the specified platforms
platform_filters = []
for platform in platforms:
platform_filters.append(
model.platforms.op("@>")(json.dumps([platform]))
)
if platform_filters:
query = query.filter(or_(*platform_filters))
if tactics:
from sqlalchemy import or_
from app.utils import escape_like
tactic_filters = []
for tactic in tactics:
tactic_filters.append(model.tactic.ilike(f"%{escape_like(tactic)}%"))
query = query.filter(or_(*tactic_filters))
return query
def _format_tactic(tactic_str: str | None) -> str:
"""Normalize tactic string to ATT&CK Navigator format (kebab-case)."""
if not tactic_str:
return ""
# Take first tactic if comma-separated
first = tactic_str.split(",")[0].strip().lower()
return first
def _get_technique_metadata(technique, db: Session) -> list:
"""Build metadata array for a technique."""
# Count validated tests
test_count = (
db.query(func.count(Test.id))
.filter(Test.technique_id == technique.id, Test.state == TestState.validated)
.scalar()
) or 0
# Count detection rules
rule_count = (
db.query(func.count(DetectionRule.id))
.filter(DetectionRule.mitre_technique_id == technique.mitre_id)
.scalar()
) or 0
metadata = [
{"name": "tests_count", "value": str(test_count)},
{"name": "detection_rules", "value": str(rule_count)},
]
if technique.last_review_date:
metadata.append(
{"name": "last_validated", "value": technique.last_review_date.strftime("%Y-%m-%d")}
)
return metadata
# ── GET /heatmap/coverage ───────────────────────────────────────────── # ── GET /heatmap/coverage ─────────────────────────────────────────────
@@ -165,43 +31,9 @@ def heatmap_coverage(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
"""Coverage layer — score based on status_global of each technique.""" """Coverage layer — score based on status_global of each technique."""
layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated by Aegis") return heatmap_service.build_coverage_layer(
db, platforms=platforms, tactics=tactics, min_score=min_score,
query = db.query(Technique) )
platform_list = [p.strip() for p in platforms.split(",")] if platforms else None
tactic_list = [t.strip() for t in tactics.split(",")] if tactics else None
query = _apply_filters(query, Technique, platform_list, tactic_list)
techniques = query.all()
for tech in techniques:
score = STATUS_SCORE_MAP.get(tech.status_global, 0)
if score < min_score:
continue
comment_parts = [f"Status: {tech.status_global.value}"]
metadata = _get_technique_metadata(tech, db)
# Enrich comment with test/rule info
tests_info = next((m for m in metadata if m["name"] == "tests_count"), None)
rules_info = next((m for m in metadata if m["name"] == "detection_rules"), None)
if tests_info:
comment_parts.append(f"{tests_info['value']} tests validated")
if rules_info:
comment_parts.append(f"{rules_info['value']} detection rules")
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": " - ".join(comment_parts),
"enabled": True,
"metadata": metadata,
})
return layer
# ── GET /heatmap/threat-actor/{actor_id} ────────────────────────────── # ── GET /heatmap/threat-actor/{actor_id} ──────────────────────────────
@@ -217,62 +49,11 @@ def heatmap_threat_actor(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
"""Threat actor layer — techniques used by an actor with coverage color.""" """Threat actor layer — techniques used by an actor with coverage color."""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first() layer = heatmap_service.build_threat_actor_layer(
if not actor: db, actor_id, platforms=platforms, tactics=tactics, min_score=min_score,
)
if layer is None:
raise HTTPException(status_code=404, detail="Threat actor not found") raise HTTPException(status_code=404, detail="Threat actor not found")
layer = _build_layer_skeleton(
f"Threat Actor: {actor.name}",
f"Techniques used by {actor.name} with coverage overlay",
gradient_colors=["#808080", "#ff6666", "#66ff66"],
)
# Get actor's technique IDs
actor_technique_rows = (
db.query(ThreatActorTechnique)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.all()
)
actor_technique_ids = {row.technique_id for row in actor_technique_rows}
if not actor_technique_ids:
return layer
query = db.query(Technique)
platform_list = [p.strip() for p in platforms.split(",")] if platforms else None
tactic_list = [t.strip() for t in tactics.split(",")] if tactics else None
query = _apply_filters(query, Technique, platform_list, tactic_list)
techniques = query.all()
for tech in techniques:
is_actor_technique = tech.id in actor_technique_ids
score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique else 0
if is_actor_technique and score < min_score:
continue
if is_actor_technique:
metadata = _get_technique_metadata(tech, db)
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"Used by {actor.name} - Coverage: {tech.status_global.value}",
"enabled": True,
"metadata": metadata,
})
else:
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": "",
"score": 0,
"comment": "",
"enabled": False,
"metadata": [],
})
return layer return layer
@@ -288,76 +69,10 @@ def heatmap_detection_rules(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
"""Detection rules layer — score based on ratio of rules available vs total.""" """Detection rules layer — score based on ratio of rules available vs total."""
layer = _build_layer_skeleton( return heatmap_service.build_detection_rules_layer(
"Detection Rules Coverage", db, platforms=platforms, tactics=tactics, min_score=min_score,
"Coverage of detection rules per technique",
) )
query = db.query(Technique)
platform_list = [p.strip() for p in platforms.split(",")] if platforms else None
tactic_list = [t.strip() for t in tactics.split(",")] if tactics else None
query = _apply_filters(query, Technique, platform_list, tactic_list)
techniques = query.all()
# Get rule counts per technique_mitre_id in one query
rule_counts = dict(
db.query(
DetectionRule.mitre_technique_id,
func.count(DetectionRule.id),
)
.filter(DetectionRule.is_active == True)
.group_by(DetectionRule.mitre_technique_id)
.all()
)
# Find the max rule count for normalization
max_rules = max(rule_counts.values()) if rule_counts else 1
from app.models.test_detection_result import TestDetectionResult
# Get evaluated rule counts per technique
evaluated_counts_raw = (
db.query(
DetectionRule.mitre_technique_id,
func.count(TestDetectionResult.id),
)
.join(TestDetectionResult, TestDetectionResult.detection_rule_id == DetectionRule.id)
.filter(TestDetectionResult.triggered.isnot(None))
.group_by(DetectionRule.mitre_technique_id)
.all()
)
evaluated_counts = dict(evaluated_counts_raw)
for tech in techniques:
total_rules = rule_counts.get(tech.mitre_id, 0)
evaluated_rules = evaluated_counts.get(tech.mitre_id, 0)
if total_rules > 0:
# Score based on rule availability (normalized) and evaluation ratio
availability_score = min((total_rules / max_rules) * 50, 50)
evaluation_score = (evaluated_rules / total_rules) * 50 if total_rules > 0 else 0
score = int(min(availability_score + evaluation_score, 100))
else:
score = 0
if score < min_score:
continue
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"{total_rules} rules available, {evaluated_rules} evaluated",
"enabled": True,
"metadata": [
{"name": "total_rules", "value": str(total_rules)},
{"name": "evaluated_rules", "value": str(evaluated_rules)},
],
})
return layer
# ── GET /heatmap/campaign/{campaign_id} ─────────────────────────────── # ── GET /heatmap/campaign/{campaign_id} ───────────────────────────────
@@ -372,107 +87,26 @@ def heatmap_campaign(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
"""Campaign layer — only techniques in the campaign, colored by test state.""" """Campaign layer — only techniques in the campaign, colored by test state."""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first() layer = heatmap_service.build_campaign_layer(
if not campaign: db, campaign_id, platforms=platforms, tactics=tactics, min_score=min_score,
)
if layer is None:
raise HTTPException(status_code=404, detail="Campaign not found") raise HTTPException(status_code=404, detail="Campaign not found")
layer = _build_layer_skeleton(
f"Campaign: {campaign.name}",
f"Progress of campaign '{campaign.name}'",
)
# Get campaign tests with their associated techniques
campaign_tests = (
db.query(CampaignTest)
.filter(CampaignTest.campaign_id == campaign.id)
.all()
)
if not campaign_tests:
return layer
# Map test_id -> test for all tests in campaign
test_ids = [ct.test_id for ct in campaign_tests]
tests = db.query(Test).filter(Test.id.in_(test_ids)).all()
test_map = {t.id: t for t in tests}
# Map technique_id -> technique
technique_ids = {t.technique_id for t in tests if t.technique_id}
techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all()
tech_map = {t.id: t for t in techniques}
# Score mapping for test states
test_state_score = {
TestState.validated: 100,
TestState.in_review: 70,
TestState.blue_evaluating: 50,
TestState.red_executing: 30,
TestState.draft: 10,
TestState.rejected: 5,
}
# Group by technique (a technique may have multiple tests in a campaign)
tech_scores: dict = {}
for ct in campaign_tests:
test = test_map.get(ct.test_id)
if not test:
continue
tech = tech_map.get(test.technique_id)
if not tech:
continue
state_score = test_state_score.get(test.state, 0)
if tech.mitre_id not in tech_scores:
tech_scores[tech.mitre_id] = {
"technique": tech,
"max_score": state_score,
"tests": [],
}
else:
tech_scores[tech.mitre_id]["max_score"] = max(
tech_scores[tech.mitre_id]["max_score"], state_score
)
tech_scores[tech.mitre_id]["tests"].append(test)
platform_list = [p.strip() for p in platforms.split(",")] if platforms else None
tactic_list = [t.strip() for t in tactics.split(",")] if tactics else None
for mitre_id, info in tech_scores.items():
tech = info["technique"]
score = info["max_score"]
# Apply filters
if platform_list:
tech_platforms = tech.platforms or []
if not any(p in tech_platforms for p in platform_list):
continue
if tactic_list:
tech_tactics = (tech.tactic or "").lower().split(",")
tech_tactics = [t.strip() for t in tech_tactics]
if not any(t in tech_tactics for t in tactic_list):
continue
if score < min_score:
continue
test_states = [t.state.value for t in info["tests"]]
layer["techniques"].append({
"techniqueID": mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"Campaign tests: {', '.join(test_states)}",
"enabled": True,
"metadata": [
{"name": "campaign_tests", "value": str(len(info["tests"]))},
{"name": "best_state", "value": max(test_states) if test_states else "none"},
],
})
return layer return layer
# ── GET /heatmap/export-navigator ───────────────────────────────────── # ── GET /heatmap/export-navigator ─────────────────────────────────────
_LAYER_BUILDERS = {
"coverage": lambda db, **kw: heatmap_service.build_coverage_layer(db, **kw),
"detection-rules": lambda db, **kw: heatmap_service.build_detection_rules_layer(db, **kw),
}
_LAYER_BUILDERS_WITH_ID = {
"threat-actor": lambda db, lid, **kw: heatmap_service.build_threat_actor_layer(db, lid, **kw),
"campaign": lambda db, lid, **kw: heatmap_service.build_campaign_layer(db, lid, **kw),
}
@router.get("/export-navigator") @router.get("/export-navigator")
def export_navigator( def export_navigator(
@@ -485,43 +119,24 @@ def export_navigator(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
"""Export a heatmap layer as a downloadable JSON file for ATT&CK Navigator.""" """Export a heatmap layer as a downloadable JSON file for ATT&CK Navigator."""
# Delegate to the appropriate layer endpoint kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score)
if layer == "coverage":
data = heatmap_coverage( if layer in _LAYER_BUILDERS:
platforms=platforms, tactics=tactics, min_score=min_score, data = _LAYER_BUILDERS[layer](db, **kwargs)
db=db, current_user=current_user, elif layer in _LAYER_BUILDERS_WITH_ID:
)
elif layer == "threat-actor":
if not layer_id: if not layer_id:
raise HTTPException(status_code=400, detail="layer_id required for threat-actor layer") raise HTTPException(status_code=400, detail=f"layer_id required for {layer} layer")
data = heatmap_threat_actor( data = _LAYER_BUILDERS_WITH_ID[layer](db, layer_id, **kwargs)
actor_id=layer_id, platforms=platforms, tactics=tactics, if data is None:
min_score=min_score, db=db, current_user=current_user, raise HTTPException(status_code=404, detail=f"{layer} not found")
)
elif layer == "detection-rules":
data = heatmap_detection_rules(
platforms=platforms, tactics=tactics, min_score=min_score,
db=db, current_user=current_user,
)
elif layer == "campaign":
if not layer_id:
raise HTTPException(status_code=400, detail="layer_id required for campaign layer")
data = heatmap_campaign(
campaign_id=layer_id, platforms=platforms, tactics=tactics,
min_score=min_score, db=db, current_user=current_user,
)
else: else:
raise HTTPException(status_code=400, detail=f"Unknown layer type: {layer}") raise HTTPException(status_code=400, detail=f"Unknown layer type: {layer}")
# Convert to JSON and return as downloadable file
json_content = json.dumps(data, indent=2, default=str) json_content = json.dumps(data, indent=2, default=str)
buffer = io.BytesIO(json_content.encode("utf-8")) buffer = io.BytesIO(json_content.encode("utf-8"))
filename = f"aegis_{layer}_layer.json"
return StreamingResponse( return StreamingResponse(
buffer, buffer,
media_type="application/json", media_type="application/json",
headers={ headers={"Content-Disposition": f"attachment; filename=aegis_{layer}_layer.json"},
"Content-Disposition": f"attachment; filename={filename}",
},
) )

View File

@@ -0,0 +1,452 @@
"""Heatmap service — ATT&CK Navigator-compatible layer generation.
Builds multiple layer types (coverage, threat actor, detection rules,
campaign) as plain dictionaries ready for JSON serialisation.
This module is framework-agnostic: no FastAPI imports, no HTTPException,
no ``db.commit()``.
"""
from __future__ import annotations
import json
from typing import Optional
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.models.campaign import Campaign, CampaignTest
from app.models.detection_rule import DetectionRule
from app.models.defensive_technique import DefensiveTechniqueMapping
from app.models.enums import TechniqueStatus, TestState
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_detection_result import TestDetectionResult
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.utils import escape_like
# ── Constants ─────────────────────────────────────────────────────────
ATTACK_VERSION = "15"
NAVIGATOR_VERSION = "5.0"
LAYER_VERSION = "4.5"
DOMAIN = "enterprise-attack"
STATUS_SCORE_MAP: dict[TechniqueStatus, int] = {
TechniqueStatus.validated: 100,
TechniqueStatus.partial: 60,
TechniqueStatus.in_progress: 30,
TechniqueStatus.not_covered: 10,
TechniqueStatus.not_evaluated: 0,
TechniqueStatus.review_required: 10,
}
TEST_STATE_SCORE: dict[TestState, int] = {
TestState.validated: 100,
TestState.in_review: 70,
TestState.blue_evaluating: 50,
TestState.red_executing: 30,
TestState.draft: 10,
TestState.rejected: 5,
}
# ── Internal helpers ──────────────────────────────────────────────────
def _score_to_color(score: int) -> str:
"""Map a 0-100 score to a red-yellow-green colour hex."""
if score <= 0:
return "#d3d3d3"
if score <= 25:
return "#ff6666"
if score <= 50:
return "#ff9933"
if score <= 75:
return "#ffff66"
return "#66ff66"
def _build_layer_skeleton(
name: str,
description: str,
gradient_colors: list[str] | None = None,
) -> dict:
"""Return a base layer dict compatible with ATT&CK Navigator."""
return {
"name": name,
"versions": {
"attack": ATTACK_VERSION,
"navigator": NAVIGATOR_VERSION,
"layer": LAYER_VERSION,
},
"domain": DOMAIN,
"description": description,
"filters": {"platforms": ["windows", "linux", "macos"]},
"gradient": {
"colors": gradient_colors or ["#ff6666", "#ffff66", "#66ff66"],
"minValue": 0,
"maxValue": 100,
},
"techniques": [],
}
def _apply_filters(
query,
model,
platforms: list[str] | None = None,
tactics: list[str] | None = None,
):
"""Apply common platform and tactic filters to a technique query."""
if platforms:
platform_filters = [
model.platforms.op("@>")(json.dumps([p])) for p in platforms
]
query = query.filter(or_(*platform_filters))
if tactics:
tactic_filters = [
model.tactic.ilike(f"%{escape_like(t)}%") for t in tactics
]
query = query.filter(or_(*tactic_filters))
return query
def _format_tactic(tactic_str: str | None) -> str:
"""Normalize tactic string to ATT&CK Navigator format (kebab-case)."""
if not tactic_str:
return ""
return tactic_str.split(",")[0].strip().lower()
def _parse_csv(value: str | None) -> list[str] | None:
"""Split a comma-separated string into a trimmed list, or ``None``."""
if not value:
return None
return [v.strip() for v in value.split(",") if v.strip()]
# ── Public API ────────────────────────────────────────────────────────
def build_coverage_layer(
db: Session,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Coverage layer -- score based on ``status_global`` of each technique."""
layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated by Aegis")
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
# Bulk-fetch test counts and rule counts to avoid N+1
tech_ids = [t.id for t in techniques]
mitre_ids = [t.mitre_id for t in techniques]
test_counts = dict(
db.query(Test.technique_id, func.count(Test.id))
.filter(Test.technique_id.in_(tech_ids), Test.state == TestState.validated)
.group_by(Test.technique_id)
.all()
) if tech_ids else {}
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.mitre_technique_id.in_(mitre_ids))
.group_by(DetectionRule.mitre_technique_id)
.all()
) if mitre_ids else {}
for tech in techniques:
score = STATUS_SCORE_MAP.get(tech.status_global, 0)
if score < min_score:
continue
tc = test_counts.get(tech.id, 0)
rc = rule_counts.get(tech.mitre_id, 0)
metadata = [
{"name": "tests_count", "value": str(tc)},
{"name": "detection_rules", "value": str(rc)},
]
if tech.last_review_date:
metadata.append(
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
)
comment_parts = [
f"Status: {tech.status_global.value}",
f"{tc} tests validated",
f"{rc} detection rules",
]
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": " - ".join(comment_parts),
"enabled": True,
"metadata": metadata,
})
return layer
def build_threat_actor_layer(
db: Session,
actor_id: str,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict | None:
"""Threat actor layer -- techniques used by an actor with coverage colour.
Returns ``None`` if the actor does not exist.
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
return None
layer = _build_layer_skeleton(
f"Threat Actor: {actor.name}",
f"Techniques used by {actor.name} with coverage overlay",
gradient_colors=["#808080", "#ff6666", "#66ff66"],
)
actor_technique_ids = {
row.technique_id
for row in db.query(ThreatActorTechnique.technique_id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.all()
}
if not actor_technique_ids:
return layer
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
# Bulk-fetch metadata for actor techniques only
test_counts = dict(
db.query(Test.technique_id, func.count(Test.id))
.filter(Test.technique_id.in_(actor_technique_ids), Test.state == TestState.validated)
.group_by(Test.technique_id)
.all()
)
actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids]
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.mitre_technique_id.in_(actor_mitre_ids))
.group_by(DetectionRule.mitre_technique_id)
.all()
) if actor_mitre_ids else {}
for tech in techniques:
is_actor_technique = tech.id in actor_technique_ids
score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique else 0
if is_actor_technique and score < min_score:
continue
if is_actor_technique:
tc = test_counts.get(tech.id, 0)
rc = rule_counts.get(tech.mitre_id, 0)
metadata = [
{"name": "tests_count", "value": str(tc)},
{"name": "detection_rules", "value": str(rc)},
]
if tech.last_review_date:
metadata.append(
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
)
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"Used by {actor.name} - Coverage: {tech.status_global.value}",
"enabled": True,
"metadata": metadata,
})
else:
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": "",
"score": 0,
"comment": "",
"enabled": False,
"metadata": [],
})
return layer
def build_detection_rules_layer(
db: Session,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict:
"""Detection rules layer -- score based on rule availability and evaluation ratio."""
layer = _build_layer_skeleton(
"Detection Rules Coverage",
"Coverage of detection rules per technique",
)
query = _apply_filters(
db.query(Technique), Technique,
_parse_csv(platforms), _parse_csv(tactics),
)
techniques = query.all()
rule_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
.filter(DetectionRule.is_active == True) # noqa: E712
.group_by(DetectionRule.mitre_technique_id)
.all()
)
max_rules = max(rule_counts.values()) if rule_counts else 1
evaluated_counts = dict(
db.query(DetectionRule.mitre_technique_id, func.count(TestDetectionResult.id))
.join(TestDetectionResult, TestDetectionResult.detection_rule_id == DetectionRule.id)
.filter(TestDetectionResult.triggered.isnot(None))
.group_by(DetectionRule.mitre_technique_id)
.all()
)
for tech in techniques:
total_rules = rule_counts.get(tech.mitre_id, 0)
evaluated_rules = evaluated_counts.get(tech.mitre_id, 0)
if total_rules > 0:
availability_score = min((total_rules / max_rules) * 50, 50)
evaluation_score = (evaluated_rules / total_rules) * 50
score = int(min(availability_score + evaluation_score, 100))
else:
score = 0
if score < min_score:
continue
layer["techniques"].append({
"techniqueID": tech.mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"{total_rules} rules available, {evaluated_rules} evaluated",
"enabled": True,
"metadata": [
{"name": "total_rules", "value": str(total_rules)},
{"name": "evaluated_rules", "value": str(evaluated_rules)},
],
})
return layer
def build_campaign_layer(
db: Session,
campaign_id: str,
*,
platforms: str | None = None,
tactics: str | None = None,
min_score: int = 0,
) -> dict | None:
"""Campaign layer -- techniques in a campaign, coloured by test state.
Returns ``None`` if the campaign does not exist.
"""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
if not campaign:
return None
layer = _build_layer_skeleton(
f"Campaign: {campaign.name}",
f"Progress of campaign '{campaign.name}'",
)
campaign_tests = (
db.query(CampaignTest)
.filter(CampaignTest.campaign_id == campaign.id)
.all()
)
if not campaign_tests:
return layer
test_ids = [ct.test_id for ct in campaign_tests]
tests = db.query(Test).filter(Test.id.in_(test_ids)).all()
test_map = {t.id: t for t in tests}
technique_ids = {t.technique_id for t in tests if t.technique_id}
techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all()
tech_map = {t.id: t for t in techniques}
# Group tests by technique, keeping the best state score
tech_scores: dict = {}
for ct in campaign_tests:
test = test_map.get(ct.test_id)
if not test:
continue
tech = tech_map.get(test.technique_id)
if not tech:
continue
state_score = TEST_STATE_SCORE.get(test.state, 0)
if tech.mitre_id not in tech_scores:
tech_scores[tech.mitre_id] = {
"technique": tech,
"max_score": state_score,
"tests": [],
}
else:
tech_scores[tech.mitre_id]["max_score"] = max(
tech_scores[tech.mitre_id]["max_score"], state_score,
)
tech_scores[tech.mitre_id]["tests"].append(test)
platform_list = _parse_csv(platforms)
tactic_list = _parse_csv(tactics)
for mitre_id, info in tech_scores.items():
tech = info["technique"]
score = info["max_score"]
if platform_list:
tech_platforms = tech.platforms or []
if not any(p in tech_platforms for p in platform_list):
continue
if tactic_list:
tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")]
if not any(t in tech_tactics for t in tactic_list):
continue
if score < min_score:
continue
test_states = [t.state.value for t in info["tests"]]
layer["techniques"].append({
"techniqueID": mitre_id,
"tactic": _format_tactic(tech.tactic),
"color": _score_to_color(score),
"score": score,
"comment": f"Campaign tests: {', '.join(test_states)}",
"enabled": True,
"metadata": [
{"name": "campaign_tests", "value": str(len(info["tests"]))},
{"name": "best_state", "value": max(test_states) if test_states else "none"},
],
})
return layer