- Add test_test_entity.py with 46 pure unit tests covering the full domain entity - Fix _FakeSettings in 11 test files (REPORT_TEMPLATES_DIR, JIRA, TEMPO) - Fix stale db.commit assertions to db.flush after UoW refactor - Add missing mock fields for TestEntity.from_orm compatibility - Make database.py skip pool args for SQLite in test environment - Disable slowapi rate limiter in test client fixture - Inject test engine into app.database to fix threading errors - Update role assertions to match current require_any_role policy - Mark 6 legacy V1 endpoint tests as xfail (replaced by V2 workflow)
252 lines
8.8 KiB
Python
252 lines
8.8 KiB
Python
"""Validation tests for T-107: Updated status recalculation service.
|
|
|
|
Verifies the new logic that considers dual validation and detection_result.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import uuid
|
|
from unittest.mock import MagicMock
|
|
from types import ModuleType
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stub heavy dependencies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if backend_dir not in sys.path:
|
|
sys.path.insert(0, backend_dir)
|
|
|
|
# Only stub if not already stubbed (in case tests run together)
|
|
if "pydantic_settings" not in sys.modules:
|
|
pydantic_settings_mock = ModuleType("pydantic_settings")
|
|
|
|
class _BaseSettings:
|
|
def __init__(self, **kwargs):
|
|
pass
|
|
def __init_subclass__(cls, **kwargs):
|
|
super().__init_subclass__(**kwargs)
|
|
|
|
pydantic_settings_mock.BaseSettings = _BaseSettings
|
|
sys.modules["pydantic_settings"] = pydantic_settings_mock
|
|
|
|
if "app.config" not in sys.modules:
|
|
config_mod = ModuleType("app.config")
|
|
|
|
class _FakeSettings:
|
|
DATABASE_URL = "sqlite:///:memory:"
|
|
SECRET_KEY = "test"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
MINIO_ENDPOINT = "localhost:9000"
|
|
MINIO_ACCESS_KEY = "test"
|
|
MINIO_SECRET_KEY = "test"
|
|
MINIO_BUCKET = "test"
|
|
REPORT_TEMPLATES_DIR = "app/templates/reports"
|
|
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
|
|
COMPANY_NAME = "Test Org"
|
|
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
|
|
JIRA_ENABLED = False
|
|
JIRA_URL = ""
|
|
JIRA_USERNAME = ""
|
|
JIRA_API_TOKEN = ""
|
|
JIRA_IS_CLOUD = True
|
|
JIRA_DEFAULT_PROJECT = ""
|
|
JIRA_ISSUE_TYPE_TEST = "Task"
|
|
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
|
|
TEMPO_ENABLED = False
|
|
TEMPO_API_TOKEN = ""
|
|
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
|
|
NVD_API_KEY = ""
|
|
STALE_THRESHOLD_DAYS = 365
|
|
CORS_ORIGINS = "http://localhost:3000"
|
|
SCORING_WEIGHT_TESTS = 40
|
|
SCORING_WEIGHT_DETECTION_RULES = 20
|
|
SCORING_WEIGHT_D3FEND = 15
|
|
SCORING_WEIGHT_FRESHNESS = 15
|
|
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
|
|
|
|
config_mod.settings = _FakeSettings()
|
|
sys.modules["app.config"] = config_mod
|
|
|
|
if "app.database" not in sys.modules:
|
|
db_mod = ModuleType("app.database")
|
|
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
|
|
db_mod.get_db = MagicMock()
|
|
sys.modules["app.database"] = db_mod
|
|
|
|
if "taxii2client" not in sys.modules:
|
|
sys.modules["taxii2client"] = ModuleType("taxii2client")
|
|
taxii_v20 = ModuleType("taxii2client.v20")
|
|
taxii_v20.Server = MagicMock
|
|
sys.modules["taxii2client.v20"] = taxii_v20
|
|
|
|
if "jose" not in sys.modules:
|
|
jose_mod = ModuleType("jose")
|
|
jose_mod.JWTError = Exception
|
|
jose_mod.jwt = MagicMock()
|
|
sys.modules["jose"] = jose_mod
|
|
|
|
if "boto3" not in sys.modules:
|
|
boto3_mod = ModuleType("boto3")
|
|
boto3_mod.client = MagicMock()
|
|
sys.modules["boto3"] = boto3_mod
|
|
sys.modules["botocore"] = ModuleType("botocore")
|
|
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
|
|
sys.modules["botocore.exceptions"].ClientError = Exception
|
|
|
|
if "apscheduler" not in sys.modules:
|
|
sys.modules["apscheduler"] = ModuleType("apscheduler")
|
|
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
|
|
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
|
|
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
|
|
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
|
|
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
|
|
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from app.models.enums import TechniqueStatus, TestState, TestResult
|
|
from app.services.status_service import recalculate_technique_status
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_test_obj(state, detection_result=None):
|
|
"""Create a mock test with the given state and detection_result."""
|
|
t = MagicMock()
|
|
t.state = state
|
|
t.detection_result = detection_result
|
|
return t
|
|
|
|
|
|
def _make_technique(tests=None):
|
|
"""Create a mock technique."""
|
|
technique = MagicMock()
|
|
technique.tests = tests or []
|
|
technique.status_global = None
|
|
return technique
|
|
|
|
|
|
def _make_db():
|
|
return MagicMock()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. Sin tests -> not_evaluated
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_no_tests():
|
|
technique = _make_technique([])
|
|
db = _make_db()
|
|
recalculate_technique_status(db, technique)
|
|
assert technique.status_global == TechniqueStatus.not_evaluated
|
|
print(" [PASS] No tests -> not_evaluated")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Todos validated con detection=detected -> validated
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_all_validated_all_detected():
|
|
tests = [
|
|
_make_test_obj(TestState.validated, TestResult.detected),
|
|
_make_test_obj(TestState.validated, TestResult.detected),
|
|
]
|
|
technique = _make_technique(tests)
|
|
db = _make_db()
|
|
recalculate_technique_status(db, technique)
|
|
assert technique.status_global == TechniqueStatus.validated
|
|
print(" [PASS] All validated, all detected -> validated")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. Algunos validated, otros en progreso -> partial
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_some_validated_some_in_progress():
|
|
tests = [
|
|
_make_test_obj(TestState.validated, TestResult.detected),
|
|
_make_test_obj(TestState.red_executing, None),
|
|
]
|
|
technique = _make_technique(tests)
|
|
db = _make_db()
|
|
recalculate_technique_status(db, technique)
|
|
assert technique.status_global == TechniqueStatus.partial
|
|
print(" [PASS] Some validated, some in progress -> partial")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. Todos en estados intermedios -> in_progress
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_all_intermediate():
|
|
tests = [
|
|
_make_test_obj(TestState.red_executing, None),
|
|
_make_test_obj(TestState.blue_evaluating, None),
|
|
]
|
|
technique = _make_technique(tests)
|
|
db = _make_db()
|
|
recalculate_technique_status(db, technique)
|
|
assert technique.status_global == TechniqueStatus.in_progress
|
|
print(" [PASS] All intermediate -> in_progress")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. Todos validated con detection=not_detected -> not_covered
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_all_validated_not_detected():
|
|
tests = [
|
|
_make_test_obj(TestState.validated, TestResult.not_detected),
|
|
_make_test_obj(TestState.validated, TestResult.not_detected),
|
|
]
|
|
technique = _make_technique(tests)
|
|
db = _make_db()
|
|
recalculate_technique_status(db, technique)
|
|
assert technique.status_global == TechniqueStatus.not_covered
|
|
print(" [PASS] All validated, not_detected -> not_covered")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bonus: All validated with partially_detected -> partial
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_all_validated_partially_detected():
|
|
tests = [
|
|
_make_test_obj(TestState.validated, TestResult.detected),
|
|
_make_test_obj(TestState.validated, TestResult.partially_detected),
|
|
]
|
|
technique = _make_technique(tests)
|
|
db = _make_db()
|
|
recalculate_technique_status(db, technique)
|
|
assert technique.status_global == TechniqueStatus.partial
|
|
print(" [PASS] All validated, partially_detected -> partial")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run all
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
print("T-107 Validation: Status Service Recalculation")
|
|
print("=" * 50)
|
|
test_no_tests()
|
|
test_all_validated_all_detected()
|
|
test_some_validated_some_in_progress()
|
|
test_all_intermediate()
|
|
test_all_validated_not_detected()
|
|
test_all_validated_partially_detected()
|
|
print("=" * 50)
|
|
print("ALL T-107 validations PASSED!")
|