"""T-125: Tests del flujo de trabajo Red/Blue. Comprehensive tests covering the full test lifecycle: draft -> red_executing -> blue_evaluating -> in_review -> validated/rejected Uses mock objects to test the workflow service and router logic without requiring a running database. """ import sys import os import uuid from unittest.mock import MagicMock, patch from types import ModuleType from datetime import datetime # --------------------------------------------------------------------------- # Stub heavy dependencies before importing app modules # --------------------------------------------------------------------------- backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if backend_dir not in sys.path: sys.path.insert(0, backend_dir) if "pydantic_settings" not in sys.modules: _ps = ModuleType("pydantic_settings") class _BaseSettings: def __init__(self, **kwargs): pass def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) _ps.BaseSettings = _BaseSettings sys.modules["pydantic_settings"] = _ps if "app.config" not in sys.modules: _cfg = ModuleType("app.config") class _FakeSettings: DATABASE_URL = "sqlite:///:memory:" SECRET_KEY = "test" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 REDIS_URL = "redis://localhost:6379/0" MINIO_ENDPOINT = "localhost:9000" MINIO_ACCESS_KEY = "test" MINIO_SECRET_KEY = "test" MINIO_BUCKET = "test" MINIO_SECURE = False MAX_RETEST_COUNT = 3 REPORT_TEMPLATES_DIR = "app/templates/reports" REPORT_OUTPUT_DIR = "/tmp/aegis_reports" COMPANY_NAME = "Test Org" COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png" JIRA_ENABLED = False JIRA_URL = "" JIRA_USERNAME = "" JIRA_API_TOKEN = "" JIRA_IS_CLOUD = True JIRA_DEFAULT_PROJECT = "" JIRA_ISSUE_TYPE_TEST = "Task" JIRA_ISSUE_TYPE_CAMPAIGN = "Epic" TEMPO_ENABLED = False TEMPO_API_TOKEN = "" TEMPO_DEFAULT_WORK_TYPE = "Red Team" NVD_API_KEY = "" STALE_THRESHOLD_DAYS = 365 CORS_ORIGINS = "http://localhost:3000" SCORING_WEIGHT_TESTS = 40 SCORING_WEIGHT_DETECTION_RULES = 20 SCORING_WEIGHT_D3FEND = 15 SCORING_WEIGHT_FRESHNESS = 15 SCORING_WEIGHT_PLATFORM_DIVERSITY = 10 _cfg.settings = _FakeSettings() sys.modules["app.config"] = _cfg if "app.database" not in sys.modules: _db = ModuleType("app.database") _db.Base = type("Base", (), {"metadata": MagicMock()}) _db.get_db = MagicMock() sys.modules["app.database"] = _db for _mod in [ "taxii2client", "taxii2client.v20", "jose", "boto3", "botocore", "botocore.exceptions", "apscheduler", "apscheduler.schedulers", "apscheduler.schedulers.background", "apscheduler.triggers", "apscheduler.triggers.cron", ]: if _mod not in sys.modules: m = ModuleType(_mod) if _mod == "taxii2client.v20": m.Server = MagicMock elif _mod == "jose": m.JWTError = Exception; m.jwt = MagicMock() elif _mod == "boto3": m.client = MagicMock() elif _mod == "botocore.exceptions": m.ClientError = Exception elif _mod == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock elif _mod == "apscheduler.triggers.cron": m.CronTrigger = MagicMock sys.modules[_mod] = m # --------------------------------------------------------------------------- # Imports # --------------------------------------------------------------------------- from fastapi import HTTPException from app.domain.exceptions import InvalidOperationError, InvalidTransitionError from app.models.enums import TestState, TestResult from app.services.test_workflow_service import ( VALID_TRANSITIONS, can_transition, transition_state, start_execution, submit_red_evidence, submit_blue_evidence, validate_as_red_lead, validate_as_blue_lead, check_dual_validation, reopen_test, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_test(state: TestState = TestState.draft, **kwargs) -> MagicMock: t = MagicMock() t.id = uuid.uuid4() t.name = "Test Security Check" t.technique_id = uuid.uuid4() t.state = state t.red_validation_status = kwargs.get("red_validation_status", None) t.blue_validation_status = kwargs.get("blue_validation_status", None) t.red_validated_by = kwargs.get("red_validated_by", None) t.red_validated_at = kwargs.get("red_validated_at", None) t.red_validation_notes = kwargs.get("red_validation_notes", None) t.blue_validated_by = kwargs.get("blue_validated_by", None) t.blue_validated_at = kwargs.get("blue_validated_at", None) t.blue_validation_notes = kwargs.get("blue_validation_notes", None) t.execution_date = kwargs.get("execution_date", None) t.red_started_at = kwargs.get("red_started_at", None) t.blue_started_at = kwargs.get("blue_started_at", None) t.paused_at = kwargs.get("paused_at", None) t.red_paused_seconds = kwargs.get("red_paused_seconds", 0) t.blue_paused_seconds = kwargs.get("blue_paused_seconds", 0) return t def _make_user(role: str = "red_tech") -> MagicMock: user = MagicMock() user.id = uuid.uuid4() user.role = role return user def _make_db() -> MagicMock: return MagicMock() # =========================================================================== # 1. test_full_happy_path # draft -> red_executing -> blue_evaluating -> in_review -> validated # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_full_happy_path(mock_log): """draft -> red_executing -> blue_evaluating -> in_review -> validated""" test = _make_test(TestState.draft) red_tech = _make_user("red_tech") blue_tech = _make_user("blue_tech") red_lead = _make_user("red_lead") blue_lead = _make_user("blue_lead") db = _make_db() # Step 1: draft -> red_executing result = start_execution(db, test, red_tech) assert result.state == TestState.red_executing assert result.execution_date is not None # Step 2: red_executing -> blue_evaluating result = submit_red_evidence(db, result, red_tech) assert result.state == TestState.blue_evaluating # Step 3: blue_evaluating -> in_review result = submit_blue_evidence(db, result, blue_tech) assert result.state == TestState.in_review # Step 4: Red Lead approves result = validate_as_red_lead(db, result, red_lead, "approved", "Attack well documented") assert result.red_validation_status == "approved" assert result.red_validated_by == red_lead.id assert result.red_validated_at is not None assert result.red_validation_notes == "Attack well documented" # Still in_review (waiting for blue lead) assert result.state == TestState.in_review # Step 5: Blue Lead approves -> validated result = validate_as_blue_lead(db, result, blue_lead, "approved", "Detection confirmed") assert result.blue_validation_status == "approved" assert result.state == TestState.validated # Verify audit logs were generated at each step assert mock_log.call_count >= 5 # =========================================================================== # 2. test_rejection_and_reopen # in_review -> rejected -> draft -> red_executing -> ... # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_rejection_and_reopen(mock_log): """in_review -> rejected -> draft -> red_executing -> ...""" test = _make_test(TestState.draft) red_tech = _make_user("red_tech") blue_tech = _make_user("blue_tech") red_lead = _make_user("red_lead") db = _make_db() # Advance to in_review start_execution(db, test, red_tech) submit_red_evidence(db, test, red_tech) submit_blue_evidence(db, test, blue_tech) assert test.state == TestState.in_review # Red Lead rejects -> rejected validate_as_red_lead(db, test, red_lead, "rejected", "Need more evidence") assert test.state == TestState.rejected # Reopen -> draft reopen_test(db, test, red_lead) assert test.state == TestState.draft # Restart the cycle start_execution(db, test, red_tech) assert test.state == TestState.red_executing # =========================================================================== # 3. test_invalid_transitions # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_invalid_transitions(mock_log): """Verify that invalid state transitions raise InvalidTransitionError.""" db = _make_db() user = _make_user("admin") # draft -> validated (should fail) test = _make_test(TestState.draft) try: transition_state(db, test, TestState.validated, user) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # draft -> blue_evaluating (should fail) test = _make_test(TestState.draft) try: transition_state(db, test, TestState.blue_evaluating, user) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # red_executing -> in_review (should fail, must go through blue_evaluating) test = _make_test(TestState.red_executing) try: transition_state(db, test, TestState.in_review, user) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # validated -> anything (terminal state) test = _make_test(TestState.validated) try: transition_state(db, test, TestState.draft, user) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # rejected -> red_executing (must go through draft first) test = _make_test(TestState.rejected) try: transition_state(db, test, TestState.red_executing, user) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # =========================================================================== # 4. test_red_tech_cannot_access_blue_phase # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_red_tech_cannot_access_blue_phase(mock_log): """Red tech cannot submit blue evidence (wrong transition from wrong state).""" db = _make_db() red_tech = _make_user("red_tech") # A test in red_executing cannot jump to in_review test = _make_test(TestState.red_executing) try: submit_blue_evidence(db, test, red_tech) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # Red tech cannot validate (test must be in blue_evaluating for submit_blue) test2 = _make_test(TestState.draft) try: submit_blue_evidence(db, test2, red_tech) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # =========================================================================== # 5. test_blue_tech_cannot_access_red_phase # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_blue_tech_cannot_access_red_phase(mock_log): """Blue tech cannot start execution or submit red evidence.""" db = _make_db() blue_tech = _make_user("blue_tech") # Blue tech cannot start execution (test must be in draft -> red_executing) # The workflow service doesn't check role, but the router does. # At service level, blue_evaluating -> blue_evaluating is invalid transition: test = _make_test(TestState.blue_evaluating) try: start_execution(db, test, blue_tech) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # Blue tech cannot submit red evidence on a draft test test2 = _make_test(TestState.draft) try: submit_red_evidence(db, test2, blue_tech) assert False, "Should have raised InvalidTransitionError" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # =========================================================================== # 6. test_dual_validation_both_approve # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_dual_validation_both_approve(mock_log): """Both managers approve -> test becomes validated.""" test = _make_test(TestState.in_review) red_lead = _make_user("red_lead") blue_lead = _make_user("blue_lead") db = _make_db() # Red Lead approves first validate_as_red_lead(db, test, red_lead, "approved", "LGTM") assert test.red_validation_status == "approved" # Not yet validated — waiting for blue assert test.state == TestState.in_review # Blue Lead approves validate_as_blue_lead(db, test, blue_lead, "approved", "Detection verified") assert test.blue_validation_status == "approved" assert test.state == TestState.validated # =========================================================================== # 7. test_dual_validation_one_rejects # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_dual_validation_one_rejects(mock_log): """One manager rejects -> test becomes rejected immediately.""" test = _make_test(TestState.in_review) red_lead = _make_user("red_lead") db = _make_db() validate_as_red_lead(db, test, red_lead, "rejected", "Insufficient evidence") assert test.red_validation_status == "rejected" assert test.state == TestState.rejected @patch("app.services.test_workflow_service.log_action") def test_dual_validation_blue_rejects_first(mock_log): """Blue Lead rejects first -> test becomes rejected immediately.""" test = _make_test(TestState.in_review) blue_lead = _make_user("blue_lead") db = _make_db() validate_as_blue_lead(db, test, blue_lead, "rejected", "Detection not adequate") assert test.blue_validation_status == "rejected" assert test.state == TestState.rejected @patch("app.services.test_workflow_service.log_action") def test_dual_validation_red_approves_blue_rejects(mock_log): """Red approves, then blue rejects -> rejected.""" test = _make_test(TestState.in_review) red_lead = _make_user("red_lead") blue_lead = _make_user("blue_lead") db = _make_db() validate_as_red_lead(db, test, red_lead, "approved", "Good attack") assert test.state == TestState.in_review # waiting for blue validate_as_blue_lead(db, test, blue_lead, "rejected", "Bad detection") assert test.state == TestState.rejected # =========================================================================== # 8. test_evidence_team_separation # =========================================================================== def test_evidence_team_separation(): """Verify evidence router logic separates red and blue evidence correctly.""" from app.domain.errors import BusinessRuleViolation, PermissionViolation from app.models.enums import TeamSide from app.services.evidence_service import validate_upload_permission # Red tech can upload red evidence in draft test = _make_test(TestState.draft) red_user = _make_user("red_tech") red_user.role = "red_tech" validate_upload_permission(test, TeamSide.red, red_user.role) # should not raise # Red tech can upload red evidence in red_executing test.state = TestState.red_executing validate_upload_permission(test, TeamSide.red, red_user.role) # should not raise # Red tech CANNOT upload red evidence in blue_evaluating (state violation -> 400) test.state = TestState.blue_evaluating try: validate_upload_permission(test, TeamSide.red, red_user.role) assert False, "Should have raised BusinessRuleViolation" except BusinessRuleViolation: pass # Red tech CANNOT upload blue evidence (role violation -> 403) test.state = TestState.blue_evaluating try: validate_upload_permission(test, TeamSide.blue, red_user.role) assert False, "Should have raised PermissionViolation" except PermissionViolation: pass # Blue tech can upload blue evidence in blue_evaluating test.state = TestState.blue_evaluating blue_user = _make_user("blue_tech") blue_user.role = "blue_tech" validate_upload_permission(test, TeamSide.blue, blue_user.role) # should not raise # Blue tech CANNOT upload blue evidence in draft (state violation -> 400) test.state = TestState.draft try: validate_upload_permission(test, TeamSide.blue, blue_user.role) assert False, "Should have raised BusinessRuleViolation" except BusinessRuleViolation: pass # Blue tech CANNOT upload red evidence (role violation -> 403) test.state = TestState.draft try: validate_upload_permission(test, TeamSide.red, blue_user.role) assert False, "Should have raised PermissionViolation" except PermissionViolation: pass # =========================================================================== # 9. test_red_edit_allowed_in_draft_and_red_executing # =========================================================================== def test_red_edit_allowed_in_draft_and_red_executing(): """Verify the red update checks that state is draft or red_executing.""" from app.services.test_crud_service import update_test_red import inspect source = inspect.getsource(update_test_red) # The service must guard against states other than draft/red_executing assert "draft" in source, "Red update must allow draft state" assert "red_executing" in source, "Red update must allow red_executing state" assert "BusinessRuleViolation" in source, "Must raise domain exception for invalid state (mapped to 400)" # =========================================================================== # 10. test_reopen_clears_validation_fields # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_reopen_clears_validation_fields(mock_log): """Reopen clears all red/blue validation status, notes, timestamps.""" test = _make_test( TestState.rejected, red_validation_status="rejected", red_validated_by=uuid.uuid4(), red_validated_at=datetime.utcnow(), red_validation_notes="Bad attack", blue_validation_status="approved", blue_validated_by=uuid.uuid4(), blue_validated_at=datetime.utcnow(), blue_validation_notes="Good detection", ) user = _make_user("red_lead") db = _make_db() result = reopen_test(db, test, user) assert result.state == TestState.draft assert result.red_validation_status is None assert result.red_validated_by is None assert result.red_validated_at is None assert result.red_validation_notes is None assert result.blue_validation_status is None assert result.blue_validated_by is None assert result.blue_validated_at is None assert result.blue_validation_notes is None db.flush.assert_called() # =========================================================================== # 11. test_cannot_validate_outside_in_review # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_cannot_validate_outside_in_review(mock_log): """Managers cannot validate a test that is not in in_review state.""" db = _make_db() red_lead = _make_user("red_lead") blue_lead = _make_user("blue_lead") for state in [TestState.draft, TestState.red_executing, TestState.blue_evaluating, TestState.validated, TestState.rejected]: test = _make_test(state) try: validate_as_red_lead(db, test, red_lead, "approved", "OK") assert False, f"Red Lead should not validate in {state.value}" except InvalidOperationError as exc: assert exc.code == "INVALID_OPERATION" test2 = _make_test(state) try: validate_as_blue_lead(db, test2, blue_lead, "approved", "OK") assert False, f"Blue Lead should not validate in {state.value}" except InvalidOperationError as exc: assert exc.code == "INVALID_OPERATION" # =========================================================================== # 12. test_cannot_reopen_non_rejected_test # =========================================================================== @patch("app.services.test_workflow_service.log_action") def test_cannot_reopen_non_rejected_test(mock_log): """Reopen only works on rejected tests.""" db = _make_db() user = _make_user("red_lead") for state in [TestState.draft, TestState.red_executing, TestState.blue_evaluating, TestState.in_review, TestState.validated]: test = _make_test(state) try: reopen_test(db, test, user) assert False, f"Should not reopen from {state.value}" except InvalidTransitionError as exc: assert exc.code == "INVALID_TRANSITION" # --------------------------------------------------------------------------- # Run all # --------------------------------------------------------------------------- if __name__ == "__main__": print("T-125 Validation: Workflow Tests") print("=" * 55) test_full_happy_path() test_rejection_and_reopen() test_invalid_transitions() test_red_tech_cannot_access_blue_phase() test_blue_tech_cannot_access_red_phase() test_dual_validation_both_approve() test_dual_validation_one_rejects() test_dual_validation_blue_rejects_first() test_dual_validation_red_approves_blue_rejects() test_evidence_team_separation() test_red_edit_allowed_in_draft_and_red_executing() test_reopen_clears_validation_fields() test_cannot_validate_outside_in_review() test_cannot_reopen_non_rejected_test() print("=" * 55) print("ALL T-125 validations PASSED!")