"""Advanced metrics endpoints — coverage by tactic, never-tested, avg validation time.""" from datetime import datetime from fastapi import APIRouter, Depends from sqlalchemy import func, case from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user from app.models.audit import AuditLog from app.models.technique import Technique from app.models.test import Test from app.models.user import User router = APIRouter(prefix="/metrics/advanced", tags=["advanced-metrics"]) @router.get("/coverage-by-tactic") def coverage_by_tactic( db: Session = Depends(get_db), user: User = Depends(get_current_user), ): """Coverage percentage broken down by MITRE ATT&CK tactic.""" results = ( db.query( Technique.tactic, func.count(Technique.id).label("total"), func.sum( case((Technique.status_global == "validated", 1), else_=0) ).label("validated"), func.sum( case((Technique.status_global == "partial", 1), else_=0) ).label("partial"), func.sum( case((Technique.status_global == "not_covered", 1), else_=0) ).label("not_covered"), func.sum( case((Technique.status_global == "in_progress", 1), else_=0) ).label("in_progress"), ) .group_by(Technique.tactic) .order_by(Technique.tactic) .all() ) return [ { "tactic": r[0] or "Unknown", "total": r[1], "validated": int(r[2]), "partial": int(r[3]), "not_covered": int(r[4]), "in_progress": int(r[5]), "coverage_pct": round((int(r[2]) / r[1]) * 100, 1) if r[1] > 0 else 0, } for r in results ] @router.get("/never-tested") def never_tested_techniques( db: Session = Depends(get_db), user: User = Depends(get_current_user), ): """Techniques that have never had a test created.""" tested_technique_ids = ( db.query(Test.technique_id).distinct().subquery() ) techniques = ( db.query(Technique) .filter(~Technique.id.in_(db.query(tested_technique_ids))) .order_by(Technique.mitre_id) .all() ) return [ { "mitre_id": t.mitre_id, "name": t.name, "tactic": t.tactic, "is_subtechnique": t.is_subtechnique, } for t in techniques ] @router.get("/avg-validation-time") def avg_validation_time( db: Session = Depends(get_db), user: User = Depends(get_current_user), ): """Average time from test creation to validation, computed from audit logs. Returns overall average and per-phase averages where data is available. """ validated_tests = ( db.query(Test) .filter(Test.state == "validated") .all() ) if not validated_tests: return { "total_validated": 0, "avg_total_hours": 0, "avg_red_phase_hours": 0, "avg_blue_phase_hours": 0, } total_durations = [] red_durations = [] blue_durations = [] for test in validated_tests: if test.created_at and test.red_validated_at: total_seconds = (test.red_validated_at - test.created_at).total_seconds() total_durations.append(total_seconds) if test.red_started_at and test.blue_started_at: red_sec = (test.blue_started_at - test.red_started_at).total_seconds() red_paused = test.red_paused_seconds or 0 red_durations.append(max(red_sec - red_paused, 0)) if test.blue_started_at and test.blue_validated_at: blue_sec = (test.blue_validated_at - test.blue_started_at).total_seconds() blue_paused = test.blue_paused_seconds or 0 blue_durations.append(max(blue_sec - blue_paused, 0)) def avg_hours(durations: list[float]) -> float: if not durations: return 0 return round(sum(durations) / len(durations) / 3600, 2) return { "total_validated": len(validated_tests), "avg_total_hours": avg_hours(total_durations), "avg_red_phase_hours": avg_hours(red_durations), "avg_blue_phase_hours": avg_hours(blue_durations), } @router.get("/detection-rate-trend") def detection_rate_trend( db: Session = Depends(get_db), user: User = Depends(get_current_user), ): """Monthly detection rate trend for the last 12 months.""" from datetime import timedelta now = datetime.utcnow() months = [] for i in range(11, -1, -1): month_start = datetime(now.year, now.month, 1) - timedelta(days=i * 30) month_end = month_start + timedelta(days=30) validated = ( db.query(func.count(Test.id)) .filter( Test.state == "validated", Test.created_at >= month_start, Test.created_at < month_end, ) .scalar() or 0 ) detected = ( db.query(func.count(Test.id)) .filter( Test.state == "validated", Test.detection_result == "detected", Test.created_at >= month_start, Test.created_at < month_end, ) .scalar() or 0 ) months.append({ "month": month_start.strftime("%Y-%m"), "validated": validated, "detected": detected, "detection_rate": round((detected / validated) * 100, 1) if validated > 0 else 0, }) return months