#!/usr/bin/env python3 """ Aegis QA Runner — Tests all roles and access-control rules. Run inside the server with: python /tmp/qa_runner.py """ import json import sys import time import uuid from typing import Optional import requests BASE = "http://localhost:8000/api/v1" PASS = "Admin1234!QA99" # 14 chars — meets 12-char complexity requirement ADMIN_USER = "administrator" ADMIN_PASS = "Admin1234!" RESULTS: list[dict] = [] def _r(label: str, ok: bool, detail: str = "") -> None: symbol = "[PASS]" if ok else "[FAIL]" RESULTS.append({"label": label, "ok": ok, "detail": detail}) print(f" {symbol} {label}" + (f" [{detail}]" if detail else "")) def login(username: str, password: str) -> Optional[requests.Session]: s = requests.Session() # Login uses OAuth2PasswordRequestForm (form-encoded, not JSON) resp = s.post(f"{BASE}/auth/login", data={"username": username, "password": password}, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=10) if resp.status_code == 200: # Also set Authorization header for API-key-style usage if token in body token = resp.json().get("access_token") if token: s.headers["Authorization"] = f"Bearer {token}" return s return None # ─── helpers ───────────────────────────────────────────────────────────────── def expect(label: str, resp: requests.Response, expected_status: int) -> bool: ok = resp.status_code == expected_status _r(label, ok, f"got {resp.status_code} expected {expected_status}") return ok def expect_403(label: str, resp: requests.Response) -> bool: return expect(label, resp, 403) def expect_ok(label: str, resp: requests.Response) -> bool: ok = resp.status_code in (200, 201, 202, 204) _r(label, ok, f"got {resp.status_code}") return ok # ─── setup ──────────────────────────────────────────────────────────────────── def find_admin_session() -> requests.Session: """Login as admin and return the session (1 login call total).""" s = login(ADMIN_USER, ADMIN_PASS) if s: print(f" [admin login OK: {ADMIN_USER}]") return s raise RuntimeError(f"Cannot login as {ADMIN_USER} with configured password") def create_test_users(admin_session: requests.Session) -> dict[str, str]: """Create one user per role. Returns {role: username}.""" roles = ["red_lead", "blue_lead", "red_tech", "blue_tech", "viewer"] users = {} suffix = str(uuid.uuid4())[:8] for role in roles: uname = f"qa_{role}_{suffix}" resp = admin_session.post(f"{BASE}/users", json={ "username": uname, "password": PASS, "email": f"{uname}@qa.test", "full_name": f"QA {role}", "role": role, }, timeout=10) if resp.status_code == 201: users[role] = uname print(f" Created {role}: {uname}") else: print(f" WARN: could not create {role}: {resp.status_code} {resp.text[:100]}") return users def get_sessions(users: dict[str, str], admin_session: requests.Session) -> dict[str, requests.Session]: """Login all test users. Reuses the existing admin_session to avoid extra login calls.""" sessions = {"admin": admin_session} for role, uname in users.items(): # Small delay between logins to avoid rate-limit (5/minute per IP) time.sleep(13) # 60s / 5 requests = 12s spacing, +1s buffer s = login(uname, PASS) if s: # If must_change_password, change it first me = s.get(f"{BASE}/auth/me").json() if me.get("must_change_password"): s.post(f"{BASE}/auth/change-password", json={"current_password": PASS, "new_password": PASS}) sessions[role] = s else: print(f" WARN: login failed for {role}:{uname}") return sessions # ─── test suites ───────────────────────────────────────────────────────────── def test_auth(sessions: dict) -> None: print("\n── 1. Auth ──────────────────────────────────────────────────") s = sessions.get("viewer") if not s: return expect_ok("GET /auth/me (viewer)", s.get(f"{BASE}/auth/me")) # Rate limit: 5 bad logins — Note: when called from localhost (inside container), # some rate limiters exempt 127.0.0.1. Mark as info-only. print(" Testing rate limit on login (6 bad attempts from loopback)...") blocked = False for i in range(6): r = requests.post(f"{BASE}/auth/login", data={"username": "nobody_rate_test", "password": f"bad{i}"}, headers={"Content-Type": "application/x-www-form-urlencoded"}, timeout=5) if r.status_code == 429: blocked = True break if blocked: _r("Login rate-limited after 5 failures", True, "429 received") else: print(" [INFO] Rate limit not triggered from loopback (may be exempt on localhost) — expected in production") def test_user_management(sessions: dict) -> None: print("\n── 2. User management ───────────────────────────────────────") admin = sessions.get("admin") red_lead = sessions.get("red_lead") viewer = sessions.get("viewer") if admin: expect_ok("admin: GET /users (list all)", admin.get(f"{BASE}/users")) if red_lead: expect_403("red_lead: GET /users → 403", red_lead.get(f"{BASE}/users")) if viewer: expect_403("viewer: GET /users → 403", viewer.get(f"{BASE}/users")) expect_403("viewer: POST /users → 403", viewer.post(f"{BASE}/users", json={"username": "hack", "password": "X", "email": "h@h.com", "role": "admin"})) def test_techniques(sessions: dict) -> None: print("\n── 3. Techniques (MITRE) ────────────────────────────────────") admin = sessions.get("admin") red_tech = sessions.get("red_tech") viewer = sessions.get("viewer") if viewer: r = viewer.get(f"{BASE}/techniques?limit=5") expect_ok("viewer: GET /techniques", r) if red_tech: expect_403("red_tech: POST /techniques → 403", red_tech.post(f"{BASE}/techniques", json={"mitre_id": "T9999", "name": "Hack", "tactic": "execution"})) r = red_tech.get(f"{BASE}/techniques?status=not_covered&limit=5") expect_ok("red_tech: GET /techniques?status=not_covered", r) def test_full_test_lifecycle(sessions: dict) -> dict: """Creates a test and walks it through the full state machine.""" print("\n── 4. Full test lifecycle ────────────────────────────────────") admin = sessions.get("admin") red_lead = sessions.get("red_lead") blue_lead = sessions.get("blue_lead") red_tech = sessions.get("red_tech") blue_tech = sessions.get("blue_tech") viewer = sessions.get("viewer") state = {} if not red_lead: print(" SKIP: no red_lead session") return state # Get a technique techs = red_lead.get(f"{BASE}/techniques?limit=5").json() if not techs: print(" SKIP: no techniques in DB") return state tech_id = techs[0]["id"] state["technique_id"] = tech_id # Create test as red_lead r = red_lead.post(f"{BASE}/tests", json={ "technique_id": tech_id, "name": f"QA Test {uuid.uuid4().hex[:6]}", "platform": "windows", "description": "Automated QA test", }) if not expect_ok("red_lead: POST /tests (create)", r): return state test_id = r.json()["id"] state["test_id"] = test_id print(f" Test created: {test_id}") # red_tech cannot create tests if red_tech: expect_403("red_tech: POST /tests → 403", red_tech.post(f"{BASE}/tests", json={ "technique_id": tech_id, "name": "Hack", "platform": "windows"})) # viewer cannot create tests if viewer: expect_403("viewer: POST /tests → 403", viewer.post(f"{BASE}/tests", json={ "technique_id": tech_id, "name": "Hack", "platform": "windows"})) # Check state = draft t = red_lead.get(f"{BASE}/tests/{test_id}").json() _r("test state = draft after creation", t.get("state") == "draft", f"state={t.get('state')}") # Start execution (red_tech) if red_tech: r = red_tech.post(f"{BASE}/tests/{test_id}/start-execution") expect_ok("red_tech: start-execution", r) else: r = red_lead.post(f"{BASE}/tests/{test_id}/start-execution") expect_ok("red_lead: start-execution", r) # Verify state = red_executing t = red_lead.get(f"{BASE}/tests/{test_id}").json() _r("test state = red_executing", t.get("state") == "red_executing", f"state={t.get('state')}") # blue_tech cannot start-execution if blue_tech: r2 = red_lead.post(f"{BASE}/tests", json={ "technique_id": tech_id, "name": f"QA Test2 {uuid.uuid4().hex[:6]}", "platform": "linux", }) if r2.status_code == 201: t2_id = r2.json()["id"] expect_403("blue_tech: start-execution → 403", blue_tech.post(f"{BASE}/tests/{t2_id}/start-execution")) # Clean up test2 admin.delete(f"{BASE}/tests/{t2_id}") if admin else None # Update red fields actor = red_tech or red_lead r = actor.post(f"{BASE}/tests/{test_id}/red", json={"tool_used": "QA Tool", "command_executed": "echo test"}) if r.status_code == 405: r = actor.patch(f"{BASE}/tests/{test_id}/red", json={"tool_used": "QA Tool", "command_executed": "echo test"}) expect_ok("red actor: PATCH /red fields", r) # blue_tech cannot update red fields if blue_tech: rb = blue_tech.patch(f"{BASE}/tests/{test_id}/red", json={"tool_used": "evil"}) expect_403("blue_tech: PATCH /red → 403", rb) # Submit red r = actor.post(f"{BASE}/tests/{test_id}/submit-red") expect_ok("red actor: submit-red", r) t = red_lead.get(f"{BASE}/tests/{test_id}").json() _r("test state = blue_evaluating", t.get("state") == "blue_evaluating", f"state={t.get('state')}") # red_tech cannot submit-blue if red_tech: expect_403("red_tech: submit-blue → 403", red_tech.post(f"{BASE}/tests/{test_id}/submit-blue")) # Update blue fields bactor = blue_tech or blue_lead if bactor: r = bactor.patch(f"{BASE}/tests/{test_id}/blue", json={"detection_result": "detected", "detection_notes": "QA test detection"}) expect_ok("blue actor: PATCH /blue fields", r) # red_lead cannot update blue fields if red_lead: expect_403("red_lead: PATCH /blue → 403", red_lead.patch(f"{BASE}/tests/{test_id}/blue", json={"detection_result": "not_detected"})) # Submit blue r = bactor.post(f"{BASE}/tests/{test_id}/submit-blue") expect_ok("blue actor: submit-blue", r) t = red_lead.get(f"{BASE}/tests/{test_id}").json() _r("test state = in_review", t.get("state") == "in_review", f"state={t.get('state')}") # Validate red (red_lead) if red_lead: r = red_lead.post(f"{BASE}/tests/{test_id}/validate-red", json={"red_validation_status": "approved", "red_validation_notes": "QA approved"}) expect_ok("red_lead: validate-red", r) # Validate blue (blue_lead) if blue_lead: r = blue_lead.post(f"{BASE}/tests/{test_id}/validate-blue", json={"blue_validation_status": "approved", "blue_validation_notes": "QA approved"}) expect_ok("blue_lead: validate-blue", r) t = red_lead.get(f"{BASE}/tests/{test_id}").json() if red_lead else blue_lead.get(f"{BASE}/tests/{test_id}").json() _r("test state = validated", t.get("state") == "validated", f"state={t.get('state')}") return state def test_knowledge(sessions: dict, state: dict) -> None: print("\n── 5. Knowledge (Playbooks + Lessons) ───────────────────────") red_lead = sessions.get("red_lead") red_tech = sessions.get("red_tech") viewer = sessions.get("viewer") # Get a technique_id to satisfy the required field technique_id = state.get("technique_id") if not technique_id and red_lead: techs = red_lead.get(f"{BASE}/techniques?limit=1").json() if techs: technique_id = techs[0]["id"] if red_lead and technique_id: r = red_lead.post(f"{BASE}/knowledge/playbooks", json={ "title": f"QA Playbook {uuid.uuid4().hex[:6]}", "playbook_type": "attack", "content": "QA test content", "technique_id": technique_id, }) # 201 = created, 409 = already exists (unique per technique+type) — both mean auth OK ok = r.status_code in (201, 409) _r("red_lead: POST /knowledge/playbooks (201 or 409)", ok, f"got {r.status_code}") r2 = red_lead.post(f"{BASE}/knowledge/lessons", json={ "title": "QA Lesson", "what_happened": "During QA testing, the attack path was discovered", "root_cause": "Missing detection rule for the technique", "severity": "medium", }) expect_ok("red_lead: POST /knowledge/lessons", r2) elif red_lead: print(" SKIP: no technique_id available for knowledge tests") if red_tech: tid = technique_id or str(uuid.uuid4()) expect_403("red_tech: POST /knowledge/playbooks → 403", red_tech.post(f"{BASE}/knowledge/playbooks", json={ "title": "Hack", "playbook_type": "attack", "content": "X", "technique_id": tid})) expect_403("red_tech: POST /knowledge/lessons → 403", red_tech.post(f"{BASE}/knowledge/lessons", json={ "title": "X", "what_happened": "Y", "root_cause": "Z", "severity": "low"})) if viewer: expect_ok("viewer: GET /knowledge/playbooks", viewer.get(f"{BASE}/knowledge/playbooks")) expect_ok("viewer: GET /knowledge/lessons", viewer.get(f"{BASE}/knowledge/lessons")) expect_403("viewer: POST /knowledge/playbooks → 403", viewer.post(f"{BASE}/knowledge/playbooks", json={ "title": "X", "playbook_type": "attack", "content": "Y", "technique_id": str(uuid.uuid4())})) def test_alerts(sessions: dict) -> None: print("\n── 6. Operational Alerts ─────────────────────────────────────") admin = sessions.get("admin") red_lead = sessions.get("red_lead") red_tech = sessions.get("red_tech") viewer = sessions.get("viewer") if red_lead: expect_ok("red_lead: GET /alerts", red_lead.get(f"{BASE}/alerts")) expect_ok("red_lead: GET /alerts/summary", red_lead.get(f"{BASE}/alerts/summary")) expect_ok("red_lead: GET /alerts/rules/list", red_lead.get(f"{BASE}/alerts/rules/list")) r = red_lead.post(f"{BASE}/alerts/evaluate") expect_ok("red_lead: POST /alerts/evaluate", r) if red_tech: expect_ok("red_tech: GET /alerts", red_tech.get(f"{BASE}/alerts")) expect_403("red_tech: POST /alerts/evaluate → 403", red_tech.post(f"{BASE}/alerts/evaluate")) if viewer: expect_ok("viewer: GET /alerts", viewer.get(f"{BASE}/alerts")) expect_403("viewer: POST /alerts/evaluate → 403", viewer.post(f"{BASE}/alerts/evaluate")) # Try to acknowledge an alert if any exist if red_lead: alerts = red_lead.get(f"{BASE}/alerts?status=open&limit=1").json() if alerts: alert_id = alerts[0]["id"] if red_tech: expect_403("red_tech: acknowledge alert → 403", red_tech.post(f"{BASE}/alerts/{alert_id}/acknowledge")) expect_ok("red_lead: acknowledge alert", red_lead.post(f"{BASE}/alerts/{alert_id}/acknowledge")) def test_snapshots(sessions: dict) -> None: print("\n── 7. Snapshots ─────────────────────────────────────────────") admin = sessions.get("admin") red_lead = sessions.get("red_lead") viewer = sessions.get("viewer") if viewer: expect_ok("viewer: GET /snapshots", viewer.get(f"{BASE}/snapshots")) expect_ok("viewer: GET /snapshots/evolution", viewer.get(f"{BASE}/snapshots/evolution")) expect_403("viewer: POST /snapshots → 403", viewer.post(f"{BASE}/snapshots", json={"label": "QA Snap"})) if red_lead: r = red_lead.post(f"{BASE}/snapshots", json={"label": "QA Snap"}) expect_ok("red_lead: POST /snapshots", r) if r.status_code in (200, 201): snap_id = r.json()["id"] expect_403("red_lead: DELETE /snapshots/{id} → 403", red_lead.delete(f"{BASE}/snapshots/{snap_id}")) if admin: expect_ok("admin: DELETE /snapshots/{id}", admin.delete(f"{BASE}/snapshots/{snap_id}")) def test_dashboard(sessions: dict) -> None: print("\n── 8. Executive Dashboard ───────────────────────────────────") viewer = sessions.get("viewer") red_tech = sessions.get("red_tech") for role, s in sessions.items(): if s: r = s.get(f"{BASE}/dashboard/kpis") expect_ok(f"{role}: GET /dashboard/kpis", r) break # just test once if viewer: expect_ok("viewer: GET /dashboard/executive", viewer.get(f"{BASE}/dashboard/executive")) expect_403("viewer: POST /dashboard/posture-snapshot → 403", viewer.post(f"{BASE}/dashboard/posture-snapshot")) if red_tech: expect_403("red_tech: POST /dashboard/posture-snapshot → 403", red_tech.post(f"{BASE}/dashboard/posture-snapshot")) def test_campaigns(sessions: dict, state: dict) -> None: print("\n── 9. Campaigns ─────────────────────────────────────────────") red_lead = sessions.get("red_lead") blue_lead = sessions.get("blue_lead") red_tech = sessions.get("red_tech") if red_tech: expect_ok("red_tech: GET /campaigns", red_tech.get(f"{BASE}/campaigns")) expect_403("red_tech: POST /campaigns → 403", red_tech.post(f"{BASE}/campaigns", json={"name": "Hack Camp", "campaign_type": "purple_team"})) if red_lead: r = red_lead.post(f"{BASE}/campaigns", json={"name": f"QA Campaign {uuid.uuid4().hex[:6]}", "campaign_type": "purple_team"}) expect_ok("red_lead: POST /campaigns", r) if r.status_code == 201: camp_id = r.json()["id"] # blue_lead cannot complete (on a fresh campaign — state check doesn't matter) if blue_lead: expect_403("blue_lead: POST /campaigns/{id}/complete → 403", blue_lead.post(f"{BASE}/campaigns/{camp_id}/complete")) # To complete a campaign: need a test, then activate, then complete test_id = state.get("test_id") technique_id = state.get("technique_id") # Add an existing test from state or create a new one if test_id: red_lead.post(f"{BASE}/campaigns/{camp_id}/tests", json={"test_id": test_id}) elif technique_id: # Create a fresh test and add it rt = red_lead.post(f"{BASE}/tests", json={ "technique_id": technique_id, "name": "QA Campaign Test", "platform": "windows", }) if rt.status_code == 201: test_id = rt.json()["id"] red_lead.post(f"{BASE}/campaigns/{camp_id}/tests", json={"test_id": test_id}) # Activate the campaign (needs at least one test) r_act = red_lead.post(f"{BASE}/campaigns/{camp_id}/activate") if r_act.status_code in (200, 204): # Now complete it expect_ok("red_lead: POST /campaigns/{id}/complete", red_lead.post(f"{BASE}/campaigns/{camp_id}/complete")) else: print(f" NOTE: campaign activate returned {r_act.status_code}: {r_act.text[:100]}") # Still test that complete is allowed for red_lead (even if state error) r_comp = red_lead.post(f"{BASE}/campaigns/{camp_id}/complete") _r("red_lead: POST /campaigns/{id}/complete", r_comp.status_code in (200, 204, 400), # 400=business rule, not 403 f"got {r_comp.status_code}") def test_webhooks(sessions: dict) -> None: print("\n── 10. Webhooks (admin only) ─────────────────────────────────") admin = sessions.get("admin") red_tech = sessions.get("red_tech") red_lead = sessions.get("red_lead") if red_tech: expect_403("red_tech: GET /webhooks → 403", red_tech.get(f"{BASE}/webhooks")) if red_lead: expect_403("red_lead: GET /webhooks → 403 (admin-only)", red_lead.get(f"{BASE}/webhooks")) if admin: expect_ok("admin: GET /webhooks", admin.get(f"{BASE}/webhooks")) # Test SSRF protection r = admin.post(f"{BASE}/webhooks", json={ "name": "SSRF Test", "url": "http://192.168.1.1/steal", "events": ["test.created"], }) _r("SSRF webhook blocked (private IP)", r.status_code == 422, f"got {r.status_code}") def test_audit_logs(sessions: dict) -> None: print("\n── 11. Audit Logs ───────────────────────────────────────────") admin = sessions.get("admin") red_lead = sessions.get("red_lead") viewer = sessions.get("viewer") if admin: expect_ok("admin: GET /audit-logs", admin.get(f"{BASE}/audit-logs")) if red_lead: expect_403("red_lead: GET /audit-logs → 403", red_lead.get(f"{BASE}/audit-logs")) if viewer: expect_403("viewer: GET /audit-logs → 403", viewer.get(f"{BASE}/audit-logs")) def test_system(sessions: dict) -> None: print("\n── 12. System / Scheduler ───────────────────────────────────") admin = sessions.get("admin") red_lead = sessions.get("red_lead") viewer = sessions.get("viewer") if admin: r = admin.get(f"{BASE}/system/scheduler-status") expect_ok("admin: GET /system/scheduler-status", r) if r.status_code == 200: data = r.json() _r("Scheduler is running", data.get("running") is True, str(data.get("running"))) if red_lead: expect_403("red_lead: POST /system/sync-mitre → 403", red_lead.post(f"{BASE}/system/sync-mitre")) if viewer: expect_403("viewer: POST /system/sync-mitre → 403", viewer.post(f"{BASE}/system/sync-mitre")) def test_reports(sessions: dict) -> None: print("\n── 13. Reports ──────────────────────────────────────────────") viewer = sessions.get("viewer") red_tech = sessions.get("red_tech") if viewer: expect_ok("viewer: GET /reports/coverage-summary", viewer.get(f"{BASE}/reports/coverage-summary")) expect_ok("viewer: GET /reports/generate/executive-summary?format=html", viewer.get(f"{BASE}/reports/generate/executive-summary?format=html")) if red_tech: expect_403("red_tech: GET /reports/generate/coverage-summary → 403", red_tech.get(f"{BASE}/reports/generate/coverage-summary")) def test_api_keys(sessions: dict) -> None: print("\n── 14. API Keys ─────────────────────────────────────────────") admin = sessions.get("admin") red_lead = sessions.get("red_lead") if admin: # Create read-only API key r = admin.post(f"{BASE}/api-keys", json={ "name": "QA Read Key", "scopes": ["read"], "expires_days": 1, }) expect_ok("admin: POST /api-keys (read scope)", r) if r.status_code == 201: raw_key = r.json().get("raw_key") or r.json().get("key") key_id = r.json().get("id") if raw_key: # Test that read-scope key cannot write # API keys are passed as Bearer tokens (Authorization: Bearer aegis_...) s_key = requests.Session() s_key.headers["Authorization"] = f"Bearer {raw_key}" # Should work for GET rg = s_key.get(f"{BASE}/techniques?limit=1") expect_ok("read-scope API key: GET /techniques", rg) # Should fail for POST (write) rp = s_key.post(f"{BASE}/tests", json={ "technique_id": str(uuid.uuid4()), "name": "scope test", "platform": "windows" }) _r("read-scope API key: POST /tests → 403", rp.status_code == 403, f"got {rp.status_code}") if key_id: expect_ok("admin: DELETE /api-keys/{id}", admin.delete(f"{BASE}/api-keys/{key_id}")) # ─── main ───────────────────────────────────────────────────────────────────── def main(): print("=" * 60) print(" AEGIS QA RUNNER") print("=" * 60) print("\n[Setup] Logging in as admin...") try: admin_session = find_admin_session() # 1 login call — avoids wasting rate limit budget except RuntimeError as e: print(f"FATAL: {e}") sys.exit(1) print("\n[Setup] Creating test users...") users = create_test_users(admin_session) print("\n[Setup] Logging in all users (with 13s spacing to avoid rate limit)...") sessions = get_sessions(users, admin_session) print(f" Active sessions: {list(sessions.keys())}") # Run test suites — state passes shared data (technique_id, test_id) between suites test_auth(sessions) test_user_management(sessions) test_techniques(sessions) state = test_full_test_lifecycle(sessions) test_knowledge(sessions, state) test_alerts(sessions) test_snapshots(sessions) test_dashboard(sessions) test_campaigns(sessions, state) test_webhooks(sessions) test_audit_logs(sessions) test_system(sessions) test_reports(sessions) test_api_keys(sessions) # Cleanup: delete QA test users print("\n[Cleanup] Removing QA test users...") all_users = admin_session.get(f"{BASE}/users").json() if isinstance(all_users, list): for u in all_users: if u.get("username", "").startswith("qa_"): admin_session.delete(f"{BASE}/users/{u['id']}") print(f" Deleted {u['username']}") # Summary total = len(RESULTS) passed = sum(1 for r in RESULTS if r["ok"]) failed = total - passed print("\n" + "=" * 60) print(f" RESULTS: {passed}/{total} PASSED | {failed} FAILED") print("=" * 60) if failed: print("\nFAILED TESTS:") for r in RESULTS: if not r["ok"]: print(f" ❌ {r['label']} [{r['detail']}]") return 0 if failed == 0 else 1 if __name__ == "__main__": sys.exit(main())