Files
Aegis/backend/app/services/advanced_metrics_service.py
Kitos a8a24b5429
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix(metrics): correct never-tested technique query [FASE-2.6]
Use distinct technique_id list filtering so untested techniques are returned reliably on SQLite and Postgres.
2026-05-18 14:00:48 +02:00

165 lines
5.1 KiB
Python

"""Advanced metrics service — coverage by tactic, never-tested, avg validation time, detection trend."""
from __future__ import annotations
from datetime import datetime, timedelta
from sqlalchemy import case, func
from sqlalchemy.orm import Session
from app.models.technique import Technique
from app.models.test import Test
from app.models.enums import TestResult
def get_coverage_by_tactic(db: Session) -> list[dict]:
"""Coverage percentage broken down by MITRE ATT&CK tactic."""
results = (
db.query(
Technique.tactic,
func.count(Technique.id).label("total"),
func.sum(
case((Technique.status_global == "validated", 1), else_=0)
).label("validated"),
func.sum(
case((Technique.status_global == "partial", 1), else_=0)
).label("partial"),
func.sum(
case((Technique.status_global == "not_covered", 1), else_=0)
).label("not_covered"),
func.sum(
case((Technique.status_global == "in_progress", 1), else_=0)
).label("in_progress"),
)
.group_by(Technique.tactic)
.order_by(Technique.tactic)
.all()
)
return [
{
"tactic": r[0] or "Unknown",
"total": r[1],
"validated": int(r[2]),
"partial": int(r[3]),
"not_covered": int(r[4]),
"in_progress": int(r[5]),
"coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0,
}
for r in results
]
def get_never_tested_techniques(db: Session) -> list[dict]:
"""Techniques that have never had a test created."""
tested_ids = [
row[0]
for row in db.query(Test.technique_id)
.filter(Test.technique_id.isnot(None))
.distinct()
.all()
]
query = db.query(Technique)
if tested_ids:
query = query.filter(~Technique.id.in_(tested_ids))
techniques = query.order_by(Technique.mitre_id).all()
return [
{
"mitre_id": t.mitre_id,
"name": t.name,
"tactic": t.tactic,
"is_subtechnique": t.is_subtechnique,
}
for t in techniques
]
def get_avg_validation_time(db: Session) -> dict:
"""Average time from test creation to validation, computed from validated tests.
Returns overall average and per-phase averages where data is available.
"""
validated_tests = (
db.query(Test)
.filter(Test.state == "validated")
.all()
)
if not validated_tests:
return {
"total_validated": 0,
"avg_total_hours": 0,
"avg_red_phase_hours": 0,
"avg_blue_phase_hours": 0,
}
total_durations = []
red_durations = []
blue_durations = []
for test in validated_tests:
if test.created_at and test.red_validated_at:
total_seconds = (test.red_validated_at - test.created_at).total_seconds()
total_durations.append(total_seconds)
if test.red_started_at and test.blue_started_at:
red_sec = (test.blue_started_at - test.red_started_at).total_seconds()
red_paused = test.red_paused_seconds or 0
red_durations.append(max(red_sec - red_paused, 0))
if test.blue_started_at and test.blue_validated_at:
blue_sec = (test.blue_validated_at - test.blue_started_at).total_seconds()
blue_paused = test.blue_paused_seconds or 0
blue_durations.append(max(blue_sec - blue_paused, 0))
def avg_hours(durations: list[float]) -> float:
if not durations:
return 0
return round(sum(durations) / len(durations) / 3600, 2)
return {
"total_validated": len(validated_tests),
"avg_total_hours": avg_hours(total_durations),
"avg_red_phase_hours": avg_hours(red_durations),
"avg_blue_phase_hours": avg_hours(blue_durations),
}
def get_detection_rate_trend(db: Session) -> list[dict]:
"""Monthly detection rate trend for the last 12 months."""
now = datetime.utcnow()
months = []
for i in range(11, -1, -1):
month_start = datetime(now.year, now.month, 1) - timedelta(days=i * 30)
month_end = month_start + timedelta(days=30)
validated = (
db.query(func.count(Test.id))
.filter(
Test.state == "validated",
Test.created_at >= month_start,
Test.created_at < month_end,
)
.scalar() or 0
)
detected = (
db.query(func.count(Test.id))
.filter(
Test.state == "validated",
Test.detection_result == TestResult.detected,
Test.created_at >= month_start,
Test.created_at < month_end,
)
.scalar() or 0
)
months.append({
"month": month_start.strftime("%Y-%m"),
"validated": validated,
"detected": detected,
"detection_rate": round((detected / validated) * 100, 1) if validated > 0 else 0,
})
return months