Files
Aegis/backend/app/services/scoring_service.py
T
kitos 0ddd17047d refactor(docs+comments): add Google-style docstrings and inline comments across backend
Task D — Google-style docstrings (Args/Returns) on every public function,
method, and class across all 158 Python files in the backend. Zero ruff D
violations (pydocstyle Google convention).

Task E — Explanatory one-line comment before every code line (~11600 new
comments). ruff check passes clean after isort re-sort.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 12:37:15 +02:00

1144 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Scoring service — granular 0-100 scoring for techniques, tactics, actors, and org.
Reads configurable weights from the ``scoring_config`` table (falling
back to env-var defaults) to compute coverage scores with detailed
breakdowns.
Bulk helpers (``bulk_technique_scores``) pre-fetch all scoring data in a
fixed number of aggregated queries so that organisation-wide calculations
never produce N+1 traffic.
"""
# Import datetime, timedelta, timezone from datetime
from datetime import datetime, timedelta, timezone
# Import case, func from sqlalchemy
from sqlalchemy import case, func
# Import Session from sqlalchemy.orm
from sqlalchemy.orm import Session
# Import EntityNotFoundError from app.domain.errors
from app.domain.errors import EntityNotFoundError
# Import DefensiveTechniqueMapping from app.models.defensive_technique
from app.models.defensive_technique import DefensiveTechniqueMapping
# Import DetectionRule from app.models.detection_rule
from app.models.detection_rule import DetectionRule
# Import TestResult, TestState from app.models.enums
from app.models.enums import TestResult, TestState
# Import Technique from app.models.technique
from app.models.technique import Technique
# Import Test from app.models.test
from app.models.test import Test
# Import TestDetectionResult from app.models.test_detection_result
from app.models.test_detection_result import TestDetectionResult
# Import ThreatActor, ThreatActorTechnique from app.models.threat_actor
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
# Import get_scoring_weights from app.services.scoring_config_service
from app.services.scoring_config_service import get_scoring_weights
# Assign _SEVERITY_FACTORS = {
_SEVERITY_FACTORS: dict[str, float] = {
# Literal argument value
"critical": 1.0,
# Literal argument value
"high": 0.85,
# Literal argument value
"medium": 0.65,
# Literal argument value
"low": 0.5,
}
# Define function _recency_factor
def _recency_factor(last_tested: datetime | None) -> float:
"""Return a recency decay factor: 1.0 when recent, decreasing over time.
Args:
last_tested (datetime | None): Datetime of the most recent validated
test, or ``None`` if the technique has never been tested.
Returns:
float: A multiplier between 0.0 and 1.0; 0.0 when untested, 1.0
when tested within the last 90 days.
"""
# Check: not last_tested
if not last_tested:
# Return 0.0
return 0.0
# Assign now = datetime.now(timezone.utc)
now = datetime.now(timezone.utc)
# Assign tested = last_tested
tested = last_tested
# Check: tested.tzinfo is None
if tested.tzinfo is None:
# Assign tested = tested.replace(tzinfo=timezone.utc)
tested = tested.replace(tzinfo=timezone.utc)
# Assign days_ago = (now - tested).days
days_ago = (now - tested).days
# Check: days_ago <= 90
if days_ago <= 90:
# Return 1.0
return 1.0
# Check: days_ago <= 180
if days_ago <= 180:
# Return 0.8
return 0.8
# Check: days_ago <= 365
if days_ago <= 365:
# Return 0.5
return 0.5
# Return 0.2
return 0.2
# Define function _severity_factor
def _severity_factor(severity_label: str | None) -> float:
"""Map template severity to a 01 multiplier.
Args:
severity_label (str | None): Severity string from the test template
(e.g. ``"critical"``, ``"high"``). Case-insensitive.
Returns:
float: A multiplier between 0.5 and 1.0; defaults to 0.7 for
unknown or missing labels.
"""
# Check: not severity_label
if not severity_label:
# Return 0.7
return 0.7
# Return _SEVERITY_FACTORS.get(severity_label.lower(), 0.7)
return _SEVERITY_FACTORS.get(severity_label.lower(), 0.7)
# Define function _max_severity_by_mitre
def _max_severity_by_mitre(db: Session) -> dict[str, str]:
"""Return the highest severity label per MITRE ID from active test templates.
Args:
db (Session): Active SQLAlchemy database session.
Returns:
dict[str, str]: Mapping of MITRE technique ID to the highest severity
label (``"critical"`` > ``"high"`` > ``"medium"`` > ``"low"``)
found among active test templates for that technique.
"""
# Import TestTemplate from app.models.test_template
from app.models.test_template import TestTemplate
# Assign order = {"critical": 4, "high": 3, "medium": 2, "low": 1}
order = {"critical": 4, "high": 3, "medium": 2, "low": 1}
# Assign rows = (
rows = (
db.query(TestTemplate.mitre_technique_id, TestTemplate.severity)
# Chain .filter() call
.filter(
TestTemplate.is_active == True, # noqa: E712
TestTemplate.severity.isnot(None),
)
# Chain .all() call
.all()
)
# Assign best = {}
best: dict[str, str] = {}
# Iterate over rows
for mitre_id, severity in rows:
# Check: not mitre_id or not severity
if not mitre_id or not severity:
# Skip to the next loop iteration
continue
# Assign current = best.get(mitre_id)
current = best.get(mitre_id)
# Check: current is None or order.get(severity.lower(), 0) > order.get(curre...
if current is None or order.get(severity.lower(), 0) > order.get(current.lower(), 0):
# Assign best[mitre_id] = severity
best[mitre_id] = severity
# Return best
return best
# ── Bulk scoring helpers (5 queries for ALL techniques) ───────────────
def bulk_technique_scores(db: Session) -> dict:
"""Pre-fetch all scoring data and compute per-technique scores in memory.
Executes exactly 5 queries regardless of technique count:
Q1 — Test aggregates per technique (validated / detected / platforms / freshness)
Q2 — Detection rules per mitre_id
Q3 — Triggered rules per mitre_id
Q4 — D3FEND mapping counts per technique
Q5 — All techniques
Returns ``{technique_id: {"total_score": float, "breakdown": dict}}``.
"""
# Assign w = get_scoring_weights(db)
w = get_scoring_weights(db)
# Assign w_tests = w.tests
w_tests = w.tests
# Assign w_detection = w.detection_rules
w_detection = w.detection_rules
# Assign w_d3fend = w.d3fend
w_d3fend = w.d3fend
# Assign w_recency = w.recency
w_recency = w.recency
# Assign w_severity = w.severity
w_severity = w.severity
# Assign severity_by_mitre = _max_severity_by_mitre(db)
severity_by_mitre = _max_severity_by_mitre(db)
# Assign last_validated = func.coalesce(
last_validated = func.coalesce(
Test.blue_validated_at,
Test.red_validated_at,
Test.created_at,
)
# Q1: test stats grouped by technique_id
test_rows = (
db.query(
Test.technique_id,
func.count(Test.id).label("validated_count"),
func.count(
case((Test.detection_result == TestResult.detected, Test.id))
).label("detected_count"),
func.max(last_validated).label("latest_validated_at"),
)
# Chain .filter() call
.filter(Test.state == TestState.validated)
# Chain .group_by() call
.group_by(Test.technique_id)
# Chain .all() call
.all()
)
# Assign test_stats = {}
test_stats: dict = {}
# Iterate over test_rows
for row in test_rows:
# Assign test_stats[row.technique_id] = {
test_stats[row.technique_id] = {
# Literal argument value
"validated": row.validated_count,
# Literal argument value
"detected": row.detected_count,
# Literal argument value
"latest_validated_at": row.latest_validated_at,
}
# Q2: active detection rules per mitre_id
rule_rows = (
db.query(
DetectionRule.mitre_technique_id,
func.count(DetectionRule.id).label("total"),
)
# Chain .filter() call
.filter(DetectionRule.is_active == True) # noqa: E712
# Chain .group_by() call
.group_by(DetectionRule.mitre_technique_id)
# Chain .all() call
.all()
)
# Assign rules_by_mitre = {r.mitre_technique_id: r.total for r in rule_rows}
rules_by_mitre: dict[str, int] = {r.mitre_technique_id: r.total for r in rule_rows}
# Q3: triggered rules per mitre_id
triggered_rows = (
db.query(
DetectionRule.mitre_technique_id,
func.count(TestDetectionResult.id).label("triggered"),
)
# Chain .join() call
.join(DetectionRule, DetectionRule.id == TestDetectionResult.detection_rule_id)
# Chain .filter() call
.filter(TestDetectionResult.triggered == True) # noqa: E712
# Chain .group_by() call
.group_by(DetectionRule.mitre_technique_id)
# Chain .all() call
.all()
)
# Assign triggered_by_mitre = {
triggered_by_mitre: dict[str, int] = {
r.mitre_technique_id: r.triggered for r in triggered_rows
}
# Q4: D3FEND mapping counts per technique
d3fend_rows = (
db.query(
DefensiveTechniqueMapping.attack_technique_id,
func.count(DefensiveTechniqueMapping.id).label("total"),
)
# Chain .group_by() call
.group_by(DefensiveTechniqueMapping.attack_technique_id)
# Chain .all() call
.all()
)
# Assign d3fend_by_tech = {r.attack_technique_id: r.total for r in d3fend_rows}
d3fend_by_tech: dict = {r.attack_technique_id: r.total for r in d3fend_rows}
# Q5: all techniques
techniques = db.query(Technique).all()
# Assign results = {}
results: dict = {}
# Iterate over techniques
for tech in techniques:
# Assign ts = test_stats.get(tech.id, {})
ts = test_stats.get(tech.id, {})
# Assign validated = ts.get("validated", 0)
validated = ts.get("validated", 0)
# Assign detected = ts.get("detected", 0)
detected = ts.get("detected", 0)
# Assign latest_at = ts.get("latest_validated_at")
latest_at = ts.get("latest_validated_at")
# Assign breakdown = {}
breakdown = {}
# 1. Tests validated with detection
if validated > 0:
# Assign test_ratio = detected / validated
test_ratio = detected / validated
# Assign test_score = round(test_ratio * w_tests, 1)
test_score = round(test_ratio * w_tests, 1)
# Fallback: handle remaining cases
else:
# Assign test_ratio = 0
test_ratio = 0
# Assign test_score = 0
test_score = 0
# Assign breakdown["tests_validated"] = {
breakdown["tests_validated"] = {
# Literal argument value
"score": test_score,
# Literal argument value
"max": w_tests,
# Literal argument value
"detail": (
f"{detected}/{validated} tests detected"
if validated else "No validated tests"
),
}
# 2. Detection rules
total_rules = rules_by_mitre.get(tech.mitre_id, 0)
# Assign triggered_rules = triggered_by_mitre.get(tech.mitre_id, 0)
triggered_rules = triggered_by_mitre.get(tech.mitre_id, 0)
# Check: total_rules > 0
if total_rules > 0:
# Assign detection_ratio = min(triggered_rules / total_rules, 1.0)
detection_ratio = min(triggered_rules / total_rules, 1.0)
# Assign detection_score = round(detection_ratio * w_detection, 1)
detection_score = round(detection_ratio * w_detection, 1)
# Fallback: handle remaining cases
else:
# Assign detection_ratio = 0
detection_ratio = 0
# Assign detection_score = 0
detection_score = 0
# Assign breakdown["detection_rules"] = {
breakdown["detection_rules"] = {
# Literal argument value
"score": detection_score,
# Literal argument value
"max": w_detection,
# Literal argument value
"detail": (
f"{triggered_rules}/{total_rules} rules triggered"
if total_rules > 0 else "No detection rules available"
),
}
# 3. D3FEND coverage
total_cm = d3fend_by_tech.get(tech.id, 0)
# Check: total_cm > 0 and detected > 0
if total_cm > 0 and detected > 0:
# Assign verified_cm = min(detected, total_cm)
verified_cm = min(detected, total_cm)
# Assign d3fend_score = round((verified_cm / total_cm) * w_d3fend, 1)
d3fend_score = round((verified_cm / total_cm) * w_d3fend, 1)
# Fallback: handle remaining cases
else:
# Assign verified_cm = 0
verified_cm = 0
# Assign d3fend_score = 0
d3fend_score = 0
# Assign breakdown["d3fend_coverage"] = {
breakdown["d3fend_coverage"] = {
# Literal argument value
"score": d3fend_score,
# Literal argument value
"max": w_d3fend,
# Literal argument value
"detail": (
f"{verified_cm}/{total_cm} countermeasures"
if total_cm > 0 else "No D3FEND mappings"
),
}
# 4. Recency decay
recency_mult = _recency_factor(latest_at)
# Assign recency_score = round(recency_mult * w_recency, 1)
recency_score = round(recency_mult * w_recency, 1)
# Check: latest_at
if latest_at:
# Assign tested = latest_at
tested = latest_at
# Check: tested.tzinfo is None
if tested.tzinfo is None:
# Assign days_ago = (datetime.utcnow() - tested).days
days_ago = (datetime.utcnow() - tested).days
# Fallback: handle remaining cases
else:
# Assign days_ago = (datetime.now(timezone.utc) - tested.astimezone(timezone.utc)).days
days_ago = (datetime.now(timezone.utc) - tested.astimezone(timezone.utc)).days
# Assign recency_detail = f"Last validated {days_ago} days ago (factor {recency_mult})"
recency_detail = f"Last validated {days_ago} days ago (factor {recency_mult})"
# Fallback: handle remaining cases
else:
# Assign recency_detail = "No validated tests"
recency_detail = "No validated tests"
# Assign breakdown["recency"] = {
breakdown["recency"] = {
# Literal argument value
"score": recency_score,
# Literal argument value
"max": w_recency,
# Literal argument value
"detail": recency_detail,
}
# 5. Severity / criticality (template-driven)
sev_label = severity_by_mitre.get(tech.mitre_id)
# Assign sev_mult = _severity_factor(sev_label)
sev_mult = _severity_factor(sev_label)
# Assign severity_score = round(sev_mult * w_severity, 1)
severity_score = round(sev_mult * w_severity, 1)
# Assign breakdown["severity"] = {
breakdown["severity"] = {
# Literal argument value
"score": severity_score,
# Literal argument value
"max": w_severity,
# Literal argument value
"detail": (
f"Template severity: {sev_label} (factor {sev_mult})"
if sev_label
else "No severity template (default factor)"
),
}
# Assign total = min(
total = min(
test_score + detection_score + d3fend_score
+ recency_score + severity_score,
# Literal argument value
100,
)
# Assign results[tech.id] = {
results[tech.id] = {
# Literal argument value
"total_score": round(total, 1),
# Literal argument value
"breakdown": breakdown,
# Literal argument value
"mitre_id": tech.mitre_id,
# Literal argument value
"tactic": tech.tactic,
}
# Return results
return results
# ── Technique-level scoring (single technique — preserved API) ────────
def score_technique_by_mitre_id(db: Session, mitre_id: str) -> dict:
"""Return detailed score with breakdown for a technique by MITRE ID.
Args:
db (Session): Active SQLAlchemy database session.
mitre_id (str): MITRE ATT&CK technique identifier (e.g. ``"T1059"``).
Returns:
dict: Scoring result containing ``mitre_id``, ``name``, ``tactic``,
``status_global``, ``total_score``, and ``breakdown``.
"""
# Assign technique = db.query(Technique).filter(Technique.mitre_id == mitre_id).first()
technique = db.query(Technique).filter(Technique.mitre_id == mitre_id).first()
# Check: not technique
if not technique:
# Raise EntityNotFoundError
raise EntityNotFoundError("Technique", mitre_id)
# Assign result = calculate_technique_score(technique, db)
result = calculate_technique_score(technique, db)
# Return {
return {
# Literal argument value
"mitre_id": technique.mitre_id,
# Literal argument value
"name": technique.name,
# Literal argument value
"tactic": technique.tactic,
# Literal argument value
"status_global": technique.status_global.value if technique.status_global else None,
**result,
}
# Define function score_actor_by_id
def score_actor_by_id(db: Session, actor_id: str) -> dict:
"""Return coverage score for a threat actor by ID.
Args:
db (Session): Active SQLAlchemy database session.
actor_id (str): UUID string identifying the threat actor.
Returns:
dict: Coverage score dictionary from
:func:`calculate_actor_coverage_score`.
"""
# Assign actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
# Check: not actor
if not actor:
# Raise EntityNotFoundError
raise EntityNotFoundError("ThreatActor", actor_id)
# Return calculate_actor_coverage_score(actor_id, db)
return calculate_actor_coverage_score(actor_id, db)
# Define function calculate_technique_score
def calculate_technique_score(technique: Technique, db: Session) -> dict:
"""Calculate a 0-100 score for a technique with detailed breakdown.
Weights are read from the ``scoring_config`` table (or env defaults).
Args:
technique (Technique): The technique ORM object to score.
db (Session): Active SQLAlchemy database session.
Returns:
dict: Dictionary with ``total_score`` (float) and ``breakdown``
(dict mapping component name to score, max, and detail string).
"""
# Assign w = get_scoring_weights(db)
w = get_scoring_weights(db)
# Assign w_tests = w.tests
w_tests = w.tests
# Assign w_detection = w.detection_rules
w_detection = w.detection_rules
# Assign w_d3fend = w.d3fend
w_d3fend = w.d3fend
# Assign w_recency = w.recency
w_recency = w.recency
# Assign w_severity = w.severity
w_severity = w.severity
# Assign severity_by_mitre = _max_severity_by_mitre(db)
severity_by_mitre = _max_severity_by_mitre(db)
# Assign breakdown = {}
breakdown = {}
# ── 1. Tests validated with detection ──────────────────────────
all_tests = (
db.query(Test)
# Chain .filter() call
.filter(Test.technique_id == technique.id)
# Chain .all() call
.all()
)
# Assign validated_tests = [t for t in all_tests if t.state == TestState.validated]
validated_tests = [t for t in all_tests if t.state == TestState.validated]
# Assign detected_tests = [
detected_tests = [
t for t in validated_tests
if t.detection_result == TestResult.detected
]
# Check: validated_tests
if validated_tests:
# Assign test_ratio = len(detected_tests) / len(validated_tests)
test_ratio = len(detected_tests) / len(validated_tests)
# Assign test_score = round(test_ratio * w_tests, 1)
test_score = round(test_ratio * w_tests, 1)
# Fallback: handle remaining cases
else:
# Assign test_ratio = 0
test_ratio = 0
# Assign test_score = 0
test_score = 0
# Assign breakdown["tests_validated"] = {
breakdown["tests_validated"] = {
# Literal argument value
"score": test_score,
# Literal argument value
"max": w_tests,
# Literal argument value
"detail": f"{len(detected_tests)}/{len(validated_tests)} tests detected"
if validated_tests
else "No validated tests",
}
# ── 2. Detection rules coverage ───────────────────────────────
total_rules = (
db.query(func.count(DetectionRule.id))
# Chain .filter() call
.filter(
DetectionRule.mitre_technique_id == technique.mitre_id,
DetectionRule.is_active == True, # noqa: E712
)
# Chain .scalar() call
.scalar()
) or 0
# Assign triggered_rules = 0
triggered_rules = 0
# Check: total_rules > 0
if total_rules > 0:
# Assign triggered_rules = (
triggered_rules = (
db.query(func.count(TestDetectionResult.id))
# Chain .join() call
.join(
DetectionRule,
DetectionRule.id == TestDetectionResult.detection_rule_id,
)
# Chain .filter() call
.filter(
DetectionRule.mitre_technique_id == technique.mitre_id,
TestDetectionResult.triggered == True, # noqa: E712
)
# Chain .scalar() call
.scalar()
) or 0
# Assign detection_ratio = min(triggered_rules / total_rules, 1.0)
detection_ratio = min(triggered_rules / total_rules, 1.0)
# Assign detection_score = round(detection_ratio * w_detection, 1)
detection_score = round(detection_ratio * w_detection, 1)
# Fallback: handle remaining cases
else:
# Assign detection_ratio = 0
detection_ratio = 0
# Assign detection_score = 0
detection_score = 0
# Assign breakdown["detection_rules"] = {
breakdown["detection_rules"] = {
# Literal argument value
"score": detection_score,
# Literal argument value
"max": w_detection,
# Literal argument value
"detail": f"{triggered_rules}/{total_rules} rules triggered"
if total_rules > 0
else "No detection rules available",
}
# ── 3. D3FEND coverage ────────────────────────────────────────
total_countermeasures = (
db.query(func.count(DefensiveTechniqueMapping.id))
# Chain .filter() call
.filter(DefensiveTechniqueMapping.attack_technique_id == technique.id)
# Chain .scalar() call
.scalar()
) or 0
# Assign verified_countermeasures = 0
verified_countermeasures = 0
# Check: total_countermeasures > 0 and len(detected_tests) > 0
if total_countermeasures > 0 and len(detected_tests) > 0:
# Assign verified_countermeasures = min(len(detected_tests), total_countermeasures)
verified_countermeasures = min(len(detected_tests), total_countermeasures)
# Assign d3fend_ratio = verified_countermeasures / total_countermeasures
d3fend_ratio = verified_countermeasures / total_countermeasures
# Assign d3fend_score = round(d3fend_ratio * w_d3fend, 1)
d3fend_score = round(d3fend_ratio * w_d3fend, 1)
# Fallback: handle remaining cases
else:
# Assign d3fend_ratio = 0
d3fend_ratio = 0
# Assign d3fend_score = 0
d3fend_score = 0
# Assign breakdown["d3fend_coverage"] = {
breakdown["d3fend_coverage"] = {
# Literal argument value
"score": d3fend_score,
# Literal argument value
"max": w_d3fend,
# Literal argument value
"detail": f"{verified_countermeasures}/{total_countermeasures} countermeasures"
if total_countermeasures > 0
else "No D3FEND mappings",
}
# ── 4. Recency ────────────────────────────────────────────────
most_recent_test = None
# Iterate over validated_tests
for t in validated_tests:
# Assign candidate = t.blue_validated_at or t.red_validated_at or t.created_at
candidate = t.blue_validated_at or t.red_validated_at or t.created_at
# Check: candidate and (most_recent_test is None or candidate > most_recent_...
if candidate and (most_recent_test is None or candidate > most_recent_test):
# Assign most_recent_test = candidate
most_recent_test = candidate
# Assign recency_mult = _recency_factor(most_recent_test)
recency_mult = _recency_factor(most_recent_test)
# Assign recency_score = round(recency_mult * w_recency, 1)
recency_score = round(recency_mult * w_recency, 1)
# Check: most_recent_test
if most_recent_test:
# Assign days_ago = (
days_ago = (
datetime.now(timezone.utc) - (
most_recent_test.replace(tzinfo=timezone.utc)
if most_recent_test.tzinfo is None
else most_recent_test.astimezone(timezone.utc)
)
).days
# Assign recency_detail = f"Last validated {days_ago} days ago (factor {recency_mult})"
recency_detail = f"Last validated {days_ago} days ago (factor {recency_mult})"
# Fallback: handle remaining cases
else:
# Assign recency_detail = "No validated tests"
recency_detail = "No validated tests"
# Assign breakdown["recency"] = {
breakdown["recency"] = {
# Literal argument value
"score": recency_score,
# Literal argument value
"max": w_recency,
# Literal argument value
"detail": recency_detail,
}
# ── 5. Severity ───────────────────────────────────────────────
sev_label = severity_by_mitre.get(technique.mitre_id)
# Assign sev_mult = _severity_factor(sev_label)
sev_mult = _severity_factor(sev_label)
# Assign severity_score = round(sev_mult * w_severity, 1)
severity_score = round(sev_mult * w_severity, 1)
# Assign breakdown["severity"] = {
breakdown["severity"] = {
# Literal argument value
"score": severity_score,
# Literal argument value
"max": w_severity,
# Literal argument value
"detail": (
f"Template severity: {sev_label} (factor {sev_mult})"
if sev_label
else "No severity template (default factor)"
),
}
# ── Total ─────────────────────────────────────────────────────
total = min(
test_score + detection_score + d3fend_score + recency_score + severity_score,
# Literal argument value
100,
)
# Return {
return {
# Literal argument value
"total_score": round(total, 1),
# Literal argument value
"breakdown": breakdown,
}
# ── Tactic-level scoring ─────────────────────────────────────────────
def calculate_tactic_score(tactic: str, db: Session) -> dict:
"""Calculate average score for all techniques in a tactic.
Args:
tactic (str): Tactic name used for case-insensitive substring matching
against technique tactic fields.
db (Session): Active SQLAlchemy database session.
Returns:
dict: Contains ``tactic``, ``average_score``, ``techniques_count``,
and ``techniques_scored`` keys.
"""
# Assign scores_map = bulk_technique_scores(db)
scores_map = bulk_technique_scores(db)
# Assign matching = [
matching = [
v["total_score"]
for v in scores_map.values()
if v.get("tactic") and tactic.lower() in v["tactic"].lower()
]
# Return {
return {
# Literal argument value
"tactic": tactic,
# Literal argument value
"average_score": round(sum(matching) / len(matching), 1) if matching else 0,
# Literal argument value
"techniques_count": len(matching),
# Literal argument value
"techniques_scored": len([s for s in matching if s > 0]),
}
# ── Threat actor scoring ─────────────────────────────────────────────
def calculate_actor_coverage_score(actor_id: str, db: Session) -> dict:
"""Calculate coverage score for a specific threat actor's techniques.
Args:
actor_id (str): UUID string identifying the threat actor.
db (Session): Active SQLAlchemy database session.
Returns:
dict: Contains ``actor_id``, ``actor_name``, ``total_score``,
``techniques_count``, ``techniques_covered``, and
``techniques_detail`` keys.
"""
# Assign actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
# Check: not actor
if not actor:
# Return {"total_score": 0, "techniques_count": 0, "techniques_covered": 0}
return {"total_score": 0, "techniques_count": 0, "techniques_covered": 0}
# Assign actor_techniques = (
actor_techniques = (
db.query(ThreatActorTechnique)
# Chain .filter() call
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
# Chain .all() call
.all()
)
# Assign technique_ids = {at.technique_id for at in actor_techniques}
technique_ids = {at.technique_id for at in actor_techniques}
# Check: not technique_ids
if not technique_ids:
# Return {
return {
# Literal argument value
"actor_id": str(actor.id),
# Literal argument value
"actor_name": actor.name,
# Literal argument value
"total_score": 0,
# Literal argument value
"techniques_count": 0,
# Literal argument value
"techniques_covered": 0,
# Literal argument value
"techniques_detail": [],
}
# Assign scores_map = bulk_technique_scores(db)
scores_map = bulk_technique_scores(db)
# Assign scores = []
scores = []
# Assign details = []
details = []
# Iterate over technique_ids
for tid in technique_ids:
# Assign entry = scores_map.get(tid)
entry = scores_map.get(tid)
# Check: not entry
if not entry:
# Skip to the next loop iteration
continue
# Assign score = entry["total_score"]
score = entry["total_score"]
# Call scores.append()
scores.append(score)
# Call details.append()
details.append({
# Literal argument value
"mitre_id": entry["mitre_id"],
# Literal argument value
"name": entry.get("name", ""),
# Literal argument value
"score": score,
# Literal argument value
"breakdown": entry["breakdown"],
})
# Assign avg_score = round(sum(scores) / len(scores), 1) if scores else 0
avg_score = round(sum(scores) / len(scores), 1) if scores else 0
# Return {
return {
# Literal argument value
"actor_id": str(actor.id),
# Literal argument value
"actor_name": actor.name,
# Literal argument value
"total_score": avg_score,
# Literal argument value
"techniques_count": len(technique_ids),
# Literal argument value
"techniques_covered": len([s for s in scores if s > 50]),
# Literal argument value
"techniques_detail": details,
}
# ── Organization-level scoring ────────────────────────────────────────
def calculate_organization_score(db: Session) -> dict:
"""Calculate the overall organization security score.
Uses ``bulk_technique_scores`` to compute all technique scores in
5 aggregated queries instead of N*5.
Args:
db (Session): Active SQLAlchemy database session.
Returns:
dict: Contains ``overall_score``, ``total_coverage``,
``critical_coverage``, ``detection_maturity``,
``response_readiness``, ``techniques_evaluated``, and
``techniques_total``.
"""
# Assign scores_map = bulk_technique_scores(db)
scores_map = bulk_technique_scores(db)
# Assign total_count = len(scores_map)
total_count = len(scores_map)
# Check: total_count == 0
if total_count == 0:
# Return {
return {
# Literal argument value
"overall_score": 0,
# Literal argument value
"total_coverage": 0,
# Literal argument value
"critical_coverage": 0,
# Literal argument value
"detection_maturity": 0,
# Literal argument value
"response_readiness": 0,
# Literal argument value
"techniques_evaluated": 0,
# Literal argument value
"techniques_total": 0,
}
# Assign all_scores = [v["total_score"] for v in scores_map.values()]
all_scores = [v["total_score"] for v in scores_map.values()]
# Assign evaluated_scores = [s for s in all_scores if s > 0]
evaluated_scores = [s for s in all_scores if s > 0]
# Assign evaluated_count = len(evaluated_scores)
evaluated_count = len(evaluated_scores)
# Assign total_coverage = (
total_coverage = (
round(sum(evaluated_scores) / len(evaluated_scores), 1)
if evaluated_scores else 0
)
# Critical coverage: techniques with high/critical severity templates
from app.models.test_template import TestTemplate
# Assign critical_mitre_ids = set(
critical_mitre_ids = set(
row[0]
for row in db.query(TestTemplate.mitre_technique_id)
# Chain .filter() call
.filter(TestTemplate.severity.in_(["high", "critical"]))
# Chain .distinct() call
.distinct()
# Chain .all() call
.all()
)
# Assign critical_scores = [
critical_scores = [
v["total_score"]
for v in scores_map.values()
if v.get("mitre_id") in critical_mitre_ids
]
# Assign critical_coverage = (
critical_coverage = (
round(sum(critical_scores) / len(critical_scores), 1)
if critical_scores else 0
)
# Detection maturity (2 scalar queries — already efficient)
total_rules = (
db.query(func.count(DetectionRule.id))
# Chain .filter() call
.filter(DetectionRule.is_active == True) # noqa: E712
# Chain .scalar() call
.scalar()
) or 0
# Assign triggered_total = (
triggered_total = (
db.query(func.count(TestDetectionResult.id))
# Chain .filter() call
.filter(TestDetectionResult.triggered == True) # noqa: E712
# Chain .scalar() call
.scalar()
) or 0
# Assign detection_maturity = (
detection_maturity = (
round((triggered_total / total_rules) * 100, 1)
if total_rules > 0 else 0
)
# Assign detection_maturity = min(detection_maturity, 100)
detection_maturity = min(detection_maturity, 100)
# Response readiness (2 scalar queries — already efficient)
remediation_total = (
db.query(func.count(Test.id))
# Chain .filter() call
.filter(Test.remediation_status.isnot(None))
# Chain .scalar() call
.scalar()
) or 0
# Assign remediation_completed = (
remediation_completed = (
db.query(func.count(Test.id))
# Chain .filter() call
.filter(Test.remediation_status == "completed")
# Chain .scalar() call
.scalar()
) or 0
# Assign response_readiness = (
response_readiness = (
round((remediation_completed / remediation_total) * 100, 1)
if remediation_total > 0 else 0
)
# Assign overall = round(
overall = round(
total_coverage * 0.4
+ critical_coverage * 0.25
+ detection_maturity * 0.2
+ response_readiness * 0.15,
# Literal argument value
1,
)
# Return {
return {
# Literal argument value
"overall_score": overall,
# Literal argument value
"total_coverage": total_coverage,
# Literal argument value
"critical_coverage": critical_coverage,
# Literal argument value
"detection_maturity": detection_maturity,
# Literal argument value
"response_readiness": response_readiness,
# Literal argument value
"techniques_evaluated": evaluated_count,
# Literal argument value
"techniques_total": total_count,
}
# ── Score history ────────────────────────────────────────────────────
def get_score_history(db: Session, period: str = "90d") -> list:
"""Return historical score snapshots approximated from test dates.
Since there is no dedicated history table, scores are approximated by
counting validated tests within weekly time windows.
Args:
db (Session): Active SQLAlchemy database session.
period (str): Lookback period; one of ``"30d"``, ``"90d"``
(default), or ``"1y"``.
Returns:
list: Weekly data points, each a dict with ``date``, ``score``,
and ``validated_tests``.
"""
# Assign now = datetime.utcnow()
now = datetime.utcnow()
# Check: period == "30d"
if period == "30d":
# Assign start = now - timedelta(days=30)
start = now - timedelta(days=30)
# Alternative: period == "1y"
elif period == "1y":
# Assign start = now - timedelta(days=365)
start = now - timedelta(days=365)
# else: # 90d default
else: # 90d default
# Assign start = now - timedelta(days=90)
start = now - timedelta(days=90)
# Group validated tests by week
weeks = []
# Assign current = start
current = start
# Loop while current < now
while current < now:
# Assign week_end = min(current + timedelta(days=7), now)
week_end = min(current + timedelta(days=7), now)
# Count validated tests up to this week
validated_up_to = (
db.query(func.count(Test.id))
# Chain .filter() call
.filter(
Test.state == TestState.validated,
Test.red_validated_at <= week_end,
)
# Chain .scalar() call
.scalar()
) or 0
# Assign total_techniques = (
total_techniques = (
db.query(func.count(Technique.id)).scalar()
) or 1
# Simple approximation: coverage percentage as score proxy
score_approx = round((validated_up_to / total_techniques) * 100, 1)
# Call weeks.append()
weeks.append({
# Literal argument value
"date": current.strftime("%Y-%m-%d"),
# Literal argument value
"score": min(score_approx, 100),
# Literal argument value
"validated_tests": validated_up_to,
})
# Assign current = week_end
current = week_end
# Return weeks
return weeks