feat(dashboard): Phase 13 — Executive Dashboard
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

PostureSnapshot model, Alembic migration (b039exec), schemas, service
aggregating all phases (coverage/risk/operations/knowledge/MTTD), and
router at /api/v1/dashboard with executive view, KPIs, coverage-by-tactic,
posture-history, posture-snapshot, and activity-feed endpoints.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-20 16:20:21 +02:00
parent 41a0c536bb
commit ab591d30c4
8 changed files with 997 additions and 0 deletions

250
scripts/qa_phase13.py Normal file
View File

@@ -0,0 +1,250 @@
"""
QA script for Phase 13 — Executive Dashboard.
Run with: python -X utf8 scripts/qa_phase13.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import requests
BASE = "http://localhost:8000/api/v1"
PASS = "\033[92m✓\033[0m"
FAIL = "\033[91m✗\033[0m"
passed = 0
failed = 0
def check(label: str, condition: bool, detail: str = ""):
global passed, failed
if condition:
passed += 1
print(f" {PASS} {label}")
else:
failed += 1
msg = f" {FAIL} {label}"
if detail:
msg += f"{detail}"
print(msg)
def get_token(username="administrator", password="admin123"):
r = requests.post(f"{BASE}/auth/login",
data={"username": username, "password": password})
if r.status_code == 200:
return r.json().get("access_token") or r.json().get("token")
raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}")
def auth(token):
return {"Authorization": f"Bearer {token}"}
# ─────────────────────────────────────────────────────────────────────────────
def main():
print("\n====== Phase 13 QA — Executive Dashboard ======\n")
token = get_token()
h = auth(token)
# ── Block 1: Take posture snapshot ───────────────────────────────────────
print("── Block 1: Take posture snapshot ──")
r = requests.post(f"{BASE}/dashboard/posture-snapshot", headers=h)
check("POST /dashboard/posture-snapshot → 201", r.status_code == 201,
r.text[:200])
snap = r.json() if r.status_code == 201 else {}
check("Snapshot has id", bool(snap.get("id")))
check("Snapshot has snapshot_date", bool(snap.get("snapshot_date")))
check("total_techniques > 0", snap.get("total_techniques", 0) > 0,
f"total={snap.get('total_techniques')}")
check("coverage_pct is float 0-100",
isinstance(snap.get("coverage_pct"), (int, float))
and 0 <= snap.get("coverage_pct", -1) <= 100)
check("avg_risk_score is float 0-100",
isinstance(snap.get("avg_risk_score"), (int, float))
and 0 <= snap.get("avg_risk_score", -1) <= 100)
check("validated_count + partial_count + not_covered_count == total_techniques",
snap.get("validated_count", 0)
+ snap.get("partial_count", 0)
+ snap.get("not_covered_count", 0)
== snap.get("total_techniques", -1))
check("open_queue_items >= 0", snap.get("open_queue_items", -1) >= 0)
check("orphan_techniques >= 0", snap.get("orphan_techniques", -1) >= 0)
check("playbook_count >= 0", snap.get("playbook_count", -1) >= 0)
check("lesson_count >= 0", snap.get("lesson_count", -1) >= 0)
check("executions_30d >= 0", snap.get("executions_30d", -1) >= 0)
# Idempotent: taking again today returns same date
r2 = requests.post(f"{BASE}/dashboard/posture-snapshot", headers=h)
check("POST again today → 201 (upsert)", r2.status_code == 201,
r2.text[:100])
snap2 = r2.json() if r2.status_code == 201 else {}
check("Same snapshot_date on upsert",
snap2.get("snapshot_date") == snap.get("snapshot_date"))
print()
# ── Block 2: KPIs ────────────────────────────────────────────────────────
print("── Block 2: KPIs ──")
r = requests.get(f"{BASE}/dashboard/kpis", headers=h)
check("GET /dashboard/kpis → 200", r.status_code == 200, r.text[:150])
kpis = r.json() if r.status_code == 200 else {}
check("coverage_pct present", "coverage_pct" in kpis)
check("avg_risk_score present", "avg_risk_score" in kpis)
check("critical_count present", "critical_count" in kpis)
check("open_queue_items present", "open_queue_items" in kpis)
check("playbook_count present", "playbook_count" in kpis)
check("lesson_count present", "lesson_count" in kpis)
check("snapshot_date present", "snapshot_date" in kpis)
print()
# ── Block 3: Coverage by tactic ──────────────────────────────────────────
print("── Block 3: Coverage by tactic ──")
r = requests.get(f"{BASE}/dashboard/coverage-by-tactic", headers=h)
check("GET /dashboard/coverage-by-tactic → 200", r.status_code == 200,
r.text[:150])
tactics = r.json() if r.status_code == 200 else []
check("Coverage by tactic returns list", isinstance(tactics, list))
check("At least 1 tactic", len(tactics) > 0)
if tactics:
t0 = tactics[0]
check("Tactic entry has tactic name", bool(t0.get("tactic")))
check("Tactic entry has total", "total" in t0)
check("Tactic entry has validated", "validated" in t0)
check("Tactic entry has partial", "partial" in t0)
check("Tactic entry has coverage_pct", "coverage_pct" in t0)
check("coverage_pct in 0-100",
0 <= t0.get("coverage_pct", -1) <= 100)
print()
# ── Block 4: Posture history ──────────────────────────────────────────────
print("── Block 4: Posture history ──")
r = requests.get(f"{BASE}/dashboard/posture-history", headers=h)
check("GET /dashboard/posture-history → 200", r.status_code == 200,
r.text[:150])
history = r.json() if r.status_code == 200 else []
check("History is list", isinstance(history, list))
check("At least 1 history entry (today's snapshot)", len(history) >= 1)
if history:
h0 = history[0]
check("History entry has snapshot_date", "snapshot_date" in h0)
check("History entry has coverage_pct", "coverage_pct" in h0)
check("History entry has avg_risk_score", "avg_risk_score" in h0)
# Custom days parameter
r = requests.get(f"{BASE}/dashboard/posture-history", headers=h,
params={"days": 7})
check("GET /posture-history?days=7 → 200", r.status_code == 200)
print()
# ── Block 5: Executive view ───────────────────────────────────────────────
print("── Block 5: Executive view ──")
r = requests.get(f"{BASE}/dashboard/executive", headers=h)
check("GET /dashboard/executive → 200", r.status_code == 200,
r.text[:200])
exec_data = r.json() if r.status_code == 200 else {}
check("executive has snapshot", "snapshot" in exec_data)
check("executive has coverage_trend", "coverage_trend" in exec_data)
check("executive has risk_trend", "risk_trend" in exec_data)
check("executive has top_risks", "top_risks" in exec_data)
check("executive has coverage_by_tactic", "coverage_by_tactic" in exec_data)
check("executive has recent_activity", "recent_activity" in exec_data)
snap_inner = exec_data.get("snapshot", {})
check("Embedded snapshot has total_techniques",
snap_inner.get("total_techniques", -1) > 0)
if exec_data.get("top_risks"):
tr0 = exec_data["top_risks"][0]
check("Top risk has technique_name", "technique_name" in tr0)
check("Top risk has risk_score", "risk_score" in tr0)
check("Top risk has risk_level", "risk_level" in tr0)
print()
# ── Block 6: Activity feed ────────────────────────────────────────────────
print("── Block 6: Activity feed ──")
r = requests.get(f"{BASE}/dashboard/activity", headers=h)
check("GET /dashboard/activity → 200", r.status_code == 200,
r.text[:150])
activity = r.json() if r.status_code == 200 else []
check("Activity is list", isinstance(activity, list))
if activity:
a0 = activity[0]
check("Activity entry has ts", "ts" in a0)
check("Activity entry has category", "category" in a0)
check("Activity entry has title", "title" in a0)
# Custom limit
r = requests.get(f"{BASE}/dashboard/activity", headers=h,
params={"limit": 5})
check("GET /activity?limit=5 returns ≤5 items",
r.status_code == 200 and len(r.json()) <= 5)
print()
# ── Block 7: Auth protection ──────────────────────────────────────────────
print("── Block 7: Auth protection ──")
protected = [
("GET", f"{BASE}/dashboard/executive"),
("GET", f"{BASE}/dashboard/kpis"),
("GET", f"{BASE}/dashboard/coverage-by-tactic"),
("GET", f"{BASE}/dashboard/posture-history"),
("GET", f"{BASE}/dashboard/activity"),
]
for method, url in protected:
r = requests.request(method, url)
ep = url.split("/api/v1")[1]
check(f"{method} {ep} without auth → 401", r.status_code == 401)
# snapshot requires admin/lead role
r = requests.post(f"{BASE}/dashboard/posture-snapshot")
check("POST /posture-snapshot without auth → 401", r.status_code == 401)
print()
# ── Block 8: Regression ──────────────────────────────────────────────────
print("── Block 8: Regression ──")
r = requests.get(f"{BASE}/risk/summary", headers=h)
check("GET /risk/summary still works", r.status_code == 200)
r = requests.get(f"{BASE}/knowledge/playbooks", headers=h)
check("GET /knowledge/playbooks still works", r.status_code == 200)
r = requests.get(f"{BASE}/attack-paths", headers=h)
check("GET /attack-paths still works", r.status_code == 200)
r = requests.get(f"{BASE}/techniques", headers=h)
check("GET /techniques still works", r.status_code == 200)
print()
# ── Summary ───────────────────────────────────────────────────────────────
total = passed + failed
print(f"====== Results: {passed}/{total} passed", end="")
if failed:
print(f"\033[91m{failed} FAILED\033[0m ======\n")
sys.exit(1)
else:
print(" ✓ ALL PASSED ======\n")
if __name__ == "__main__":
main()