""" QA script for Phase 11 — Knowledge Management (Playbooks + Lessons Learned). Run with: python -X utf8 scripts/qa_phase11.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import requests BASE = "http://localhost:8000/api/v1" PASS = "\033[92m✓\033[0m" FAIL = "\033[91m✗\033[0m" passed = 0 failed = 0 def check(label: str, condition: bool, detail: str = ""): global passed, failed if condition: passed += 1 print(f" {PASS} {label}") else: failed += 1 msg = f" {FAIL} {label}" if detail: msg += f" — {detail}" print(msg) def get_token(username="administrator", password="admin123"): # Auth router uses OAuth2PasswordRequestForm (form data) r = requests.post(f"{BASE}/auth/login", data={"username": username, "password": password}) if r.status_code == 200: return r.json().get("access_token") or r.json().get("token") raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}") def auth(token): return {"Authorization": f"Bearer {token}"} def get_first_technique(headers): r = requests.get(f"{BASE}/techniques", headers=headers, params={"limit": 1}) items = r.json() if isinstance(items, dict): items = items.get("items", []) if not items: raise RuntimeError("No techniques found — seed the DB first") return items[0]["id"] # ───────────────────────────────────────────────────────────────────────────── def main(): print("\n====== Phase 11 QA — Knowledge Management ======\n") token = get_token() h = auth(token) tid = get_first_technique(h) print(f" Using technique_id: {tid}\n") # ── Block 1: Playbook CRUD ──────────────────────────────────────────────── print("── Block 1: Playbook CRUD ──") # Create detect playbook r = requests.post(f"{BASE}/knowledge/playbooks", headers=h, json={ "technique_id": tid, "playbook_type": "detect", "title": "Detection Playbook v1", "content": "## Detection\nLook for suspicious PowerShell events.", "tools": ["Splunk", "Sigma"], "prerequisites": ["Log ingestion enabled"], "change_note": "Initial version", }) check("POST /knowledge/playbooks → 201", r.status_code == 201, r.text[:120]) pb1 = r.json() if r.status_code == 201 else {} check("Playbook has id", bool(pb1.get("id"))) check("Playbook version = 1", pb1.get("version") == 1) check("Playbook tools list", pb1.get("tools") == ["Splunk", "Sigma"]) # Create attack playbook r = requests.post(f"{BASE}/knowledge/playbooks", headers=h, json={ "technique_id": tid, "playbook_type": "attack", "title": "Attack Playbook", "content": "## Attack\nUse PowerShell to run encoded commands.", "tools": ["Cobalt Strike"], "prerequisites": ["Domain user access"], }) check("POST second playbook (attack type) → 201", r.status_code == 201, r.text[:120]) pb2 = r.json() if r.status_code == 201 else {} # Duplicate → 409 r = requests.post(f"{BASE}/knowledge/playbooks", headers=h, json={ "technique_id": tid, "playbook_type": "detect", "title": "Duplicate", "content": "dup", }) check("POST duplicate (same technique+type) → 409", r.status_code == 409, r.text[:120]) # Invalid playbook_type → 422 r = requests.post(f"{BASE}/knowledge/playbooks", headers=h, json={ "technique_id": tid, "playbook_type": "badtype", "title": "Bad", "content": "x", }) check("POST invalid playbook_type → 422", r.status_code == 422, r.text[:120]) # GET list r = requests.get(f"{BASE}/knowledge/playbooks", headers=h) check("GET /knowledge/playbooks → 200", r.status_code == 200) items = r.json() check("List has ≥2 playbooks", len(items) >= 2) # GET filtered by technique r = requests.get(f"{BASE}/knowledge/playbooks", headers=h, params={"technique_id": tid}) check("GET ?technique_id filter → 200", r.status_code == 200) check("Filter returns only this technique's playbooks", all(p["technique_id"] == tid for p in r.json())) # GET filtered by type r = requests.get(f"{BASE}/knowledge/playbooks", headers=h, params={"playbook_type": "detect"}) check("GET ?playbook_type=detect filter → 200", r.status_code == 200) check("Filter returns only detect playbooks", all(p["playbook_type"] == "detect" for p in r.json())) # GET single pb1_id = pb1.get("id") r = requests.get(f"{BASE}/knowledge/playbooks/{pb1_id}", headers=h) check("GET /knowledge/playbooks/{id} → 200", r.status_code == 200) check("Correct playbook returned", r.json().get("id") == pb1_id) # GET 404 r = requests.get(f"{BASE}/knowledge/playbooks/00000000-0000-0000-0000-000000000001", headers=h) check("GET non-existent playbook → 404", r.status_code == 404) print() # ── Block 2: Playbook Update + Versioning ───────────────────────────────── print("── Block 2: Playbook Update + Versioning ──") r = requests.patch(f"{BASE}/knowledge/playbooks/{pb1_id}", headers=h, json={ "content": "## Detection v2\nAlso check Sysmon event 4688.", "tools": ["Splunk", "Sigma", "Sysmon"], "change_note": "Added Sysmon detection rule", }) check("PATCH /knowledge/playbooks/{id} → 200", r.status_code == 200) pb1_v2 = r.json() check("Version incremented to 2", pb1_v2.get("version") == 2) check("Content updated", "Sysmon" in pb1_v2.get("content", "")) check("Tools updated (3 items)", len(pb1_v2.get("tools", [])) == 3) # PATCH again → version 3 r = requests.patch(f"{BASE}/knowledge/playbooks/{pb1_id}", headers=h, json={ "title": "Detection Playbook v3", "change_note": "Renamed", }) check("PATCH again → version 3", r.json().get("version") == 3) # List versions r = requests.get(f"{BASE}/knowledge/playbooks/{pb1_id}/versions", headers=h) check("GET /playbooks/{id}/versions → 200", r.status_code == 200) versions = r.json() check("3 snapshots saved", len(versions) == 3, f"got {len(versions)}") check("Versions in desc order", versions[0]["version"] >= versions[-1]["version"]) # Restore version 1 r = requests.post(f"{BASE}/knowledge/playbooks/{pb1_id}/restore/1", headers=h) check("POST /restore/1 → 200", r.status_code == 200) restored = r.json() check("Version incremented after restore", restored.get("version") == 4) check("Content restored to v1 content", "Sysmon" not in restored.get("content", "")) # Restore non-existent version → 404 r = requests.post(f"{BASE}/knowledge/playbooks/{pb1_id}/restore/999", headers=h) check("POST /restore/999 (non-existent) → 404", r.status_code == 404) print() # ── Block 3: Technique convenience endpoints ────────────────────────────── print("── Block 3: Technique convenience endpoints ──") r = requests.get(f"{BASE}/knowledge/techniques/{tid}/playbooks", headers=h) check("GET /knowledge/techniques/{id}/playbooks → 200", r.status_code == 200) check("Returns playbooks for this technique", len(r.json()) >= 1) r = requests.get(f"{BASE}/knowledge/techniques/{tid}/playbooks/detect", headers=h) check("GET /knowledge/techniques/{id}/playbooks/detect → 200", r.status_code == 200) check("Returns detect playbook", r.json().get("playbook_type") == "detect") r = requests.get(f"{BASE}/knowledge/techniques/{tid}/playbooks/respond", headers=h) check("GET /knowledge/techniques/{id}/playbooks/respond → 404", r.status_code == 404) print() # ── Block 4: Lessons Learned CRUD ──────────────────────────────────────── print("── Block 4: Lessons Learned CRUD ──") r = requests.post(f"{BASE}/knowledge/lessons", headers=h, json={ "title": "T1059 — Detection gap in PowerShell logging", "what_happened": "Red team ran encoded PS commands without triggering any alert.", "root_cause": "ScriptBlock logging was disabled on 80% of endpoints.", "fix_applied": "Enabled ScriptBlock logging via GPO. Validated in 48h.", "severity": "high", "entity_type": "manual", "technique_ids": [tid], "tags": ["powershell", "logging", "gpo"], }) check("POST /knowledge/lessons → 201", r.status_code == 201, r.text[:120]) ll1 = r.json() if r.status_code == 201 else {} check("Lesson has id", bool(ll1.get("id"))) check("Severity correct", ll1.get("severity") == "high") check("Tags saved", ll1.get("tags") == ["powershell", "logging", "gpo"]) check("technique_ids saved", tid in (ll1.get("technique_ids") or [])) # Second lesson — critical r = requests.post(f"{BASE}/knowledge/lessons", headers=h, json={ "title": "Lateral movement undetected", "what_happened": "PsExec used for lateral movement — zero detections.", "root_cause": "SMB traffic not monitored in SIEM.", "severity": "critical", "entity_type": "campaign", "tags": ["smb", "lateral-movement"], }) check("POST second lesson (critical) → 201", r.status_code == 201) ll2 = r.json() if r.status_code == 201 else {} # Invalid severity → 422 r = requests.post(f"{BASE}/knowledge/lessons", headers=h, json={ "title": "Bad severity", "what_happened": "x", "root_cause": "y", "severity": "extreme", }) check("POST invalid severity → 422", r.status_code == 422) # Invalid entity_type → 422 r = requests.post(f"{BASE}/knowledge/lessons", headers=h, json={ "title": "Bad entity_type", "what_happened": "x", "root_cause": "y", "entity_type": "incident", }) check("POST invalid entity_type → 422", r.status_code == 422) # GET list r = requests.get(f"{BASE}/knowledge/lessons", headers=h) check("GET /knowledge/lessons → 200", r.status_code == 200) check("List has ≥2 lessons", len(r.json()) >= 2) # GET single ll1_id = ll1.get("id") r = requests.get(f"{BASE}/knowledge/lessons/{ll1_id}", headers=h) check("GET /knowledge/lessons/{id} → 200", r.status_code == 200) check("Correct lesson returned", r.json().get("id") == ll1_id) print() # ── Block 5: Lessons filtering ──────────────────────────────────────────── print("── Block 5: Lessons Learned filtering ──") r = requests.get(f"{BASE}/knowledge/lessons", headers=h, params={"severity": "critical"}) check("Filter by severity=critical → 200", r.status_code == 200) results = r.json() check("Only critical lessons returned", all(l["severity"] == "critical" for l in results)) r = requests.get(f"{BASE}/knowledge/lessons", headers=h, params={"entity_type": "manual"}) check("Filter by entity_type=manual → 200", r.status_code == 200) check("Only manual lessons returned", all(l["entity_type"] == "manual" for l in r.json())) r = requests.get(f"{BASE}/knowledge/lessons", headers=h, params={"tag": "powershell"}) check("Filter by tag=powershell → 200", r.status_code == 200) check("Lesson with powershell tag returned", len(r.json()) >= 1) r = requests.get(f"{BASE}/knowledge/lessons", headers=h, params={"technique_id": tid}) check("Filter by technique_id → 200", r.status_code == 200) check("Lesson linked to technique returned", len(r.json()) >= 1) print() # ── Block 6: Lesson Update ──────────────────────────────────────────────── print("── Block 6: Lesson Learned update ──") r = requests.patch(f"{BASE}/knowledge/lessons/{ll1_id}", headers=h, json={ "fix_applied": "Updated fix: also enabled Module logging.", "tags": ["powershell", "logging", "gpo", "module-logging"], }) check("PATCH /knowledge/lessons/{id} → 200", r.status_code == 200) updated = r.json() check("Fix updated", "Module logging" in (updated.get("fix_applied") or "")) check("Tags extended", len(updated.get("tags", [])) == 4) # Invalid severity via PATCH → 422 r = requests.patch(f"{BASE}/knowledge/lessons/{ll1_id}", headers=h, json={ "severity": "extreme", }) check("PATCH invalid severity → 422", r.status_code == 422) print() # ── Block 7: Knowledge Stats ────────────────────────────────────────────── print("── Block 7: Knowledge Stats ──") r = requests.get(f"{BASE}/knowledge/stats", headers=h) check("GET /knowledge/stats → 200", r.status_code == 200) stats = r.json() check("total_playbooks ≥ 2", stats.get("total_playbooks", 0) >= 2) check("total_lessons ≥ 2", stats.get("total_lessons", 0) >= 2) check("lessons_by_severity has 'high'", "high" in stats.get("lessons_by_severity", {})) check("playbooks_by_type has 'detect'", "detect" in stats.get("playbooks_by_type", {})) check("Stats detect count ≥ 1", stats.get("playbooks_by_type", {}).get("detect", 0) >= 1) print() # ── Block 8: Soft-Delete ────────────────────────────────────────────────── print("── Block 8: Soft-delete ──") ll2_id = ll2.get("id") r = requests.delete(f"{BASE}/knowledge/lessons/{ll2_id}", headers=h) check("DELETE /knowledge/lessons/{id} → 204", r.status_code == 204) r = requests.get(f"{BASE}/knowledge/lessons/{ll2_id}", headers=h) check("GET deleted lesson → 404", r.status_code == 404) r = requests.get(f"{BASE}/knowledge/lessons", headers=h, params={"include_inactive": True}) check("GET ?include_inactive=true includes deleted lesson", r.status_code == 200) all_ids = [l["id"] for l in r.json()] check("Deleted lesson visible with include_inactive", ll2_id in all_ids) # Playbook soft-delete pb2_id = pb2.get("id") r = requests.delete(f"{BASE}/knowledge/playbooks/{pb2_id}", headers=h) check("DELETE playbook → 204", r.status_code == 204) r = requests.get(f"{BASE}/knowledge/playbooks/{pb2_id}", headers=h) check("GET deleted playbook → 404", r.status_code == 404) print() # ── Block 9: Auth protection ────────────────────────────────────────────── print("── Block 9: Auth protection (no token) ──") endpoints = [ ("GET", f"{BASE}/knowledge/playbooks"), ("POST", f"{BASE}/knowledge/playbooks"), ("GET", f"{BASE}/knowledge/lessons"), ("POST", f"{BASE}/knowledge/lessons"), ("GET", f"{BASE}/knowledge/stats"), ] for method, url in endpoints: r = requests.request(method, url, json={"title": "x", "what_happened": "x", "root_cause": "x", "technique_id": tid, "playbook_type": "hunt", "content": "x"}) check(f"{method} {url.split('/api/v1')[1]} without auth → 401", r.status_code == 401) print() # ── Block 10: Regression ───────────────────────────────────────────────── print("── Block 10: Regression (Phase 10 still works) ──") r = requests.get(f"{BASE}/attack-paths", headers=h) check("GET /attack-paths still works", r.status_code == 200) r = requests.get(f"{BASE}/ownership/analyst-dashboard", headers=h) check("GET /ownership/analyst-dashboard still works", r.status_code == 200) r = requests.get(f"{BASE}/detection-lifecycle/assets", headers=h) check("GET /detection-lifecycle/assets still works", r.status_code == 200) print() # ── Summary ─────────────────────────────────────────────────────────────── total = passed + failed print(f"====== Results: {passed}/{total} passed", end="") if failed: print(f" — \033[91m{failed} FAILED\033[0m ======\n") sys.exit(1) else: print(" ✓ ALL PASSED ======\n") if __name__ == "__main__": main()