"""Validation tests for T-106: Test Workflow Service. Uses mock objects to avoid needing a running database. The database module is stubbed before any app imports. """ import sys import os import uuid from unittest.mock import MagicMock, patch from types import ModuleType from datetime import datetime # --------------------------------------------------------------------------- # 0. Stub heavy dependencies BEFORE importing any app modules # --------------------------------------------------------------------------- # Ensure backend/ is on sys.path backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if backend_dir not in sys.path: sys.path.insert(0, backend_dir) # Stub pydantic_settings so config doesn't fail if "pydantic_settings" not in sys.modules: pydantic_settings_mock = ModuleType("pydantic_settings") class _BaseSettings: def __init__(self, **kwargs): pass def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) pydantic_settings_mock.BaseSettings = _BaseSettings sys.modules["pydantic_settings"] = pydantic_settings_mock # Stub app.config config_mod = ModuleType("app.config") class _FakeSettings: DATABASE_URL = "sqlite:///:memory:" SECRET_KEY = "test" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 REDIS_URL = "redis://localhost:6379/0" MINIO_ENDPOINT = "localhost:9000" MINIO_ACCESS_KEY = "test" MINIO_SECRET_KEY = "test" MINIO_BUCKET = "test" MINIO_SECURE = False MAX_RETEST_COUNT = 3 CORS_ORIGINS = "http://localhost:3000" SCORING_WEIGHT_TESTS = 40 SCORING_WEIGHT_DETECTION_RULES = 20 SCORING_WEIGHT_D3FEND = 15 SCORING_WEIGHT_FRESHNESS = 15 SCORING_WEIGHT_PLATFORM_DIVERSITY = 10 config_mod.settings = _FakeSettings() sys.modules["app.config"] = config_mod # Stub app.database so no real engine is created db_mod = ModuleType("app.database") db_mod.Base = type("Base", (), {"metadata": MagicMock()}) db_mod.get_db = MagicMock() sys.modules["app.database"] = db_mod # Stub taxii2client taxii_v20 = ModuleType("taxii2client.v20") taxii_v20.Server = MagicMock sys.modules["taxii2client"] = ModuleType("taxii2client") sys.modules["taxii2client.v20"] = taxii_v20 # Stub jose jose_mod = ModuleType("jose") jose_mod.JWTError = Exception jose_mod.jwt = MagicMock() sys.modules["jose"] = jose_mod # Stub boto3 boto3_mod = ModuleType("boto3") boto3_mod.client = MagicMock() sys.modules["boto3"] = boto3_mod sys.modules["botocore"] = ModuleType("botocore") sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions") sys.modules["botocore.exceptions"].ClientError = Exception # Stub apscheduler sys.modules["apscheduler"] = ModuleType("apscheduler") sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers") sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background") sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers") sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron") sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock # --------------------------------------------------------------------------- # Now we can safely import # --------------------------------------------------------------------------- from app.models.enums import TestState from app.services.test_workflow_service import ( VALID_TRANSITIONS, can_transition, transition_state, start_execution, submit_red_evidence, submit_blue_evidence, validate_as_red_lead, validate_as_blue_lead, check_dual_validation, reopen_test, ) # We need the domain exceptions for assertions from app.domain.exceptions import InvalidTransitionError # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_test(state: TestState = TestState.draft, **kwargs) -> MagicMock: t = MagicMock() t.id = uuid.uuid4() t.name = "Mock Test" t.technique_id = uuid.uuid4() t.state = state t.red_validation_status = kwargs.get("red_validation_status", None) t.blue_validation_status = kwargs.get("blue_validation_status", None) t.red_validated_by = None t.red_validated_at = None t.red_validation_notes = None t.blue_validated_by = None t.blue_validated_at = None t.blue_validation_notes = None t.execution_date = None return t def _make_user(role: str = "red_tech") -> MagicMock: user = MagicMock() user.id = uuid.uuid4() user.role = role return user def _make_db() -> MagicMock: return MagicMock() # --------------------------------------------------------------------------- # 1. draft -> red_executing works # --------------------------------------------------------------------------- @patch("app.services.test_workflow_service.log_action") def test_draft_to_red_executing(mock_log): test = _make_test(TestState.draft) user = _make_user("red_tech") db = _make_db() result = start_execution(db, test, user) assert result.state == TestState.red_executing assert result.execution_date is not None db.commit.assert_called() mock_log.assert_called() print(" [PASS] Transition draft -> red_executing works") # --------------------------------------------------------------------------- # 2. draft -> validated fails (not allowed) # --------------------------------------------------------------------------- @patch("app.services.test_workflow_service.log_action") def test_draft_to_validated_fails(mock_log): test = _make_test(TestState.draft) user = _make_user("admin") db = _make_db() try: transition_state(db, test, TestState.validated, user) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" assert exc.current_state == "draft" assert exc.target_state == "validated" print(" [PASS] Transition draft -> validated correctly fails") # --------------------------------------------------------------------------- # 3. red_executing -> blue_evaluating works # --------------------------------------------------------------------------- @patch("app.services.test_workflow_service.log_action") def test_red_executing_to_blue_evaluating(mock_log): test = _make_test(TestState.red_executing) user = _make_user("red_tech") db = _make_db() result = submit_red_evidence(db, test, user) assert result.state == TestState.blue_evaluating db.commit.assert_called() mock_log.assert_called() print(" [PASS] Transition red_executing -> blue_evaluating works") # --------------------------------------------------------------------------- # 4. check_dual_validation -> validated when both approved # --------------------------------------------------------------------------- @patch("app.services.test_workflow_service.log_action") def test_dual_validation_both_approved(mock_log): test = _make_test(TestState.in_review) user_red = _make_user("red_lead") user_blue = _make_user("blue_lead") db = _make_db() validate_as_red_lead(db, test, user_red, "approved", "LGTM") validate_as_blue_lead(db, test, user_blue, "approved", "Detection OK") assert test.state == TestState.validated print(" [PASS] check_dual_validation -> validated when both approved") # --------------------------------------------------------------------------- # 5. check_dual_validation -> rejected when one rejects # --------------------------------------------------------------------------- @patch("app.services.test_workflow_service.log_action") def test_dual_validation_one_rejected(mock_log): test = _make_test(TestState.in_review) user_red = _make_user("red_lead") db = _make_db() validate_as_red_lead(db, test, user_red, "rejected", "Insufficient evidence") assert test.state == TestState.rejected print(" [PASS] check_dual_validation -> rejected when one rejects") # --------------------------------------------------------------------------- # 6. reopen_test clears validation fields # --------------------------------------------------------------------------- @patch("app.services.test_workflow_service.log_action") def test_reopen_clears_validation(mock_log): test = _make_test( TestState.rejected, red_validation_status="rejected", blue_validation_status="approved", ) user = _make_user("red_lead") db = _make_db() result = reopen_test(db, test, user) assert result.state == TestState.draft assert result.red_validation_status is None assert result.blue_validation_status is None assert result.red_validated_by is None assert result.red_validated_at is None assert result.red_validation_notes is None assert result.blue_validated_by is None assert result.blue_validated_at is None assert result.blue_validation_notes is None db.commit.assert_called() print(" [PASS] reopen_test clears validation fields and moves to draft") # --------------------------------------------------------------------------- # 7. Every transition generates an audit log # --------------------------------------------------------------------------- @patch("app.services.test_workflow_service.log_action") def test_transitions_generate_audit_logs(mock_log): test = _make_test(TestState.draft) user = _make_user("red_tech") db = _make_db() start_execution(db, test, user) assert mock_log.call_count >= 1 c1 = mock_log.call_count submit_red_evidence(db, test, user) assert mock_log.call_count > c1 c2 = mock_log.call_count submit_blue_evidence(db, test, user) assert mock_log.call_count > c2 print(" [PASS] Each transition generates an audit log") # --------------------------------------------------------------------------- # 8. can_transition correctness # --------------------------------------------------------------------------- def test_can_transition_map(): test = _make_test(TestState.draft) assert can_transition(test, TestState.red_executing) is True assert can_transition(test, TestState.validated) is False assert can_transition(test, TestState.blue_evaluating) is False test.state = TestState.red_executing assert can_transition(test, TestState.blue_evaluating) is True assert can_transition(test, TestState.draft) is False test.state = TestState.blue_evaluating assert can_transition(test, TestState.in_review) is True test.state = TestState.in_review assert can_transition(test, TestState.validated) is True assert can_transition(test, TestState.rejected) is True assert can_transition(test, TestState.draft) is False test.state = TestState.rejected assert can_transition(test, TestState.draft) is True test.state = TestState.validated assert can_transition(test, TestState.draft) is False assert can_transition(test, TestState.rejected) is False print(" [PASS] can_transition map is correct") # --------------------------------------------------------------------------- # Run all # --------------------------------------------------------------------------- if __name__ == "__main__": print("T-106 Validation: Test Workflow Service") print("=" * 50) test_draft_to_red_executing() test_draft_to_validated_fails() test_red_executing_to_blue_evaluating() test_dual_validation_both_approved() test_dual_validation_one_rejected() test_reopen_clears_validation() test_transitions_generate_audit_logs() test_can_transition_map() print("=" * 50) print("ALL T-106 validations PASSED!")