diff --git a/scripts/qa_runner.py b/scripts/qa_runner.py new file mode 100644 index 0000000..6ad78cf --- /dev/null +++ b/scripts/qa_runner.py @@ -0,0 +1,647 @@ +#!/usr/bin/env python3 +""" +Aegis QA Runner — Tests all roles and access-control rules. +Run inside the server with: python /tmp/qa_runner.py +""" + +import json +import sys +import time +import uuid +from typing import Optional + +import requests + +BASE = "http://localhost:8000/api/v1" +PASS = "Admin1234!" +ADMIN_USER = "admin" +ADMIN_PASS = "admin" # default from docker-compose — will discover at runtime + +RESULTS: list[dict] = [] + + +def _r(label: str, ok: bool, detail: str = "") -> None: + symbol = "✅" if ok else "❌" + RESULTS.append({"label": label, "ok": ok, "detail": detail}) + print(f" {symbol} {label}" + (f" [{detail}]" if detail else "")) + + +def login(username: str, password: str) -> Optional[requests.Session]: + s = requests.Session() + resp = s.post(f"{BASE}/auth/login", + json={"username": username, "password": password}, timeout=10) + if resp.status_code == 200: + return s + return None + + +# ─── helpers ───────────────────────────────────────────────────────────────── + +def expect(label: str, resp: requests.Response, expected_status: int) -> bool: + ok = resp.status_code == expected_status + _r(label, ok, f"got {resp.status_code} expected {expected_status}") + return ok + + +def expect_403(label: str, resp: requests.Response) -> bool: + return expect(label, resp, 403) + + +def expect_ok(label: str, resp: requests.Response) -> bool: + ok = resp.status_code in (200, 201, 202, 204) + _r(label, ok, f"got {resp.status_code}") + return ok + + +# ─── setup ──────────────────────────────────────────────────────────────────── + +def find_admin_credentials() -> tuple[str, str]: + """Try a few common admin passwords and return the working one.""" + for pw in ["admin", "Admin1234!", "admin123", "password", "aegis"]: + s = login("admin", pw) + if s: + print(f" [admin password found: {pw}]") + return "admin", pw + # Try to find any admin user in DB + raise RuntimeError("Cannot find working admin credentials") + + +def create_test_users(admin_session: requests.Session) -> dict[str, str]: + """Create one user per role. Returns {role: username}.""" + roles = ["red_lead", "blue_lead", "red_tech", "blue_tech", "viewer"] + users = {} + suffix = str(uuid.uuid4())[:8] + + for role in roles: + uname = f"qa_{role}_{suffix}" + resp = admin_session.post(f"{BASE}/users", json={ + "username": uname, + "password": PASS, + "email": f"{uname}@qa.test", + "full_name": f"QA {role}", + "role": role, + }, timeout=10) + if resp.status_code == 201: + users[role] = uname + print(f" Created {role}: {uname}") + else: + print(f" WARN: could not create {role}: {resp.status_code} {resp.text[:100]}") + + return users + + +def get_sessions(users: dict[str, str], admin_user: str, admin_pass: str) -> dict[str, requests.Session]: + sessions = {} + admin_s = login(admin_user, admin_pass) + if admin_s: + sessions["admin"] = admin_s + for role, uname in users.items(): + s = login(uname, PASS) + if s: + # If must_change_password, change it first + me = s.get(f"{BASE}/auth/me").json() + if me.get("must_change_password"): + s.post(f"{BASE}/auth/change-password", + json={"current_password": PASS, "new_password": PASS}) + sessions[role] = s + else: + print(f" WARN: login failed for {role}:{uname}") + return sessions + + +# ─── test suites ───────────────────────────────────────────────────────────── + +def test_auth(sessions: dict) -> None: + print("\n── 1. Auth ──────────────────────────────────────────────────") + s = sessions.get("viewer") + if not s: + return + expect_ok("GET /auth/me (viewer)", s.get(f"{BASE}/auth/me")) + + # Rate limit: 5 bad logins + print(" Testing rate limit on login (6 bad attempts)...") + blocked = False + for i in range(6): + r = requests.post(f"{BASE}/auth/login", + json={"username": "nobody", "password": f"bad{i}"}, timeout=5) + if r.status_code == 429: + blocked = True + break + _r("Login rate-limited after 5 failures", blocked, f"got 429={'yes' if blocked else 'no'}") + + +def test_user_management(sessions: dict) -> None: + print("\n── 2. User management ───────────────────────────────────────") + admin = sessions.get("admin") + red_lead = sessions.get("red_lead") + viewer = sessions.get("viewer") + + if admin: + expect_ok("admin: GET /users (list all)", admin.get(f"{BASE}/users")) + if red_lead: + expect_403("red_lead: GET /users → 403", red_lead.get(f"{BASE}/users")) + if viewer: + expect_403("viewer: GET /users → 403", viewer.get(f"{BASE}/users")) + expect_403("viewer: POST /users → 403", viewer.post(f"{BASE}/users", + json={"username": "hack", "password": "X", "email": "h@h.com", "role": "admin"})) + + +def test_techniques(sessions: dict) -> None: + print("\n── 3. Techniques (MITRE) ────────────────────────────────────") + admin = sessions.get("admin") + red_tech = sessions.get("red_tech") + viewer = sessions.get("viewer") + + if viewer: + r = viewer.get(f"{BASE}/techniques?limit=5") + expect_ok("viewer: GET /techniques", r) + + if red_tech: + expect_403("red_tech: POST /techniques → 403", + red_tech.post(f"{BASE}/techniques", + json={"mitre_id": "T9999", "name": "Hack", "tactic": "execution"})) + r = red_tech.get(f"{BASE}/techniques?status=not_covered&limit=5") + expect_ok("red_tech: GET /techniques?status=not_covered", r) + + +def test_full_test_lifecycle(sessions: dict) -> dict: + """Creates a test and walks it through the full state machine.""" + print("\n── 4. Full test lifecycle ────────────────────────────────────") + admin = sessions.get("admin") + red_lead = sessions.get("red_lead") + blue_lead = sessions.get("blue_lead") + red_tech = sessions.get("red_tech") + blue_tech = sessions.get("blue_tech") + viewer = sessions.get("viewer") + + state = {} + + if not red_lead: + print(" SKIP: no red_lead session") + return state + + # Get a technique + techs = red_lead.get(f"{BASE}/techniques?limit=5").json() + if not techs: + print(" SKIP: no techniques in DB") + return state + + tech_id = techs[0]["id"] + state["technique_id"] = tech_id + + # Create test as red_lead + r = red_lead.post(f"{BASE}/tests", json={ + "technique_id": tech_id, + "name": f"QA Test {uuid.uuid4().hex[:6]}", + "platform": "windows", + "description": "Automated QA test", + }) + if not expect_ok("red_lead: POST /tests (create)", r): + return state + + test_id = r.json()["id"] + state["test_id"] = test_id + print(f" Test created: {test_id}") + + # red_tech cannot create tests + if red_tech: + expect_403("red_tech: POST /tests → 403", + red_tech.post(f"{BASE}/tests", json={ + "technique_id": tech_id, "name": "Hack", "platform": "windows"})) + + # viewer cannot create tests + if viewer: + expect_403("viewer: POST /tests → 403", + viewer.post(f"{BASE}/tests", json={ + "technique_id": tech_id, "name": "Hack", "platform": "windows"})) + + # Check state = draft + t = red_lead.get(f"{BASE}/tests/{test_id}").json() + _r("test state = draft after creation", t.get("state") == "draft", + f"state={t.get('state')}") + + # Start execution (red_tech) + if red_tech: + r = red_tech.post(f"{BASE}/tests/{test_id}/start-execution") + expect_ok("red_tech: start-execution", r) + else: + r = red_lead.post(f"{BASE}/tests/{test_id}/start-execution") + expect_ok("red_lead: start-execution", r) + + # Verify state = red_executing + t = red_lead.get(f"{BASE}/tests/{test_id}").json() + _r("test state = red_executing", t.get("state") == "red_executing", + f"state={t.get('state')}") + + # blue_tech cannot start-execution + if blue_tech: + r2 = red_lead.post(f"{BASE}/tests", json={ + "technique_id": tech_id, + "name": f"QA Test2 {uuid.uuid4().hex[:6]}", + "platform": "linux", + }) + if r2.status_code == 201: + t2_id = r2.json()["id"] + expect_403("blue_tech: start-execution → 403", + blue_tech.post(f"{BASE}/tests/{t2_id}/start-execution")) + # Clean up test2 + admin.delete(f"{BASE}/tests/{t2_id}") if admin else None + + # Update red fields + actor = red_tech or red_lead + r = actor.post(f"{BASE}/tests/{test_id}/red", + json={"tool_used": "QA Tool", "command_executed": "echo test"}) + if r.status_code == 405: + r = actor.patch(f"{BASE}/tests/{test_id}/red", + json={"tool_used": "QA Tool", "command_executed": "echo test"}) + expect_ok("red actor: PATCH /red fields", r) + + # blue_tech cannot update red fields + if blue_tech: + rb = blue_tech.patch(f"{BASE}/tests/{test_id}/red", + json={"tool_used": "evil"}) + expect_403("blue_tech: PATCH /red → 403", rb) + + # Submit red + r = actor.post(f"{BASE}/tests/{test_id}/submit-red") + expect_ok("red actor: submit-red", r) + + t = red_lead.get(f"{BASE}/tests/{test_id}").json() + _r("test state = blue_evaluating", t.get("state") == "blue_evaluating", + f"state={t.get('state')}") + + # red_tech cannot submit-blue + if red_tech: + expect_403("red_tech: submit-blue → 403", + red_tech.post(f"{BASE}/tests/{test_id}/submit-blue")) + + # Update blue fields + bactor = blue_tech or blue_lead + if bactor: + r = bactor.patch(f"{BASE}/tests/{test_id}/blue", + json={"detection_result": "detected", "detection_notes": "QA test detection"}) + expect_ok("blue actor: PATCH /blue fields", r) + + # red_lead cannot update blue fields + if red_lead: + expect_403("red_lead: PATCH /blue → 403", + red_lead.patch(f"{BASE}/tests/{test_id}/blue", + json={"detection_result": "not_detected"})) + + # Submit blue + r = bactor.post(f"{BASE}/tests/{test_id}/submit-blue") + expect_ok("blue actor: submit-blue", r) + + t = red_lead.get(f"{BASE}/tests/{test_id}").json() + _r("test state = in_review", t.get("state") == "in_review", + f"state={t.get('state')}") + + # Validate red (red_lead) + if red_lead: + r = red_lead.post(f"{BASE}/tests/{test_id}/validate-red", + json={"red_validation_status": "approved", + "red_validation_notes": "QA approved"}) + expect_ok("red_lead: validate-red", r) + + # Validate blue (blue_lead) + if blue_lead: + r = blue_lead.post(f"{BASE}/tests/{test_id}/validate-blue", + json={"blue_validation_status": "approved", + "blue_validation_notes": "QA approved"}) + expect_ok("blue_lead: validate-blue", r) + + t = red_lead.get(f"{BASE}/tests/{test_id}").json() if red_lead else blue_lead.get(f"{BASE}/tests/{test_id}").json() + _r("test state = validated", t.get("state") == "validated", + f"state={t.get('state')}") + + return state + + +def test_knowledge(sessions: dict) -> None: + print("\n── 5. Knowledge (Playbooks + Lessons) ───────────────────────") + red_lead = sessions.get("red_lead") + red_tech = sessions.get("red_tech") + viewer = sessions.get("viewer") + + if red_lead: + r = red_lead.post(f"{BASE}/knowledge/playbooks", json={ + "title": f"QA Playbook {uuid.uuid4().hex[:6]}", + "playbook_type": "attack", + "content": "QA test content", + }) + expect_ok("red_lead: POST /knowledge/playbooks", r) + + r2 = red_lead.post(f"{BASE}/knowledge/lessons", json={ + "title": "QA Lesson", + "content": "What we learned in QA", + "severity": "medium", + }) + expect_ok("red_lead: POST /knowledge/lessons", r2) + + if red_tech: + expect_403("red_tech: POST /knowledge/playbooks → 403", + red_tech.post(f"{BASE}/knowledge/playbooks", json={ + "title": "Hack", "playbook_type": "attack", "content": "X"})) + expect_403("red_tech: POST /knowledge/lessons → 403", + red_tech.post(f"{BASE}/knowledge/lessons", json={ + "title": "X", "content": "Y", "severity": "low"})) + + if viewer: + expect_ok("viewer: GET /knowledge/playbooks", viewer.get(f"{BASE}/knowledge/playbooks")) + expect_ok("viewer: GET /knowledge/lessons", viewer.get(f"{BASE}/knowledge/lessons")) + expect_403("viewer: POST /knowledge/playbooks → 403", + viewer.post(f"{BASE}/knowledge/playbooks", json={ + "title": "X", "playbook_type": "attack", "content": "Y"})) + + +def test_alerts(sessions: dict) -> None: + print("\n── 6. Operational Alerts ─────────────────────────────────────") + admin = sessions.get("admin") + red_lead = sessions.get("red_lead") + red_tech = sessions.get("red_tech") + viewer = sessions.get("viewer") + + if red_lead: + expect_ok("red_lead: GET /alerts", red_lead.get(f"{BASE}/alerts")) + expect_ok("red_lead: GET /alerts/summary", red_lead.get(f"{BASE}/alerts/summary")) + expect_ok("red_lead: GET /alerts/rules/list", red_lead.get(f"{BASE}/alerts/rules/list")) + r = red_lead.post(f"{BASE}/alerts/evaluate") + expect_ok("red_lead: POST /alerts/evaluate", r) + + if red_tech: + expect_ok("red_tech: GET /alerts", red_tech.get(f"{BASE}/alerts")) + expect_403("red_tech: POST /alerts/evaluate → 403", + red_tech.post(f"{BASE}/alerts/evaluate")) + + if viewer: + expect_ok("viewer: GET /alerts", viewer.get(f"{BASE}/alerts")) + expect_403("viewer: POST /alerts/evaluate → 403", + viewer.post(f"{BASE}/alerts/evaluate")) + + # Try to acknowledge an alert if any exist + if red_lead: + alerts = red_lead.get(f"{BASE}/alerts?status=open&limit=1").json() + if alerts: + alert_id = alerts[0]["id"] + if red_tech: + expect_403("red_tech: acknowledge alert → 403", + red_tech.post(f"{BASE}/alerts/{alert_id}/acknowledge")) + expect_ok("red_lead: acknowledge alert", + red_lead.post(f"{BASE}/alerts/{alert_id}/acknowledge")) + + +def test_snapshots(sessions: dict) -> None: + print("\n── 7. Snapshots ─────────────────────────────────────────────") + admin = sessions.get("admin") + red_lead = sessions.get("red_lead") + viewer = sessions.get("viewer") + + if viewer: + expect_ok("viewer: GET /snapshots", viewer.get(f"{BASE}/snapshots")) + expect_ok("viewer: GET /snapshots/evolution", viewer.get(f"{BASE}/snapshots/evolution")) + expect_403("viewer: POST /snapshots → 403", + viewer.post(f"{BASE}/snapshots", json={"label": "QA Snap"})) + + if red_lead: + r = red_lead.post(f"{BASE}/snapshots", json={"label": "QA Snap"}) + expect_ok("red_lead: POST /snapshots", r) + if r.status_code in (200, 201): + snap_id = r.json()["id"] + expect_403("red_lead: DELETE /snapshots/{id} → 403", + red_lead.delete(f"{BASE}/snapshots/{snap_id}")) + if admin: + expect_ok("admin: DELETE /snapshots/{id}", + admin.delete(f"{BASE}/snapshots/{snap_id}")) + + +def test_dashboard(sessions: dict) -> None: + print("\n── 8. Executive Dashboard ───────────────────────────────────") + viewer = sessions.get("viewer") + red_tech = sessions.get("red_tech") + + for role, s in sessions.items(): + if s: + r = s.get(f"{BASE}/dashboard/kpis") + expect_ok(f"{role}: GET /dashboard/kpis", r) + break # just test once + + if viewer: + expect_ok("viewer: GET /dashboard/executive", viewer.get(f"{BASE}/dashboard/executive")) + expect_403("viewer: POST /dashboard/posture-snapshot → 403", + viewer.post(f"{BASE}/dashboard/posture-snapshot")) + + if red_tech: + expect_403("red_tech: POST /dashboard/posture-snapshot → 403", + red_tech.post(f"{BASE}/dashboard/posture-snapshot")) + + +def test_campaigns(sessions: dict) -> None: + print("\n── 9. Campaigns ─────────────────────────────────────────────") + red_lead = sessions.get("red_lead") + blue_lead = sessions.get("blue_lead") + red_tech = sessions.get("red_tech") + + if red_tech: + expect_ok("red_tech: GET /campaigns", red_tech.get(f"{BASE}/campaigns")) + expect_403("red_tech: POST /campaigns → 403", + red_tech.post(f"{BASE}/campaigns", + json={"name": "Hack Camp", "campaign_type": "purple_team"})) + + if red_lead: + r = red_lead.post(f"{BASE}/campaigns", + json={"name": f"QA Campaign {uuid.uuid4().hex[:6]}", + "campaign_type": "purple_team"}) + expect_ok("red_lead: POST /campaigns", r) + if r.status_code == 201: + camp_id = r.json()["id"] + # blue_lead cannot complete + if blue_lead: + expect_403("blue_lead: POST /campaigns/{id}/complete → 403", + blue_lead.post(f"{BASE}/campaigns/{camp_id}/complete")) + # red_lead can complete + expect_ok("red_lead: POST /campaigns/{id}/complete", + red_lead.post(f"{BASE}/campaigns/{camp_id}/complete")) + + +def test_webhooks(sessions: dict) -> None: + print("\n── 10. Webhooks (admin only) ─────────────────────────────────") + admin = sessions.get("admin") + red_tech = sessions.get("red_tech") + red_lead = sessions.get("red_lead") + + if red_tech: + expect_403("red_tech: GET /webhooks → 403", red_tech.get(f"{BASE}/webhooks")) + if red_lead: + # red_lead should be 403 + r = red_lead.get(f"{BASE}/webhooks") + # Could be 403 or 200 depending on if router uses admin-only + print(f" red_lead GET /webhooks: {r.status_code}") + if admin: + expect_ok("admin: GET /webhooks", admin.get(f"{BASE}/webhooks")) + # Test SSRF protection + r = admin.post(f"{BASE}/webhooks", json={ + "name": "SSRF Test", + "url": "http://192.168.1.1/steal", + "events": ["test.created"], + }) + _r("SSRF webhook blocked (private IP)", r.status_code == 422, + f"got {r.status_code}") + + +def test_audit_logs(sessions: dict) -> None: + print("\n── 11. Audit Logs ───────────────────────────────────────────") + admin = sessions.get("admin") + red_lead = sessions.get("red_lead") + viewer = sessions.get("viewer") + + if admin: + expect_ok("admin: GET /audit-logs", admin.get(f"{BASE}/audit-logs")) + if red_lead: + expect_403("red_lead: GET /audit-logs → 403", red_lead.get(f"{BASE}/audit-logs")) + if viewer: + expect_403("viewer: GET /audit-logs → 403", viewer.get(f"{BASE}/audit-logs")) + + +def test_system(sessions: dict) -> None: + print("\n── 12. System / Scheduler ───────────────────────────────────") + admin = sessions.get("admin") + red_lead = sessions.get("red_lead") + viewer = sessions.get("viewer") + + if admin: + r = admin.get(f"{BASE}/system/scheduler-status") + expect_ok("admin: GET /system/scheduler-status", r) + if r.status_code == 200: + data = r.json() + _r("Scheduler is running", data.get("running") is True, str(data.get("running"))) + + if red_lead: + expect_403("red_lead: POST /system/sync-mitre → 403", + red_lead.post(f"{BASE}/system/sync-mitre")) + if viewer: + expect_403("viewer: POST /system/sync-mitre → 403", + viewer.post(f"{BASE}/system/sync-mitre")) + + +def test_reports(sessions: dict) -> None: + print("\n── 13. Reports ──────────────────────────────────────────────") + viewer = sessions.get("viewer") + red_tech = sessions.get("red_tech") + + if viewer: + expect_ok("viewer: GET /reports/coverage-summary", viewer.get(f"{BASE}/reports/coverage-summary")) + expect_ok("viewer: GET /reports/generate/executive-summary?format=html", + viewer.get(f"{BASE}/reports/generate/executive-summary?format=html")) + if red_tech: + expect_403("red_tech: GET /reports/generate/coverage-summary → 403", + red_tech.get(f"{BASE}/reports/generate/coverage-summary")) + + +def test_api_keys(sessions: dict) -> None: + print("\n── 14. API Keys ─────────────────────────────────────────────") + admin = sessions.get("admin") + red_lead = sessions.get("red_lead") + + if admin: + # Create read-only API key + r = admin.post(f"{BASE}/api-keys", json={ + "name": "QA Read Key", + "scopes": ["read"], + "expires_days": 1, + }) + expect_ok("admin: POST /api-keys (read scope)", r) + if r.status_code == 201: + raw_key = r.json().get("raw_key") or r.json().get("key") + key_id = r.json().get("id") + if raw_key: + # Test that read-scope key cannot write + s_key = requests.Session() + s_key.headers["X-API-Key"] = raw_key + # Should work for GET + rg = s_key.get(f"{BASE}/techniques?limit=1") + expect_ok("read-scope API key: GET /techniques", rg) + # Should fail for POST (write) + rp = s_key.post(f"{BASE}/tests", json={ + "technique_id": str(uuid.uuid4()), + "name": "scope test", + "platform": "windows" + }) + _r("read-scope API key: POST /tests → 403", + rp.status_code == 403, + f"got {rp.status_code}") + + if key_id: + expect_ok("admin: DELETE /api-keys/{id}", admin.delete(f"{BASE}/api-keys/{key_id}")) + + +# ─── main ───────────────────────────────────────────────────────────────────── + +def main(): + print("=" * 60) + print(" AEGIS QA RUNNER") + print("=" * 60) + + print("\n[Setup] Finding admin credentials...") + try: + admin_user, admin_pass = find_admin_credentials() + except RuntimeError as e: + print(f"FATAL: {e}") + sys.exit(1) + + admin_session = login(admin_user, admin_pass) + if not admin_session: + print("FATAL: Cannot login as admin") + sys.exit(1) + + print("\n[Setup] Creating test users...") + users = create_test_users(admin_session) + + print("\n[Setup] Logging in all users...") + sessions = get_sessions(users, admin_user, admin_pass) + print(f" Active sessions: {list(sessions.keys())}") + + # Run test suites + test_auth(sessions) + test_user_management(sessions) + test_techniques(sessions) + test_full_test_lifecycle(sessions) + test_knowledge(sessions) + test_alerts(sessions) + test_snapshots(sessions) + test_dashboard(sessions) + test_campaigns(sessions) + test_webhooks(sessions) + test_audit_logs(sessions) + test_system(sessions) + test_reports(sessions) + test_api_keys(sessions) + + # Cleanup: delete QA test users + print("\n[Cleanup] Removing QA test users...") + all_users = admin_session.get(f"{BASE}/users").json() + if isinstance(all_users, list): + for u in all_users: + if u.get("username", "").startswith("qa_"): + admin_session.delete(f"{BASE}/users/{u['id']}") + print(f" Deleted {u['username']}") + + # Summary + total = len(RESULTS) + passed = sum(1 for r in RESULTS if r["ok"]) + failed = total - passed + + print("\n" + "=" * 60) + print(f" RESULTS: {passed}/{total} PASSED | {failed} FAILED") + print("=" * 60) + + if failed: + print("\nFAILED TESTS:") + for r in RESULTS: + if not r["ok"]: + print(f" ❌ {r['label']} [{r['detail']}]") + + return 0 if failed == 0 else 1 + + +if __name__ == "__main__": + sys.exit(main())