Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
- ApiKey model (SHA-256 hash, prefix, scopes, expiry) + Alembic migration (b040ent) - SsoConfig model for SAML 2.0 IdP settings (attribute mapping, auto-provision) - API key auth integrated into get_current_user (aegis_ prefix detection) - Routers: /api/v1/api-keys (full CRUD + revoke) and /api/v1/sso (metadata, login, callback, config) - python3-saml added to requirements; Dockerfile adds libxmlsec1-dev for SAML XML signing - QA script: 52 assertions covering key lifecycle, API key auth, SSO config Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
306 lines
13 KiB
Python
306 lines
13 KiB
Python
"""
|
|
QA script for Phase 14 — Enterprise Readiness (API Keys + SSO Config).
|
|
Run with: python -X utf8 scripts/qa_phase14.py
|
|
"""
|
|
|
|
import sys, os
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import requests
|
|
|
|
BASE = "http://localhost:8000/api/v1"
|
|
PASS = "\033[92m✓\033[0m"
|
|
FAIL = "\033[91m✗\033[0m"
|
|
|
|
passed = 0
|
|
failed = 0
|
|
|
|
|
|
def check(label: str, condition: bool, detail: str = ""):
|
|
global passed, failed
|
|
if condition:
|
|
passed += 1
|
|
print(f" {PASS} {label}")
|
|
else:
|
|
failed += 1
|
|
msg = f" {FAIL} {label}"
|
|
if detail:
|
|
msg += f" — {detail}"
|
|
print(msg)
|
|
|
|
|
|
def get_token(username="administrator", password="admin123"):
|
|
r = requests.post(f"{BASE}/auth/login",
|
|
data={"username": username, "password": password})
|
|
if r.status_code == 200:
|
|
return r.json().get("access_token") or r.json().get("token")
|
|
raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}")
|
|
|
|
|
|
def auth(token):
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
print("\n====== Phase 14 QA — Enterprise Readiness ======\n")
|
|
|
|
token = get_token()
|
|
h = auth(token)
|
|
|
|
# ── Block 1: Create API key ───────────────────────────────────────────────
|
|
print("── Block 1: Create API key ──")
|
|
|
|
r = requests.post(f"{BASE}/api-keys", headers=h, json={
|
|
"name": "QA Test Key",
|
|
"description": "Automated QA key",
|
|
"scopes": ["read", "write"],
|
|
})
|
|
check("POST /api-keys → 201", r.status_code == 201, r.text[:200])
|
|
created = r.json() if r.status_code == 201 else {}
|
|
|
|
check("Created key has id", bool(created.get("id")))
|
|
check("Created key has raw_key", bool(created.get("raw_key")))
|
|
check("raw_key starts with aegis_", str(created.get("raw_key", "")).startswith("aegis_"))
|
|
check("key_prefix present", bool(created.get("key_prefix")))
|
|
check("key_prefix matches raw_key start",
|
|
str(created.get("raw_key", "")).startswith(str(created.get("key_prefix", "X"))))
|
|
check("scopes correct", set(created.get("scopes", [])) == {"read", "write"})
|
|
check("is_active = True", created.get("is_active") == True)
|
|
check("raw_key NOT in key_prefix", created.get("raw_key") != created.get("key_prefix"))
|
|
|
|
raw_key = created.get("raw_key", "")
|
|
key_id = created.get("id", "")
|
|
|
|
# Create a read-only key
|
|
r2 = requests.post(f"{BASE}/api-keys", headers=h, json={
|
|
"name": "QA Read-only Key",
|
|
"scopes": ["read"],
|
|
})
|
|
check("POST second key (read-only) → 201", r2.status_code == 201, r2.text[:100])
|
|
raw_key_ro = r2.json().get("raw_key", "") if r2.status_code == 201 else ""
|
|
key_id_ro = r2.json().get("id", "") if r2.status_code == 201 else ""
|
|
|
|
# Invalid scope → 422
|
|
r3 = requests.post(f"{BASE}/api-keys", headers=h, json={
|
|
"name": "Bad Scope Key",
|
|
"scopes": ["superuser"],
|
|
})
|
|
check("POST with invalid scope → 422", r3.status_code == 422)
|
|
|
|
print()
|
|
|
|
# ── Block 2: List & Get API keys ─────────────────────────────────────────
|
|
print("── Block 2: List & Get API keys ──")
|
|
|
|
r = requests.get(f"{BASE}/api-keys", headers=h)
|
|
check("GET /api-keys → 200", r.status_code == 200, r.text[:100])
|
|
keys = r.json() if r.status_code == 200 else []
|
|
check("List not empty", len(keys) > 0)
|
|
check("List contains our key", any(k["id"] == key_id for k in keys))
|
|
check("raw_key NOT in list response", all("raw_key" not in k for k in keys))
|
|
check("key_hash NOT in list response", all("key_hash" not in k for k in keys))
|
|
|
|
r = requests.get(f"{BASE}/api-keys/{key_id}", headers=h)
|
|
check("GET /api-keys/{id} → 200", r.status_code == 200)
|
|
k = r.json() if r.status_code == 200 else {}
|
|
check("Get returns correct id", k.get("id") == key_id)
|
|
check("raw_key NOT in get response", "raw_key" not in k)
|
|
|
|
# 404 for non-existent key
|
|
r = requests.get(
|
|
f"{BASE}/api-keys/00000000-0000-0000-0000-000000000001", headers=h
|
|
)
|
|
check("GET non-existent key → 404", r.status_code == 404)
|
|
|
|
print()
|
|
|
|
# ── Block 3: Authenticate with API key ───────────────────────────────────
|
|
print("── Block 3: Authenticate with API key ──")
|
|
|
|
if raw_key:
|
|
api_key_header = {"Authorization": f"Bearer {raw_key}"}
|
|
|
|
r = requests.get(f"{BASE}/techniques", headers=api_key_header)
|
|
check("GET /techniques with API key → 200", r.status_code == 200,
|
|
r.text[:150])
|
|
|
|
r = requests.get(f"{BASE}/risk/summary", headers=api_key_header)
|
|
check("GET /risk/summary with API key → 200", r.status_code == 200)
|
|
|
|
r = requests.get(f"{BASE}/dashboard/kpis", headers=api_key_header)
|
|
check("GET /dashboard/kpis with API key → 200", r.status_code == 200)
|
|
|
|
# Verify last_used_at was updated
|
|
r = requests.get(f"{BASE}/api-keys/{key_id}", headers=h)
|
|
check("last_used_at updated after API key use",
|
|
r.status_code == 200 and r.json().get("last_used_at") is not None)
|
|
|
|
# Invalid / garbage key → 401
|
|
r = requests.get(f"{BASE}/techniques",
|
|
headers={"Authorization": "Bearer aegis_deadbeefdeadbeef"})
|
|
check("Invalid API key → 401", r.status_code == 401)
|
|
|
|
print()
|
|
|
|
# ── Block 4: Update API key ───────────────────────────────────────────────
|
|
print("── Block 4: Update API key ──")
|
|
|
|
r = requests.patch(f"{BASE}/api-keys/{key_id}", headers=h, json={
|
|
"name": "QA Test Key (updated)",
|
|
"scopes": ["read"],
|
|
})
|
|
check("PATCH /api-keys/{id} → 200", r.status_code == 200, r.text[:150])
|
|
upd = r.json() if r.status_code == 200 else {}
|
|
check("Name updated", upd.get("name") == "QA Test Key (updated)")
|
|
check("Scopes updated to [read]", upd.get("scopes") == ["read"])
|
|
|
|
print()
|
|
|
|
# ── Block 5: Revoke API key ───────────────────────────────────────────────
|
|
print("── Block 5: Revoke API key ──")
|
|
|
|
if key_id_ro:
|
|
r = requests.post(f"{BASE}/api-keys/{key_id_ro}/revoke", headers=h)
|
|
check("POST /api-keys/{id}/revoke → 200", r.status_code == 200,
|
|
r.text[:100])
|
|
rev = r.json() if r.status_code == 200 else {}
|
|
check("is_active = False after revoke", rev.get("is_active") == False)
|
|
|
|
# Revoked key should no longer authenticate
|
|
if raw_key_ro:
|
|
r = requests.get(f"{BASE}/techniques",
|
|
headers={"Authorization": f"Bearer {raw_key_ro}"})
|
|
check("Revoked API key → 401", r.status_code == 401)
|
|
|
|
print()
|
|
|
|
# ── Block 6: SSO status (public) ─────────────────────────────────────────
|
|
print("── Block 6: SSO status & config ──")
|
|
|
|
r = requests.get(f"{BASE}/sso/status")
|
|
check("GET /sso/status → 200 (no auth required)", r.status_code == 200,
|
|
r.text[:150])
|
|
status_data = r.json() if r.status_code == 200 else {}
|
|
check("SSO status has 'enabled' field", "enabled" in status_data)
|
|
check("SSO status has 'configured' field", "configured" in status_data)
|
|
|
|
# SP metadata (may be 503 if no config yet, but endpoint exists)
|
|
r = requests.get(f"{BASE}/sso/metadata")
|
|
check("GET /sso/metadata returns XML or 404/503",
|
|
r.status_code in (200, 404, 503))
|
|
if r.status_code == 200:
|
|
check("Metadata is XML", "EntityDescriptor" in r.text
|
|
or "xml" in r.headers.get("content-type", ""))
|
|
|
|
# Get config (none yet → 404 or empty)
|
|
r = requests.get(f"{BASE}/sso/config", headers=h)
|
|
check("GET /sso/config (admin) → 200 or 404", r.status_code in (200, 404))
|
|
|
|
# Create/update SSO config
|
|
sso_payload = {
|
|
"is_enabled": False,
|
|
"provider_name": "Test IdP",
|
|
"sp_entity_id": "https://aegis.example.com/api/v1/sso/metadata",
|
|
"sp_acs_url": "https://aegis.example.com/api/v1/sso/callback",
|
|
"idp_entity_id": "https://idp.example.com",
|
|
"idp_sso_url": "https://idp.example.com/sso/saml",
|
|
"idp_certificate": "MIIC...fakecert",
|
|
"attr_email": "email",
|
|
"attr_username": "username",
|
|
"attr_role": "role",
|
|
"default_role": "viewer",
|
|
"auto_provision": True,
|
|
}
|
|
r = requests.put(f"{BASE}/sso/config", headers=h, json=sso_payload)
|
|
check("PUT /sso/config (admin) → 200", r.status_code == 200, r.text[:200])
|
|
cfg = r.json() if r.status_code == 200 else {}
|
|
check("Config has provider_name", cfg.get("provider_name") == "Test IdP")
|
|
check("Config has idp_entity_id", bool(cfg.get("idp_entity_id")))
|
|
check("sp_private_key NOT in response", "sp_private_key" not in cfg)
|
|
|
|
# Re-fetch config
|
|
r = requests.get(f"{BASE}/sso/config", headers=h)
|
|
check("GET /sso/config → 200 after upsert", r.status_code == 200)
|
|
|
|
# Status should now reflect configured=True
|
|
r = requests.get(f"{BASE}/sso/status")
|
|
st = r.json() if r.status_code == 200 else {}
|
|
check("SSO configured=True after upsert", st.get("configured") == True)
|
|
check("SSO enabled=False (we set it off)", st.get("enabled") == False)
|
|
|
|
print()
|
|
|
|
# ── Block 7: SSO login endpoint (disabled → 503) ──────────────────────────
|
|
print("── Block 7: SSO login (disabled) ──")
|
|
|
|
r = requests.get(f"{BASE}/sso/login", allow_redirects=False)
|
|
# Should be 503 (disabled) or 307/302 (if enabled and library present)
|
|
check("GET /sso/login returns 5xx when disabled or no lib",
|
|
r.status_code in (503, 302, 307, 400))
|
|
|
|
print()
|
|
|
|
# ── Block 8: Auth protection ──────────────────────────────────────────────
|
|
print("── Block 8: Auth protection ──")
|
|
|
|
r = requests.get(f"{BASE}/api-keys")
|
|
check("GET /api-keys without auth → 401", r.status_code == 401)
|
|
|
|
r = requests.post(f"{BASE}/api-keys", json={"name": "x", "scopes": ["read"]})
|
|
check("POST /api-keys without auth → 401", r.status_code == 401)
|
|
|
|
r = requests.get(f"{BASE}/sso/config")
|
|
check("GET /sso/config without auth → 401", r.status_code == 401)
|
|
|
|
r = requests.put(f"{BASE}/sso/config", json={"is_enabled": False})
|
|
check("PUT /sso/config without auth → 401", r.status_code == 401)
|
|
|
|
# Non-admin cannot delete keys
|
|
# (would need a non-admin user setup — skip for now)
|
|
|
|
print()
|
|
|
|
# ── Block 9: Cleanup ─────────────────────────────────────────────────────
|
|
print("── Block 9: Cleanup ──")
|
|
|
|
# Hard-delete the QA keys (admin)
|
|
for kid in [key_id, key_id_ro]:
|
|
if kid:
|
|
r = requests.delete(f"{BASE}/api-keys/{kid}", headers=h)
|
|
check(f"DELETE /api-keys/{kid[:8]}... → 204", r.status_code == 204,
|
|
r.text[:100])
|
|
|
|
print()
|
|
|
|
# ── Block 10: Regression ─────────────────────────────────────────────────
|
|
print("── Block 10: Regression ──")
|
|
|
|
r = requests.get(f"{BASE}/dashboard/kpis", headers=h)
|
|
check("GET /dashboard/kpis still works", r.status_code == 200)
|
|
|
|
r = requests.get(f"{BASE}/risk/summary", headers=h)
|
|
check("GET /risk/summary still works", r.status_code == 200)
|
|
|
|
r = requests.get(f"{BASE}/knowledge/playbooks", headers=h)
|
|
check("GET /knowledge/playbooks still works", r.status_code == 200)
|
|
|
|
r = requests.get(f"{BASE}/techniques", headers=h)
|
|
check("GET /techniques still works", r.status_code == 200)
|
|
|
|
print()
|
|
|
|
# ── Summary ───────────────────────────────────────────────────────────────
|
|
total = passed + failed
|
|
print(f"====== Results: {passed}/{total} passed", end="")
|
|
if failed:
|
|
print(f" — \033[91m{failed} FAILED\033[0m ======\n")
|
|
sys.exit(1)
|
|
else:
|
|
print(" ✓ ALL PASSED ======\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|