"""Scoring service — granular 0-100 scoring for techniques, tactics, actors, and org. Reads configurable weights from the ``scoring_config`` table (falling back to env-var defaults) to compute coverage scores with detailed breakdowns. Bulk helpers (``bulk_technique_scores``) pre-fetch all scoring data in a fixed number of aggregated queries so that organisation-wide calculations never produce N+1 traffic. """ # Import datetime, timedelta, timezone from datetime from datetime import datetime, timedelta, timezone # Import case, func from sqlalchemy from sqlalchemy import case, func # Import Session from sqlalchemy.orm from sqlalchemy.orm import Session # Import EntityNotFoundError from app.domain.errors from app.domain.errors import EntityNotFoundError # Import DefensiveTechniqueMapping from app.models.defensive_technique from app.models.defensive_technique import DefensiveTechniqueMapping # Import DetectionRule from app.models.detection_rule from app.models.detection_rule import DetectionRule # Import TestResult, TestState from app.models.enums from app.models.enums import TestResult, TestState # Import Technique from app.models.technique from app.models.technique import Technique # Import Test from app.models.test from app.models.test import Test # Import TestDetectionResult from app.models.test_detection_result from app.models.test_detection_result import TestDetectionResult # Import ThreatActor, ThreatActorTechnique from app.models.threat_actor from app.models.threat_actor import ThreatActor, ThreatActorTechnique # Import get_scoring_weights from app.services.scoring_config_service from app.services.scoring_config_service import get_scoring_weights # Assign _SEVERITY_FACTORS = { _SEVERITY_FACTORS: dict[str, float] = { # Literal argument value "critical": 1.0, # Literal argument value "high": 0.85, # Literal argument value "medium": 0.65, # Literal argument value "low": 0.5, } # Define function _recency_factor def _recency_factor(last_tested: datetime | None) -> float: """Return a recency decay factor: 1.0 when recent, decreasing over time. Args: last_tested (datetime | None): Datetime of the most recent validated test, or ``None`` if the technique has never been tested. Returns: float: A multiplier between 0.0 and 1.0; 0.0 when untested, 1.0 when tested within the last 90 days. """ # Check: not last_tested if not last_tested: # Return 0.0 return 0.0 # Assign now = datetime.now(timezone.utc) now = datetime.now(timezone.utc) # Assign tested = last_tested tested = last_tested # Check: tested.tzinfo is None if tested.tzinfo is None: # Assign tested = tested.replace(tzinfo=timezone.utc) tested = tested.replace(tzinfo=timezone.utc) # Assign days_ago = (now - tested).days days_ago = (now - tested).days # Check: days_ago <= 90 if days_ago <= 90: # Return 1.0 return 1.0 # Check: days_ago <= 180 if days_ago <= 180: # Return 0.8 return 0.8 # Check: days_ago <= 365 if days_ago <= 365: # Return 0.5 return 0.5 # Return 0.2 return 0.2 # Define function _severity_factor def _severity_factor(severity_label: str | None) -> float: """Map template severity to a 0–1 multiplier. Args: severity_label (str | None): Severity string from the test template (e.g. ``"critical"``, ``"high"``). Case-insensitive. Returns: float: A multiplier between 0.5 and 1.0; defaults to 0.7 for unknown or missing labels. """ # Check: not severity_label if not severity_label: # Return 0.7 return 0.7 # Return _SEVERITY_FACTORS.get(severity_label.lower(), 0.7) return _SEVERITY_FACTORS.get(severity_label.lower(), 0.7) # Define function _max_severity_by_mitre def _max_severity_by_mitre(db: Session) -> dict[str, str]: """Return the highest severity label per MITRE ID from active test templates. Args: db (Session): Active SQLAlchemy database session. Returns: dict[str, str]: Mapping of MITRE technique ID to the highest severity label (``"critical"`` > ``"high"`` > ``"medium"`` > ``"low"``) found among active test templates for that technique. """ # Import TestTemplate from app.models.test_template from app.models.test_template import TestTemplate # Assign order = {"critical": 4, "high": 3, "medium": 2, "low": 1} order = {"critical": 4, "high": 3, "medium": 2, "low": 1} # Assign rows = ( rows = ( db.query(TestTemplate.mitre_technique_id, TestTemplate.severity) # Chain .filter() call .filter( TestTemplate.is_active == True, # noqa: E712 TestTemplate.severity.isnot(None), ) # Chain .all() call .all() ) # Assign best = {} best: dict[str, str] = {} # Iterate over rows for mitre_id, severity in rows: # Check: not mitre_id or not severity if not mitre_id or not severity: # Skip to the next loop iteration continue # Assign current = best.get(mitre_id) current = best.get(mitre_id) # Check: current is None or order.get(severity.lower(), 0) > order.get(curre... if current is None or order.get(severity.lower(), 0) > order.get(current.lower(), 0): # Assign best[mitre_id] = severity best[mitre_id] = severity # Return best return best # ── Bulk scoring helpers (5 queries for ALL techniques) ─────────────── def bulk_technique_scores(db: Session) -> dict: """Pre-fetch all scoring data and compute per-technique scores in memory. Executes exactly 5 queries regardless of technique count: Q1 — Test aggregates per technique (validated / detected / platforms / freshness) Q2 — Detection rules per mitre_id Q3 — Triggered rules per mitre_id Q4 — D3FEND mapping counts per technique Q5 — All techniques Returns ``{technique_id: {"total_score": float, "breakdown": dict}}``. """ # Assign w = get_scoring_weights(db) w = get_scoring_weights(db) # Assign w_tests = w.tests w_tests = w.tests # Assign w_detection = w.detection_rules w_detection = w.detection_rules # Assign w_d3fend = w.d3fend w_d3fend = w.d3fend # Assign w_recency = w.recency w_recency = w.recency # Assign w_severity = w.severity w_severity = w.severity # Assign severity_by_mitre = _max_severity_by_mitre(db) severity_by_mitre = _max_severity_by_mitre(db) # Assign last_validated = func.coalesce( last_validated = func.coalesce( Test.blue_validated_at, Test.red_validated_at, Test.created_at, ) # Q1: test stats grouped by technique_id test_rows = ( db.query( Test.technique_id, func.count(Test.id).label("validated_count"), func.count( case((Test.detection_result == TestResult.detected, Test.id)) ).label("detected_count"), func.max(last_validated).label("latest_validated_at"), ) # Chain .filter() call .filter(Test.state == TestState.validated) # Chain .group_by() call .group_by(Test.technique_id) # Chain .all() call .all() ) # Assign test_stats = {} test_stats: dict = {} # Iterate over test_rows for row in test_rows: # Assign test_stats[row.technique_id] = { test_stats[row.technique_id] = { # Literal argument value "validated": row.validated_count, # Literal argument value "detected": row.detected_count, # Literal argument value "latest_validated_at": row.latest_validated_at, } # Q2: active detection rules per mitre_id rule_rows = ( db.query( DetectionRule.mitre_technique_id, func.count(DetectionRule.id).label("total"), ) # Chain .filter() call .filter(DetectionRule.is_active == True) # noqa: E712 # Chain .group_by() call .group_by(DetectionRule.mitre_technique_id) # Chain .all() call .all() ) # Assign rules_by_mitre = {r.mitre_technique_id: r.total for r in rule_rows} rules_by_mitre: dict[str, int] = {r.mitre_technique_id: r.total for r in rule_rows} # Q3: triggered rules per mitre_id triggered_rows = ( db.query( DetectionRule.mitre_technique_id, func.count(TestDetectionResult.id).label("triggered"), ) # Chain .join() call .join(DetectionRule, DetectionRule.id == TestDetectionResult.detection_rule_id) # Chain .filter() call .filter(TestDetectionResult.triggered == True) # noqa: E712 # Chain .group_by() call .group_by(DetectionRule.mitre_technique_id) # Chain .all() call .all() ) # Assign triggered_by_mitre = { triggered_by_mitre: dict[str, int] = { r.mitre_technique_id: r.triggered for r in triggered_rows } # Q4: D3FEND mapping counts per technique d3fend_rows = ( db.query( DefensiveTechniqueMapping.attack_technique_id, func.count(DefensiveTechniqueMapping.id).label("total"), ) # Chain .group_by() call .group_by(DefensiveTechniqueMapping.attack_technique_id) # Chain .all() call .all() ) # Assign d3fend_by_tech = {r.attack_technique_id: r.total for r in d3fend_rows} d3fend_by_tech: dict = {r.attack_technique_id: r.total for r in d3fend_rows} # Q5: all techniques techniques = db.query(Technique).all() # Assign results = {} results: dict = {} # Iterate over techniques for tech in techniques: # Assign ts = test_stats.get(tech.id, {}) ts = test_stats.get(tech.id, {}) # Assign validated = ts.get("validated", 0) validated = ts.get("validated", 0) # Assign detected = ts.get("detected", 0) detected = ts.get("detected", 0) # Assign latest_at = ts.get("latest_validated_at") latest_at = ts.get("latest_validated_at") # Assign breakdown = {} breakdown = {} # 1. Tests validated with detection if validated > 0: # Assign test_ratio = detected / validated test_ratio = detected / validated # Assign test_score = round(test_ratio * w_tests, 1) test_score = round(test_ratio * w_tests, 1) # Fallback: handle remaining cases else: # Assign test_ratio = 0 test_ratio = 0 # Assign test_score = 0 test_score = 0 # Assign breakdown["tests_validated"] = { breakdown["tests_validated"] = { # Literal argument value "score": test_score, # Literal argument value "max": w_tests, # Literal argument value "detail": ( f"{detected}/{validated} tests detected" if validated else "No validated tests" ), } # 2. Detection rules total_rules = rules_by_mitre.get(tech.mitre_id, 0) # Assign triggered_rules = triggered_by_mitre.get(tech.mitre_id, 0) triggered_rules = triggered_by_mitre.get(tech.mitre_id, 0) # Check: total_rules > 0 if total_rules > 0: # Assign detection_ratio = min(triggered_rules / total_rules, 1.0) detection_ratio = min(triggered_rules / total_rules, 1.0) # Assign detection_score = round(detection_ratio * w_detection, 1) detection_score = round(detection_ratio * w_detection, 1) # Fallback: handle remaining cases else: # Assign detection_ratio = 0 detection_ratio = 0 # Assign detection_score = 0 detection_score = 0 # Assign breakdown["detection_rules"] = { breakdown["detection_rules"] = { # Literal argument value "score": detection_score, # Literal argument value "max": w_detection, # Literal argument value "detail": ( f"{triggered_rules}/{total_rules} rules triggered" if total_rules > 0 else "No detection rules available" ), } # 3. D3FEND coverage total_cm = d3fend_by_tech.get(tech.id, 0) # Check: total_cm > 0 and detected > 0 if total_cm > 0 and detected > 0: # Assign verified_cm = min(detected, total_cm) verified_cm = min(detected, total_cm) # Assign d3fend_score = round((verified_cm / total_cm) * w_d3fend, 1) d3fend_score = round((verified_cm / total_cm) * w_d3fend, 1) # Fallback: handle remaining cases else: # Assign verified_cm = 0 verified_cm = 0 # Assign d3fend_score = 0 d3fend_score = 0 # Assign breakdown["d3fend_coverage"] = { breakdown["d3fend_coverage"] = { # Literal argument value "score": d3fend_score, # Literal argument value "max": w_d3fend, # Literal argument value "detail": ( f"{verified_cm}/{total_cm} countermeasures" if total_cm > 0 else "No D3FEND mappings" ), } # 4. Recency decay recency_mult = _recency_factor(latest_at) # Assign recency_score = round(recency_mult * w_recency, 1) recency_score = round(recency_mult * w_recency, 1) # Check: latest_at if latest_at: # Assign tested = latest_at tested = latest_at # Check: tested.tzinfo is None if tested.tzinfo is None: # Assign days_ago = (datetime.utcnow() - tested).days days_ago = (datetime.utcnow() - tested).days # Fallback: handle remaining cases else: # Assign days_ago = (datetime.now(timezone.utc) - tested.astimezone(timezone.utc)).days days_ago = (datetime.now(timezone.utc) - tested.astimezone(timezone.utc)).days # Assign recency_detail = f"Last validated {days_ago} days ago (factor {recency_mult})" recency_detail = f"Last validated {days_ago} days ago (factor {recency_mult})" # Fallback: handle remaining cases else: # Assign recency_detail = "No validated tests" recency_detail = "No validated tests" # Assign breakdown["recency"] = { breakdown["recency"] = { # Literal argument value "score": recency_score, # Literal argument value "max": w_recency, # Literal argument value "detail": recency_detail, } # 5. Severity / criticality (template-driven) sev_label = severity_by_mitre.get(tech.mitre_id) # Assign sev_mult = _severity_factor(sev_label) sev_mult = _severity_factor(sev_label) # Assign severity_score = round(sev_mult * w_severity, 1) severity_score = round(sev_mult * w_severity, 1) # Assign breakdown["severity"] = { breakdown["severity"] = { # Literal argument value "score": severity_score, # Literal argument value "max": w_severity, # Literal argument value "detail": ( f"Template severity: {sev_label} (factor {sev_mult})" if sev_label else "No severity template (default factor)" ), } # Assign total = min( total = min( test_score + detection_score + d3fend_score + recency_score + severity_score, # Literal argument value 100, ) # Assign results[tech.id] = { results[tech.id] = { # Literal argument value "total_score": round(total, 1), # Literal argument value "breakdown": breakdown, # Literal argument value "mitre_id": tech.mitre_id, # Literal argument value "tactic": tech.tactic, } # Return results return results # ── Technique-level scoring (single technique — preserved API) ──────── def score_technique_by_mitre_id(db: Session, mitre_id: str) -> dict: """Return detailed score with breakdown for a technique by MITRE ID. Args: db (Session): Active SQLAlchemy database session. mitre_id (str): MITRE ATT&CK technique identifier (e.g. ``"T1059"``). Returns: dict: Scoring result containing ``mitre_id``, ``name``, ``tactic``, ``status_global``, ``total_score``, and ``breakdown``. """ # Assign technique = db.query(Technique).filter(Technique.mitre_id == mitre_id).first() technique = db.query(Technique).filter(Technique.mitre_id == mitre_id).first() # Check: not technique if not technique: # Raise EntityNotFoundError raise EntityNotFoundError("Technique", mitre_id) # Assign result = calculate_technique_score(technique, db) result = calculate_technique_score(technique, db) # Return { return { # Literal argument value "mitre_id": technique.mitre_id, # Literal argument value "name": technique.name, # Literal argument value "tactic": technique.tactic, # Literal argument value "status_global": technique.status_global.value if technique.status_global else None, **result, } # Define function score_actor_by_id def score_actor_by_id(db: Session, actor_id: str) -> dict: """Return coverage score for a threat actor by ID. Args: db (Session): Active SQLAlchemy database session. actor_id (str): UUID string identifying the threat actor. Returns: dict: Coverage score dictionary from :func:`calculate_actor_coverage_score`. """ # Assign actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first() actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first() # Check: not actor if not actor: # Raise EntityNotFoundError raise EntityNotFoundError("ThreatActor", actor_id) # Return calculate_actor_coverage_score(actor_id, db) return calculate_actor_coverage_score(actor_id, db) # Define function calculate_technique_score def calculate_technique_score(technique: Technique, db: Session) -> dict: """Calculate a 0-100 score for a technique with detailed breakdown. Weights are read from the ``scoring_config`` table (or env defaults). Args: technique (Technique): The technique ORM object to score. db (Session): Active SQLAlchemy database session. Returns: dict: Dictionary with ``total_score`` (float) and ``breakdown`` (dict mapping component name to score, max, and detail string). """ # Assign w = get_scoring_weights(db) w = get_scoring_weights(db) # Assign w_tests = w.tests w_tests = w.tests # Assign w_detection = w.detection_rules w_detection = w.detection_rules # Assign w_d3fend = w.d3fend w_d3fend = w.d3fend # Assign w_recency = w.recency w_recency = w.recency # Assign w_severity = w.severity w_severity = w.severity # Assign severity_by_mitre = _max_severity_by_mitre(db) severity_by_mitre = _max_severity_by_mitre(db) # Assign breakdown = {} breakdown = {} # ── 1. Tests validated with detection ────────────────────────── all_tests = ( db.query(Test) # Chain .filter() call .filter(Test.technique_id == technique.id) # Chain .all() call .all() ) # Assign validated_tests = [t for t in all_tests if t.state == TestState.validated] validated_tests = [t for t in all_tests if t.state == TestState.validated] # Assign detected_tests = [ detected_tests = [ t for t in validated_tests if t.detection_result == TestResult.detected ] # Check: validated_tests if validated_tests: # Assign test_ratio = len(detected_tests) / len(validated_tests) test_ratio = len(detected_tests) / len(validated_tests) # Assign test_score = round(test_ratio * w_tests, 1) test_score = round(test_ratio * w_tests, 1) # Fallback: handle remaining cases else: # Assign test_ratio = 0 test_ratio = 0 # Assign test_score = 0 test_score = 0 # Assign breakdown["tests_validated"] = { breakdown["tests_validated"] = { # Literal argument value "score": test_score, # Literal argument value "max": w_tests, # Literal argument value "detail": f"{len(detected_tests)}/{len(validated_tests)} tests detected" if validated_tests else "No validated tests", } # ── 2. Detection rules coverage ─────────────────────────────── total_rules = ( db.query(func.count(DetectionRule.id)) # Chain .filter() call .filter( DetectionRule.mitre_technique_id == technique.mitre_id, DetectionRule.is_active == True, # noqa: E712 ) # Chain .scalar() call .scalar() ) or 0 # Assign triggered_rules = 0 triggered_rules = 0 # Check: total_rules > 0 if total_rules > 0: # Assign triggered_rules = ( triggered_rules = ( db.query(func.count(TestDetectionResult.id)) # Chain .join() call .join( DetectionRule, DetectionRule.id == TestDetectionResult.detection_rule_id, ) # Chain .filter() call .filter( DetectionRule.mitre_technique_id == technique.mitre_id, TestDetectionResult.triggered == True, # noqa: E712 ) # Chain .scalar() call .scalar() ) or 0 # Assign detection_ratio = min(triggered_rules / total_rules, 1.0) detection_ratio = min(triggered_rules / total_rules, 1.0) # Assign detection_score = round(detection_ratio * w_detection, 1) detection_score = round(detection_ratio * w_detection, 1) # Fallback: handle remaining cases else: # Assign detection_ratio = 0 detection_ratio = 0 # Assign detection_score = 0 detection_score = 0 # Assign breakdown["detection_rules"] = { breakdown["detection_rules"] = { # Literal argument value "score": detection_score, # Literal argument value "max": w_detection, # Literal argument value "detail": f"{triggered_rules}/{total_rules} rules triggered" if total_rules > 0 else "No detection rules available", } # ── 3. D3FEND coverage ──────────────────────────────────────── total_countermeasures = ( db.query(func.count(DefensiveTechniqueMapping.id)) # Chain .filter() call .filter(DefensiveTechniqueMapping.attack_technique_id == technique.id) # Chain .scalar() call .scalar() ) or 0 # Assign verified_countermeasures = 0 verified_countermeasures = 0 # Check: total_countermeasures > 0 and len(detected_tests) > 0 if total_countermeasures > 0 and len(detected_tests) > 0: # Assign verified_countermeasures = min(len(detected_tests), total_countermeasures) verified_countermeasures = min(len(detected_tests), total_countermeasures) # Assign d3fend_ratio = verified_countermeasures / total_countermeasures d3fend_ratio = verified_countermeasures / total_countermeasures # Assign d3fend_score = round(d3fend_ratio * w_d3fend, 1) d3fend_score = round(d3fend_ratio * w_d3fend, 1) # Fallback: handle remaining cases else: # Assign d3fend_ratio = 0 d3fend_ratio = 0 # Assign d3fend_score = 0 d3fend_score = 0 # Assign breakdown["d3fend_coverage"] = { breakdown["d3fend_coverage"] = { # Literal argument value "score": d3fend_score, # Literal argument value "max": w_d3fend, # Literal argument value "detail": f"{verified_countermeasures}/{total_countermeasures} countermeasures" if total_countermeasures > 0 else "No D3FEND mappings", } # ── 4. Recency ──────────────────────────────────────────────── most_recent_test = None # Iterate over validated_tests for t in validated_tests: # Assign candidate = t.blue_validated_at or t.red_validated_at or t.created_at candidate = t.blue_validated_at or t.red_validated_at or t.created_at # Check: candidate and (most_recent_test is None or candidate > most_recent_... if candidate and (most_recent_test is None or candidate > most_recent_test): # Assign most_recent_test = candidate most_recent_test = candidate # Assign recency_mult = _recency_factor(most_recent_test) recency_mult = _recency_factor(most_recent_test) # Assign recency_score = round(recency_mult * w_recency, 1) recency_score = round(recency_mult * w_recency, 1) # Check: most_recent_test if most_recent_test: # Assign days_ago = ( days_ago = ( datetime.now(timezone.utc) - ( most_recent_test.replace(tzinfo=timezone.utc) if most_recent_test.tzinfo is None else most_recent_test.astimezone(timezone.utc) ) ).days # Assign recency_detail = f"Last validated {days_ago} days ago (factor {recency_mult})" recency_detail = f"Last validated {days_ago} days ago (factor {recency_mult})" # Fallback: handle remaining cases else: # Assign recency_detail = "No validated tests" recency_detail = "No validated tests" # Assign breakdown["recency"] = { breakdown["recency"] = { # Literal argument value "score": recency_score, # Literal argument value "max": w_recency, # Literal argument value "detail": recency_detail, } # ── 5. Severity ─────────────────────────────────────────────── sev_label = severity_by_mitre.get(technique.mitre_id) # Assign sev_mult = _severity_factor(sev_label) sev_mult = _severity_factor(sev_label) # Assign severity_score = round(sev_mult * w_severity, 1) severity_score = round(sev_mult * w_severity, 1) # Assign breakdown["severity"] = { breakdown["severity"] = { # Literal argument value "score": severity_score, # Literal argument value "max": w_severity, # Literal argument value "detail": ( f"Template severity: {sev_label} (factor {sev_mult})" if sev_label else "No severity template (default factor)" ), } # ── Total ───────────────────────────────────────────────────── total = min( test_score + detection_score + d3fend_score + recency_score + severity_score, # Literal argument value 100, ) # Return { return { # Literal argument value "total_score": round(total, 1), # Literal argument value "breakdown": breakdown, } # ── Tactic-level scoring ───────────────────────────────────────────── def calculate_tactic_score(tactic: str, db: Session) -> dict: """Calculate average score for all techniques in a tactic. Args: tactic (str): Tactic name used for case-insensitive substring matching against technique tactic fields. db (Session): Active SQLAlchemy database session. Returns: dict: Contains ``tactic``, ``average_score``, ``techniques_count``, and ``techniques_scored`` keys. """ # Assign scores_map = bulk_technique_scores(db) scores_map = bulk_technique_scores(db) # Assign matching = [ matching = [ v["total_score"] for v in scores_map.values() if v.get("tactic") and tactic.lower() in v["tactic"].lower() ] # Return { return { # Literal argument value "tactic": tactic, # Literal argument value "average_score": round(sum(matching) / len(matching), 1) if matching else 0, # Literal argument value "techniques_count": len(matching), # Literal argument value "techniques_scored": len([s for s in matching if s > 0]), } # ── Threat actor scoring ───────────────────────────────────────────── def calculate_actor_coverage_score(actor_id: str, db: Session) -> dict: """Calculate coverage score for a specific threat actor's techniques. Args: actor_id (str): UUID string identifying the threat actor. db (Session): Active SQLAlchemy database session. Returns: dict: Contains ``actor_id``, ``actor_name``, ``total_score``, ``techniques_count``, ``techniques_covered``, and ``techniques_detail`` keys. """ # Assign actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first() actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first() # Check: not actor if not actor: # Return {"total_score": 0, "techniques_count": 0, "techniques_covered": 0} return {"total_score": 0, "techniques_count": 0, "techniques_covered": 0} # Assign actor_techniques = ( actor_techniques = ( db.query(ThreatActorTechnique) # Chain .filter() call .filter(ThreatActorTechnique.threat_actor_id == actor.id) # Chain .all() call .all() ) # Assign technique_ids = {at.technique_id for at in actor_techniques} technique_ids = {at.technique_id for at in actor_techniques} # Check: not technique_ids if not technique_ids: # Return { return { # Literal argument value "actor_id": str(actor.id), # Literal argument value "actor_name": actor.name, # Literal argument value "total_score": 0, # Literal argument value "techniques_count": 0, # Literal argument value "techniques_covered": 0, # Literal argument value "techniques_detail": [], } # Assign scores_map = bulk_technique_scores(db) scores_map = bulk_technique_scores(db) # Assign scores = [] scores = [] # Assign details = [] details = [] # Iterate over technique_ids for tid in technique_ids: # Assign entry = scores_map.get(tid) entry = scores_map.get(tid) # Check: not entry if not entry: # Skip to the next loop iteration continue # Assign score = entry["total_score"] score = entry["total_score"] # Call scores.append() scores.append(score) # Call details.append() details.append({ # Literal argument value "mitre_id": entry["mitre_id"], # Literal argument value "name": entry.get("name", ""), # Literal argument value "score": score, # Literal argument value "breakdown": entry["breakdown"], }) # Assign avg_score = round(sum(scores) / len(scores), 1) if scores else 0 avg_score = round(sum(scores) / len(scores), 1) if scores else 0 # Return { return { # Literal argument value "actor_id": str(actor.id), # Literal argument value "actor_name": actor.name, # Literal argument value "total_score": avg_score, # Literal argument value "techniques_count": len(technique_ids), # Literal argument value "techniques_covered": len([s for s in scores if s > 50]), # Literal argument value "techniques_detail": details, } # ── Organization-level scoring ──────────────────────────────────────── def calculate_organization_score(db: Session) -> dict: """Calculate the overall organization security score. Uses ``bulk_technique_scores`` to compute all technique scores in 5 aggregated queries instead of N*5. Args: db (Session): Active SQLAlchemy database session. Returns: dict: Contains ``overall_score``, ``total_coverage``, ``critical_coverage``, ``detection_maturity``, ``response_readiness``, ``techniques_evaluated``, and ``techniques_total``. """ # Assign scores_map = bulk_technique_scores(db) scores_map = bulk_technique_scores(db) # Assign total_count = len(scores_map) total_count = len(scores_map) # Check: total_count == 0 if total_count == 0: # Return { return { # Literal argument value "overall_score": 0, # Literal argument value "total_coverage": 0, # Literal argument value "critical_coverage": 0, # Literal argument value "detection_maturity": 0, # Literal argument value "response_readiness": 0, # Literal argument value "techniques_evaluated": 0, # Literal argument value "techniques_total": 0, } # Assign all_scores = [v["total_score"] for v in scores_map.values()] all_scores = [v["total_score"] for v in scores_map.values()] # Assign evaluated_scores = [s for s in all_scores if s > 0] evaluated_scores = [s for s in all_scores if s > 0] # Assign evaluated_count = len(evaluated_scores) evaluated_count = len(evaluated_scores) # Assign total_coverage = ( total_coverage = ( round(sum(evaluated_scores) / len(evaluated_scores), 1) if evaluated_scores else 0 ) # Critical coverage: techniques with high/critical severity templates from app.models.test_template import TestTemplate # Assign critical_mitre_ids = set( critical_mitre_ids = set( row[0] for row in db.query(TestTemplate.mitre_technique_id) # Chain .filter() call .filter(TestTemplate.severity.in_(["high", "critical"])) # Chain .distinct() call .distinct() # Chain .all() call .all() ) # Assign critical_scores = [ critical_scores = [ v["total_score"] for v in scores_map.values() if v.get("mitre_id") in critical_mitre_ids ] # Assign critical_coverage = ( critical_coverage = ( round(sum(critical_scores) / len(critical_scores), 1) if critical_scores else 0 ) # Detection maturity (2 scalar queries — already efficient) total_rules = ( db.query(func.count(DetectionRule.id)) # Chain .filter() call .filter(DetectionRule.is_active == True) # noqa: E712 # Chain .scalar() call .scalar() ) or 0 # Assign triggered_total = ( triggered_total = ( db.query(func.count(TestDetectionResult.id)) # Chain .filter() call .filter(TestDetectionResult.triggered == True) # noqa: E712 # Chain .scalar() call .scalar() ) or 0 # Assign detection_maturity = ( detection_maturity = ( round((triggered_total / total_rules) * 100, 1) if total_rules > 0 else 0 ) # Assign detection_maturity = min(detection_maturity, 100) detection_maturity = min(detection_maturity, 100) # Response readiness (2 scalar queries — already efficient) remediation_total = ( db.query(func.count(Test.id)) # Chain .filter() call .filter(Test.remediation_status.isnot(None)) # Chain .scalar() call .scalar() ) or 0 # Assign remediation_completed = ( remediation_completed = ( db.query(func.count(Test.id)) # Chain .filter() call .filter(Test.remediation_status == "completed") # Chain .scalar() call .scalar() ) or 0 # Assign response_readiness = ( response_readiness = ( round((remediation_completed / remediation_total) * 100, 1) if remediation_total > 0 else 0 ) # Assign overall = round( overall = round( total_coverage * 0.4 + critical_coverage * 0.25 + detection_maturity * 0.2 + response_readiness * 0.15, # Literal argument value 1, ) # Return { return { # Literal argument value "overall_score": overall, # Literal argument value "total_coverage": total_coverage, # Literal argument value "critical_coverage": critical_coverage, # Literal argument value "detection_maturity": detection_maturity, # Literal argument value "response_readiness": response_readiness, # Literal argument value "techniques_evaluated": evaluated_count, # Literal argument value "techniques_total": total_count, } # ── Score history ──────────────────────────────────────────────────── def get_score_history(db: Session, period: str = "90d") -> list: """Return historical score snapshots approximated from test dates. Since there is no dedicated history table, scores are approximated by counting validated tests within weekly time windows. Args: db (Session): Active SQLAlchemy database session. period (str): Lookback period; one of ``"30d"``, ``"90d"`` (default), or ``"1y"``. Returns: list: Weekly data points, each a dict with ``date``, ``score``, and ``validated_tests``. """ # Assign now = datetime.utcnow() now = datetime.utcnow() # Check: period == "30d" if period == "30d": # Assign start = now - timedelta(days=30) start = now - timedelta(days=30) # Alternative: period == "1y" elif period == "1y": # Assign start = now - timedelta(days=365) start = now - timedelta(days=365) # else: # 90d default else: # 90d default # Assign start = now - timedelta(days=90) start = now - timedelta(days=90) # Group validated tests by week weeks = [] # Assign current = start current = start # Loop while current < now while current < now: # Assign week_end = min(current + timedelta(days=7), now) week_end = min(current + timedelta(days=7), now) # Count validated tests up to this week validated_up_to = ( db.query(func.count(Test.id)) # Chain .filter() call .filter( Test.state == TestState.validated, Test.red_validated_at <= week_end, ) # Chain .scalar() call .scalar() ) or 0 # Assign total_techniques = ( total_techniques = ( db.query(func.count(Technique.id)).scalar() ) or 1 # Simple approximation: coverage percentage as score proxy score_approx = round((validated_up_to / total_techniques) * 100, 1) # Call weeks.append() weeks.append({ # Literal argument value "date": current.strftime("%Y-%m-%d"), # Literal argument value "score": min(score_approx, 100), # Literal argument value "validated_tests": validated_up_to, }) # Assign current = week_end current = week_end # Return weeks return weeks