From ab591d30c45c3d6e7993f37639ea6a91ad12cf6e Mon Sep 17 00:00:00 2001 From: kitos Date: Wed, 20 May 2026 16:20:21 +0200 Subject: [PATCH] =?UTF-8?q?feat(dashboard):=20Phase=2013=20=E2=80=94=20Exe?= =?UTF-8?q?cutive=20Dashboard?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PostureSnapshot model, Alembic migration (b039exec), schemas, service aggregating all phases (coverage/risk/operations/knowledge/MTTD), and router at /api/v1/dashboard with executive view, KPIs, coverage-by-tactic, posture-history, posture-snapshot, and activity-feed endpoints. Co-Authored-By: Claude Sonnet 4.6 --- .../versions/b039_executive_dashboard.py | 77 ++++ backend/app/main.py | 2 + backend/app/models/__init__.py | 2 + backend/app/models/executive_dashboard.py | 68 ++++ backend/app/routers/executive_dashboard.py | 124 ++++++ .../app/schemas/executive_dashboard_schema.py | 113 ++++++ .../services/executive_dashboard_service.py | 361 ++++++++++++++++++ scripts/qa_phase13.py | 250 ++++++++++++ 8 files changed, 997 insertions(+) create mode 100644 backend/alembic/versions/b039_executive_dashboard.py create mode 100644 backend/app/models/executive_dashboard.py create mode 100644 backend/app/routers/executive_dashboard.py create mode 100644 backend/app/schemas/executive_dashboard_schema.py create mode 100644 backend/app/services/executive_dashboard_service.py create mode 100644 scripts/qa_phase13.py diff --git a/backend/alembic/versions/b039_executive_dashboard.py b/backend/alembic/versions/b039_executive_dashboard.py new file mode 100644 index 0000000..3b0bf88 --- /dev/null +++ b/backend/alembic/versions/b039_executive_dashboard.py @@ -0,0 +1,77 @@ +"""Phase 13: Executive Dashboard — posture_snapshots table. + +Revision ID: b039exec +Revises: b038risk +Create Date: 2026-05-20 +""" + +from alembic import op +import sqlalchemy as sa + +revision = "b039exec" +down_revision = "b038risk" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS posture_snapshots ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + snapshot_date DATE NOT NULL, + + -- Coverage + total_techniques INTEGER NOT NULL DEFAULT 0, + validated_count INTEGER NOT NULL DEFAULT 0, + partial_count INTEGER NOT NULL DEFAULT 0, + not_covered_count INTEGER NOT NULL DEFAULT 0, + coverage_pct FLOAT NOT NULL DEFAULT 0.0, + + -- Risk + avg_risk_score FLOAT NOT NULL DEFAULT 0.0, + critical_count INTEGER NOT NULL DEFAULT 0, + high_count INTEGER NOT NULL DEFAULT 0, + medium_count INTEGER NOT NULL DEFAULT 0, + low_count INTEGER NOT NULL DEFAULT 0, + + -- Operations + open_queue_items INTEGER NOT NULL DEFAULT 0, + orphan_techniques INTEGER NOT NULL DEFAULT 0, + + -- Knowledge + playbook_count INTEGER NOT NULL DEFAULT 0, + lesson_count INTEGER NOT NULL DEFAULT 0, + + -- MTTD + mttd_avg_seconds FLOAT, + executions_30d INTEGER NOT NULL DEFAULT 0, + detection_rate_30d FLOAT, + + -- Meta + created_by UUID REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(), + extra JSONB + ) + """)) + + # Unique constraint: one snapshot per calendar day + conn.execute(sa.text(""" + DO $$ BEGIN + ALTER TABLE posture_snapshots + ADD CONSTRAINT uq_posture_snapshot_date UNIQUE (snapshot_date); + EXCEPTION WHEN duplicate_table THEN NULL; + WHEN duplicate_object THEN NULL; + END $$ + """)) + + # Index for date-range trend queries + conn.execute(sa.text(""" + CREATE INDEX IF NOT EXISTS ix_posture_snapshots_date + ON posture_snapshots (snapshot_date) + """)) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text("DROP TABLE IF EXISTS posture_snapshots CASCADE")) diff --git a/backend/app/main.py b/backend/app/main.py index 39b9bdc..530b088 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -43,6 +43,7 @@ from app.routers import ownership as ownership_router from app.routers import attack_paths as attack_paths_router from app.routers import knowledge as knowledge_router from app.routers import risk_intelligence as risk_router +from app.routers import executive_dashboard as dashboard_router from app.domain.errors import DomainError from app.middleware.error_handler import domain_exception_handler from app.middleware.request_context import RequestContextMiddleware @@ -145,6 +146,7 @@ app.include_router(ownership_router.router, prefix="/api/v1") app.include_router(attack_paths_router.router, prefix="/api/v1") app.include_router(knowledge_router.router, prefix="/api/v1") app.include_router(risk_router.router, prefix="/api/v1") +app.include_router(dashboard_router.router, prefix="/api/v1") @app.get("/health", include_in_schema=False) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 51e480b..8611969 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -40,6 +40,7 @@ from app.models.attack_path import ( ) from app.models.knowledge import Playbook, PlaybookVersion, LessonLearned from app.models.risk_intelligence import TechniqueRiskProfile +from app.models.executive_dashboard import PostureSnapshot __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", @@ -63,4 +64,5 @@ __all__ = [ "ExecutionStatus", "StepResultStatus", "TimelineActorSide", "TimelineEntryType", "Playbook", "PlaybookVersion", "LessonLearned", "TechniqueRiskProfile", + "PostureSnapshot", ] diff --git a/backend/app/models/executive_dashboard.py b/backend/app/models/executive_dashboard.py new file mode 100644 index 0000000..92ac26d --- /dev/null +++ b/backend/app/models/executive_dashboard.py @@ -0,0 +1,68 @@ +"""Phase 13: Executive Dashboard — PostureSnapshot model.""" + +import uuid +from datetime import datetime + +from sqlalchemy import ( + Boolean, Column, Date, DateTime, Float, ForeignKey, + Index, Integer, UniqueConstraint, +) +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from app.database import Base + + +class PostureSnapshot(Base): + """ + Daily point-in-time capture of the organisation's security posture. + + Aggregates data from all phases (coverage, risk, ownership, knowledge, + attack-paths) into a single row that can be trended over time. + """ + + __tablename__ = "posture_snapshots" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + snapshot_date = Column(Date, nullable=False) # one per calendar day + + # ── Coverage ────────────────────────────────────────────────────────────── + total_techniques = Column(Integer, nullable=False, default=0) + validated_count = Column(Integer, nullable=False, default=0) + partial_count = Column(Integer, nullable=False, default=0) + not_covered_count = Column(Integer, nullable=False, default=0) + coverage_pct = Column(Float, nullable=False, default=0.0) # 0–100 + + # ── Risk ───────────────────────────────────────────────────────────────── + avg_risk_score = Column(Float, nullable=False, default=0.0) + critical_count = Column(Integer, nullable=False, default=0) + high_count = Column(Integer, nullable=False, default=0) + medium_count = Column(Integer, nullable=False, default=0) + low_count = Column(Integer, nullable=False, default=0) + + # ── Operations ──────────────────────────────────────────────────────────── + open_queue_items = Column(Integer, nullable=False, default=0) + orphan_techniques = Column(Integer, nullable=False, default=0) + + # ── Knowledge ───────────────────────────────────────────────────────────── + playbook_count = Column(Integer, nullable=False, default=0) + lesson_count = Column(Integer, nullable=False, default=0) + + # ── MTTD (from attack-path executions completed in last 30 d) ──────────── + mttd_avg_seconds = Column(Float, nullable=True) # None if no data + executions_30d = Column(Integer, nullable=False, default=0) + detection_rate_30d = Column(Float, nullable=True) # avg across executions + + # ── Meta ───────────────────────────────────────────────────────────────── + created_by = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + created_at = Column(DateTime, default=datetime.utcnow) + extra = Column(JSONB, nullable=True) # full breakdown / by-tactic + + creator = relationship("User", foreign_keys=[created_by]) + + __table_args__ = ( + UniqueConstraint("snapshot_date", name="uq_posture_snapshot_date"), + Index("ix_posture_snapshots_date", "snapshot_date"), + ) diff --git a/backend/app/routers/executive_dashboard.py b/backend/app/routers/executive_dashboard.py new file mode 100644 index 0000000..ea28459 --- /dev/null +++ b/backend/app/routers/executive_dashboard.py @@ -0,0 +1,124 @@ +"""Phase 13: Executive Dashboard router.""" + +from typing import List, Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.schemas.executive_dashboard_schema import ( + PostureSnapshotOut, + ExecutiveSummary, + KpiBlock, + CoverageByTactic, + PostureHistoryEntry, + ActivityEntry, +) +import app.services.executive_dashboard_service as svc + +router = APIRouter(prefix="/dashboard", tags=["Executive Dashboard"]) + + +@router.get("/executive", response_model=ExecutiveSummary) +def executive_view( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """ + Full executive view — snapshot, 30-day trends, top risks, + coverage by tactic, and recent activity feed. + """ + data = svc.get_executive_summary(db) + snap = data["snapshot"] + return ExecutiveSummary( + snapshot=PostureSnapshotOut.model_validate(snap), + coverage_trend=data["coverage_trend"], + risk_trend=data["risk_trend"], + top_risks=data["top_risks"], + coverage_by_tactic=data["coverage_by_tactic"], + recent_activity=data["recent_activity"], + ) + + +@router.get("/kpis", response_model=KpiBlock) +def kpis( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Compact KPI block — live aggregation without persisting a snapshot.""" + live = svc.get_live_kpis(db) + + # Try to find today's snapshot id; fall back to None + from datetime import date + from app.models.executive_dashboard import PostureSnapshot + today_snap = db.query(PostureSnapshot).filter( + PostureSnapshot.snapshot_date == date.today() + ).first() + + return KpiBlock( + coverage_pct=live["coverage_pct"], + avg_risk_score=live["avg_risk_score"], + critical_count=live["critical_count"], + open_queue_items=live["open_queue_items"], + orphan_techniques=live["orphan_techniques"], + mttd_avg_seconds=live.get("mttd_avg_seconds"), + detection_rate_30d=live.get("detection_rate_30d"), + playbook_count=live["playbook_count"], + lesson_count=live["lesson_count"], + snapshot_date=live["snapshot_date"], + snapshot_id=today_snap.id if today_snap else None, + ) + + +@router.get("/coverage-by-tactic", response_model=List[CoverageByTactic]) +def coverage_by_tactic( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Per-tactic validated / partial / not_covered breakdown.""" + return svc.get_coverage_by_tactic(db) + + +@router.get("/posture-history", response_model=List[PostureHistoryEntry]) +def posture_history( + days: int = Query(30, ge=1, le=365), + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Historical posture snapshots for trend charts (default last 30 days).""" + snaps = svc.get_posture_history(db, days=days) + return [ + PostureHistoryEntry( + snapshot_date=s.snapshot_date, + coverage_pct=s.coverage_pct, + avg_risk_score=s.avg_risk_score, + critical_count=s.critical_count, + open_queue_items=s.open_queue_items, + ) + for s in snaps + ] + + +@router.post("/posture-snapshot", response_model=PostureSnapshotOut, status_code=201) +def create_posture_snapshot( + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + """ + Take (or refresh) today's posture snapshot — admin / leads only. + Aggregates live data from all phases into a single PostureSnapshot row. + """ + snap = svc.take_posture_snapshot(db, created_by=user.id) + return PostureSnapshotOut.model_validate(snap) + + +@router.get("/activity", response_model=List[ActivityEntry]) +def recent_activity( + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Recent activity feed — tests, attack-path executions, OSINT signals.""" + return svc.get_recent_activity(db, limit=limit) diff --git a/backend/app/schemas/executive_dashboard_schema.py b/backend/app/schemas/executive_dashboard_schema.py new file mode 100644 index 0000000..0f19c19 --- /dev/null +++ b/backend/app/schemas/executive_dashboard_schema.py @@ -0,0 +1,113 @@ +"""Phase 13: Executive Dashboard — Pydantic schemas.""" + +from __future__ import annotations + +from datetime import date, datetime +from typing import Any, Dict, List, Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +class PostureSnapshotOut(BaseModel): + id: UUID + snapshot_date: date + + # Coverage + total_techniques: int + validated_count: int + partial_count: int + not_covered_count: int + coverage_pct: float + + # Risk + avg_risk_score: float + critical_count: int + high_count: int + medium_count: int + low_count: int + + # Operations + open_queue_items: int + orphan_techniques: int + + # Knowledge + playbook_count: int + lesson_count: int + + # MTTD + mttd_avg_seconds: Optional[float] = None + executions_30d: int + detection_rate_30d: Optional[float] = None + + # Meta + created_by: Optional[UUID] = None + created_at: Optional[datetime] = None + extra: Optional[Dict[str, Any]] = None + + class Config: + from_attributes = True + + +class ExecutiveSummary(BaseModel): + """Full executive view — current posture + trends.""" + snapshot: PostureSnapshotOut + coverage_trend: List[Dict[str, Any]] = Field( + default_factory=list, + description="Last 30-day coverage_pct series [{date, value}]", + ) + risk_trend: List[Dict[str, Any]] = Field( + default_factory=list, + description="Last 30-day avg_risk_score series [{date, value}]", + ) + top_risks: List[Dict[str, Any]] = Field( + default_factory=list, + description="Top 5 highest-risk techniques", + ) + coverage_by_tactic: List[Dict[str, Any]] = Field( + default_factory=list, + description="Per-tactic validated/partial/not_covered counts", + ) + recent_activity: List[Dict[str, Any]] = Field( + default_factory=list, + description="Most-recent events (tests, paths, queue changes)", + ) + + +class KpiBlock(BaseModel): + """Compact KPI block for a dashboard header.""" + coverage_pct: float + avg_risk_score: float + critical_count: int + open_queue_items: int + orphan_techniques: int + mttd_avg_seconds: Optional[float] = None + detection_rate_30d: Optional[float] = None + playbook_count: int + lesson_count: int + snapshot_date: date + snapshot_id: UUID + + +class CoverageByTactic(BaseModel): + tactic: str + total: int + validated: int + partial: int + not_covered: int + coverage_pct: float + + +class PostureHistoryEntry(BaseModel): + snapshot_date: date + coverage_pct: float + avg_risk_score: float + critical_count: int + open_queue_items: int + + +class ActivityEntry(BaseModel): + ts: datetime + category: str # "test" | "attack_path" | "queue" | "osint" + title: str + detail: Optional[str] = None diff --git a/backend/app/services/executive_dashboard_service.py b/backend/app/services/executive_dashboard_service.py new file mode 100644 index 0000000..38c1551 --- /dev/null +++ b/backend/app/services/executive_dashboard_service.py @@ -0,0 +1,361 @@ +"""Phase 13: Executive Dashboard service — aggregate posture data across all phases.""" + +from __future__ import annotations + +import time +from datetime import date, datetime, timedelta +from typing import List, Optional +from uuid import UUID + +from sqlalchemy import func +from sqlalchemy.orm import Session + +from app.models.executive_dashboard import PostureSnapshot +from app.models.technique import Technique +from app.models.risk_intelligence import TechniqueRiskProfile +from app.models.ownership_queue import ( + TechniqueOwnership, RevalidationQueueItem, QueueStatus, +) +from app.models.knowledge import Playbook, LessonLearned +from app.models.attack_path import AttackPathExecution, ExecutionStatus +from app.models.test import Test +from app.models.osint_item import OsintItem +from app.models.enums import TechniqueStatus + + +# ── Internal aggregation helpers ────────────────────────────────────────────── + +def _aggregate_coverage(db: Session) -> dict: + """Aggregate technique coverage counts from live data.""" + techniques = db.query(Technique).all() + total = len(techniques) + + counts = { + TechniqueStatus.validated: 0, + TechniqueStatus.partial: 0, + TechniqueStatus.not_covered: 0, + } + for t in techniques: + s = t.status_global + if s in counts: + counts[s] += 1 + + validated = counts[TechniqueStatus.validated] + partial = counts[TechniqueStatus.partial] + not_covered = total - validated - partial + coverage_pct = round((validated + partial * 0.5) / total * 100.0, 2) if total > 0 else 0.0 + + return { + "total_techniques": total, + "validated_count": validated, + "partial_count": partial, + "not_covered_count": not_covered, + "coverage_pct": coverage_pct, + } + + +def _aggregate_risk(db: Session) -> dict: + """Aggregate risk metrics from TechniqueRiskProfile.""" + profiles = db.query(TechniqueRiskProfile).all() + if not profiles: + return { + "avg_risk_score": 0.0, + "critical_count": 0, + "high_count": 0, + "medium_count": 0, + "low_count": 0, + } + + by_level = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} + score_sum = 0.0 + for p in profiles: + score_sum += p.risk_score + lvl = p.risk_level or "info" + by_level[lvl] = by_level.get(lvl, 0) + 1 + + return { + "avg_risk_score": round(score_sum / len(profiles), 2), + "critical_count": by_level["critical"], + "high_count": by_level["high"], + "medium_count": by_level["medium"], + "low_count": by_level["low"], + } + + +def _aggregate_operations(db: Session) -> dict: + """Aggregate operational queue and orphan counts.""" + open_queue = db.query(RevalidationQueueItem).filter( + RevalidationQueueItem.status.in_([QueueStatus.pending, QueueStatus.in_progress]), + ).count() + + # Orphan = technique with no ownership record OR owner_id IS NULL + owned_technique_ids = ( + db.query(TechniqueOwnership.technique_id) + .filter(TechniqueOwnership.owner_id.isnot(None)) + .subquery() + ) + total_tech = db.query(Technique).count() + owned_count = db.query(TechniqueOwnership).filter( + TechniqueOwnership.owner_id.isnot(None) + ).count() + orphans = total_tech - owned_count + + return { + "open_queue_items": open_queue, + "orphan_techniques": max(orphans, 0), + } + + +def _aggregate_knowledge(db: Session) -> dict: + """Count active playbooks and lessons learned.""" + playbook_count = db.query(Playbook).filter(Playbook.is_active == True).count() + lesson_count = db.query(LessonLearned).filter(LessonLearned.is_active == True).count() + return { + "playbook_count": playbook_count, + "lesson_count": lesson_count, + } + + +def _aggregate_mttd(db: Session) -> dict: + """Aggregate MTTD from completed attack-path executions in the last 30 days.""" + cutoff = datetime.utcnow() - timedelta(days=30) + execs = db.query(AttackPathExecution).filter( + AttackPathExecution.status == ExecutionStatus.completed, + AttackPathExecution.completed_at >= cutoff, + ).all() + + count = len(execs) + mttd_values = [e.mttd_seconds for e in execs if e.mttd_seconds is not None] + dr_values = [e.detection_rate for e in execs if e.detection_rate is not None] + + return { + "executions_30d": count, + "mttd_avg_seconds": round(sum(mttd_values) / len(mttd_values), 2) if mttd_values else None, + "detection_rate_30d": round(sum(dr_values) / len(dr_values), 4) if dr_values else None, + } + + +def _build_extra_breakdown(db: Session) -> dict: + """Build the by-tactic breakdown stored in the `extra` JSONB field.""" + techniques = db.query(Technique).all() + tactic_map: dict = {} + for t in techniques: + tac = t.tactic or "Unknown" + if tac not in tactic_map: + tactic_map[tac] = {"total": 0, "validated": 0, "partial": 0, "not_covered": 0} + tactic_map[tac]["total"] += 1 + s = t.status_global + if s == TechniqueStatus.validated: + tactic_map[tac]["validated"] += 1 + elif s == TechniqueStatus.partial: + tactic_map[tac]["partial"] += 1 + else: + tactic_map[tac]["not_covered"] += 1 + + coverage_by_tactic = [ + { + "tactic": tac, + "total": v["total"], + "validated": v["validated"], + "partial": v["partial"], + "not_covered": v["not_covered"], + "coverage_pct": round( + (v["validated"] + v["partial"] * 0.5) / v["total"] * 100.0, 2 + ) if v["total"] > 0 else 0.0, + } + for tac, v in sorted(tactic_map.items()) + ] + return {"coverage_by_tactic": coverage_by_tactic} + + +# ── Snapshot persistence ─────────────────────────────────────────────────────── + +def take_posture_snapshot( + db: Session, + created_by: Optional[UUID] = None, +) -> PostureSnapshot: + """ + Aggregate all phases and write (or update) today's PostureSnapshot. + Upserts on snapshot_date — only one row per calendar day. + """ + today = date.today() + + coverage = _aggregate_coverage(db) + risk = _aggregate_risk(db) + operations = _aggregate_operations(db) + knowledge = _aggregate_knowledge(db) + mttd = _aggregate_mttd(db) + extra = _build_extra_breakdown(db) + + existing = db.query(PostureSnapshot).filter( + PostureSnapshot.snapshot_date == today, + ).first() + + values = { + **coverage, + **risk, + **operations, + **knowledge, + **mttd, + "extra": extra, + } + + if existing: + for k, v in values.items(): + setattr(existing, k, v) + existing.created_by = created_by + db.commit() + db.refresh(existing) + return existing + + snap = PostureSnapshot(snapshot_date=today, created_by=created_by, **values) + db.add(snap) + db.commit() + db.refresh(snap) + return snap + + +# ── Live / read-only aggregations (no DB write) ─────────────────────────────── + +def get_live_kpis(db: Session) -> dict: + """Return current KPIs without persisting a snapshot.""" + coverage = _aggregate_coverage(db) + risk = _aggregate_risk(db) + operations = _aggregate_operations(db) + knowledge = _aggregate_knowledge(db) + mttd = _aggregate_mttd(db) + return {**coverage, **risk, **operations, **knowledge, **mttd, "snapshot_date": date.today()} + + +def get_coverage_by_tactic(db: Session) -> list: + """Per-tactic validated/partial/not_covered breakdown.""" + extra = _build_extra_breakdown(db) + return extra["coverage_by_tactic"] + + +def get_posture_history( + db: Session, + days: int = 30, +) -> List[PostureSnapshot]: + """Return the last `days` PostureSnapshot rows ordered ascending.""" + cutoff = date.today() - timedelta(days=days) + return ( + db.query(PostureSnapshot) + .filter(PostureSnapshot.snapshot_date >= cutoff) + .order_by(PostureSnapshot.snapshot_date.asc()) + .all() + ) + + +def get_top_risks(db: Session, limit: int = 5) -> list: + """Return top-N risk profiles with technique details.""" + from app.models.risk_intelligence import TechniqueRiskProfile + + rows = ( + db.query(TechniqueRiskProfile, Technique) + .join(Technique, TechniqueRiskProfile.technique_id == Technique.id) + .order_by(TechniqueRiskProfile.risk_score.desc()) + .limit(limit) + .all() + ) + return [ + { + "technique_id": str(p.technique_id), + "technique_name": t.name, + "technique_tid": t.mitre_id, + "tactic": t.tactic, + "risk_score": p.risk_score, + "risk_level": p.risk_level, + "likelihood": p.likelihood, + "impact": p.impact, + "detection_gap": p.detection_gap, + } + for p, t in rows + ] + + +def get_recent_activity(db: Session, limit: int = 20) -> list: + """Combine recent events from tests, attack-path executions, queue, and OSINT.""" + events: list = [] + + # Recent test executions (use execution_date, fall back to created_at) + recent_tests = ( + db.query(Test) + .filter(Test.result.isnot(None)) + .order_by(Test.created_at.desc()) + .limit(limit) + .all() + ) + for t in recent_tests: + ts = t.execution_date or t.created_at + events.append({ + "ts": ts, + "category": "test", + "title": f"Test executed — result: {t.result.value if t.result else 'pending'}", + "detail": str(t.id), + }) + + # Recent completed attack-path executions + recent_execs = ( + db.query(AttackPathExecution) + .filter( + AttackPathExecution.status == ExecutionStatus.completed, + AttackPathExecution.completed_at.isnot(None), + ) + .order_by(AttackPathExecution.completed_at.desc()) + .limit(limit // 2) + .all() + ) + for e in recent_execs: + dr = f"{e.detection_rate * 100:.0f}%" if e.detection_rate is not None else "n/a" + events.append({ + "ts": e.completed_at, + "category": "attack_path", + "title": f"Attack path completed — detection: {dr}", + "detail": str(e.id), + }) + + # Recent OSINT items + recent_osint = ( + db.query(OsintItem) + .order_by(OsintItem.discovered_at.desc()) + .limit(limit // 4) + .all() + ) + for o in recent_osint: + events.append({ + "ts": o.discovered_at, + "category": "osint", + "title": f"OSINT signal: {o.title or 'unknown'}", + "detail": str(o.id), + }) + + # Sort all events descending by timestamp, return top `limit` + events.sort(key=lambda x: x["ts"] or datetime.min, reverse=True) + return events[:limit] + + +def get_executive_summary(db: Session) -> dict: + """Full executive view — live KPIs + snapshot + trends + top risks + activity.""" + # Take (or update) today's snapshot + snap = take_posture_snapshot(db) + + # 30-day trend + history = get_posture_history(db, days=30) + coverage_trend = [ + {"date": str(s.snapshot_date), "value": s.coverage_pct} + for s in history + ] + risk_trend = [ + {"date": str(s.snapshot_date), "value": s.avg_risk_score} + for s in history + ] + + return { + "snapshot": snap, + "coverage_trend": coverage_trend, + "risk_trend": risk_trend, + "top_risks": get_top_risks(db), + "coverage_by_tactic": get_coverage_by_tactic(db), + "recent_activity": get_recent_activity(db), + } diff --git a/scripts/qa_phase13.py b/scripts/qa_phase13.py new file mode 100644 index 0000000..594971e --- /dev/null +++ b/scripts/qa_phase13.py @@ -0,0 +1,250 @@ +""" +QA script for Phase 13 — Executive Dashboard. +Run with: python -X utf8 scripts/qa_phase13.py +""" + +import sys, os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests + +BASE = "http://localhost:8000/api/v1" +PASS = "\033[92m✓\033[0m" +FAIL = "\033[91m✗\033[0m" + +passed = 0 +failed = 0 + + +def check(label: str, condition: bool, detail: str = ""): + global passed, failed + if condition: + passed += 1 + print(f" {PASS} {label}") + else: + failed += 1 + msg = f" {FAIL} {label}" + if detail: + msg += f" — {detail}" + print(msg) + + +def get_token(username="administrator", password="admin123"): + r = requests.post(f"{BASE}/auth/login", + data={"username": username, "password": password}) + if r.status_code == 200: + return r.json().get("access_token") or r.json().get("token") + raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}") + + +def auth(token): + return {"Authorization": f"Bearer {token}"} + + +# ───────────────────────────────────────────────────────────────────────────── + +def main(): + print("\n====== Phase 13 QA — Executive Dashboard ======\n") + + token = get_token() + h = auth(token) + + # ── Block 1: Take posture snapshot ─────────────────────────────────────── + print("── Block 1: Take posture snapshot ──") + + r = requests.post(f"{BASE}/dashboard/posture-snapshot", headers=h) + check("POST /dashboard/posture-snapshot → 201", r.status_code == 201, + r.text[:200]) + snap = r.json() if r.status_code == 201 else {} + + check("Snapshot has id", bool(snap.get("id"))) + check("Snapshot has snapshot_date", bool(snap.get("snapshot_date"))) + check("total_techniques > 0", snap.get("total_techniques", 0) > 0, + f"total={snap.get('total_techniques')}") + check("coverage_pct is float 0-100", + isinstance(snap.get("coverage_pct"), (int, float)) + and 0 <= snap.get("coverage_pct", -1) <= 100) + check("avg_risk_score is float 0-100", + isinstance(snap.get("avg_risk_score"), (int, float)) + and 0 <= snap.get("avg_risk_score", -1) <= 100) + check("validated_count + partial_count + not_covered_count == total_techniques", + snap.get("validated_count", 0) + + snap.get("partial_count", 0) + + snap.get("not_covered_count", 0) + == snap.get("total_techniques", -1)) + check("open_queue_items >= 0", snap.get("open_queue_items", -1) >= 0) + check("orphan_techniques >= 0", snap.get("orphan_techniques", -1) >= 0) + check("playbook_count >= 0", snap.get("playbook_count", -1) >= 0) + check("lesson_count >= 0", snap.get("lesson_count", -1) >= 0) + check("executions_30d >= 0", snap.get("executions_30d", -1) >= 0) + + # Idempotent: taking again today returns same date + r2 = requests.post(f"{BASE}/dashboard/posture-snapshot", headers=h) + check("POST again today → 201 (upsert)", r2.status_code == 201, + r2.text[:100]) + snap2 = r2.json() if r2.status_code == 201 else {} + check("Same snapshot_date on upsert", + snap2.get("snapshot_date") == snap.get("snapshot_date")) + + print() + + # ── Block 2: KPIs ──────────────────────────────────────────────────────── + print("── Block 2: KPIs ──") + + r = requests.get(f"{BASE}/dashboard/kpis", headers=h) + check("GET /dashboard/kpis → 200", r.status_code == 200, r.text[:150]) + kpis = r.json() if r.status_code == 200 else {} + + check("coverage_pct present", "coverage_pct" in kpis) + check("avg_risk_score present", "avg_risk_score" in kpis) + check("critical_count present", "critical_count" in kpis) + check("open_queue_items present", "open_queue_items" in kpis) + check("playbook_count present", "playbook_count" in kpis) + check("lesson_count present", "lesson_count" in kpis) + check("snapshot_date present", "snapshot_date" in kpis) + + print() + + # ── Block 3: Coverage by tactic ────────────────────────────────────────── + print("── Block 3: Coverage by tactic ──") + + r = requests.get(f"{BASE}/dashboard/coverage-by-tactic", headers=h) + check("GET /dashboard/coverage-by-tactic → 200", r.status_code == 200, + r.text[:150]) + tactics = r.json() if r.status_code == 200 else [] + check("Coverage by tactic returns list", isinstance(tactics, list)) + check("At least 1 tactic", len(tactics) > 0) + if tactics: + t0 = tactics[0] + check("Tactic entry has tactic name", bool(t0.get("tactic"))) + check("Tactic entry has total", "total" in t0) + check("Tactic entry has validated", "validated" in t0) + check("Tactic entry has partial", "partial" in t0) + check("Tactic entry has coverage_pct", "coverage_pct" in t0) + check("coverage_pct in 0-100", + 0 <= t0.get("coverage_pct", -1) <= 100) + + print() + + # ── Block 4: Posture history ────────────────────────────────────────────── + print("── Block 4: Posture history ──") + + r = requests.get(f"{BASE}/dashboard/posture-history", headers=h) + check("GET /dashboard/posture-history → 200", r.status_code == 200, + r.text[:150]) + history = r.json() if r.status_code == 200 else [] + check("History is list", isinstance(history, list)) + check("At least 1 history entry (today's snapshot)", len(history) >= 1) + if history: + h0 = history[0] + check("History entry has snapshot_date", "snapshot_date" in h0) + check("History entry has coverage_pct", "coverage_pct" in h0) + check("History entry has avg_risk_score", "avg_risk_score" in h0) + + # Custom days parameter + r = requests.get(f"{BASE}/dashboard/posture-history", headers=h, + params={"days": 7}) + check("GET /posture-history?days=7 → 200", r.status_code == 200) + + print() + + # ── Block 5: Executive view ─────────────────────────────────────────────── + print("── Block 5: Executive view ──") + + r = requests.get(f"{BASE}/dashboard/executive", headers=h) + check("GET /dashboard/executive → 200", r.status_code == 200, + r.text[:200]) + exec_data = r.json() if r.status_code == 200 else {} + + check("executive has snapshot", "snapshot" in exec_data) + check("executive has coverage_trend", "coverage_trend" in exec_data) + check("executive has risk_trend", "risk_trend" in exec_data) + check("executive has top_risks", "top_risks" in exec_data) + check("executive has coverage_by_tactic", "coverage_by_tactic" in exec_data) + check("executive has recent_activity", "recent_activity" in exec_data) + + snap_inner = exec_data.get("snapshot", {}) + check("Embedded snapshot has total_techniques", + snap_inner.get("total_techniques", -1) > 0) + + if exec_data.get("top_risks"): + tr0 = exec_data["top_risks"][0] + check("Top risk has technique_name", "technique_name" in tr0) + check("Top risk has risk_score", "risk_score" in tr0) + check("Top risk has risk_level", "risk_level" in tr0) + + print() + + # ── Block 6: Activity feed ──────────────────────────────────────────────── + print("── Block 6: Activity feed ──") + + r = requests.get(f"{BASE}/dashboard/activity", headers=h) + check("GET /dashboard/activity → 200", r.status_code == 200, + r.text[:150]) + activity = r.json() if r.status_code == 200 else [] + check("Activity is list", isinstance(activity, list)) + + if activity: + a0 = activity[0] + check("Activity entry has ts", "ts" in a0) + check("Activity entry has category", "category" in a0) + check("Activity entry has title", "title" in a0) + + # Custom limit + r = requests.get(f"{BASE}/dashboard/activity", headers=h, + params={"limit": 5}) + check("GET /activity?limit=5 returns ≤5 items", + r.status_code == 200 and len(r.json()) <= 5) + + print() + + # ── Block 7: Auth protection ────────────────────────────────────────────── + print("── Block 7: Auth protection ──") + + protected = [ + ("GET", f"{BASE}/dashboard/executive"), + ("GET", f"{BASE}/dashboard/kpis"), + ("GET", f"{BASE}/dashboard/coverage-by-tactic"), + ("GET", f"{BASE}/dashboard/posture-history"), + ("GET", f"{BASE}/dashboard/activity"), + ] + for method, url in protected: + r = requests.request(method, url) + ep = url.split("/api/v1")[1] + check(f"{method} {ep} without auth → 401", r.status_code == 401) + + # snapshot requires admin/lead role + r = requests.post(f"{BASE}/dashboard/posture-snapshot") + check("POST /posture-snapshot without auth → 401", r.status_code == 401) + + print() + + # ── Block 8: Regression ────────────────────────────────────────────────── + print("── Block 8: Regression ──") + + r = requests.get(f"{BASE}/risk/summary", headers=h) + check("GET /risk/summary still works", r.status_code == 200) + + r = requests.get(f"{BASE}/knowledge/playbooks", headers=h) + check("GET /knowledge/playbooks still works", r.status_code == 200) + + r = requests.get(f"{BASE}/attack-paths", headers=h) + check("GET /attack-paths still works", r.status_code == 200) + + r = requests.get(f"{BASE}/techniques", headers=h) + check("GET /techniques still works", r.status_code == 200) + + print() + + # ── Summary ─────────────────────────────────────────────────────────────── + total = passed + failed + print(f"====== Results: {passed}/{total} passed", end="") + if failed: + print(f" — \033[91m{failed} FAILED\033[0m ======\n") + sys.exit(1) + else: + print(" ✓ ALL PASSED ======\n") + + +if __name__ == "__main__": + main()