Files
Aegis/scripts/qa_phase14.py
kitos d81fc04b8f
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(enterprise): Phase 14 — API Key Management + SSO/SAML 2.0
- ApiKey model (SHA-256 hash, prefix, scopes, expiry) + Alembic migration (b040ent)
- SsoConfig model for SAML 2.0 IdP settings (attribute mapping, auto-provision)
- API key auth integrated into get_current_user (aegis_ prefix detection)
- Routers: /api/v1/api-keys (full CRUD + revoke) and /api/v1/sso (metadata, login, callback, config)
- python3-saml added to requirements; Dockerfile adds libxmlsec1-dev for SAML XML signing
- QA script: 52 assertions covering key lifecycle, API key auth, SSO config

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 16:43:57 +02:00

306 lines
13 KiB
Python

"""
QA script for Phase 14 — Enterprise Readiness (API Keys + SSO Config).
Run with: python -X utf8 scripts/qa_phase14.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import requests
BASE = "http://localhost:8000/api/v1"
PASS = "\033[92m✓\033[0m"
FAIL = "\033[91m✗\033[0m"
passed = 0
failed = 0
def check(label: str, condition: bool, detail: str = ""):
global passed, failed
if condition:
passed += 1
print(f" {PASS} {label}")
else:
failed += 1
msg = f" {FAIL} {label}"
if detail:
msg += f"{detail}"
print(msg)
def get_token(username="administrator", password="admin123"):
r = requests.post(f"{BASE}/auth/login",
data={"username": username, "password": password})
if r.status_code == 200:
return r.json().get("access_token") or r.json().get("token")
raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}")
def auth(token):
return {"Authorization": f"Bearer {token}"}
# ─────────────────────────────────────────────────────────────────────────────
def main():
print("\n====== Phase 14 QA — Enterprise Readiness ======\n")
token = get_token()
h = auth(token)
# ── Block 1: Create API key ───────────────────────────────────────────────
print("── Block 1: Create API key ──")
r = requests.post(f"{BASE}/api-keys", headers=h, json={
"name": "QA Test Key",
"description": "Automated QA key",
"scopes": ["read", "write"],
})
check("POST /api-keys → 201", r.status_code == 201, r.text[:200])
created = r.json() if r.status_code == 201 else {}
check("Created key has id", bool(created.get("id")))
check("Created key has raw_key", bool(created.get("raw_key")))
check("raw_key starts with aegis_", str(created.get("raw_key", "")).startswith("aegis_"))
check("key_prefix present", bool(created.get("key_prefix")))
check("key_prefix matches raw_key start",
str(created.get("raw_key", "")).startswith(str(created.get("key_prefix", "X"))))
check("scopes correct", set(created.get("scopes", [])) == {"read", "write"})
check("is_active = True", created.get("is_active") == True)
check("raw_key NOT in key_prefix", created.get("raw_key") != created.get("key_prefix"))
raw_key = created.get("raw_key", "")
key_id = created.get("id", "")
# Create a read-only key
r2 = requests.post(f"{BASE}/api-keys", headers=h, json={
"name": "QA Read-only Key",
"scopes": ["read"],
})
check("POST second key (read-only) → 201", r2.status_code == 201, r2.text[:100])
raw_key_ro = r2.json().get("raw_key", "") if r2.status_code == 201 else ""
key_id_ro = r2.json().get("id", "") if r2.status_code == 201 else ""
# Invalid scope → 422
r3 = requests.post(f"{BASE}/api-keys", headers=h, json={
"name": "Bad Scope Key",
"scopes": ["superuser"],
})
check("POST with invalid scope → 422", r3.status_code == 422)
print()
# ── Block 2: List & Get API keys ─────────────────────────────────────────
print("── Block 2: List & Get API keys ──")
r = requests.get(f"{BASE}/api-keys", headers=h)
check("GET /api-keys → 200", r.status_code == 200, r.text[:100])
keys = r.json() if r.status_code == 200 else []
check("List not empty", len(keys) > 0)
check("List contains our key", any(k["id"] == key_id for k in keys))
check("raw_key NOT in list response", all("raw_key" not in k for k in keys))
check("key_hash NOT in list response", all("key_hash" not in k for k in keys))
r = requests.get(f"{BASE}/api-keys/{key_id}", headers=h)
check("GET /api-keys/{id} → 200", r.status_code == 200)
k = r.json() if r.status_code == 200 else {}
check("Get returns correct id", k.get("id") == key_id)
check("raw_key NOT in get response", "raw_key" not in k)
# 404 for non-existent key
r = requests.get(
f"{BASE}/api-keys/00000000-0000-0000-0000-000000000001", headers=h
)
check("GET non-existent key → 404", r.status_code == 404)
print()
# ── Block 3: Authenticate with API key ───────────────────────────────────
print("── Block 3: Authenticate with API key ──")
if raw_key:
api_key_header = {"Authorization": f"Bearer {raw_key}"}
r = requests.get(f"{BASE}/techniques", headers=api_key_header)
check("GET /techniques with API key → 200", r.status_code == 200,
r.text[:150])
r = requests.get(f"{BASE}/risk/summary", headers=api_key_header)
check("GET /risk/summary with API key → 200", r.status_code == 200)
r = requests.get(f"{BASE}/dashboard/kpis", headers=api_key_header)
check("GET /dashboard/kpis with API key → 200", r.status_code == 200)
# Verify last_used_at was updated
r = requests.get(f"{BASE}/api-keys/{key_id}", headers=h)
check("last_used_at updated after API key use",
r.status_code == 200 and r.json().get("last_used_at") is not None)
# Invalid / garbage key → 401
r = requests.get(f"{BASE}/techniques",
headers={"Authorization": "Bearer aegis_deadbeefdeadbeef"})
check("Invalid API key → 401", r.status_code == 401)
print()
# ── Block 4: Update API key ───────────────────────────────────────────────
print("── Block 4: Update API key ──")
r = requests.patch(f"{BASE}/api-keys/{key_id}", headers=h, json={
"name": "QA Test Key (updated)",
"scopes": ["read"],
})
check("PATCH /api-keys/{id} → 200", r.status_code == 200, r.text[:150])
upd = r.json() if r.status_code == 200 else {}
check("Name updated", upd.get("name") == "QA Test Key (updated)")
check("Scopes updated to [read]", upd.get("scopes") == ["read"])
print()
# ── Block 5: Revoke API key ───────────────────────────────────────────────
print("── Block 5: Revoke API key ──")
if key_id_ro:
r = requests.post(f"{BASE}/api-keys/{key_id_ro}/revoke", headers=h)
check("POST /api-keys/{id}/revoke → 200", r.status_code == 200,
r.text[:100])
rev = r.json() if r.status_code == 200 else {}
check("is_active = False after revoke", rev.get("is_active") == False)
# Revoked key should no longer authenticate
if raw_key_ro:
r = requests.get(f"{BASE}/techniques",
headers={"Authorization": f"Bearer {raw_key_ro}"})
check("Revoked API key → 401", r.status_code == 401)
print()
# ── Block 6: SSO status (public) ─────────────────────────────────────────
print("── Block 6: SSO status & config ──")
r = requests.get(f"{BASE}/sso/status")
check("GET /sso/status → 200 (no auth required)", r.status_code == 200,
r.text[:150])
status_data = r.json() if r.status_code == 200 else {}
check("SSO status has 'enabled' field", "enabled" in status_data)
check("SSO status has 'configured' field", "configured" in status_data)
# SP metadata (may be 503 if no config yet, but endpoint exists)
r = requests.get(f"{BASE}/sso/metadata")
check("GET /sso/metadata returns XML or 404/503",
r.status_code in (200, 404, 503))
if r.status_code == 200:
check("Metadata is XML", "EntityDescriptor" in r.text
or "xml" in r.headers.get("content-type", ""))
# Get config (none yet → 404 or empty)
r = requests.get(f"{BASE}/sso/config", headers=h)
check("GET /sso/config (admin) → 200 or 404", r.status_code in (200, 404))
# Create/update SSO config
sso_payload = {
"is_enabled": False,
"provider_name": "Test IdP",
"sp_entity_id": "https://aegis.example.com/api/v1/sso/metadata",
"sp_acs_url": "https://aegis.example.com/api/v1/sso/callback",
"idp_entity_id": "https://idp.example.com",
"idp_sso_url": "https://idp.example.com/sso/saml",
"idp_certificate": "MIIC...fakecert",
"attr_email": "email",
"attr_username": "username",
"attr_role": "role",
"default_role": "viewer",
"auto_provision": True,
}
r = requests.put(f"{BASE}/sso/config", headers=h, json=sso_payload)
check("PUT /sso/config (admin) → 200", r.status_code == 200, r.text[:200])
cfg = r.json() if r.status_code == 200 else {}
check("Config has provider_name", cfg.get("provider_name") == "Test IdP")
check("Config has idp_entity_id", bool(cfg.get("idp_entity_id")))
check("sp_private_key NOT in response", "sp_private_key" not in cfg)
# Re-fetch config
r = requests.get(f"{BASE}/sso/config", headers=h)
check("GET /sso/config → 200 after upsert", r.status_code == 200)
# Status should now reflect configured=True
r = requests.get(f"{BASE}/sso/status")
st = r.json() if r.status_code == 200 else {}
check("SSO configured=True after upsert", st.get("configured") == True)
check("SSO enabled=False (we set it off)", st.get("enabled") == False)
print()
# ── Block 7: SSO login endpoint (disabled → 503) ──────────────────────────
print("── Block 7: SSO login (disabled) ──")
r = requests.get(f"{BASE}/sso/login", allow_redirects=False)
# Should be 503 (disabled) or 307/302 (if enabled and library present)
check("GET /sso/login returns 5xx when disabled or no lib",
r.status_code in (503, 302, 307, 400))
print()
# ── Block 8: Auth protection ──────────────────────────────────────────────
print("── Block 8: Auth protection ──")
r = requests.get(f"{BASE}/api-keys")
check("GET /api-keys without auth → 401", r.status_code == 401)
r = requests.post(f"{BASE}/api-keys", json={"name": "x", "scopes": ["read"]})
check("POST /api-keys without auth → 401", r.status_code == 401)
r = requests.get(f"{BASE}/sso/config")
check("GET /sso/config without auth → 401", r.status_code == 401)
r = requests.put(f"{BASE}/sso/config", json={"is_enabled": False})
check("PUT /sso/config without auth → 401", r.status_code == 401)
# Non-admin cannot delete keys
# (would need a non-admin user setup — skip for now)
print()
# ── Block 9: Cleanup ─────────────────────────────────────────────────────
print("── Block 9: Cleanup ──")
# Hard-delete the QA keys (admin)
for kid in [key_id, key_id_ro]:
if kid:
r = requests.delete(f"{BASE}/api-keys/{kid}", headers=h)
check(f"DELETE /api-keys/{kid[:8]}... → 204", r.status_code == 204,
r.text[:100])
print()
# ── Block 10: Regression ─────────────────────────────────────────────────
print("── Block 10: Regression ──")
r = requests.get(f"{BASE}/dashboard/kpis", headers=h)
check("GET /dashboard/kpis still works", r.status_code == 200)
r = requests.get(f"{BASE}/risk/summary", headers=h)
check("GET /risk/summary still works", r.status_code == 200)
r = requests.get(f"{BASE}/knowledge/playbooks", headers=h)
check("GET /knowledge/playbooks still works", r.status_code == 200)
r = requests.get(f"{BASE}/techniques", headers=h)
check("GET /techniques still works", r.status_code == 200)
print()
# ── Summary ───────────────────────────────────────────────────────────────
total = passed + failed
print(f"====== Results: {passed}/{total} passed", end="")
if failed:
print(f"\033[91m{failed} FAILED\033[0m ======\n")
sys.exit(1)
else:
print(" ✓ ALL PASSED ======\n")
if __name__ == "__main__":
main()