feat(phase-12): implement Red/Blue API endpoints (T-109, T-110, T-111, T-112)
T-109: Rewrite tests router with full Red/Blue workflow endpoints - list with filters, create from template, Red/Blue team updates with state guards, start-execution, submit-red, submit-blue, validate-red, validate-blue, reopen, and timeline. All using workflow service from Phase 11. T-110: Rewrite evidence router with Red/Blue separation - upload with team field, list with team filter, delete with state-based permissions. Red Team edits in draft/red_executing, Blue Team in blue_evaluating, admin bypasses all. T-111: Create test_templates router with full CRUD - paginated list with source/platform/severity/search filters, by-technique lookup, admin-only create/update, and soft delete. Registered in main.py. T-112: Add POST /system/import-atomic-tests endpoint to system router - admin-only trigger for Atomic Red Team import with error handling and statistics response. Includes validation tests for all four tasks (35 checks total).
This commit is contained in:
148
backend/tests/test_t112_system_import.py
Normal file
148
backend/tests/test_t112_system_import.py
Normal file
@@ -0,0 +1,148 @@
|
||||
"""Validation tests for T-112: System endpoint for Atomic Red Team import.
|
||||
|
||||
Tests endpoint existence, admin-only access, and audit logging.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import uuid
|
||||
from unittest.mock import MagicMock
|
||||
from types import ModuleType
|
||||
import inspect
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stubs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
if backend_dir not in sys.path:
|
||||
sys.path.insert(0, backend_dir)
|
||||
|
||||
if "pydantic_settings" not in sys.modules:
|
||||
pydantic_settings_mock = ModuleType("pydantic_settings")
|
||||
class _BaseSettings:
|
||||
def __init__(self, **kwargs): pass
|
||||
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
|
||||
pydantic_settings_mock.BaseSettings = _BaseSettings
|
||||
sys.modules["pydantic_settings"] = pydantic_settings_mock
|
||||
|
||||
if "app.config" not in sys.modules:
|
||||
config_mod = ModuleType("app.config")
|
||||
class _FakeSettings:
|
||||
DATABASE_URL = "sqlite:///:memory:"
|
||||
SECRET_KEY = "test"
|
||||
ALGORITHM = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
||||
MINIO_ENDPOINT = "localhost:9000"
|
||||
MINIO_ACCESS_KEY = "test"
|
||||
MINIO_SECRET_KEY = "test"
|
||||
MINIO_BUCKET = "test"
|
||||
config_mod.settings = _FakeSettings()
|
||||
sys.modules["app.config"] = config_mod
|
||||
|
||||
if "app.database" not in sys.modules:
|
||||
db_mod = ModuleType("app.database")
|
||||
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
|
||||
db_mod.get_db = MagicMock()
|
||||
db_mod.SessionLocal = MagicMock()
|
||||
sys.modules["app.database"] = db_mod
|
||||
elif not hasattr(sys.modules["app.database"], "SessionLocal"):
|
||||
sys.modules["app.database"].SessionLocal = MagicMock()
|
||||
|
||||
for mod_name in [
|
||||
"taxii2client", "taxii2client.v20",
|
||||
"jose", "boto3", "botocore", "botocore.exceptions",
|
||||
"apscheduler", "apscheduler.schedulers",
|
||||
"apscheduler.schedulers.background",
|
||||
"apscheduler.triggers", "apscheduler.triggers.cron",
|
||||
]:
|
||||
if mod_name not in sys.modules:
|
||||
m = ModuleType(mod_name)
|
||||
if mod_name == "taxii2client.v20": m.Server = MagicMock
|
||||
elif mod_name == "jose": m.JWTError = Exception; m.jwt = MagicMock()
|
||||
elif mod_name == "boto3": m.client = MagicMock()
|
||||
elif mod_name == "botocore.exceptions": m.ClientError = Exception
|
||||
elif mod_name == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock
|
||||
elif mod_name == "apscheduler.triggers.cron": m.CronTrigger = MagicMock
|
||||
sys.modules[mod_name] = m
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Imports
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
from app.routers.system import router
|
||||
|
||||
|
||||
def _get_route_paths():
|
||||
routes = {}
|
||||
for route in router.routes:
|
||||
path = getattr(route, "path", "")
|
||||
methods = getattr(route, "methods", set())
|
||||
for method in methods:
|
||||
routes[f"{method} {path}"] = route
|
||||
return routes
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. POST /system/import-atomic-tests endpoint exists
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_import_endpoint_exists():
|
||||
routes = _get_route_paths()
|
||||
found = any("import-atomic-tests" in k and "POST" in k for k in routes)
|
||||
assert found, f"POST /system/import-atomic-tests not found. Routes: {list(routes.keys())}"
|
||||
print(" [PASS] POST /system/import-atomic-tests endpoint exists")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Only admin can execute
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_admin_only():
|
||||
from app.routers.system import trigger_atomic_import
|
||||
source = inspect.getsource(trigger_atomic_import)
|
||||
assert 'require_role("admin")' in source or "require_role" in source
|
||||
print(" [PASS] Only admin can execute the import")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. Audit log is registered (via atomic_import_service)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_audit_log_in_service():
|
||||
from app.services.atomic_import_service import import_atomic_red_team
|
||||
source = inspect.getsource(import_atomic_red_team)
|
||||
assert "log_action" in source
|
||||
assert "import_atomic_red_team" in source
|
||||
print(" [PASS] Audit log is registered in the import service")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. Response includes imported and skipped counts
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_response_format():
|
||||
from app.routers.system import trigger_atomic_import
|
||||
source = inspect.getsource(trigger_atomic_import)
|
||||
assert '"imported"' in source or "'imported'" in source
|
||||
assert '"skipped"' in source or "'skipped'" in source
|
||||
print(" [PASS] Response includes imported and skipped counts")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Run all
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("T-112 Validation: System Import Atomic Red Team Endpoint")
|
||||
print("=" * 58)
|
||||
test_import_endpoint_exists()
|
||||
test_admin_only()
|
||||
test_audit_log_in_service()
|
||||
test_response_format()
|
||||
print("=" * 58)
|
||||
print("ALL T-112 validations PASSED!")
|
||||
Reference in New Issue
Block a user