T-109: Rewrite tests router with full Red/Blue workflow endpoints - list with filters, create from template, Red/Blue team updates with state guards, start-execution, submit-red, submit-blue, validate-red, validate-blue, reopen, and timeline. All using workflow service from Phase 11. T-110: Rewrite evidence router with Red/Blue separation - upload with team field, list with team filter, delete with state-based permissions. Red Team edits in draft/red_executing, Blue Team in blue_evaluating, admin bypasses all. T-111: Create test_templates router with full CRUD - paginated list with source/platform/severity/search filters, by-technique lookup, admin-only create/update, and soft delete. Registered in main.py. T-112: Add POST /system/import-atomic-tests endpoint to system router - admin-only trigger for Atomic Red Team import with error handling and statistics response. Includes validation tests for all four tasks (35 checks total).
149 lines
5.4 KiB
Python
149 lines
5.4 KiB
Python
"""Validation tests for T-112: System endpoint for Atomic Red Team import.
|
|
|
|
Tests endpoint existence, admin-only access, and audit logging.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import uuid
|
|
from unittest.mock import MagicMock
|
|
from types import ModuleType
|
|
import inspect
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stubs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if backend_dir not in sys.path:
|
|
sys.path.insert(0, backend_dir)
|
|
|
|
if "pydantic_settings" not in sys.modules:
|
|
pydantic_settings_mock = ModuleType("pydantic_settings")
|
|
class _BaseSettings:
|
|
def __init__(self, **kwargs): pass
|
|
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
|
|
pydantic_settings_mock.BaseSettings = _BaseSettings
|
|
sys.modules["pydantic_settings"] = pydantic_settings_mock
|
|
|
|
if "app.config" not in sys.modules:
|
|
config_mod = ModuleType("app.config")
|
|
class _FakeSettings:
|
|
DATABASE_URL = "sqlite:///:memory:"
|
|
SECRET_KEY = "test"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
MINIO_ENDPOINT = "localhost:9000"
|
|
MINIO_ACCESS_KEY = "test"
|
|
MINIO_SECRET_KEY = "test"
|
|
MINIO_BUCKET = "test"
|
|
config_mod.settings = _FakeSettings()
|
|
sys.modules["app.config"] = config_mod
|
|
|
|
if "app.database" not in sys.modules:
|
|
db_mod = ModuleType("app.database")
|
|
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
|
|
db_mod.get_db = MagicMock()
|
|
db_mod.SessionLocal = MagicMock()
|
|
sys.modules["app.database"] = db_mod
|
|
elif not hasattr(sys.modules["app.database"], "SessionLocal"):
|
|
sys.modules["app.database"].SessionLocal = MagicMock()
|
|
|
|
for mod_name in [
|
|
"taxii2client", "taxii2client.v20",
|
|
"jose", "boto3", "botocore", "botocore.exceptions",
|
|
"apscheduler", "apscheduler.schedulers",
|
|
"apscheduler.schedulers.background",
|
|
"apscheduler.triggers", "apscheduler.triggers.cron",
|
|
]:
|
|
if mod_name not in sys.modules:
|
|
m = ModuleType(mod_name)
|
|
if mod_name == "taxii2client.v20": m.Server = MagicMock
|
|
elif mod_name == "jose": m.JWTError = Exception; m.jwt = MagicMock()
|
|
elif mod_name == "boto3": m.client = MagicMock()
|
|
elif mod_name == "botocore.exceptions": m.ClientError = Exception
|
|
elif mod_name == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock
|
|
elif mod_name == "apscheduler.triggers.cron": m.CronTrigger = MagicMock
|
|
sys.modules[mod_name] = m
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from app.routers.system import router
|
|
|
|
|
|
def _get_route_paths():
|
|
routes = {}
|
|
for route in router.routes:
|
|
path = getattr(route, "path", "")
|
|
methods = getattr(route, "methods", set())
|
|
for method in methods:
|
|
routes[f"{method} {path}"] = route
|
|
return routes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. POST /system/import-atomic-tests endpoint exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_import_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
found = any("import-atomic-tests" in k and "POST" in k for k in routes)
|
|
assert found, f"POST /system/import-atomic-tests not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /system/import-atomic-tests endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Only admin can execute
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_admin_only():
|
|
from app.routers.system import trigger_atomic_import
|
|
source = inspect.getsource(trigger_atomic_import)
|
|
assert 'require_role("admin")' in source or "require_role" in source
|
|
print(" [PASS] Only admin can execute the import")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. Audit log is registered (via atomic_import_service)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_audit_log_in_service():
|
|
from app.services.atomic_import_service import import_atomic_red_team
|
|
source = inspect.getsource(import_atomic_red_team)
|
|
assert "log_action" in source
|
|
assert "import_atomic_red_team" in source
|
|
print(" [PASS] Audit log is registered in the import service")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. Response includes imported and skipped counts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_response_format():
|
|
from app.routers.system import trigger_atomic_import
|
|
source = inspect.getsource(trigger_atomic_import)
|
|
assert '"imported"' in source or "'imported'" in source
|
|
assert '"skipped"' in source or "'skipped'" in source
|
|
print(" [PASS] Response includes imported and skipped counts")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run all
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
print("T-112 Validation: System Import Atomic Red Team Endpoint")
|
|
print("=" * 58)
|
|
test_import_endpoint_exists()
|
|
test_admin_only()
|
|
test_audit_log_in_service()
|
|
test_response_format()
|
|
print("=" * 58)
|
|
print("ALL T-112 validations PASSED!")
|