Files
Aegis/backend/app/services/executive_dashboard_service.py
kitos ab591d30c4
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(dashboard): Phase 13 — Executive Dashboard
PostureSnapshot model, Alembic migration (b039exec), schemas, service
aggregating all phases (coverage/risk/operations/knowledge/MTTD), and
router at /api/v1/dashboard with executive view, KPIs, coverage-by-tactic,
posture-history, posture-snapshot, and activity-feed endpoints.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 16:20:21 +02:00

362 lines
12 KiB
Python

"""Phase 13: Executive Dashboard service — aggregate posture data across all phases."""
from __future__ import annotations
import time
from datetime import date, datetime, timedelta
from typing import List, Optional
from uuid import UUID
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.models.executive_dashboard import PostureSnapshot
from app.models.technique import Technique
from app.models.risk_intelligence import TechniqueRiskProfile
from app.models.ownership_queue import (
TechniqueOwnership, RevalidationQueueItem, QueueStatus,
)
from app.models.knowledge import Playbook, LessonLearned
from app.models.attack_path import AttackPathExecution, ExecutionStatus
from app.models.test import Test
from app.models.osint_item import OsintItem
from app.models.enums import TechniqueStatus
# ── Internal aggregation helpers ──────────────────────────────────────────────
def _aggregate_coverage(db: Session) -> dict:
"""Aggregate technique coverage counts from live data."""
techniques = db.query(Technique).all()
total = len(techniques)
counts = {
TechniqueStatus.validated: 0,
TechniqueStatus.partial: 0,
TechniqueStatus.not_covered: 0,
}
for t in techniques:
s = t.status_global
if s in counts:
counts[s] += 1
validated = counts[TechniqueStatus.validated]
partial = counts[TechniqueStatus.partial]
not_covered = total - validated - partial
coverage_pct = round((validated + partial * 0.5) / total * 100.0, 2) if total > 0 else 0.0
return {
"total_techniques": total,
"validated_count": validated,
"partial_count": partial,
"not_covered_count": not_covered,
"coverage_pct": coverage_pct,
}
def _aggregate_risk(db: Session) -> dict:
"""Aggregate risk metrics from TechniqueRiskProfile."""
profiles = db.query(TechniqueRiskProfile).all()
if not profiles:
return {
"avg_risk_score": 0.0,
"critical_count": 0,
"high_count": 0,
"medium_count": 0,
"low_count": 0,
}
by_level = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0}
score_sum = 0.0
for p in profiles:
score_sum += p.risk_score
lvl = p.risk_level or "info"
by_level[lvl] = by_level.get(lvl, 0) + 1
return {
"avg_risk_score": round(score_sum / len(profiles), 2),
"critical_count": by_level["critical"],
"high_count": by_level["high"],
"medium_count": by_level["medium"],
"low_count": by_level["low"],
}
def _aggregate_operations(db: Session) -> dict:
"""Aggregate operational queue and orphan counts."""
open_queue = db.query(RevalidationQueueItem).filter(
RevalidationQueueItem.status.in_([QueueStatus.pending, QueueStatus.in_progress]),
).count()
# Orphan = technique with no ownership record OR owner_id IS NULL
owned_technique_ids = (
db.query(TechniqueOwnership.technique_id)
.filter(TechniqueOwnership.owner_id.isnot(None))
.subquery()
)
total_tech = db.query(Technique).count()
owned_count = db.query(TechniqueOwnership).filter(
TechniqueOwnership.owner_id.isnot(None)
).count()
orphans = total_tech - owned_count
return {
"open_queue_items": open_queue,
"orphan_techniques": max(orphans, 0),
}
def _aggregate_knowledge(db: Session) -> dict:
"""Count active playbooks and lessons learned."""
playbook_count = db.query(Playbook).filter(Playbook.is_active == True).count()
lesson_count = db.query(LessonLearned).filter(LessonLearned.is_active == True).count()
return {
"playbook_count": playbook_count,
"lesson_count": lesson_count,
}
def _aggregate_mttd(db: Session) -> dict:
"""Aggregate MTTD from completed attack-path executions in the last 30 days."""
cutoff = datetime.utcnow() - timedelta(days=30)
execs = db.query(AttackPathExecution).filter(
AttackPathExecution.status == ExecutionStatus.completed,
AttackPathExecution.completed_at >= cutoff,
).all()
count = len(execs)
mttd_values = [e.mttd_seconds for e in execs if e.mttd_seconds is not None]
dr_values = [e.detection_rate for e in execs if e.detection_rate is not None]
return {
"executions_30d": count,
"mttd_avg_seconds": round(sum(mttd_values) / len(mttd_values), 2) if mttd_values else None,
"detection_rate_30d": round(sum(dr_values) / len(dr_values), 4) if dr_values else None,
}
def _build_extra_breakdown(db: Session) -> dict:
"""Build the by-tactic breakdown stored in the `extra` JSONB field."""
techniques = db.query(Technique).all()
tactic_map: dict = {}
for t in techniques:
tac = t.tactic or "Unknown"
if tac not in tactic_map:
tactic_map[tac] = {"total": 0, "validated": 0, "partial": 0, "not_covered": 0}
tactic_map[tac]["total"] += 1
s = t.status_global
if s == TechniqueStatus.validated:
tactic_map[tac]["validated"] += 1
elif s == TechniqueStatus.partial:
tactic_map[tac]["partial"] += 1
else:
tactic_map[tac]["not_covered"] += 1
coverage_by_tactic = [
{
"tactic": tac,
"total": v["total"],
"validated": v["validated"],
"partial": v["partial"],
"not_covered": v["not_covered"],
"coverage_pct": round(
(v["validated"] + v["partial"] * 0.5) / v["total"] * 100.0, 2
) if v["total"] > 0 else 0.0,
}
for tac, v in sorted(tactic_map.items())
]
return {"coverage_by_tactic": coverage_by_tactic}
# ── Snapshot persistence ───────────────────────────────────────────────────────
def take_posture_snapshot(
db: Session,
created_by: Optional[UUID] = None,
) -> PostureSnapshot:
"""
Aggregate all phases and write (or update) today's PostureSnapshot.
Upserts on snapshot_date — only one row per calendar day.
"""
today = date.today()
coverage = _aggregate_coverage(db)
risk = _aggregate_risk(db)
operations = _aggregate_operations(db)
knowledge = _aggregate_knowledge(db)
mttd = _aggregate_mttd(db)
extra = _build_extra_breakdown(db)
existing = db.query(PostureSnapshot).filter(
PostureSnapshot.snapshot_date == today,
).first()
values = {
**coverage,
**risk,
**operations,
**knowledge,
**mttd,
"extra": extra,
}
if existing:
for k, v in values.items():
setattr(existing, k, v)
existing.created_by = created_by
db.commit()
db.refresh(existing)
return existing
snap = PostureSnapshot(snapshot_date=today, created_by=created_by, **values)
db.add(snap)
db.commit()
db.refresh(snap)
return snap
# ── Live / read-only aggregations (no DB write) ───────────────────────────────
def get_live_kpis(db: Session) -> dict:
"""Return current KPIs without persisting a snapshot."""
coverage = _aggregate_coverage(db)
risk = _aggregate_risk(db)
operations = _aggregate_operations(db)
knowledge = _aggregate_knowledge(db)
mttd = _aggregate_mttd(db)
return {**coverage, **risk, **operations, **knowledge, **mttd, "snapshot_date": date.today()}
def get_coverage_by_tactic(db: Session) -> list:
"""Per-tactic validated/partial/not_covered breakdown."""
extra = _build_extra_breakdown(db)
return extra["coverage_by_tactic"]
def get_posture_history(
db: Session,
days: int = 30,
) -> List[PostureSnapshot]:
"""Return the last `days` PostureSnapshot rows ordered ascending."""
cutoff = date.today() - timedelta(days=days)
return (
db.query(PostureSnapshot)
.filter(PostureSnapshot.snapshot_date >= cutoff)
.order_by(PostureSnapshot.snapshot_date.asc())
.all()
)
def get_top_risks(db: Session, limit: int = 5) -> list:
"""Return top-N risk profiles with technique details."""
from app.models.risk_intelligence import TechniqueRiskProfile
rows = (
db.query(TechniqueRiskProfile, Technique)
.join(Technique, TechniqueRiskProfile.technique_id == Technique.id)
.order_by(TechniqueRiskProfile.risk_score.desc())
.limit(limit)
.all()
)
return [
{
"technique_id": str(p.technique_id),
"technique_name": t.name,
"technique_tid": t.mitre_id,
"tactic": t.tactic,
"risk_score": p.risk_score,
"risk_level": p.risk_level,
"likelihood": p.likelihood,
"impact": p.impact,
"detection_gap": p.detection_gap,
}
for p, t in rows
]
def get_recent_activity(db: Session, limit: int = 20) -> list:
"""Combine recent events from tests, attack-path executions, queue, and OSINT."""
events: list = []
# Recent test executions (use execution_date, fall back to created_at)
recent_tests = (
db.query(Test)
.filter(Test.result.isnot(None))
.order_by(Test.created_at.desc())
.limit(limit)
.all()
)
for t in recent_tests:
ts = t.execution_date or t.created_at
events.append({
"ts": ts,
"category": "test",
"title": f"Test executed — result: {t.result.value if t.result else 'pending'}",
"detail": str(t.id),
})
# Recent completed attack-path executions
recent_execs = (
db.query(AttackPathExecution)
.filter(
AttackPathExecution.status == ExecutionStatus.completed,
AttackPathExecution.completed_at.isnot(None),
)
.order_by(AttackPathExecution.completed_at.desc())
.limit(limit // 2)
.all()
)
for e in recent_execs:
dr = f"{e.detection_rate * 100:.0f}%" if e.detection_rate is not None else "n/a"
events.append({
"ts": e.completed_at,
"category": "attack_path",
"title": f"Attack path completed — detection: {dr}",
"detail": str(e.id),
})
# Recent OSINT items
recent_osint = (
db.query(OsintItem)
.order_by(OsintItem.discovered_at.desc())
.limit(limit // 4)
.all()
)
for o in recent_osint:
events.append({
"ts": o.discovered_at,
"category": "osint",
"title": f"OSINT signal: {o.title or 'unknown'}",
"detail": str(o.id),
})
# Sort all events descending by timestamp, return top `limit`
events.sort(key=lambda x: x["ts"] or datetime.min, reverse=True)
return events[:limit]
def get_executive_summary(db: Session) -> dict:
"""Full executive view — live KPIs + snapshot + trends + top risks + activity."""
# Take (or update) today's snapshot
snap = take_posture_snapshot(db)
# 30-day trend
history = get_posture_history(db, days=30)
coverage_trend = [
{"date": str(s.snapshot_date), "value": s.coverage_pct}
for s in history
]
risk_trend = [
{"date": str(s.snapshot_date), "value": s.avg_risk_score}
for s in history
]
return {
"snapshot": snap,
"coverage_trend": coverage_trend,
"risk_trend": risk_trend,
"top_risks": get_top_risks(db),
"coverage_by_tactic": get_coverage_by_tactic(db),
"recent_activity": get_recent_activity(db),
}