test: gap verification script for Phase 13 gaps
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
This commit is contained in:
124
scripts/verify_gaps.py
Normal file
124
scripts/verify_gaps.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
"""
|
||||||
|
Verify Phase 13 gap fixes:
|
||||||
|
1. Hourly alert_evaluation job is registered in APScheduler
|
||||||
|
2. evaluate_all_rules creates in-app notifications for admins
|
||||||
|
3. webhook dispatch_webhook_targeted exists and is callable
|
||||||
|
"""
|
||||||
|
import requests, sys
|
||||||
|
|
||||||
|
BASE = "http://localhost:8000/api/v1"
|
||||||
|
PASS = "\033[92m✓\033[0m"
|
||||||
|
FAIL = "\033[91m✗\033[0m"
|
||||||
|
passed = 0
|
||||||
|
failed = 0
|
||||||
|
|
||||||
|
|
||||||
|
def check(label, cond, detail=""):
|
||||||
|
global passed, failed
|
||||||
|
if cond:
|
||||||
|
passed += 1
|
||||||
|
print(f" {PASS} {label}")
|
||||||
|
else:
|
||||||
|
failed += 1
|
||||||
|
print(f" {FAIL} {label}" + (f" — {detail}" if detail else ""))
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
print("\n====== Phase 13 Gap Verification ======\n")
|
||||||
|
|
||||||
|
# ── Gap 1: hourly job registered ──────────────────────────────────────────
|
||||||
|
print("── Gap 1: Hourly APScheduler job ──")
|
||||||
|
try:
|
||||||
|
from app.jobs.mitre_sync_job import scheduler
|
||||||
|
job_ids = [j.id for j in scheduler.get_jobs()]
|
||||||
|
check("alert_evaluation job registered", "alert_evaluation" in job_ids,
|
||||||
|
f"registered jobs: {job_ids}")
|
||||||
|
if "alert_evaluation" in job_ids:
|
||||||
|
job = next(j for j in scheduler.get_jobs() if j.id == "alert_evaluation")
|
||||||
|
trigger_str = str(job.trigger)
|
||||||
|
check("alert_evaluation trigger is interval", "interval" in trigger_str.lower(),
|
||||||
|
trigger_str)
|
||||||
|
print(f" next run: {job.next_run_time}")
|
||||||
|
except Exception as exc:
|
||||||
|
check("scheduler import", False, str(exc))
|
||||||
|
|
||||||
|
print()
|
||||||
|
|
||||||
|
# ── Gap 2: in-app notifications dispatched ────────────────────────────────
|
||||||
|
print("── Gap 2: In-app notifications on alert fire ──")
|
||||||
|
tok = requests.post(f"{BASE}/auth/login",
|
||||||
|
data={"username": "administrator", "password": "admin123"})
|
||||||
|
if tok.status_code != 200:
|
||||||
|
print(f" Login failed: {tok.text}"); sys.exit(1)
|
||||||
|
h = {"Authorization": f"Bearer {tok.json().get('access_token')}"}
|
||||||
|
|
||||||
|
# Mark all existing notifications as read so we start clean
|
||||||
|
existing = requests.get(f"{BASE}/notifications", headers=h).json()
|
||||||
|
existing_ids = [n["id"] for n in (existing if isinstance(existing, list) else [])]
|
||||||
|
|
||||||
|
# Reset rule cooldowns to 0 so they all fire
|
||||||
|
rules = requests.get(f"{BASE}/alerts/rules/list", headers=h,
|
||||||
|
params={"include_disabled": "true"}).json()
|
||||||
|
for rule in rules:
|
||||||
|
requests.patch(f"{BASE}/alerts/rules/{rule['id']}", headers=h,
|
||||||
|
json={"cooldown_hours": 0})
|
||||||
|
|
||||||
|
# Evaluate
|
||||||
|
eval_res = requests.post(f"{BASE}/alerts/evaluate", headers=h).json()
|
||||||
|
fired = eval_res.get("alerts_fired", 0)
|
||||||
|
check("evaluate_all_rules fires alerts", fired > 0, f"fired={fired}")
|
||||||
|
|
||||||
|
if fired > 0:
|
||||||
|
# Check notifications were created
|
||||||
|
new_notifs = requests.get(f"{BASE}/notifications", headers=h).json()
|
||||||
|
new_notifs = new_notifs if isinstance(new_notifs, list) else []
|
||||||
|
alert_notifs = [n for n in new_notifs
|
||||||
|
if n.get("type") == "alert_fired"
|
||||||
|
and n["id"] not in existing_ids]
|
||||||
|
check("In-app notifications created for alert_fired",
|
||||||
|
len(alert_notifs) > 0,
|
||||||
|
f"found {len(alert_notifs)} new alert_fired notifications")
|
||||||
|
if alert_notifs:
|
||||||
|
n0 = alert_notifs[0]
|
||||||
|
check("Notification has title", bool(n0.get("title")))
|
||||||
|
check("Notification entity_type = alert_instance",
|
||||||
|
n0.get("entity_type") == "alert_instance")
|
||||||
|
check("Notification entity_id set", bool(n0.get("entity_id")))
|
||||||
|
else:
|
||||||
|
print(" (no alerts fired — skipping notification check)")
|
||||||
|
|
||||||
|
# Restore cooldowns
|
||||||
|
for rule in rules:
|
||||||
|
requests.patch(f"{BASE}/alerts/rules/{rule['id']}", headers=h,
|
||||||
|
json={"cooldown_hours": rule.get("cooldown_hours", 24)})
|
||||||
|
|
||||||
|
print()
|
||||||
|
|
||||||
|
# ── Gap 3: webhook_service has dispatch_webhook_targeted ─────────────────
|
||||||
|
print("── Gap 3: dispatch_webhook_targeted exists ──")
|
||||||
|
try:
|
||||||
|
from app.services.webhook_service import dispatch_webhook_targeted
|
||||||
|
check("dispatch_webhook_targeted importable", True)
|
||||||
|
import inspect
|
||||||
|
sig = inspect.signature(dispatch_webhook_targeted)
|
||||||
|
params = list(sig.parameters)
|
||||||
|
check("signature: (webhook_id, event_type, payload)",
|
||||||
|
params == ["webhook_id", "event_type", "payload"],
|
||||||
|
str(params))
|
||||||
|
except ImportError as exc:
|
||||||
|
check("dispatch_webhook_targeted importable", False, str(exc))
|
||||||
|
|
||||||
|
print()
|
||||||
|
|
||||||
|
# ── Summary ───────────────────────────────────────────────────────────────
|
||||||
|
total = passed + failed
|
||||||
|
print(f"====== Results: {passed}/{total} passed", end="")
|
||||||
|
if failed:
|
||||||
|
print(f" — \033[91m{failed} FAILED\033[0m ======\n")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
print(" ✓ ALL PASSED ======\n")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user