fix(metrics): use direct timestamp fields instead of audit log lookups
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

MTTD: was querying AuditLog for action names that don't match actual
logged actions. Now uses red_started_at → blue_started_at directly
(both stored on the Test record). Net of red_paused_seconds.

MTTR: was searching for remediation_status=completed (no data). Redefined
as total pipeline time: red_started_at → blue_validated_at net of all
paused time. Only counts fully validated tests.

Red avg time: was using red_validated_at - created_at (created_at NULL
for many tests). Now uses blue_started_at - red_started_at net paused.

Blue avg time: was using blue_validated_at - red_validated_at (wrong
phase boundary). Now uses blue_work_started_at (or blue_started_at
fallback) → blue_validated_at net of blue_paused_seconds.
This commit is contained in:
kitos
2026-06-03 10:40:05 +02:00
parent 56d49f6de7
commit 06e8effaa4
2 changed files with 48 additions and 65 deletions

View File

@@ -37,45 +37,28 @@ def _safe_stats(values: list[float]) -> dict:
def calculate_mttd(db: Session) -> Optional[dict]:
"""Calculate Mean Time to Detect.
For each validated test: time between entering red_executing and
entering blue_evaluating (extracted from audit_log timestamps).
Uses direct timestamp fields on the Test record:
red_started_at → when Red Team started the attack
blue_started_at → when Red Team submitted to Blue (attack entered detection phase)
MTTD = blue_started_at - red_started_at - red_paused_seconds
Represents how long Red Team spent executing before Blue received the test.
"""
# Get validated tests that have both timestamps available
# Using audit log entries for state transitions
tests = (
db.query(Test)
.filter(Test.state == TestState.validated)
.filter(
Test.red_started_at.isnot(None),
Test.blue_started_at.isnot(None),
)
.all()
)
detection_times = []
for test in tests:
# Find the red_executing and blue_evaluating transition timestamps
red_start = (
db.query(AuditLog.timestamp)
.filter(
AuditLog.entity_type == "test",
AuditLog.entity_id == str(test.id),
AuditLog.action.in_(["test_start_execution", "start_execution"]),
)
.order_by(AuditLog.timestamp.asc())
.first()
)
blue_start = (
db.query(AuditLog.timestamp)
.filter(
AuditLog.entity_type == "test",
AuditLog.entity_id == str(test.id),
AuditLog.action.in_(["test_submit_red", "submit_red"]),
)
.order_by(AuditLog.timestamp.asc())
.first()
)
if red_start and blue_start and blue_start[0] > red_start[0]:
hours = (blue_start[0] - red_start[0]).total_seconds() / 3600
detection_times.append(hours)
for t in tests:
gross_secs = (t.blue_started_at - t.red_started_at).total_seconds()
net_secs = gross_secs - (t.red_paused_seconds or 0)
if net_secs > 0:
detection_times.append(net_secs / 3600)
return _safe_stats(detection_times)
@@ -86,38 +69,29 @@ def calculate_mttd(db: Session) -> Optional[dict]:
def calculate_mttr(db: Session) -> Optional[dict]:
"""Calculate Mean Time to Respond.
For tests with remediation_status = completed: time between
detection_result being set and remediation_status = completed.
Redefined as total pipeline time from attack start to full validation:
red_started_at → blue_validated_at (net of paused time).
Represents how long the full security testing cycle takes end-to-end.
Only uses tests that have been fully validated (both sides approved).
"""
# Tests with completed remediation
tests = (
db.query(Test)
.filter(
Test.remediation_status == "completed",
Test.state == TestState.validated,
Test.red_started_at.isnot(None),
Test.blue_validated_at.isnot(None),
)
.all()
)
response_times = []
for test in tests:
# Find when remediation was completed from audit log
remediation_complete = (
db.query(AuditLog.timestamp)
.filter(
AuditLog.entity_type == "test",
AuditLog.entity_id == str(test.id),
AuditLog.action.ilike("%remediation%"),
)
.order_by(AuditLog.timestamp.desc())
.first()
)
detection_time = test.blue_validated_at
if remediation_complete and detection_time:
hours = (remediation_complete[0] - detection_time).total_seconds() / 3600
if hours > 0:
response_times.append(hours)
for t in tests:
gross_secs = (t.blue_validated_at - t.red_started_at).total_seconds()
paused = (t.red_paused_seconds or 0) + (t.blue_paused_seconds or 0)
net_secs = gross_secs - paused
if net_secs > 0:
response_times.append(net_secs / 3600)
return _safe_stats(response_times)
@@ -427,20 +401,24 @@ def get_metrics_by_team(db: Session) -> dict:
red_avg_time = None
red_times = []
# Time for red team to complete their phase
# Red team avg execution time: red_started_at → blue_started_at (net of paused)
tests_with_red = (
db.query(Test)
.filter(Test.red_validated_at.isnot(None), Test.created_at.isnot(None))
.filter(
Test.red_started_at.isnot(None),
Test.blue_started_at.isnot(None),
)
.all()
)
for t in tests_with_red:
hours = (t.red_validated_at - t.created_at).total_seconds() / 3600
if hours > 0:
red_times.append(hours)
gross = (t.blue_started_at - t.red_started_at).total_seconds()
net = gross - (t.red_paused_seconds or 0)
if net > 0:
red_times.append(net / 3600)
if red_times:
red_avg_time = round(sum(red_times) / len(red_times), 1)
# Blue team metrics
# Blue team: count tests that reached the blue evaluation phase
blue_tests_completed = (
db.query(func.count(Test.id))
.filter(Test.state.in_([
@@ -451,20 +429,25 @@ def get_metrics_by_team(db: Session) -> dict:
.scalar()
) or 0
# Blue avg evaluation time:
# Prefer blue_work_started_at (actual pick-up) → blue_validated_at.
# Fall back to blue_started_at if blue_work_started_at is not set.
blue_avg_time = None
blue_times = []
tests_with_blue = (
db.query(Test)
.filter(
Test.blue_started_at.isnot(None),
Test.blue_validated_at.isnot(None),
Test.red_validated_at.isnot(None),
)
.all()
)
for t in tests_with_blue:
hours = (t.blue_validated_at - t.red_validated_at).total_seconds() / 3600
if hours > 0:
blue_times.append(hours)
phase_start = t.blue_work_started_at or t.blue_started_at
gross = (t.blue_validated_at - phase_start).total_seconds()
net = gross - (t.blue_paused_seconds or 0)
if net > 0:
blue_times.append(net / 3600)
if blue_times:
blue_avg_time = round(sum(blue_times) / len(blue_times), 1)

View File

@@ -581,7 +581,7 @@ export default function ExecutiveDashboardPage() {
label="MTTR"
value={opMetrics?.mttr?.mean_hours ?? "N/A"}
unit={opMetrics?.mttr ? "hrs" : undefined}
tooltip={{ description: "Mean Time To Respond — average hours for a test to go from execution through the full Red/Blue validation and reach a final result.", context: "Lower is better. Reflects process efficiency and team responsiveness." }}
tooltip={{ description: "Mean Time To Respond — average hours from Red Team attack start to full validation (both Red and Blue leads approved). Measures the total security test cycle time end-to-end.", context: "Lower is better. Long MTTR may indicate bottlenecks in the review pipeline." }}
/>
<KPICard
label="Detection Efficacy"