""" QA script for Phase 12 — Risk Intelligence. Run with: python -X utf8 scripts/qa_phase12.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import requests BASE = "http://localhost:8000/api/v1" PASS = "\033[92m✓\033[0m" FAIL = "\033[91m✗\033[0m" passed = 0 failed = 0 def check(label: str, condition: bool, detail: str = ""): global passed, failed if condition: passed += 1 print(f" {PASS} {label}") else: failed += 1 msg = f" {FAIL} {label}" if detail: msg += f" — {detail}" print(msg) def get_token(username="administrator", password="admin123"): r = requests.post(f"{BASE}/auth/login", data={"username": username, "password": password}) if r.status_code == 200: return r.json().get("access_token") or r.json().get("token") raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}") def auth(token): return {"Authorization": f"Bearer {token}"} def get_first_technique(headers): r = requests.get(f"{BASE}/techniques", headers=headers, params={"limit": 5}) items = r.json() if isinstance(items, dict): items = items.get("items", []) if not items: raise RuntimeError("No techniques found") return items # ───────────────────────────────────────────────────────────────────────────── def main(): print("\n====== Phase 12 QA — Risk Intelligence ======\n") token = get_token() h = auth(token) techniques = get_first_technique(h) tid = techniques[0]["id"] print(f" Using technique_id: {tid}\n") # ── Block 1: Compute single technique ──────────────────────────────────── print("── Block 1: Compute single technique ──") r = requests.post(f"{BASE}/risk/profiles/{tid}/compute", headers=h) check("POST /risk/profiles/{id}/compute → 200", r.status_code == 200, r.text[:150]) profile = r.json() if r.status_code == 200 else {} check("Profile has technique_id", profile.get("technique_id") == tid) check("risk_score is float 0-100", isinstance(profile.get("risk_score"), (int, float)) and 0 <= profile.get("risk_score", -1) <= 100) check("risk_level is valid", profile.get("risk_level") in ("critical", "high", "medium", "low", "info")) check("detection_gap in 0-1", 0 <= profile.get("detection_gap", -1) <= 1) check("is_stale = False", profile.get("is_stale") == False) check("scoring_breakdown present", bool(profile.get("scoring_breakdown"))) check("recommendations is list", isinstance(profile.get("recommendations"), list)) # Compute a second technique if len(techniques) > 1: tid2 = techniques[1]["id"] r = requests.post(f"{BASE}/risk/profiles/{tid2}/compute", headers=h) check("POST compute second technique → 200", r.status_code == 200, r.text[:120]) # Compute non-existent technique → 404 r = requests.post( f"{BASE}/risk/profiles/00000000-0000-0000-0000-000000000001/compute", headers=h) check("POST compute non-existent technique → 404", r.status_code == 404) print() # ── Block 2: Compute all techniques ────────────────────────────────────── print("── Block 2: Compute ALL techniques ──") r = requests.post(f"{BASE}/risk/compute", headers=h) check("POST /risk/compute → 202", r.status_code == 202, r.text[:150]) result = r.json() if r.status_code == 202 else {} check("computed > 0", result.get("computed", 0) > 0, f"computed={result.get('computed')}") check("errors == 0", result.get("errors", 99) == 0, f"errors={result.get('errors')}") check("duration_seconds present", "duration_seconds" in result) print(f" computed={result.get('computed')} techniques in " f"{result.get('duration_seconds', '?')}s") print() # ── Block 3: List profiles ──────────────────────────────────────────────── print("── Block 3: List risk profiles ──") r = requests.get(f"{BASE}/risk/profiles", headers=h) check("GET /risk/profiles → 200", r.status_code == 200) profiles = r.json() if r.status_code == 200 else [] check("Profiles list not empty", len(profiles) > 0) check("Each profile has risk_score", all("risk_score" in p for p in profiles[:5])) check("Sorted by risk_score desc", all(profiles[i]["risk_score"] >= profiles[i+1]["risk_score"] for i in range(min(len(profiles)-1, 5)))) # Filter by risk_level for level in ("critical", "high", "medium", "low", "info"): r2 = requests.get(f"{BASE}/risk/profiles", headers=h, params={"risk_level": level}) if r2.status_code == 200 and r2.json(): check(f"Filter risk_level={level} → only that level", all(p["risk_level"] == level for p in r2.json())) break else: # Just verify the filter works without crashing r2 = requests.get(f"{BASE}/risk/profiles", headers=h, params={"risk_level": "info"}) check("Filter risk_level=info → 200", r2.status_code == 200) # Filter by min_score r = requests.get(f"{BASE}/risk/profiles", headers=h, params={"min_score": 0.0, "max_score": 100.0}) check("Filter min/max_score → 200", r.status_code == 200) # limit + offset r = requests.get(f"{BASE}/risk/profiles", headers=h, params={"limit": 2, "offset": 0}) check("GET ?limit=2 returns ≤2 profiles", r.status_code == 200 and len(r.json()) <= 2) print() # ── Block 4: Get single profile ─────────────────────────────────────────── print("── Block 4: Get single risk profile ──") r = requests.get(f"{BASE}/risk/profiles/{tid}", headers=h) check("GET /risk/profiles/{technique_id} → 200", r.status_code == 200) p = r.json() if r.status_code == 200 else {} check("Correct technique_id", p.get("technique_id") == tid) check("scoring_breakdown has detection_gap key", "detection_gap" in (p.get("scoring_breakdown") or {})) check("scoring_breakdown has threat_actor key", "threat_actor" in (p.get("scoring_breakdown") or {})) check("scoring_breakdown has osint key", "osint" in (p.get("scoring_breakdown") or {})) check("scoring_breakdown has test_failures key", "test_failures" in (p.get("scoring_breakdown") or {})) # Not found → 404 r = requests.get( f"{BASE}/risk/profiles/00000000-0000-0000-0000-000000000001", headers=h) check("GET non-existent profile → 404", r.status_code == 404) print() # ── Block 5: Risk matrix ────────────────────────────────────────────────── print("── Block 5: Risk matrix ──") r = requests.get(f"{BASE}/risk/matrix", headers=h) check("GET /risk/matrix → 200", r.status_code == 200) matrix = r.json() if r.status_code == 200 else [] check("Matrix not empty", len(matrix) > 0) if matrix: entry = matrix[0] check("Matrix entry has technique_name", "technique_name" in entry) check("Matrix entry has likelihood", "likelihood" in entry) check("Matrix entry has impact", "impact" in entry) check("Matrix entry has risk_level", "risk_level" in entry) print() # ── Block 6: Risk summary ───────────────────────────────────────────────── print("── Block 6: Risk summary ──") r = requests.get(f"{BASE}/risk/summary", headers=h) check("GET /risk/summary → 200", r.status_code == 200) summary = r.json() if r.status_code == 200 else {} check("total_techniques > 0", summary.get("total_techniques", 0) > 0) check("scored_techniques > 0", summary.get("scored_techniques", 0) > 0) check("by_level has all levels", all(k in summary.get("by_level", {}) for k in ("critical", "high", "medium", "low", "info"))) check("avg_risk_score is float", isinstance(summary.get("avg_risk_score"), (int, float))) check("top_risks is list", isinstance(summary.get("top_risks"), list)) if summary.get("top_risks"): top = summary["top_risks"][0] check("Top risk has technique_name", "technique_name" in top) check("Top risk has risk_score", "risk_score" in top) print() # ── Block 7: Recommendations ────────────────────────────────────────────── print("── Block 7: Recommendations ──") r = requests.get(f"{BASE}/risk/recommendations", headers=h) check("GET /risk/recommendations → 200", r.status_code == 200) recs = r.json() if r.status_code == 200 else [] check("Recommendations list not empty", len(recs) > 0) if recs: rec = recs[0] check("Recommendation has priority", "priority" in rec) check("Recommendation has recommendations list", isinstance(rec.get("recommendations"), list)) check("Recommendations sorted by priority", all(recs[i]["priority"] <= recs[i+1]["priority"] for i in range(min(len(recs)-1, 4)))) # Custom limit r = requests.get(f"{BASE}/risk/recommendations", headers=h, params={"limit": 3}) check("GET ?limit=3 returns ≤3 recommendations", r.status_code == 200 and len(r.json()) <= 3) print() # ── Block 8: Top risks ──────────────────────────────────────────────────── print("── Block 8: Top risks ──") r = requests.get(f"{BASE}/risk/top", headers=h) check("GET /risk/top → 200", r.status_code == 200) top = r.json() if r.status_code == 200 else [] check("Top risks not empty", len(top) > 0) r = requests.get(f"{BASE}/risk/top", headers=h, params={"limit": 3}) check("GET /risk/top?limit=3 → ≤3 results", r.status_code == 200 and len(r.json()) <= 3) print() # ── Block 9: Auth protection ────────────────────────────────────────────── print("── Block 9: Auth protection ──") no_auth = [ ("GET", f"{BASE}/risk/profiles"), ("GET", f"{BASE}/risk/matrix"), ("GET", f"{BASE}/risk/summary"), ("GET", f"{BASE}/risk/recommendations"), ("GET", f"{BASE}/risk/top"), ] for method, url in no_auth: r = requests.request(method, url) ep = url.split("/api/v1")[1] check(f"{method} {ep} without auth → 401", r.status_code == 401) # Admin-only compute endpoint # Create a non-admin token for role check r_na = requests.post(f"{BASE}/risk/compute") check("POST /risk/compute without auth → 401", r_na.status_code == 401) print() # ── Block 10: Regression ───────────────────────────────────────────────── print("── Block 10: Regression ──") r = requests.get(f"{BASE}/knowledge/playbooks", headers=h) check("GET /knowledge/playbooks still works", r.status_code == 200) r = requests.get(f"{BASE}/attack-paths", headers=h) check("GET /attack-paths still works", r.status_code == 200) r = requests.get(f"{BASE}/techniques", headers=h) check("GET /techniques still works", r.status_code == 200) print() # ── Summary ─────────────────────────────────────────────────────────────── total = passed + failed print(f"====== Results: {passed}/{total} passed", end="") if failed: print(f" — \033[91m{failed} FAILED\033[0m ======\n") sys.exit(1) else: print(" ✓ ALL PASSED ======\n") if __name__ == "__main__": main()