feat(alerts): Phase 13 — Operational Alert Engine
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

AlertRule + AlertInstance models (b041alerts migration), 8 pre-seeded system
rules (high_risk x2, stale_technique, coverage_regression, low_coverage,
expiry_wave, new_technique, orphan_spike), evaluation engine with per-rule
cooldown, full alert lifecycle (acknowledge/resolve/dismiss), custom rule CRUD,
and summary endpoint. Rules seeded at app startup.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-21 15:25:55 +02:00
parent d81fc04b8f
commit d4b147da7c
8 changed files with 1387 additions and 0 deletions

View File

@@ -0,0 +1,302 @@
"""
QA script for Phase 13 — Operational Alerts.
Run with: python -X utf8 scripts/qa_phase13_alerts.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import requests
BASE = "http://localhost:8000/api/v1"
PASS = "\033[92m✓\033[0m"
FAIL = "\033[91m✗\033[0m"
passed = 0
failed = 0
VALID_SEVERITIES = {"critical", "high", "medium", "low", "info"}
VALID_STATUSES = {"open", "acknowledged", "resolved", "dismissed"}
def check(label: str, condition: bool, detail: str = ""):
global passed, failed
if condition:
passed += 1
print(f" {PASS} {label}")
else:
failed += 1
msg = f" {FAIL} {label}"
if detail:
msg += f"{detail}"
print(msg)
def get_token(username="administrator", password="admin123"):
r = requests.post(f"{BASE}/auth/login",
data={"username": username, "password": password})
if r.status_code == 200:
return r.json().get("access_token") or r.json().get("token")
raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}")
def auth(token):
return {"Authorization": f"Bearer {token}"}
# ─────────────────────────────────────────────────────────────────────────────
def main():
print("\n====== Phase 13 QA — Operational Alerts ======\n")
token = get_token()
h = auth(token)
# ── Block 1: System rules seeded ─────────────────────────────────────────
print("── Block 1: System rules seeded at startup ──")
r = requests.get(f"{BASE}/alerts/rules/list", headers=h,
params={"include_disabled": True})
check("GET /alerts/rules/list → 200", r.status_code == 200, r.text[:150])
rules = r.json() if r.status_code == 200 else []
check("At least 8 system rules seeded", len(rules) >= 8,
f"got {len(rules)}")
system_rules = [r for r in rules if r.get("is_system")]
check("All seeded rules are system rules", len(system_rules) == len(rules))
check("Rules have valid severities",
all(r["severity"] in VALID_SEVERITIES for r in rules))
check("Rules have config dict",
all(isinstance(r.get("config"), dict) for r in rules))
rule_types = {r["rule_type"] for r in rules}
for rt in ("high_risk", "stale_technique", "coverage_regression",
"expiry_wave", "new_technique", "orphan_spike"):
check(f"Rule type '{rt}' present", rt in rule_types)
print()
# ── Block 2: Evaluate rules ───────────────────────────────────────────────
print("── Block 2: Evaluate alert rules ──")
r = requests.post(f"{BASE}/alerts/evaluate", headers=h)
check("POST /alerts/evaluate → 202", r.status_code == 202, r.text[:200])
result = r.json() if r.status_code == 202 else {}
check("rules_evaluated > 0", result.get("rules_evaluated", 0) > 0,
f"got {result.get('rules_evaluated')}")
check("alerts_fired is int", isinstance(result.get("alerts_fired"), int))
check("duration_seconds present", "duration_seconds" in result)
check("alerts list present", isinstance(result.get("alerts"), list))
print(f" fired={result.get('alerts_fired')} alerts in "
f"{result.get('duration_seconds', '?')}s")
if result.get("alerts"):
a0 = result["alerts"][0]
check("Alert has title", bool(a0.get("title")))
check("Alert has message", bool(a0.get("message")))
check("Alert has severity", a0.get("severity") in VALID_SEVERITIES)
check("Alert status = open", a0.get("status") == "open")
print()
# ── Block 3: List & Get alerts ────────────────────────────────────────────
print("── Block 3: List & Get alert instances ──")
r = requests.get(f"{BASE}/alerts", headers=h)
check("GET /alerts → 200", r.status_code == 200, r.text[:150])
alerts = r.json() if r.status_code == 200 else []
check("Alerts is list", isinstance(alerts, list))
# Filter by status=open
r = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open"})
check("GET /alerts?status=open → 200", r.status_code == 200)
open_alerts = r.json() if r.status_code == 200 else []
if open_alerts:
check("All filtered alerts have status=open",
all(a["status"] == "open" for a in open_alerts))
# Filter by severity
r = requests.get(f"{BASE}/alerts", headers=h, params={"severity": "critical"})
check("GET /alerts?severity=critical → 200", r.status_code == 200)
# Pagination
r = requests.get(f"{BASE}/alerts", headers=h, params={"limit": 3, "offset": 0})
check("GET /alerts?limit=3 returns ≤3", r.status_code == 200 and len(r.json()) <= 3)
# Get single (use first from open_alerts if available)
if open_alerts:
aid = open_alerts[0]["id"]
r = requests.get(f"{BASE}/alerts/{aid}", headers=h)
check("GET /alerts/{id} → 200", r.status_code == 200)
check("Correct id returned", r.json().get("id") == aid)
# 404 for non-existent
r = requests.get(
f"{BASE}/alerts/00000000-0000-0000-0000-000000000001", headers=h
)
check("GET non-existent alert → 404", r.status_code == 404)
print()
# ── Block 4: Alert lifecycle (acknowledge → resolve) ──────────────────────
print("── Block 4: Alert lifecycle ──")
# Re-evaluate to ensure we have open alerts (reset cooldowns first by creating custom rule)
r_open = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open", "limit": 1})
open_now = r_open.json() if r_open.status_code == 200 else []
if open_now:
aid = open_now[0]["id"]
# Acknowledge
r = requests.post(f"{BASE}/alerts/{aid}/acknowledge", headers=h)
check("POST /alerts/{id}/acknowledge → 200", r.status_code == 200,
r.text[:150])
ack = r.json() if r.status_code == 200 else {}
check("Status = acknowledged after ack", ack.get("status") == "acknowledged")
check("acknowledged_at set", bool(ack.get("acknowledged_at")))
check("acknowledged_by set", bool(ack.get("acknowledged_by")))
# Can't acknowledge again
r = requests.post(f"{BASE}/alerts/{aid}/acknowledge", headers=h)
check("Double-acknowledge → 400", r.status_code == 400)
# Resolve
r = requests.post(f"{BASE}/alerts/{aid}/resolve", headers=h)
check("POST /alerts/{id}/resolve → 200", r.status_code == 200,
r.text[:150])
res = r.json() if r.status_code == 200 else {}
check("Status = resolved after resolve", res.get("status") == "resolved")
check("resolved_at set", bool(res.get("resolved_at")))
# Can't resolve again
r = requests.post(f"{BASE}/alerts/{aid}/resolve", headers=h)
check("Double-resolve → 400", r.status_code == 400)
else:
print(" (no open alerts to test lifecycle — skipping acknowledge/resolve)")
# Test dismiss on a different alert
r_open2 = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open", "limit": 1})
open_now2 = r_open2.json() if r_open2.status_code == 200 else []
if open_now2:
aid2 = open_now2[0]["id"]
r = requests.post(f"{BASE}/alerts/{aid2}/dismiss", headers=h)
check("POST /alerts/{id}/dismiss → 200", r.status_code == 200, r.text[:100])
dis = r.json() if r.status_code == 200 else {}
check("Status = dismissed", dis.get("status") == "dismissed")
else:
print(" (no open alerts to dismiss — skipping)")
print()
# ── Block 5: Summary ──────────────────────────────────────────────────────
print("── Block 5: Alert summary ──")
r = requests.get(f"{BASE}/alerts/summary", headers=h)
check("GET /alerts/summary → 200", r.status_code == 200, r.text[:150])
summary = r.json() if r.status_code == 200 else {}
check("total_open is int", isinstance(summary.get("total_open"), int))
check("total_acknowledged is int", isinstance(summary.get("total_acknowledged"), int))
check("total_resolved is int", isinstance(summary.get("total_resolved"), int))
check("by_severity has all levels",
all(k in summary.get("by_severity", {})
for k in ("critical", "high", "medium", "low", "info")))
check("by_rule_type is dict", isinstance(summary.get("by_rule_type"), dict))
check("recent_alerts is list", isinstance(summary.get("recent_alerts"), list))
print()
# ── Block 6: Custom rule CRUD ─────────────────────────────────────────────
print("── Block 6: Custom rule CRUD ──")
custom_rule_body = {
"name": "QA Custom Rule",
"description": "Temporary rule for QA testing",
"rule_type": "high_risk",
"severity": "low",
"config": {"min_risk_score": 99.9, "min_count": 9999}, # won't fire
"cooldown_hours": 0,
}
r = requests.post(f"{BASE}/alerts/rules", headers=h, json=custom_rule_body)
check("POST /alerts/rules → 201", r.status_code == 201, r.text[:200])
custom = r.json() if r.status_code == 201 else {}
check("Custom rule is_system=False", custom.get("is_system") == False)
custom_id = custom.get("id", "")
# Get it
if custom_id:
r = requests.get(f"{BASE}/alerts/rules/{custom_id}", headers=h)
check("GET /alerts/rules/{id} → 200", r.status_code == 200)
# Update: disable it
if custom_id:
r = requests.patch(f"{BASE}/alerts/rules/{custom_id}", headers=h,
json={"is_enabled": False, "severity": "info"})
check("PATCH /alerts/rules/{id} → 200", r.status_code == 200, r.text[:100])
upd = r.json() if r.status_code == 200 else {}
check("is_enabled = False after patch", upd.get("is_enabled") == False)
check("severity = info after patch", upd.get("severity") == "info")
# System rule cannot be deleted
system_rule_id = next((r["id"] for r in rules if r.get("is_system")), None)
if system_rule_id:
r = requests.delete(f"{BASE}/alerts/rules/{system_rule_id}", headers=h)
check("DELETE system rule → 400", r.status_code == 400)
# Delete custom rule
if custom_id:
r = requests.delete(f"{BASE}/alerts/rules/{custom_id}", headers=h)
check("DELETE custom rule → 204", r.status_code == 204)
# Invalid rule_type → 422
r = requests.post(f"{BASE}/alerts/rules", headers=h, json={
"name": "Bad", "rule_type": "nonexistent", "severity": "low", "config": {},
})
check("POST rule with invalid rule_type → 422", r.status_code == 422)
print()
# ── Block 7: Auth protection ──────────────────────────────────────────────
print("── Block 7: Auth protection ──")
for method, url in [
("GET", f"{BASE}/alerts"),
("GET", f"{BASE}/alerts/summary"),
("GET", f"{BASE}/alerts/rules/list"),
]:
r = requests.request(method, url)
ep = url.split("/api/v1")[1]
check(f"{method} {ep} without auth → 401", r.status_code == 401)
r = requests.post(f"{BASE}/alerts/evaluate")
check("POST /alerts/evaluate without auth → 401", r.status_code == 401)
print()
# ── Block 8: Regression ───────────────────────────────────────────────────
print("── Block 8: Regression ──")
r = requests.get(f"{BASE}/dashboard/kpis", headers=h)
check("GET /dashboard/kpis still works", r.status_code == 200)
r = requests.get(f"{BASE}/risk/summary", headers=h)
check("GET /risk/summary still works", r.status_code == 200)
r = requests.get(f"{BASE}/api-keys", headers=h)
check("GET /api-keys still works", r.status_code == 200)
r = requests.get(f"{BASE}/techniques", headers=h)
check("GET /techniques still works", r.status_code == 200)
print()
# ── Summary ───────────────────────────────────────────────────────────────
total = passed + failed
print(f"====== Results: {passed}/{total} passed", end="")
if failed:
print(f"\033[91m{failed} FAILED\033[0m ======\n")
sys.exit(1)
else:
print(" ✓ ALL PASSED ======\n")
if __name__ == "__main__":
main()