9472fe91fa
Aegis CI / lint-and-test (push) Has been cancelled
- Remove ANN (type annotations) and D (docstrings) from ruff select; not feasible to add thousands of missing annotations/docstrings across the codebase - Add I001 and E501 to ignore: comment-interleaved import style and SQLAlchemy FK definitions naturally exceed line limits - Fix F811 duplicate import blocks in main.py, models/__init__.py, routers (campaigns, system, tests, evidence) and services (test_workflow, test_crud, campaign_service, schemas/test) - Add missing Evidence/IntelItem/Technique/Test/TestTemplate/User imports to models/__init__.py (were only in duplicate block) - Fix F821: add missing JWTError import in auth.py - Fix F401 unused imports across 15+ files (jira_service, sso_service, notification_service, playbook_service, tempo_service, models, schemas, routers: admin_config, attack_paths, executive_dashboard, knowledge, ownership, risk_intelligence, sso, api_keys, email_service) - Fix F841 unused variables: owned_technique_ids (executive_dashboard_service), severity (jira_service), priority_order (revalidation_queue_service) - Fix F541 f-strings without placeholders in system.py and attck_evaluations_service - Fix F601 duplicate dict key G0067 in threat_actor_import_service - Fix E701 multiple-statements-on-one-line in risk_intelligence_service - Fix E741 ambiguous variable name l -> lvl in risk_intelligence_service - Fix N806 uppercase vars in functions: technique.py, heatmap_service.py; add noqa for compliance_import_service.py large unused constant dicts - Fix W293 whitespace on blank lines in tests/conftest.py
956 lines
33 KiB
Python
956 lines
33 KiB
Python
"""Heatmap service — ATT&CK Navigator-compatible layer generation.
|
|
|
|
Builds multiple layer types (coverage, threat actor, detection rules,
|
|
campaign) as plain dictionaries ready for JSON serialisation.
|
|
|
|
This module is framework-agnostic: no FastAPI imports, no HTTPException,
|
|
no ``db.commit()``.
|
|
"""
|
|
|
|
# Enable future language features for compatibility
|
|
from __future__ import annotations
|
|
|
|
# Import json
|
|
import json
|
|
|
|
# Import Callable from collections.abc
|
|
from collections.abc import Callable
|
|
|
|
# Import func, or_ from sqlalchemy
|
|
from sqlalchemy import func, or_
|
|
|
|
# Import Query, Session from sqlalchemy.orm
|
|
from sqlalchemy.orm import Query, Session
|
|
|
|
# Import BusinessRuleViolation, EntityNotFoundError from app.domain.errors
|
|
from app.domain.errors import BusinessRuleViolation, EntityNotFoundError
|
|
|
|
# Import Campaign, CampaignTest from app.models.campaign
|
|
from app.models.campaign import Campaign, CampaignTest
|
|
|
|
# Import DetectionRule from app.models.detection_rule
|
|
from app.models.detection_rule import DetectionRule
|
|
|
|
# Import TechniqueStatus, TestState from app.models.enums
|
|
from app.models.enums import TechniqueStatus, TestState
|
|
|
|
# Import Technique from app.models.technique
|
|
from app.models.technique import Technique
|
|
|
|
# Import Test from app.models.test
|
|
from app.models.test import Test
|
|
|
|
# Import TestDetectionResult from app.models.test_detection_result
|
|
from app.models.test_detection_result import TestDetectionResult
|
|
|
|
# Import ThreatActor, ThreatActorTechnique from app.models.threat_actor
|
|
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
|
|
|
|
# Import escape_like from app.utils
|
|
from app.utils import escape_like
|
|
|
|
# ── Constants ─────────────────────────────────────────────────────────
|
|
|
|
ATTACK_VERSION = "15"
|
|
# Assign NAVIGATOR_VERSION = "5.0"
|
|
NAVIGATOR_VERSION = "5.0"
|
|
# Assign LAYER_VERSION = "4.5"
|
|
LAYER_VERSION = "4.5"
|
|
# Assign DOMAIN = "enterprise-attack"
|
|
DOMAIN = "enterprise-attack"
|
|
|
|
# Assign STATUS_SCORE_MAP = {
|
|
STATUS_SCORE_MAP: dict[TechniqueStatus, int] = {
|
|
TechniqueStatus.validated: 100,
|
|
TechniqueStatus.partial: 60,
|
|
TechniqueStatus.in_progress: 30,
|
|
TechniqueStatus.not_covered: 10,
|
|
TechniqueStatus.not_evaluated: 0,
|
|
TechniqueStatus.review_required: 10,
|
|
}
|
|
|
|
# Assign TEST_STATE_SCORE = {
|
|
TEST_STATE_SCORE: dict[TestState, int] = {
|
|
TestState.validated: 100,
|
|
TestState.in_review: 70,
|
|
TestState.blue_evaluating: 50,
|
|
TestState.red_executing: 30,
|
|
TestState.draft: 10,
|
|
TestState.rejected: 5,
|
|
}
|
|
|
|
|
|
# ── Internal helpers ──────────────────────────────────────────────────
|
|
|
|
|
|
def _score_to_color(score: int) -> str:
|
|
"""Map a 0-100 score to a red-yellow-green colour hex.
|
|
|
|
Args:
|
|
score (int): Coverage score between 0 and 100 inclusive.
|
|
|
|
Returns:
|
|
str: Hex colour string representing the score tier.
|
|
"""
|
|
# Check: score <= 0
|
|
if score <= 0:
|
|
# Return "#d3d3d3"
|
|
return "#d3d3d3"
|
|
# Check: score <= 25
|
|
if score <= 25:
|
|
# Return "#ff6666"
|
|
return "#ff6666"
|
|
# Check: score <= 50
|
|
if score <= 50:
|
|
# Return "#ff9933"
|
|
return "#ff9933"
|
|
# Check: score <= 75
|
|
if score <= 75:
|
|
# Return "#ffff66"
|
|
return "#ffff66"
|
|
# Return "#66ff66"
|
|
return "#66ff66"
|
|
|
|
|
|
# Define function _build_layer_skeleton
|
|
def _build_layer_skeleton(
|
|
# Entry: name
|
|
name: str,
|
|
# Entry: description
|
|
description: str,
|
|
# Entry: gradient_colors
|
|
gradient_colors: list[str] | None = None,
|
|
) -> dict:
|
|
"""Return a base layer dict compatible with ATT&CK Navigator.
|
|
|
|
Args:
|
|
name (str): Human-readable name for the layer.
|
|
description (str): Description text embedded in the layer metadata.
|
|
gradient_colors (list[str] | None): Optional list of hex colour stops
|
|
for the gradient; defaults to red-yellow-green if omitted.
|
|
|
|
Returns:
|
|
dict: Skeleton layer dictionary with versions, domain, and empty
|
|
techniques list.
|
|
"""
|
|
# Return {
|
|
return {
|
|
# Literal argument value
|
|
"name": name,
|
|
# Literal argument value
|
|
"versions": {
|
|
# Literal argument value
|
|
"attack": ATTACK_VERSION,
|
|
# Literal argument value
|
|
"navigator": NAVIGATOR_VERSION,
|
|
# Literal argument value
|
|
"layer": LAYER_VERSION,
|
|
},
|
|
# Literal argument value
|
|
"domain": DOMAIN,
|
|
# Literal argument value
|
|
"description": description,
|
|
# Literal argument value
|
|
"filters": {"platforms": ["windows", "linux", "macos"]},
|
|
# Literal argument value
|
|
"gradient": {
|
|
# Literal argument value
|
|
"colors": gradient_colors or ["#ff6666", "#ffff66", "#66ff66"],
|
|
# Literal argument value
|
|
"minValue": 0,
|
|
# Literal argument value
|
|
"maxValue": 100,
|
|
},
|
|
# Literal argument value
|
|
"techniques": [],
|
|
}
|
|
|
|
|
|
# Define function _apply_filters
|
|
def _apply_filters(
|
|
# Entry: query
|
|
query: Query, # type: ignore[type-arg]
|
|
# Entry: model
|
|
model: type,
|
|
# Entry: platforms
|
|
platforms: list[str] | None = None,
|
|
# Entry: tactics
|
|
tactics: list[str] | None = None,
|
|
) -> Query: # type: ignore[type-arg]
|
|
"""Apply common platform and tactic filters to a technique query.
|
|
|
|
Args:
|
|
query (Query): Base SQLAlchemy query targeting a technique-like model.
|
|
model (type): The SQLAlchemy model class that owns ``platforms`` and
|
|
``tactic`` columns.
|
|
platforms (list[str] | None): Optional list of platform names to
|
|
filter by (OR-joined).
|
|
tactics (list[str] | None): Optional list of tactic strings to
|
|
filter by (OR-joined, case-insensitive substring match).
|
|
|
|
Returns:
|
|
Query: The query with platform and tactic filters applied.
|
|
"""
|
|
# Check: platforms
|
|
if platforms:
|
|
# Assign platform_filters = [
|
|
platform_filters = [
|
|
model.platforms.op("@>")(json.dumps([p])) for p in platforms
|
|
]
|
|
# Assign query = query.filter(or_(*platform_filters))
|
|
query = query.filter(or_(*platform_filters))
|
|
# Check: tactics
|
|
if tactics:
|
|
# Assign tactic_filters = [
|
|
tactic_filters = [
|
|
model.tactic.ilike(f"%{escape_like(t)}%") for t in tactics
|
|
]
|
|
# Assign query = query.filter(or_(*tactic_filters))
|
|
query = query.filter(or_(*tactic_filters))
|
|
# Return query
|
|
return query
|
|
|
|
|
|
# Define function _format_tactic
|
|
def _format_tactic(tactic_str: str | None) -> str:
|
|
"""Normalize tactic string to ATT&CK Navigator format (kebab-case).
|
|
|
|
Args:
|
|
tactic_str (str | None): Raw tactic string, possibly comma-separated
|
|
or mixed-case.
|
|
|
|
Returns:
|
|
str: First tactic value lowercased and trimmed, or empty string if
|
|
the input is falsy.
|
|
"""
|
|
# Check: not tactic_str
|
|
if not tactic_str:
|
|
# Return ""
|
|
return ""
|
|
# Return tactic_str.split(",")[0].strip().lower()
|
|
return tactic_str.split(",")[0].strip().lower()
|
|
|
|
|
|
# Define function _parse_csv
|
|
def _parse_csv(value: str | None) -> list[str] | None:
|
|
"""Split a comma-separated string into a trimmed list, or ``None``.
|
|
|
|
Args:
|
|
value (str | None): Comma-separated string to split, or ``None``.
|
|
|
|
Returns:
|
|
list[str] | None: Non-empty trimmed tokens, or ``None`` if the input
|
|
is falsy or produces no tokens.
|
|
"""
|
|
# Check: not value
|
|
if not value:
|
|
# Return None
|
|
return None
|
|
# Return [v.strip() for v in value.split(",") if v.strip()]
|
|
return [v.strip() for v in value.split(",") if v.strip()]
|
|
|
|
|
|
# ── Public API ────────────────────────────────────────────────────────
|
|
|
|
|
|
def build_coverage_layer(
|
|
# Entry: db
|
|
db: Session,
|
|
*,
|
|
# Entry: platforms
|
|
platforms: str | None = None,
|
|
# Entry: tactics
|
|
tactics: str | None = None,
|
|
# Entry: min_score
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Coverage layer -- score based on ``status_global`` of each technique.
|
|
|
|
Args:
|
|
db (Session): Active SQLAlchemy database session.
|
|
platforms (str | None): Optional comma-separated platform names to
|
|
filter techniques.
|
|
tactics (str | None): Optional comma-separated tactic names to filter
|
|
techniques.
|
|
min_score (int): Minimum score threshold; techniques below this are
|
|
omitted from the layer.
|
|
|
|
Returns:
|
|
dict: ATT&CK Navigator-compatible layer dictionary.
|
|
"""
|
|
# Assign layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated b...
|
|
layer = _build_layer_skeleton("Aegis Coverage", "Coverage layer generated by Aegis")
|
|
|
|
# Assign query = _apply_filters(
|
|
query = _apply_filters(
|
|
db.query(Technique), Technique,
|
|
_parse_csv(platforms), _parse_csv(tactics),
|
|
)
|
|
# Assign techniques = query.all()
|
|
techniques = query.all()
|
|
|
|
# Bulk-fetch test counts and rule counts to avoid N+1
|
|
tech_ids = [t.id for t in techniques]
|
|
# Assign mitre_ids = [t.mitre_id for t in techniques]
|
|
mitre_ids = [t.mitre_id for t in techniques]
|
|
|
|
# Assign test_counts = dict(
|
|
test_counts = dict(
|
|
db.query(Test.technique_id, func.count(Test.id))
|
|
# Chain .filter() call
|
|
.filter(Test.technique_id.in_(tech_ids), Test.state == TestState.validated)
|
|
# Chain .group_by() call
|
|
.group_by(Test.technique_id)
|
|
# Chain .all() call
|
|
.all()
|
|
) if tech_ids else {}
|
|
|
|
# Assign rule_counts = dict(
|
|
rule_counts = dict(
|
|
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
|
|
# Chain .filter() call
|
|
.filter(DetectionRule.mitre_technique_id.in_(mitre_ids))
|
|
# Chain .group_by() call
|
|
.group_by(DetectionRule.mitre_technique_id)
|
|
# Chain .all() call
|
|
.all()
|
|
) if mitre_ids else {}
|
|
|
|
# Iterate over techniques
|
|
for tech in techniques:
|
|
# Assign score = STATUS_SCORE_MAP.get(tech.status_global, 0)
|
|
score = STATUS_SCORE_MAP.get(tech.status_global, 0)
|
|
# Check: score < min_score
|
|
if score < min_score:
|
|
# Skip to the next loop iteration
|
|
continue
|
|
|
|
# Assign tc = test_counts.get(tech.id, 0)
|
|
tc = test_counts.get(tech.id, 0)
|
|
# Assign rc = rule_counts.get(tech.mitre_id, 0)
|
|
rc = rule_counts.get(tech.mitre_id, 0)
|
|
|
|
# Assign metadata = [
|
|
metadata = [
|
|
{"name": "tests_count", "value": str(tc)},
|
|
{"name": "detection_rules", "value": str(rc)},
|
|
]
|
|
# Check: tech.last_review_date
|
|
if tech.last_review_date:
|
|
# Call metadata.append()
|
|
metadata.append(
|
|
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
|
|
)
|
|
|
|
# Assign comment_parts = [
|
|
comment_parts = [
|
|
f"Status: {tech.status_global.value}",
|
|
f"{tc} tests validated",
|
|
f"{rc} detection rules",
|
|
]
|
|
|
|
# layer["techniques"].append({
|
|
layer["techniques"].append({
|
|
# Literal argument value
|
|
"techniqueID": tech.mitre_id,
|
|
# Literal argument value
|
|
"tactic": _format_tactic(tech.tactic),
|
|
# Literal argument value
|
|
"color": _score_to_color(score),
|
|
# Literal argument value
|
|
"score": score,
|
|
# Literal argument value
|
|
"comment": " - ".join(comment_parts),
|
|
# Literal argument value
|
|
"enabled": True,
|
|
# Literal argument value
|
|
"metadata": metadata,
|
|
})
|
|
|
|
# Return layer
|
|
return layer
|
|
|
|
|
|
# Define function build_threat_actor_layer
|
|
def build_threat_actor_layer(
|
|
# Entry: db
|
|
db: Session,
|
|
# Entry: actor_id
|
|
actor_id: str,
|
|
*,
|
|
# Entry: platforms
|
|
platforms: str | None = None,
|
|
# Entry: tactics
|
|
tactics: str | None = None,
|
|
# Entry: min_score
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Threat actor layer -- techniques used by an actor with coverage colour.
|
|
|
|
Raises :class:`EntityNotFoundError` if the actor does not exist.
|
|
|
|
Args:
|
|
db (Session): Active SQLAlchemy database session.
|
|
actor_id (str): UUID string identifying the threat actor.
|
|
platforms (str | None): Optional comma-separated platform names to
|
|
filter techniques.
|
|
tactics (str | None): Optional comma-separated tactic names to filter
|
|
techniques.
|
|
min_score (int): Minimum score threshold for actor techniques.
|
|
|
|
Returns:
|
|
dict: ATT&CK Navigator-compatible layer dictionary coloured by
|
|
coverage status for the specified actor.
|
|
"""
|
|
# Assign actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
|
|
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
|
|
# Check: not actor
|
|
if not actor:
|
|
# Raise EntityNotFoundError
|
|
raise EntityNotFoundError("ThreatActor", actor_id)
|
|
|
|
# Assign layer = _build_layer_skeleton(
|
|
layer = _build_layer_skeleton(
|
|
f"Threat Actor: {actor.name}",
|
|
f"Techniques used by {actor.name} with coverage overlay",
|
|
# Keyword argument: gradient_colors
|
|
gradient_colors=["#808080", "#ff6666", "#66ff66"],
|
|
)
|
|
|
|
# Assign actor_technique_ids = {
|
|
actor_technique_ids = {
|
|
row.technique_id
|
|
for row in db.query(ThreatActorTechnique.technique_id)
|
|
# Chain .filter() call
|
|
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
|
|
# Chain .all() call
|
|
.all()
|
|
}
|
|
# Check: not actor_technique_ids
|
|
if not actor_technique_ids:
|
|
# Return layer
|
|
return layer
|
|
|
|
# Assign query = _apply_filters(
|
|
query = _apply_filters(
|
|
db.query(Technique), Technique,
|
|
_parse_csv(platforms), _parse_csv(tactics),
|
|
)
|
|
# Assign techniques = query.all()
|
|
techniques = query.all()
|
|
|
|
# Bulk-fetch metadata for actor techniques only
|
|
test_counts = dict(
|
|
db.query(Test.technique_id, func.count(Test.id))
|
|
# Chain .filter() call
|
|
.filter(Test.technique_id.in_(actor_technique_ids), Test.state == TestState.validated)
|
|
# Chain .group_by() call
|
|
.group_by(Test.technique_id)
|
|
# Chain .all() call
|
|
.all()
|
|
)
|
|
# Assign actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids]
|
|
actor_mitre_ids = [t.mitre_id for t in techniques if t.id in actor_technique_ids]
|
|
# Assign rule_counts = dict(
|
|
rule_counts = dict(
|
|
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
|
|
# Chain .filter() call
|
|
.filter(DetectionRule.mitre_technique_id.in_(actor_mitre_ids))
|
|
# Chain .group_by() call
|
|
.group_by(DetectionRule.mitre_technique_id)
|
|
# Chain .all() call
|
|
.all()
|
|
) if actor_mitre_ids else {}
|
|
|
|
# Iterate over techniques
|
|
for tech in techniques:
|
|
# Assign is_actor_technique = tech.id in actor_technique_ids
|
|
is_actor_technique = tech.id in actor_technique_ids
|
|
# Assign score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique e...
|
|
score = STATUS_SCORE_MAP.get(tech.status_global, 0) if is_actor_technique else 0
|
|
|
|
# Check: is_actor_technique and score < min_score
|
|
if is_actor_technique and score < min_score:
|
|
# Skip to the next loop iteration
|
|
continue
|
|
|
|
# Only include techniques actually used by this actor — skip the rest
|
|
# so that tactics with no actor techniques are hidden in the matrix.
|
|
if not is_actor_technique:
|
|
continue
|
|
|
|
tc = test_counts.get(tech.id, 0)
|
|
rc = rule_counts.get(tech.mitre_id, 0)
|
|
metadata = [
|
|
{"name": "tests_count", "value": str(tc)},
|
|
{"name": "detection_rules", "value": str(rc)},
|
|
]
|
|
if tech.last_review_date:
|
|
metadata.append(
|
|
{"name": "last_validated", "value": tech.last_review_date.strftime("%Y-%m-%d")}
|
|
)
|
|
layer["techniques"].append({
|
|
"techniqueID": tech.mitre_id,
|
|
"tactic": _format_tactic(tech.tactic),
|
|
"color": _score_to_color(score),
|
|
"score": score,
|
|
"comment": f"Used by {actor.name} - Coverage: {tech.status_global.value}",
|
|
"enabled": True,
|
|
"metadata": metadata,
|
|
})
|
|
|
|
# Return layer
|
|
return layer
|
|
|
|
|
|
# Define function build_detection_rules_layer
|
|
def build_detection_rules_layer(
|
|
# Entry: db
|
|
db: Session,
|
|
*,
|
|
# Entry: platforms
|
|
platforms: str | None = None,
|
|
# Entry: tactics
|
|
tactics: str | None = None,
|
|
# Entry: min_score
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Detection rules layer -- score based on absolute rule count per technique.
|
|
|
|
Scoring uses fixed thresholds so the colour reflects real coverage regardless
|
|
of what other techniques have:
|
|
0 rules → gray (score 0)
|
|
1 rule → red (score 25)
|
|
2 rules → orange (score 50)
|
|
3 rules → yellow (score 75)
|
|
4+ rules → green (score 100)
|
|
"""
|
|
layer = _build_layer_skeleton(
|
|
# Literal argument value
|
|
"Detection Rules Coverage",
|
|
"Number of active detection rules per technique",
|
|
)
|
|
|
|
# Assign query = _apply_filters(
|
|
query = _apply_filters(
|
|
db.query(Technique), Technique,
|
|
_parse_csv(platforms), _parse_csv(tactics),
|
|
)
|
|
# Assign techniques = query.all()
|
|
techniques = query.all()
|
|
|
|
# Assign rule_counts = dict(
|
|
rule_counts = dict(
|
|
db.query(DetectionRule.mitre_technique_id, func.count(DetectionRule.id))
|
|
# Chain .filter() call
|
|
.filter(DetectionRule.is_active == True) # noqa: E712
|
|
# Chain .group_by() call
|
|
.group_by(DetectionRule.mitre_technique_id)
|
|
# Chain .all() call
|
|
.all()
|
|
)
|
|
|
|
# Assign evaluated_counts = dict(
|
|
evaluated_counts = dict(
|
|
db.query(DetectionRule.mitre_technique_id, func.count(TestDetectionResult.id))
|
|
# Chain .join() call
|
|
.join(TestDetectionResult, TestDetectionResult.detection_rule_id == DetectionRule.id)
|
|
# Chain .filter() call
|
|
.filter(TestDetectionResult.triggered.isnot(None))
|
|
# Chain .group_by() call
|
|
.group_by(DetectionRule.mitre_technique_id)
|
|
# Chain .all() call
|
|
.all()
|
|
)
|
|
|
|
# 4 rules = full coverage (100). Each rule adds 25 points.
|
|
rules_for_full_coverage = 4
|
|
|
|
for tech in techniques:
|
|
# Assign total_rules = rule_counts.get(tech.mitre_id, 0)
|
|
total_rules = rule_counts.get(tech.mitre_id, 0)
|
|
# Assign evaluated_rules = evaluated_counts.get(tech.mitre_id, 0)
|
|
evaluated_rules = evaluated_counts.get(tech.mitre_id, 0)
|
|
|
|
score = min(int((total_rules / rules_for_full_coverage) * 100), 100)
|
|
|
|
# Check: score < min_score
|
|
if score < min_score:
|
|
# Skip to the next loop iteration
|
|
continue
|
|
|
|
rule_word = "rule" if total_rules == 1 else "rules"
|
|
eval_note = f", {evaluated_rules} evaluated" if evaluated_rules > 0 else ""
|
|
comment = f"{total_rules} active {rule_word}{eval_note}"
|
|
|
|
layer["techniques"].append({
|
|
# Literal argument value
|
|
"techniqueID": tech.mitre_id,
|
|
# Literal argument value
|
|
"tactic": _format_tactic(tech.tactic),
|
|
# Literal argument value
|
|
"color": _score_to_color(score),
|
|
# Literal argument value
|
|
"score": score,
|
|
"comment": comment,
|
|
"enabled": True,
|
|
# Literal argument value
|
|
"metadata": [
|
|
{"name": "total_rules", "value": str(total_rules)},
|
|
{"name": "evaluated_rules", "value": str(evaluated_rules)},
|
|
],
|
|
})
|
|
|
|
# Return layer
|
|
return layer
|
|
|
|
|
|
# Define function build_campaign_layer
|
|
def build_campaign_layer(
|
|
# Entry: db
|
|
db: Session,
|
|
# Entry: campaign_id
|
|
campaign_id: str,
|
|
*,
|
|
# Entry: platforms
|
|
platforms: str | None = None,
|
|
# Entry: tactics
|
|
tactics: str | None = None,
|
|
# Entry: min_score
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Campaign layer -- techniques in a campaign, coloured by test state.
|
|
|
|
Raises :class:`EntityNotFoundError` if the campaign does not exist.
|
|
|
|
Args:
|
|
db (Session): Active SQLAlchemy database session.
|
|
campaign_id (str): UUID string identifying the campaign.
|
|
platforms (str | None): Optional comma-separated platform names to
|
|
filter techniques.
|
|
tactics (str | None): Optional comma-separated tactic names to filter
|
|
techniques.
|
|
min_score (int): Minimum score threshold for techniques in the layer.
|
|
|
|
Returns:
|
|
dict: ATT&CK Navigator-compatible layer dictionary where each
|
|
technique colour reflects the best test state within the campaign.
|
|
"""
|
|
# Assign campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
|
|
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
|
|
# Check: not campaign
|
|
if not campaign:
|
|
# Raise EntityNotFoundError
|
|
raise EntityNotFoundError("Campaign", campaign_id)
|
|
|
|
# Assign layer = _build_layer_skeleton(
|
|
layer = _build_layer_skeleton(
|
|
f"Campaign: {campaign.name}",
|
|
f"Progress of campaign '{campaign.name}'",
|
|
)
|
|
|
|
# Assign campaign_tests = (
|
|
campaign_tests = (
|
|
db.query(CampaignTest)
|
|
# Chain .filter() call
|
|
.filter(CampaignTest.campaign_id == campaign.id)
|
|
# Chain .all() call
|
|
.all()
|
|
)
|
|
# Check: not campaign_tests
|
|
if not campaign_tests:
|
|
# Return layer
|
|
return layer
|
|
|
|
# Assign test_ids = [ct.test_id for ct in campaign_tests]
|
|
test_ids = [ct.test_id for ct in campaign_tests]
|
|
# Assign tests = db.query(Test).filter(Test.id.in_(test_ids)).all()
|
|
tests = db.query(Test).filter(Test.id.in_(test_ids)).all()
|
|
# Assign test_map = {t.id: t for t in tests}
|
|
test_map = {t.id: t for t in tests}
|
|
|
|
# Assign technique_ids = {t.technique_id for t in tests if t.technique_id}
|
|
technique_ids = {t.technique_id for t in tests if t.technique_id}
|
|
# Assign techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all()
|
|
techniques = db.query(Technique).filter(Technique.id.in_(technique_ids)).all()
|
|
# Assign tech_map = {t.id: t for t in techniques}
|
|
tech_map = {t.id: t for t in techniques}
|
|
|
|
# Group tests by technique, keeping the best state score
|
|
tech_scores: dict = {}
|
|
# Iterate over campaign_tests
|
|
for ct in campaign_tests:
|
|
# Assign test = test_map.get(ct.test_id)
|
|
test = test_map.get(ct.test_id)
|
|
# Check: not test
|
|
if not test:
|
|
# Skip to the next loop iteration
|
|
continue
|
|
# Assign tech = tech_map.get(test.technique_id)
|
|
tech = tech_map.get(test.technique_id)
|
|
# Check: not tech
|
|
if not tech:
|
|
# Skip to the next loop iteration
|
|
continue
|
|
|
|
# Assign state_score = TEST_STATE_SCORE.get(test.state, 0)
|
|
state_score = TEST_STATE_SCORE.get(test.state, 0)
|
|
# Check: tech.mitre_id not in tech_scores
|
|
if tech.mitre_id not in tech_scores:
|
|
# Assign tech_scores[tech.mitre_id] = {
|
|
tech_scores[tech.mitre_id] = {
|
|
# Literal argument value
|
|
"technique": tech,
|
|
# Literal argument value
|
|
"max_score": state_score,
|
|
# Literal argument value
|
|
"tests": [],
|
|
}
|
|
# Fallback: handle remaining cases
|
|
else:
|
|
# Assign tech_scores[tech.mitre_id]["max_score"] = max(
|
|
tech_scores[tech.mitre_id]["max_score"] = max(
|
|
tech_scores[tech.mitre_id]["max_score"], state_score,
|
|
)
|
|
# tech_scores[tech.mitre_id]["tests"].append(test)
|
|
tech_scores[tech.mitre_id]["tests"].append(test)
|
|
|
|
# Assign platform_list = _parse_csv(platforms)
|
|
platform_list = _parse_csv(platforms)
|
|
# Assign tactic_list = _parse_csv(tactics)
|
|
tactic_list = _parse_csv(tactics)
|
|
|
|
# Iterate over tech_scores.items()
|
|
for mitre_id, info in tech_scores.items():
|
|
# Assign tech = info["technique"]
|
|
tech = info["technique"]
|
|
# Assign score = info["max_score"]
|
|
score = info["max_score"]
|
|
|
|
# Check: platform_list
|
|
if platform_list:
|
|
# Assign tech_platforms = tech.platforms or []
|
|
tech_platforms = tech.platforms or []
|
|
# Check: not any(p in tech_platforms for p in platform_list)
|
|
if not any(p in tech_platforms for p in platform_list):
|
|
# Skip to the next loop iteration
|
|
continue
|
|
# Check: tactic_list
|
|
if tactic_list:
|
|
# Assign tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")]
|
|
tech_tactics = [t.strip() for t in (tech.tactic or "").lower().split(",")]
|
|
# Check: not any(t in tech_tactics for t in tactic_list)
|
|
if not any(t in tech_tactics for t in tactic_list):
|
|
# Skip to the next loop iteration
|
|
continue
|
|
# Check: score < min_score
|
|
if score < min_score:
|
|
# Skip to the next loop iteration
|
|
continue
|
|
|
|
# Assign test_states = [t.state.value for t in info["tests"]]
|
|
test_states = [t.state.value for t in info["tests"]]
|
|
# layer["techniques"].append({
|
|
layer["techniques"].append({
|
|
# Literal argument value
|
|
"techniqueID": mitre_id,
|
|
# Literal argument value
|
|
"tactic": _format_tactic(tech.tactic),
|
|
# Literal argument value
|
|
"color": _score_to_color(score),
|
|
# Literal argument value
|
|
"score": score,
|
|
# Literal argument value
|
|
"comment": f"Campaign tests: {', '.join(test_states)}",
|
|
# Literal argument value
|
|
"enabled": True,
|
|
# Literal argument value
|
|
"metadata": [
|
|
{"name": "campaign_tests", "value": str(len(info["tests"]))},
|
|
{"name": "best_state", "value": max(test_states) if test_states else "none"},
|
|
],
|
|
})
|
|
|
|
# Return layer
|
|
return layer
|
|
|
|
|
|
# ── Layer registry (OCP-compliant dispatch) ──────────────────────────
|
|
#
|
|
# To add a new layer type:
|
|
# 1. Write a builder function: ``def build_X_layer(db, *, platforms, tactics, min_score) -> dict``
|
|
# 2. Call ``register_layer("x", build_X_layer)`` (or ``register_layer("x", fn, requires_id=True)``)
|
|
# 3. Optionally add a convenience endpoint in the router
|
|
#
|
|
# The ``/export-navigator?layer=x`` endpoint picks up new layers automatically.
|
|
|
|
|
|
class _LayerRegistry:
|
|
"""Extensible registry that maps layer type names to builder functions."""
|
|
|
|
# Assign __slots__ = ("_simple", "_with_id")
|
|
__slots__ = ("_simple", "_with_id")
|
|
|
|
# Define function __init__
|
|
def __init__(self) -> None:
|
|
# Assign self._simple = {}
|
|
self._simple: dict[str, object] = {}
|
|
# Assign self._with_id = {}
|
|
self._with_id: dict[str, object] = {}
|
|
|
|
# Define function register
|
|
def register(self, name: str, builder: Callable[..., dict], *, requires_id: bool = False) -> None:
|
|
"""Register a builder function under *name*.
|
|
|
|
Args:
|
|
name (str): Unique layer type identifier.
|
|
builder (Callable[..., dict]): Layer builder function.
|
|
requires_id (bool): Whether the builder needs a positional
|
|
``layer_id`` argument.
|
|
"""
|
|
# Assign target = self._with_id if requires_id else self._simple
|
|
target = self._with_id if requires_id else self._simple
|
|
# Assign target[name] = builder
|
|
target[name] = builder
|
|
|
|
# Apply the @property decorator
|
|
@property
|
|
# Define function supported_types
|
|
def supported_types(self) -> set[str]:
|
|
"""Return the set of all registered layer type names.
|
|
|
|
Returns:
|
|
set[str]: Union of simple and entity-bound layer type names.
|
|
"""
|
|
# Return set(self._simple) | set(self._with_id)
|
|
return set(self._simple) | set(self._with_id)
|
|
|
|
# Define function build
|
|
def build(
|
|
self,
|
|
# Entry: db
|
|
db: Session,
|
|
# Entry: layer_type
|
|
layer_type: str,
|
|
*,
|
|
# Entry: layer_id
|
|
layer_id: str | None = None,
|
|
# Entry: platforms
|
|
platforms: str | None = None,
|
|
# Entry: tactics
|
|
tactics: str | None = None,
|
|
# Entry: min_score
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Dispatch to the registered builder for *layer_type*.
|
|
|
|
Args:
|
|
db (Session): Active SQLAlchemy database session.
|
|
layer_type (str): Registered layer type name.
|
|
layer_id (str | None): Entity UUID for entity-bound layer types.
|
|
platforms (str | None): Optional comma-separated platform filter.
|
|
tactics (str | None): Optional comma-separated tactic filter.
|
|
min_score (int): Minimum score threshold.
|
|
|
|
Returns:
|
|
dict: ATT&CK Navigator-compatible layer dictionary.
|
|
"""
|
|
# Assign kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score)
|
|
kwargs = dict(platforms=platforms, tactics=tactics, min_score=min_score)
|
|
|
|
# Check: layer_type in self._simple
|
|
if layer_type in self._simple:
|
|
# Return self._simple[layer_type](db, **kwargs)
|
|
return self._simple[layer_type](db, **kwargs)
|
|
|
|
# Check: layer_type in self._with_id
|
|
if layer_type in self._with_id:
|
|
# Check: not layer_id
|
|
if not layer_id:
|
|
# Raise BusinessRuleViolation
|
|
raise BusinessRuleViolation(
|
|
f"layer_id is required for '{layer_type}' layer"
|
|
)
|
|
# Return self._with_id[layer_type](db, layer_id, **kwargs)
|
|
return self._with_id[layer_type](db, layer_id, **kwargs)
|
|
|
|
# Raise BusinessRuleViolation
|
|
raise BusinessRuleViolation(f"Unknown layer type: {layer_type}")
|
|
|
|
|
|
# Assign LAYER_REGISTRY = _LayerRegistry()
|
|
LAYER_REGISTRY = _LayerRegistry()
|
|
|
|
# Call LAYER_REGISTRY.register()
|
|
LAYER_REGISTRY.register("coverage", build_coverage_layer)
|
|
# Call LAYER_REGISTRY.register()
|
|
LAYER_REGISTRY.register("detection-rules", build_detection_rules_layer)
|
|
# Call LAYER_REGISTRY.register()
|
|
LAYER_REGISTRY.register("threat-actor", build_threat_actor_layer, requires_id=True)
|
|
# Call LAYER_REGISTRY.register()
|
|
LAYER_REGISTRY.register("campaign", build_campaign_layer, requires_id=True)
|
|
|
|
# Assign SUPPORTED_LAYER_TYPES = LAYER_REGISTRY.supported_types # snapshot of built-in types
|
|
SUPPORTED_LAYER_TYPES = LAYER_REGISTRY.supported_types # snapshot of built-in types
|
|
|
|
|
|
# Define function register_layer
|
|
def register_layer(name: str, builder: Callable[..., dict], *, requires_id: bool = False) -> None:
|
|
"""Register a new heatmap layer type at import time.
|
|
|
|
Args:
|
|
name (str): Unique identifier for the layer type used in API requests.
|
|
builder (Callable[..., dict]): Function that builds the layer dict;
|
|
must accept ``(db, *, platforms, tactics, min_score)`` and
|
|
optionally a positional ``layer_id`` when ``requires_id`` is
|
|
``True``.
|
|
requires_id (bool): Set to ``True`` when the builder needs a
|
|
``layer_id`` argument (e.g. threat-actor, campaign layers).
|
|
"""
|
|
# Call LAYER_REGISTRY.register()
|
|
LAYER_REGISTRY.register(name, builder, requires_id=requires_id)
|
|
|
|
|
|
# Define function build_navigator_export
|
|
def build_navigator_export(
|
|
# Entry: db
|
|
db: Session,
|
|
# Entry: layer_type
|
|
layer_type: str,
|
|
*,
|
|
# Entry: layer_id
|
|
layer_id: str | None = None,
|
|
# Entry: platforms
|
|
platforms: str | None = None,
|
|
# Entry: tactics
|
|
tactics: str | None = None,
|
|
# Entry: min_score
|
|
min_score: int = 0,
|
|
) -> dict:
|
|
"""Build a heatmap layer dict by type name.
|
|
|
|
Raises :class:`BusinessRuleViolation` for unknown layer types or
|
|
missing ``layer_id``. Raises :class:`EntityNotFoundError` when
|
|
an entity-bound layer (threat-actor, campaign) references a
|
|
non-existent record.
|
|
|
|
Args:
|
|
db (Session): Active SQLAlchemy database session.
|
|
layer_type (str): Registered layer type name (e.g. ``"coverage"``,
|
|
``"threat-actor"``).
|
|
layer_id (str | None): Entity UUID required for entity-bound layer
|
|
types such as ``"threat-actor"`` and ``"campaign"``.
|
|
platforms (str | None): Optional comma-separated platform filter.
|
|
tactics (str | None): Optional comma-separated tactic filter.
|
|
min_score (int): Minimum score; techniques below this are excluded.
|
|
|
|
Returns:
|
|
dict: ATT&CK Navigator-compatible layer dictionary.
|
|
"""
|
|
# Return LAYER_REGISTRY.build(
|
|
return LAYER_REGISTRY.build(
|
|
db, layer_type,
|
|
# Keyword argument: layer_id
|
|
layer_id=layer_id, platforms=platforms, tactics=tactics, min_score=min_score,
|
|
)
|