"""Validation tests for T-112: System endpoint for Atomic Red Team import. Tests endpoint existence, admin-only access, and audit logging. """ import sys import os import uuid from unittest.mock import MagicMock from types import ModuleType import inspect # --------------------------------------------------------------------------- # Stubs # --------------------------------------------------------------------------- backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if backend_dir not in sys.path: sys.path.insert(0, backend_dir) if "pydantic_settings" not in sys.modules: pydantic_settings_mock = ModuleType("pydantic_settings") class _BaseSettings: def __init__(self, **kwargs): pass def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) pydantic_settings_mock.BaseSettings = _BaseSettings sys.modules["pydantic_settings"] = pydantic_settings_mock if "app.config" not in sys.modules: config_mod = ModuleType("app.config") class _FakeSettings: DATABASE_URL = "sqlite:///:memory:" SECRET_KEY = "test" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 MINIO_ENDPOINT = "localhost:9000" MINIO_ACCESS_KEY = "test" MINIO_SECRET_KEY = "test" MINIO_BUCKET = "test" REPORT_TEMPLATES_DIR = "app/templates/reports" REPORT_OUTPUT_DIR = "/tmp/aegis_reports" COMPANY_NAME = "Test Org" COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png" JIRA_ENABLED = False JIRA_URL = "" JIRA_USERNAME = "" JIRA_API_TOKEN = "" JIRA_IS_CLOUD = True JIRA_DEFAULT_PROJECT = "" JIRA_ISSUE_TYPE_TEST = "Task" JIRA_ISSUE_TYPE_CAMPAIGN = "Epic" TEMPO_ENABLED = False TEMPO_API_TOKEN = "" TEMPO_DEFAULT_WORK_TYPE = "Red Team" NVD_API_KEY = "" STALE_THRESHOLD_DAYS = 365 CORS_ORIGINS = "http://localhost:3000" SCORING_WEIGHT_TESTS = 40 SCORING_WEIGHT_DETECTION_RULES = 20 SCORING_WEIGHT_D3FEND = 15 SCORING_WEIGHT_FRESHNESS = 15 SCORING_WEIGHT_PLATFORM_DIVERSITY = 10 config_mod.settings = _FakeSettings() sys.modules["app.config"] = config_mod if "app.database" not in sys.modules: db_mod = ModuleType("app.database") db_mod.Base = type("Base", (), {"metadata": MagicMock()}) db_mod.get_db = MagicMock() db_mod.SessionLocal = MagicMock() sys.modules["app.database"] = db_mod elif not hasattr(sys.modules["app.database"], "SessionLocal"): sys.modules["app.database"].SessionLocal = MagicMock() for mod_name in [ "taxii2client", "taxii2client.v20", "jose", "boto3", "botocore", "botocore.exceptions", "apscheduler", "apscheduler.schedulers", "apscheduler.schedulers.background", "apscheduler.triggers", "apscheduler.triggers.cron", ]: if mod_name not in sys.modules: m = ModuleType(mod_name) if mod_name == "taxii2client.v20": m.Server = MagicMock elif mod_name == "jose": m.JWTError = Exception; m.jwt = MagicMock() elif mod_name == "boto3": m.client = MagicMock() elif mod_name == "botocore.exceptions": m.ClientError = Exception elif mod_name == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock elif mod_name == "apscheduler.triggers.cron": m.CronTrigger = MagicMock sys.modules[mod_name] = m # --------------------------------------------------------------------------- # Imports # --------------------------------------------------------------------------- from app.routers.system import router def _get_route_paths(): routes = {} for route in router.routes: path = getattr(route, "path", "") methods = getattr(route, "methods", set()) for method in methods: routes[f"{method} {path}"] = route return routes # --------------------------------------------------------------------------- # 1. POST /system/import-atomic-tests endpoint exists # --------------------------------------------------------------------------- def test_import_endpoint_exists(): routes = _get_route_paths() found = any("import-atomic-tests" in k and "POST" in k for k in routes) assert found, f"POST /system/import-atomic-tests not found. Routes: {list(routes.keys())}" print(" [PASS] POST /system/import-atomic-tests endpoint exists") # --------------------------------------------------------------------------- # 2. Only admin can execute # --------------------------------------------------------------------------- def test_admin_only(): from app.routers.system import trigger_atomic_import source = inspect.getsource(trigger_atomic_import) assert 'require_role("admin")' in source or "require_role" in source print(" [PASS] Only admin can execute the import") # --------------------------------------------------------------------------- # 3. Audit log is registered (via atomic_import_service) # --------------------------------------------------------------------------- def test_audit_log_in_service(): from app.services.atomic_import_service import import_atomic_red_team source = inspect.getsource(import_atomic_red_team) assert "log_action" in source assert "import_atomic_red_team" in source print(" [PASS] Audit log is registered in the import service") # --------------------------------------------------------------------------- # 4. Response includes imported and skipped counts # --------------------------------------------------------------------------- def test_response_format(): from app.routers.system import trigger_atomic_import source = inspect.getsource(trigger_atomic_import) assert '"imported"' in source or "'imported'" in source assert '"skipped"' in source or "'skipped'" in source print(" [PASS] Response includes imported and skipped counts") # --------------------------------------------------------------------------- # Run all # --------------------------------------------------------------------------- if __name__ == "__main__": print("T-112 Validation: System Import Atomic Red Team Endpoint") print("=" * 58) test_import_endpoint_exists() test_admin_only() test_audit_log_in_service() test_response_format() print("=" * 58) print("ALL T-112 validations PASSED!")