""" QA script for Phase 13 — Operational Alerts. Run with: python -X utf8 scripts/qa_phase13_alerts.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import requests BASE = "http://localhost:8000/api/v1" PASS = "\033[92m✓\033[0m" FAIL = "\033[91m✗\033[0m" passed = 0 failed = 0 VALID_SEVERITIES = {"critical", "high", "medium", "low", "info"} VALID_STATUSES = {"open", "acknowledged", "resolved", "dismissed"} def check(label: str, condition: bool, detail: str = ""): global passed, failed if condition: passed += 1 print(f" {PASS} {label}") else: failed += 1 msg = f" {FAIL} {label}" if detail: msg += f" — {detail}" print(msg) def get_token(username="administrator", password="admin123"): r = requests.post(f"{BASE}/auth/login", data={"username": username, "password": password}) if r.status_code == 200: return r.json().get("access_token") or r.json().get("token") raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}") def auth(token): return {"Authorization": f"Bearer {token}"} # ───────────────────────────────────────────────────────────────────────────── def main(): print("\n====== Phase 13 QA — Operational Alerts ======\n") token = get_token() h = auth(token) # ── Block 1: System rules seeded ───────────────────────────────────────── print("── Block 1: System rules seeded at startup ──") r = requests.get(f"{BASE}/alerts/rules/list", headers=h, params={"include_disabled": True}) check("GET /alerts/rules/list → 200", r.status_code == 200, r.text[:150]) rules = r.json() if r.status_code == 200 else [] check("At least 8 system rules seeded", len(rules) >= 8, f"got {len(rules)}") system_rules = [r for r in rules if r.get("is_system")] check("All seeded rules are system rules", len(system_rules) == len(rules)) check("Rules have valid severities", all(r["severity"] in VALID_SEVERITIES for r in rules)) check("Rules have config dict", all(isinstance(r.get("config"), dict) for r in rules)) rule_types = {r["rule_type"] for r in rules} for rt in ("high_risk", "stale_technique", "coverage_regression", "expiry_wave", "new_technique", "orphan_spike"): check(f"Rule type '{rt}' present", rt in rule_types) print() # ── Block 2: Evaluate rules ─────────────────────────────────────────────── print("── Block 2: Evaluate alert rules ──") r = requests.post(f"{BASE}/alerts/evaluate", headers=h) check("POST /alerts/evaluate → 202", r.status_code == 202, r.text[:200]) result = r.json() if r.status_code == 202 else {} check("rules_evaluated > 0", result.get("rules_evaluated", 0) > 0, f"got {result.get('rules_evaluated')}") check("alerts_fired is int", isinstance(result.get("alerts_fired"), int)) check("duration_seconds present", "duration_seconds" in result) check("alerts list present", isinstance(result.get("alerts"), list)) print(f" fired={result.get('alerts_fired')} alerts in " f"{result.get('duration_seconds', '?')}s") if result.get("alerts"): a0 = result["alerts"][0] check("Alert has title", bool(a0.get("title"))) check("Alert has message", bool(a0.get("message"))) check("Alert has severity", a0.get("severity") in VALID_SEVERITIES) check("Alert status = open", a0.get("status") == "open") print() # ── Block 3: List & Get alerts ──────────────────────────────────────────── print("── Block 3: List & Get alert instances ──") r = requests.get(f"{BASE}/alerts", headers=h) check("GET /alerts → 200", r.status_code == 200, r.text[:150]) alerts = r.json() if r.status_code == 200 else [] check("Alerts is list", isinstance(alerts, list)) # Filter by status=open r = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open"}) check("GET /alerts?status=open → 200", r.status_code == 200) open_alerts = r.json() if r.status_code == 200 else [] if open_alerts: check("All filtered alerts have status=open", all(a["status"] == "open" for a in open_alerts)) # Filter by severity r = requests.get(f"{BASE}/alerts", headers=h, params={"severity": "critical"}) check("GET /alerts?severity=critical → 200", r.status_code == 200) # Pagination r = requests.get(f"{BASE}/alerts", headers=h, params={"limit": 3, "offset": 0}) check("GET /alerts?limit=3 returns ≤3", r.status_code == 200 and len(r.json()) <= 3) # Get single (use first from open_alerts if available) if open_alerts: aid = open_alerts[0]["id"] r = requests.get(f"{BASE}/alerts/{aid}", headers=h) check("GET /alerts/{id} → 200", r.status_code == 200) check("Correct id returned", r.json().get("id") == aid) # 404 for non-existent r = requests.get( f"{BASE}/alerts/00000000-0000-0000-0000-000000000001", headers=h ) check("GET non-existent alert → 404", r.status_code == 404) print() # ── Block 4: Alert lifecycle (acknowledge → resolve) ────────────────────── print("── Block 4: Alert lifecycle ──") # Re-evaluate to ensure we have open alerts (reset cooldowns first by creating custom rule) r_open = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open", "limit": 1}) open_now = r_open.json() if r_open.status_code == 200 else [] if open_now: aid = open_now[0]["id"] # Acknowledge r = requests.post(f"{BASE}/alerts/{aid}/acknowledge", headers=h) check("POST /alerts/{id}/acknowledge → 200", r.status_code == 200, r.text[:150]) ack = r.json() if r.status_code == 200 else {} check("Status = acknowledged after ack", ack.get("status") == "acknowledged") check("acknowledged_at set", bool(ack.get("acknowledged_at"))) check("acknowledged_by set", bool(ack.get("acknowledged_by"))) # Can't acknowledge again r = requests.post(f"{BASE}/alerts/{aid}/acknowledge", headers=h) check("Double-acknowledge → 400", r.status_code == 400) # Resolve r = requests.post(f"{BASE}/alerts/{aid}/resolve", headers=h) check("POST /alerts/{id}/resolve → 200", r.status_code == 200, r.text[:150]) res = r.json() if r.status_code == 200 else {} check("Status = resolved after resolve", res.get("status") == "resolved") check("resolved_at set", bool(res.get("resolved_at"))) # Can't resolve again r = requests.post(f"{BASE}/alerts/{aid}/resolve", headers=h) check("Double-resolve → 400", r.status_code == 400) else: print(" (no open alerts to test lifecycle — skipping acknowledge/resolve)") # Test dismiss on a different alert r_open2 = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open", "limit": 1}) open_now2 = r_open2.json() if r_open2.status_code == 200 else [] if open_now2: aid2 = open_now2[0]["id"] r = requests.post(f"{BASE}/alerts/{aid2}/dismiss", headers=h) check("POST /alerts/{id}/dismiss → 200", r.status_code == 200, r.text[:100]) dis = r.json() if r.status_code == 200 else {} check("Status = dismissed", dis.get("status") == "dismissed") else: print(" (no open alerts to dismiss — skipping)") print() # ── Block 5: Summary ────────────────────────────────────────────────────── print("── Block 5: Alert summary ──") r = requests.get(f"{BASE}/alerts/summary", headers=h) check("GET /alerts/summary → 200", r.status_code == 200, r.text[:150]) summary = r.json() if r.status_code == 200 else {} check("total_open is int", isinstance(summary.get("total_open"), int)) check("total_acknowledged is int", isinstance(summary.get("total_acknowledged"), int)) check("total_resolved is int", isinstance(summary.get("total_resolved"), int)) check("by_severity has all levels", all(k in summary.get("by_severity", {}) for k in ("critical", "high", "medium", "low", "info"))) check("by_rule_type is dict", isinstance(summary.get("by_rule_type"), dict)) check("recent_alerts is list", isinstance(summary.get("recent_alerts"), list)) print() # ── Block 6: Custom rule CRUD ───────────────────────────────────────────── print("── Block 6: Custom rule CRUD ──") custom_rule_body = { "name": "QA Custom Rule", "description": "Temporary rule for QA testing", "rule_type": "high_risk", "severity": "low", "config": {"min_risk_score": 99.9, "min_count": 9999}, # won't fire "cooldown_hours": 0, } r = requests.post(f"{BASE}/alerts/rules", headers=h, json=custom_rule_body) check("POST /alerts/rules → 201", r.status_code == 201, r.text[:200]) custom = r.json() if r.status_code == 201 else {} check("Custom rule is_system=False", custom.get("is_system") == False) custom_id = custom.get("id", "") # Get it if custom_id: r = requests.get(f"{BASE}/alerts/rules/{custom_id}", headers=h) check("GET /alerts/rules/{id} → 200", r.status_code == 200) # Update: disable it if custom_id: r = requests.patch(f"{BASE}/alerts/rules/{custom_id}", headers=h, json={"is_enabled": False, "severity": "info"}) check("PATCH /alerts/rules/{id} → 200", r.status_code == 200, r.text[:100]) upd = r.json() if r.status_code == 200 else {} check("is_enabled = False after patch", upd.get("is_enabled") == False) check("severity = info after patch", upd.get("severity") == "info") # System rule cannot be deleted system_rule_id = next((r["id"] for r in rules if r.get("is_system")), None) if system_rule_id: r = requests.delete(f"{BASE}/alerts/rules/{system_rule_id}", headers=h) check("DELETE system rule → 400", r.status_code == 400) # Delete custom rule if custom_id: r = requests.delete(f"{BASE}/alerts/rules/{custom_id}", headers=h) check("DELETE custom rule → 204", r.status_code == 204) # Invalid rule_type → 422 r = requests.post(f"{BASE}/alerts/rules", headers=h, json={ "name": "Bad", "rule_type": "nonexistent", "severity": "low", "config": {}, }) check("POST rule with invalid rule_type → 422", r.status_code == 422) print() # ── Block 7: Auth protection ────────────────────────────────────────────── print("── Block 7: Auth protection ──") for method, url in [ ("GET", f"{BASE}/alerts"), ("GET", f"{BASE}/alerts/summary"), ("GET", f"{BASE}/alerts/rules/list"), ]: r = requests.request(method, url) ep = url.split("/api/v1")[1] check(f"{method} {ep} without auth → 401", r.status_code == 401) r = requests.post(f"{BASE}/alerts/evaluate") check("POST /alerts/evaluate without auth → 401", r.status_code == 401) print() # ── Block 8: Regression ─────────────────────────────────────────────────── print("── Block 8: Regression ──") r = requests.get(f"{BASE}/dashboard/kpis", headers=h) check("GET /dashboard/kpis still works", r.status_code == 200) r = requests.get(f"{BASE}/risk/summary", headers=h) check("GET /risk/summary still works", r.status_code == 200) r = requests.get(f"{BASE}/api-keys", headers=h) check("GET /api-keys still works", r.status_code == 200) r = requests.get(f"{BASE}/techniques", headers=h) check("GET /techniques still works", r.status_code == 200) print() # ── Summary ─────────────────────────────────────────────────────────────── total = passed + failed print(f"====== Results: {passed}/{total} passed", end="") if failed: print(f" — \033[91m{failed} FAILED\033[0m ======\n") sys.exit(1) else: print(" ✓ ALL PASSED ======\n") if __name__ == "__main__": main()