Files
Aegis/backend/tests/test_t107_status_service.py
Kitos 7af6be10be feat(phase-11): implement Red/Blue business logic services (T-106, T-107, T-108)
T-106: Create test_workflow_service.py with state-machine transitions for the complete test lifecycle (draft -> red_executing -> blue_evaluating -> in_review -> validated/rejected), dual validation by Red/Blue leads, and reopen capability with field cleanup.

T-107: Update status_service.py to use detection_result from Blue Team instead of legacy result field, and differentiate between partial progress (some validated) vs all-in-progress states.

T-108: Create atomic_import_service.py that downloads the Atomic Red Team repo as a ZIP (avoiding API rate limits), parses all atomics YAML files, and creates idempotent TestTemplate records mapped to MITRE techniques.

Includes validation tests for all three tasks (19 checks total).
2026-02-09 09:58:54 +01:00

230 lines
8.0 KiB
Python

"""Validation tests for T-107: Updated status recalculation service.
Verifies the new logic that considers dual validation and detection_result.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock
from types import ModuleType
# ---------------------------------------------------------------------------
# Stub heavy dependencies
# ---------------------------------------------------------------------------
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
# Only stub if not already stubbed (in case tests run together)
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs):
pass
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
if "app.config" not in sys.modules:
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
if "app.database" not in sys.modules:
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
sys.modules["app.database"] = db_mod
if "taxii2client" not in sys.modules:
sys.modules["taxii2client"] = ModuleType("taxii2client")
taxii_v20 = ModuleType("taxii2client.v20")
taxii_v20.Server = MagicMock
sys.modules["taxii2client.v20"] = taxii_v20
if "jose" not in sys.modules:
jose_mod = ModuleType("jose")
jose_mod.JWTError = Exception
jose_mod.jwt = MagicMock()
sys.modules["jose"] = jose_mod
if "boto3" not in sys.modules:
boto3_mod = ModuleType("boto3")
boto3_mod.client = MagicMock()
sys.modules["boto3"] = boto3_mod
sys.modules["botocore"] = ModuleType("botocore")
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
sys.modules["botocore.exceptions"].ClientError = Exception
if "apscheduler" not in sys.modules:
sys.modules["apscheduler"] = ModuleType("apscheduler")
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
from app.models.enums import TechniqueStatus, TestState, TestResult
from app.services.status_service import recalculate_technique_status
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_test_obj(state, detection_result=None):
"""Create a mock test with the given state and detection_result."""
t = MagicMock()
t.state = state
t.detection_result = detection_result
return t
def _make_technique(tests=None):
"""Create a mock technique."""
technique = MagicMock()
technique.tests = tests or []
technique.status_global = None
return technique
def _make_db():
return MagicMock()
# ---------------------------------------------------------------------------
# 1. Sin tests -> not_evaluated
# ---------------------------------------------------------------------------
def test_no_tests():
technique = _make_technique([])
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.not_evaluated
db.commit.assert_called()
print(" [PASS] No tests -> not_evaluated")
# ---------------------------------------------------------------------------
# 2. Todos validated con detection=detected -> validated
# ---------------------------------------------------------------------------
def test_all_validated_all_detected():
tests = [
_make_test_obj(TestState.validated, TestResult.detected),
_make_test_obj(TestState.validated, TestResult.detected),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.validated
print(" [PASS] All validated, all detected -> validated")
# ---------------------------------------------------------------------------
# 3. Algunos validated, otros en progreso -> partial
# ---------------------------------------------------------------------------
def test_some_validated_some_in_progress():
tests = [
_make_test_obj(TestState.validated, TestResult.detected),
_make_test_obj(TestState.red_executing, None),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.partial
print(" [PASS] Some validated, some in progress -> partial")
# ---------------------------------------------------------------------------
# 4. Todos en estados intermedios -> in_progress
# ---------------------------------------------------------------------------
def test_all_intermediate():
tests = [
_make_test_obj(TestState.red_executing, None),
_make_test_obj(TestState.blue_evaluating, None),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.in_progress
print(" [PASS] All intermediate -> in_progress")
# ---------------------------------------------------------------------------
# 5. Todos validated con detection=not_detected -> not_covered
# ---------------------------------------------------------------------------
def test_all_validated_not_detected():
tests = [
_make_test_obj(TestState.validated, TestResult.not_detected),
_make_test_obj(TestState.validated, TestResult.not_detected),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.not_covered
print(" [PASS] All validated, not_detected -> not_covered")
# ---------------------------------------------------------------------------
# Bonus: All validated with partially_detected -> partial
# ---------------------------------------------------------------------------
def test_all_validated_partially_detected():
tests = [
_make_test_obj(TestState.validated, TestResult.detected),
_make_test_obj(TestState.validated, TestResult.partially_detected),
]
technique = _make_technique(tests)
db = _make_db()
recalculate_technique_status(db, technique)
assert technique.status_global == TechniqueStatus.partial
print(" [PASS] All validated, partially_detected -> partial")
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-107 Validation: Status Service Recalculation")
print("=" * 50)
test_no_tests()
test_all_validated_all_detected()
test_some_validated_some_in_progress()
test_all_intermediate()
test_all_validated_not_detected()
test_all_validated_partially_detected()
print("=" * 50)
print("ALL T-107 validations PASSED!")