feat(phase-28): add scoring system, operational metrics and executive dashboard (T-224 to T-226)

This commit is contained in:
2026-02-09 17:24:44 +01:00
parent a911ddeb52
commit 12f33307fd
11 changed files with 1930 additions and 0 deletions

View File

@@ -0,0 +1,467 @@
"""Scoring service — granular 0-100 scoring for techniques, tactics, actors, and org.
Uses configurable weights from Settings to compute coverage scores with
detailed breakdowns.
"""
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.config import settings
from app.models.technique import Technique
from app.models.test import Test
from app.models.detection_rule import DetectionRule
from app.models.test_detection_result import TestDetectionResult
from app.models.defensive_technique import DefensiveTechniqueMapping
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.models.enums import TestState, TestResult
# ── Technique-level scoring ──────────────────────────────────────────
def calculate_technique_score(technique: Technique, db: Session) -> dict:
"""Calculate a 0-100 score for a technique with detailed breakdown.
Weights (configurable via settings):
- tests_validated: weight from SCORING_WEIGHT_TESTS
- detection_rules: weight from SCORING_WEIGHT_DETECTION_RULES
- d3fend_coverage: weight from SCORING_WEIGHT_D3FEND
- freshness: weight from SCORING_WEIGHT_FRESHNESS
- platform_diversity: weight from SCORING_WEIGHT_PLATFORM_DIVERSITY
"""
w_tests = settings.SCORING_WEIGHT_TESTS
w_detection = settings.SCORING_WEIGHT_DETECTION_RULES
w_d3fend = settings.SCORING_WEIGHT_D3FEND
w_freshness = settings.SCORING_WEIGHT_FRESHNESS
w_diversity = settings.SCORING_WEIGHT_PLATFORM_DIVERSITY
breakdown = {}
# ── 1. Tests validated with detection ──────────────────────────
all_tests = (
db.query(Test)
.filter(Test.technique_id == technique.id)
.all()
)
validated_tests = [t for t in all_tests if t.state == TestState.validated]
detected_tests = [
t for t in validated_tests
if t.detection_result == TestResult.detected
]
if validated_tests:
test_ratio = len(detected_tests) / len(validated_tests)
test_score = round(test_ratio * w_tests, 1)
else:
test_ratio = 0
test_score = 0
breakdown["tests_validated"] = {
"score": test_score,
"max": w_tests,
"detail": f"{len(detected_tests)}/{len(validated_tests)} tests detected"
if validated_tests
else "No validated tests",
}
# ── 2. Detection rules coverage ───────────────────────────────
total_rules = (
db.query(func.count(DetectionRule.id))
.filter(
DetectionRule.mitre_technique_id == technique.mitre_id,
DetectionRule.is_active == True,
)
.scalar()
) or 0
triggered_rules = 0
if total_rules > 0:
triggered_rules = (
db.query(func.count(TestDetectionResult.id))
.join(
DetectionRule,
DetectionRule.id == TestDetectionResult.detection_rule_id,
)
.filter(
DetectionRule.mitre_technique_id == technique.mitre_id,
TestDetectionResult.triggered == True,
)
.scalar()
) or 0
detection_ratio = min(triggered_rules / total_rules, 1.0)
detection_score = round(detection_ratio * w_detection, 1)
else:
detection_ratio = 0
detection_score = 0
breakdown["detection_rules"] = {
"score": detection_score,
"max": w_detection,
"detail": f"{triggered_rules}/{total_rules} rules triggered"
if total_rules > 0
else "No detection rules available",
}
# ── 3. D3FEND coverage ────────────────────────────────────────
total_countermeasures = (
db.query(func.count(DefensiveTechniqueMapping.id))
.filter(DefensiveTechniqueMapping.attack_technique_id == technique.id)
.scalar()
) or 0
# Consider a countermeasure "verified" if we have validated tests
# with detection for the technique (simplified heuristic)
verified_countermeasures = 0
if total_countermeasures > 0 and len(detected_tests) > 0:
# Rough heuristic: each detected test validates ~1 countermeasure
verified_countermeasures = min(len(detected_tests), total_countermeasures)
d3fend_ratio = verified_countermeasures / total_countermeasures
d3fend_score = round(d3fend_ratio * w_d3fend, 1)
else:
d3fend_ratio = 0
d3fend_score = 0
breakdown["d3fend_coverage"] = {
"score": d3fend_score,
"max": w_d3fend,
"detail": f"{verified_countermeasures}/{total_countermeasures} countermeasures"
if total_countermeasures > 0
else "No D3FEND mappings",
}
# ── 4. Freshness ──────────────────────────────────────────────
# Most recent validated test date
most_recent_test = (
db.query(func.max(Test.red_validated_at))
.filter(
Test.technique_id == technique.id,
Test.state == TestState.validated,
)
.scalar()
)
now = datetime.utcnow()
if most_recent_test:
days_ago = (now - most_recent_test).days
if days_ago < 90:
freshness_pct = 1.0
elif days_ago < 180:
freshness_pct = 0.5
else:
freshness_pct = 0.0
freshness_score = round(freshness_pct * w_freshness, 1)
freshness_detail = f"Last test {days_ago} days ago"
else:
freshness_pct = 0
freshness_score = 0
freshness_detail = "No validated tests"
breakdown["freshness"] = {
"score": freshness_score,
"max": w_freshness,
"detail": freshness_detail,
}
# ── 5. Platform diversity ─────────────────────────────────────
available_platforms = technique.platforms or []
total_platforms = len(available_platforms) if available_platforms else 3 # default 3
tested_platforms = set()
for t in validated_tests:
if t.platform:
tested_platforms.add(t.platform.lower())
if total_platforms > 0 and tested_platforms:
diversity_ratio = min(len(tested_platforms) / total_platforms, 1.0)
diversity_score = round(diversity_ratio * w_diversity, 1)
else:
diversity_ratio = 0
diversity_score = 0
breakdown["platform_diversity"] = {
"score": diversity_score,
"max": w_diversity,
"detail": f"{len(tested_platforms)}/{total_platforms} platforms covered"
if tested_platforms
else "No platforms tested",
}
# ── Total ─────────────────────────────────────────────────────
total = min(
test_score + detection_score + d3fend_score + freshness_score + diversity_score,
100,
)
return {
"total_score": round(total, 1),
"breakdown": breakdown,
}
# ── Tactic-level scoring ─────────────────────────────────────────────
def calculate_tactic_score(tactic: str, db: Session) -> dict:
"""Calculate average score for all techniques in a tactic."""
techniques = (
db.query(Technique)
.filter(Technique.tactic.ilike(f"%{tactic}%"))
.all()
)
if not techniques:
return {
"tactic": tactic,
"average_score": 0,
"techniques_count": 0,
"techniques_scored": 0,
}
scores = []
for tech in techniques:
result = calculate_technique_score(tech, db)
scores.append(result["total_score"])
return {
"tactic": tactic,
"average_score": round(sum(scores) / len(scores), 1) if scores else 0,
"techniques_count": len(techniques),
"techniques_scored": len([s for s in scores if s > 0]),
}
# ── Threat actor scoring ─────────────────────────────────────────────
def calculate_actor_coverage_score(actor_id: str, db: Session) -> dict:
"""Calculate coverage score for a specific threat actor's techniques."""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
return {"total_score": 0, "techniques_count": 0, "techniques_covered": 0}
# Get all techniques used by this actor
actor_techniques = (
db.query(ThreatActorTechnique)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.all()
)
technique_ids = [at.technique_id for at in actor_techniques]
if not technique_ids:
return {
"actor_id": str(actor.id),
"actor_name": actor.name,
"total_score": 0,
"techniques_count": 0,
"techniques_covered": 0,
"techniques_detail": [],
}
techniques = (
db.query(Technique)
.filter(Technique.id.in_(technique_ids))
.all()
)
scores = []
details = []
for tech in techniques:
result = calculate_technique_score(tech, db)
score = result["total_score"]
scores.append(score)
details.append({
"mitre_id": tech.mitre_id,
"name": tech.name,
"score": score,
"breakdown": result["breakdown"],
})
avg_score = round(sum(scores) / len(scores), 1) if scores else 0
return {
"actor_id": str(actor.id),
"actor_name": actor.name,
"total_score": avg_score,
"techniques_count": len(techniques),
"techniques_covered": len([s for s in scores if s > 50]),
"techniques_detail": details,
}
# ── Organization-level scoring ────────────────────────────────────────
def calculate_organization_score(db: Session) -> dict:
"""Calculate the overall organization security score."""
# All techniques
all_techniques = db.query(Technique).all()
total_count = len(all_techniques)
if total_count == 0:
return {
"overall_score": 0,
"total_coverage": 0,
"critical_coverage": 0,
"detection_maturity": 0,
"response_readiness": 0,
"techniques_evaluated": 0,
"techniques_total": 0,
}
# Calculate scores for all techniques (with caching for performance)
all_scores = []
evaluated_count = 0
for tech in all_techniques:
result = calculate_technique_score(tech, db)
score = result["total_score"]
all_scores.append(score)
if score > 0:
evaluated_count += 1
# Total coverage: average of all evaluated techniques
evaluated_scores = [s for s in all_scores if s > 0]
total_coverage = (
round(sum(evaluated_scores) / len(evaluated_scores), 1)
if evaluated_scores
else 0
)
# Critical coverage: techniques with high-severity templates
# (simplified: techniques that have tests are "critical")
from app.models.test_template import TestTemplate
critical_mitre_ids = set(
row[0]
for row in db.query(TestTemplate.mitre_technique_id)
.filter(TestTemplate.severity.in_(["high", "critical"]))
.distinct()
.all()
)
critical_techniques = [
t for t in all_techniques if t.mitre_id in critical_mitre_ids
]
if critical_techniques:
critical_scores = []
for tech in critical_techniques:
result = calculate_technique_score(tech, db)
critical_scores.append(result["total_score"])
critical_coverage = round(sum(critical_scores) / len(critical_scores), 1)
else:
critical_coverage = 0
# Detection maturity: based on detection rule coverage
total_rules = (
db.query(func.count(DetectionRule.id))
.filter(DetectionRule.is_active == True)
.scalar()
) or 0
triggered_total = (
db.query(func.count(TestDetectionResult.id))
.filter(TestDetectionResult.triggered == True)
.scalar()
) or 0
detection_maturity = (
round((triggered_total / total_rules) * 100, 1)
if total_rules > 0
else 0
)
detection_maturity = min(detection_maturity, 100)
# Response readiness: based on remediation completion
remediation_total = (
db.query(func.count(Test.id))
.filter(Test.remediation_status.isnot(None))
.scalar()
) or 0
remediation_completed = (
db.query(func.count(Test.id))
.filter(Test.remediation_status == "completed")
.scalar()
) or 0
response_readiness = (
round((remediation_completed / remediation_total) * 100, 1)
if remediation_total > 0
else 0
)
# Overall score: weighted average of sub-scores
overall = round(
total_coverage * 0.4
+ critical_coverage * 0.25
+ detection_maturity * 0.2
+ response_readiness * 0.15,
1,
)
return {
"overall_score": overall,
"total_coverage": total_coverage,
"critical_coverage": critical_coverage,
"detection_maturity": detection_maturity,
"response_readiness": response_readiness,
"techniques_evaluated": evaluated_count,
"techniques_total": total_count,
}
# ── Score history ────────────────────────────────────────────────────
def get_score_history(db: Session, period: str = "90d") -> list:
"""Get historical score snapshots.
Since we don't have a dedicated history table, we approximate by
computing scores based on test dates within time windows.
Returns a list of weekly data points.
"""
from app.models.audit import AuditLog
now = datetime.utcnow()
if period == "30d":
start = now - timedelta(days=30)
elif period == "1y":
start = now - timedelta(days=365)
else: # 90d default
start = now - timedelta(days=90)
# Group validated tests by week
weeks = []
current = start
while current < now:
week_end = min(current + timedelta(days=7), now)
# Count validated tests up to this week
validated_up_to = (
db.query(func.count(Test.id))
.filter(
Test.state == TestState.validated,
Test.red_validated_at <= week_end,
)
.scalar()
) or 0
total_techniques = (
db.query(func.count(Technique.id)).scalar()
) or 1
# Simple approximation: coverage percentage as score proxy
score_approx = round((validated_up_to / total_techniques) * 100, 1)
weeks.append({
"date": current.strftime("%Y-%m-%d"),
"score": min(score_approx, 100),
"validated_tests": validated_up_to,
})
current = week_end
return weeks