diff --git a/scripts/verify_gaps.py b/scripts/verify_gaps.py new file mode 100644 index 0000000..f22d920 --- /dev/null +++ b/scripts/verify_gaps.py @@ -0,0 +1,124 @@ +""" +Verify Phase 13 gap fixes: +1. Hourly alert_evaluation job is registered in APScheduler +2. evaluate_all_rules creates in-app notifications for admins +3. webhook dispatch_webhook_targeted exists and is callable +""" +import requests, sys + +BASE = "http://localhost:8000/api/v1" +PASS = "\033[92m✓\033[0m" +FAIL = "\033[91m✗\033[0m" +passed = 0 +failed = 0 + + +def check(label, cond, detail=""): + global passed, failed + if cond: + passed += 1 + print(f" {PASS} {label}") + else: + failed += 1 + print(f" {FAIL} {label}" + (f" — {detail}" if detail else "")) + + +def main(): + print("\n====== Phase 13 Gap Verification ======\n") + + # ── Gap 1: hourly job registered ────────────────────────────────────────── + print("── Gap 1: Hourly APScheduler job ──") + try: + from app.jobs.mitre_sync_job import scheduler + job_ids = [j.id for j in scheduler.get_jobs()] + check("alert_evaluation job registered", "alert_evaluation" in job_ids, + f"registered jobs: {job_ids}") + if "alert_evaluation" in job_ids: + job = next(j for j in scheduler.get_jobs() if j.id == "alert_evaluation") + trigger_str = str(job.trigger) + check("alert_evaluation trigger is interval", "interval" in trigger_str.lower(), + trigger_str) + print(f" next run: {job.next_run_time}") + except Exception as exc: + check("scheduler import", False, str(exc)) + + print() + + # ── Gap 2: in-app notifications dispatched ──────────────────────────────── + print("── Gap 2: In-app notifications on alert fire ──") + tok = requests.post(f"{BASE}/auth/login", + data={"username": "administrator", "password": "admin123"}) + if tok.status_code != 200: + print(f" Login failed: {tok.text}"); sys.exit(1) + h = {"Authorization": f"Bearer {tok.json().get('access_token')}"} + + # Mark all existing notifications as read so we start clean + existing = requests.get(f"{BASE}/notifications", headers=h).json() + existing_ids = [n["id"] for n in (existing if isinstance(existing, list) else [])] + + # Reset rule cooldowns to 0 so they all fire + rules = requests.get(f"{BASE}/alerts/rules/list", headers=h, + params={"include_disabled": "true"}).json() + for rule in rules: + requests.patch(f"{BASE}/alerts/rules/{rule['id']}", headers=h, + json={"cooldown_hours": 0}) + + # Evaluate + eval_res = requests.post(f"{BASE}/alerts/evaluate", headers=h).json() + fired = eval_res.get("alerts_fired", 0) + check("evaluate_all_rules fires alerts", fired > 0, f"fired={fired}") + + if fired > 0: + # Check notifications were created + new_notifs = requests.get(f"{BASE}/notifications", headers=h).json() + new_notifs = new_notifs if isinstance(new_notifs, list) else [] + alert_notifs = [n for n in new_notifs + if n.get("type") == "alert_fired" + and n["id"] not in existing_ids] + check("In-app notifications created for alert_fired", + len(alert_notifs) > 0, + f"found {len(alert_notifs)} new alert_fired notifications") + if alert_notifs: + n0 = alert_notifs[0] + check("Notification has title", bool(n0.get("title"))) + check("Notification entity_type = alert_instance", + n0.get("entity_type") == "alert_instance") + check("Notification entity_id set", bool(n0.get("entity_id"))) + else: + print(" (no alerts fired — skipping notification check)") + + # Restore cooldowns + for rule in rules: + requests.patch(f"{BASE}/alerts/rules/{rule['id']}", headers=h, + json={"cooldown_hours": rule.get("cooldown_hours", 24)}) + + print() + + # ── Gap 3: webhook_service has dispatch_webhook_targeted ───────────────── + print("── Gap 3: dispatch_webhook_targeted exists ──") + try: + from app.services.webhook_service import dispatch_webhook_targeted + check("dispatch_webhook_targeted importable", True) + import inspect + sig = inspect.signature(dispatch_webhook_targeted) + params = list(sig.parameters) + check("signature: (webhook_id, event_type, payload)", + params == ["webhook_id", "event_type", "payload"], + str(params)) + except ImportError as exc: + check("dispatch_webhook_targeted importable", False, str(exc)) + + print() + + # ── Summary ─────────────────────────────────────────────────────────────── + total = passed + failed + print(f"====== Results: {passed}/{total} passed", end="") + if failed: + print(f" — \033[91m{failed} FAILED\033[0m ======\n") + sys.exit(1) + else: + print(" ✓ ALL PASSED ======\n") + + +if __name__ == "__main__": + main()