From 1338d52cd09a261746e7d1bcafa5a0627b1f02db Mon Sep 17 00:00:00 2001 From: Kitos Date: Wed, 18 Feb 2026 15:49:59 +0100 Subject: [PATCH] fix(workflow): enforce domain state machine in dual validation path validate_as_red/blue_lead now delegate to TestEntity. check_dual_validation routes through entity instead of assigning test.state directly. Side effects dispatched via domain events. Entity raises InvalidOperationError for backward compat. Removed 4 dead V1 xfail tests, fixed 2 real test issues. 224 passed, 0 xfailed. --- backend/app/domain/test_entity.py | 23 ++- backend/app/services/test_workflow_service.py | 115 +++++++------- backend/tests/test_tests.py | 141 +++--------------- 3 files changed, 89 insertions(+), 190 deletions(-) diff --git a/backend/app/domain/test_entity.py b/backend/app/domain/test_entity.py index 514bdda..182d64b 100644 --- a/backend/app/domain/test_entity.py +++ b/backend/app/domain/test_entity.py @@ -28,7 +28,11 @@ from dataclasses import dataclass, field from datetime import datetime from typing import Any -from app.domain.errors import BusinessRuleViolation, InvalidStateTransition +from app.domain.errors import ( + BusinessRuleViolation, + InvalidOperationError, + InvalidStateTransition, +) # ── Value objects ──────────────────────────────────────────────────── @@ -307,9 +311,22 @@ class TestEntity: self.paused_at = None return extra + def check_dual_validation(self) -> None: + """Evaluate both leads' votes and advance state if appropriate. + + - Both **approved** -> ``validated`` + - Either **rejected** -> ``rejected`` + - Otherwise no change (waiting for the other lead). + + Called automatically by :meth:`validate_red` and :meth:`validate_blue`. + Also available as a standalone entry point for backward compatibility + when validation fields are set externally. + """ + self._check_dual_validation() + def _assert_in_review(self, side: str) -> None: if self.state != TestState.in_review: - raise BusinessRuleViolation( + raise InvalidOperationError( f"Cannot validate {side} side while test is in " f"'{self.state.value}' state (must be in_review)" ) @@ -317,7 +334,7 @@ class TestEntity: @staticmethod def _assert_valid_vote(status: str) -> None: if status not in ("approved", "rejected"): - raise BusinessRuleViolation( + raise InvalidOperationError( "validation_status must be 'approved' or 'rejected'" ) diff --git a/backend/app/services/test_workflow_service.py b/backend/app/services/test_workflow_service.py index 28d922c..d2f0dfd 100644 --- a/backend/app/services/test_workflow_service.py +++ b/backend/app/services/test_workflow_service.py @@ -333,26 +333,14 @@ def validate_as_red_lead( ) -> Test: """Record Red Lead's validation decision. - *validation_status* must be ``"approved"`` or ``"rejected"``. - After recording the decision, :func:`check_dual_validation` is called - to potentially advance the test to ``validated`` or ``rejected``. + Delegates validation rules and state mutation entirely to + :meth:`TestEntity.validate_red`. If both leads have voted the + entity will also advance the test to ``validated`` or ``rejected``. """ - current = test.state.value if isinstance(test.state, TestState) else test.state - if test.state not in (TestState.in_review,): - raise InvalidOperationError( - f"Cannot validate red side while test is in '{current}' state (must be in_review)" - ) - - if validation_status not in ("approved", "rejected"): - raise InvalidOperationError( - "validation_status must be 'approved' or 'rejected'" - ) - - now = datetime.utcnow() - test.red_validation_status = validation_status - test.red_validated_by = user.id - test.red_validated_at = now - test.red_validation_notes = notes + entity = TestEntity.from_orm(test) + entity.validate_red(validation_status, by=user.id, notes=notes) + entity.apply_to(test) + db.flush() log_action( db, @@ -367,7 +355,7 @@ def validate_as_red_lead( }, ) - check_dual_validation(db, test) + _dispatch_dual_validation_effects(db, test, entity) return test @@ -380,26 +368,14 @@ def validate_as_blue_lead( ) -> Test: """Record Blue Lead's validation decision. - *validation_status* must be ``"approved"`` or ``"rejected"``. - After recording the decision, :func:`check_dual_validation` is called - to potentially advance the test to ``validated`` or ``rejected``. + Delegates validation rules and state mutation entirely to + :meth:`TestEntity.validate_blue`. If both leads have voted the + entity will also advance the test to ``validated`` or ``rejected``. """ - current = test.state.value if isinstance(test.state, TestState) else test.state - if test.state not in (TestState.in_review,): - raise InvalidOperationError( - f"Cannot validate blue side while test is in '{current}' state (must be in_review)" - ) - - if validation_status not in ("approved", "rejected"): - raise InvalidOperationError( - "validation_status must be 'approved' or 'rejected'" - ) - - now = datetime.utcnow() - test.blue_validation_status = validation_status - test.blue_validated_by = user.id - test.blue_validated_at = now - test.blue_validation_notes = notes + entity = TestEntity.from_orm(test) + entity.validate_blue(validation_status, by=user.id, notes=notes) + entity.apply_to(test) + db.flush() log_action( db, @@ -414,43 +390,52 @@ def validate_as_blue_lead( }, ) - check_dual_validation(db, test) + _dispatch_dual_validation_effects(db, test, entity) return test def check_dual_validation(db: Session, test: Test) -> Test: """Evaluate both leads' decisions and advance the test if both have voted. - - Both **approved** → ``validated`` - - Either **rejected** → ``rejected`` - - Otherwise no state change (waiting for the other lead). - - Commits only when the state actually changes. + All state mutation is delegated to :meth:`TestEntity.check_dual_validation`. + This function never assigns ``test.state`` directly. """ - red_status = test.red_validation_status - blue_status = test.blue_validation_status + entity = TestEntity.from_orm(test) + entity.check_dual_validation() + entity.apply_to(test) - if red_status == "rejected" or blue_status == "rejected": - test.state = TestState.rejected - try: - notify_test_state_change(db, test, "rejected") - except Exception as e: - logger.warning("Notification failed for test %s (rejected): %s", test.id, e, exc_info=True) - elif red_status == "approved" and blue_status == "approved": - test.state = TestState.validated - # Invalidate cached scores — a validation changes org-level numbers - try: - from app.services.score_cache import invalidate - invalidate() - except Exception as e: - logger.warning("Score cache invalidation failed: %s", e, exc_info=True) - try: - notify_test_state_change(db, test, "validated") - except Exception as e: - logger.warning("Notification failed for test %s (validated): %s", test.id, e, exc_info=True) + _dispatch_dual_validation_effects(db, test, entity) return test +def _dispatch_dual_validation_effects( + db: Session, test: Test, entity: TestEntity +) -> None: + """Dispatch side effects (notifications, cache) based on domain events.""" + for event in entity.events: + if event.name == "dual_validation_approved": + try: + from app.services.score_cache import invalidate + invalidate() + except Exception as e: + logger.warning("Score cache invalidation failed: %s", e, exc_info=True) + try: + notify_test_state_change(db, test, "validated") + except Exception as e: + logger.warning( + "Notification failed for test %s (validated): %s", + test.id, e, exc_info=True, + ) + elif event.name == "dual_validation_rejected": + try: + notify_test_state_change(db, test, "rejected") + except Exception as e: + logger.warning( + "Notification failed for test %s (rejected): %s", + test.id, e, exc_info=True, + ) + + def handle_remediation_completed(db: Session, test: Test, user: User) -> Test | None: """Create a re-test when remediation is completed. diff --git a/backend/tests/test_tests.py b/backend/tests/test_tests.py index fd7c96f..84ad8c5 100644 --- a/backend/tests/test_tests.py +++ b/backend/tests/test_tests.py @@ -1,12 +1,8 @@ -"""Tests for security test endpoints. +"""Tests for security test endpoints (V2 API). -NOTE: These tests were written for the V1 API (single validate/reject -endpoints). The V2 workflow uses dual Red/Blue validation, different -RBAC roles, and a new state machine. Integration tests for V2 live in -``test_integration_v2.py`` and ``test_workflow.py``. - -Tests in this file that exercise deprecated V1 endpoints are marked as -``xfail`` so they don't block the suite. +Covers the test CRUD and basic workflow via the REST API. +For full workflow logic tests see ``test_workflow.py`` and +``test_integration_v2.py``. """ import pytest @@ -23,21 +19,20 @@ def technique(client, auth_headers): return response.json() -@pytest.mark.xfail(reason="V1 test: auth bypass when Redis unavailable in test env") -def test_create_test_requires_auth(client, technique): - """Test that creating a test requires authentication.""" +def test_create_test_requires_auth(client): + """POST /tests without token returns 401 or 403.""" response = client.post( "/api/v1/tests", json={ - "technique_id": technique["id"], + "technique_id": "00000000-0000-0000-0000-000000000000", "name": "Test Name", }, ) - assert response.status_code == 401 + assert response.status_code in (401, 403) -def test_create_test_success(client, red_tech_headers, technique): - """Test successful test creation.""" +def test_create_test_success(client, auth_headers, technique): + """Admin can create a test via POST /tests.""" response = client.post( "/api/v1/tests", json={ @@ -46,7 +41,7 @@ def test_create_test_success(client, red_tech_headers, technique): "description": "Test description", "platform": "windows", }, - headers=red_tech_headers, + headers=auth_headers, ) assert response.status_code == 201 data = response.json() @@ -55,126 +50,28 @@ def test_create_test_success(client, red_tech_headers, technique): assert data["technique_id"] == technique["id"] -@pytest.mark.xfail(reason="V1 test: RBAC returns 403 before 404 check in V2") -def test_create_test_nonexistent_technique(client, red_tech_headers): - """Test creating a test with non-existent technique fails.""" +def test_create_test_nonexistent_technique(client, auth_headers): + """Creating a test with non-existent technique fails.""" response = client.post( "/api/v1/tests", json={ "technique_id": "00000000-0000-0000-0000-000000000000", "name": "Test", }, - headers=red_tech_headers, + headers=auth_headers, ) assert response.status_code == 404 -def test_get_test_by_id(client, red_tech_headers, technique): - """Test getting a test by ID.""" - # Create a test +def test_get_test_by_id(client, auth_headers, technique): + """GET /tests/{id} returns the test.""" create_response = client.post( "/api/v1/tests", json={"technique_id": technique["id"], "name": "Test"}, - headers=red_tech_headers, + headers=auth_headers, ) test_id = create_response.json()["id"] - - # Get it - response = client.get(f"/api/v1/tests/{test_id}", headers=red_tech_headers) + + response = client.get(f"/api/v1/tests/{test_id}", headers=auth_headers) assert response.status_code == 200 assert response.json()["id"] == test_id - - -@pytest.mark.xfail(reason="V1 test: /validate endpoint replaced by dual-validation in V2") -def test_validate_test(client, auth_headers, red_tech_headers, technique): - """Test validating a test updates status correctly.""" - # Create a test - create_response = client.post( - "/api/v1/tests", - json={"technique_id": technique["id"], "name": "Test"}, - headers=red_tech_headers, - ) - test_id = create_response.json()["id"] - - # Validate it (requires lead/admin) - response = client.post( - f"/api/v1/tests/{test_id}/validate", - json={"result": "detected"}, - headers=auth_headers, - ) - assert response.status_code == 200 - data = response.json() - assert data["state"] == "validated" - assert data["result"] == "detected" - assert data["validated_by"] is not None - - -@pytest.mark.xfail(reason="V1 test: /validate endpoint replaced by dual-validation in V2") -def test_validate_test_updates_technique_status(client, auth_headers, red_tech_headers, technique): - """Test that validating a test recalculates technique status.""" - # Create and validate a test - create_response = client.post( - "/api/v1/tests", - json={"technique_id": technique["id"], "name": "Test"}, - headers=red_tech_headers, - ) - test_id = create_response.json()["id"] - - client.post( - f"/api/v1/tests/{test_id}/validate", - json={"result": "detected"}, - headers=auth_headers, - ) - - # Check technique status was updated - response = client.get( - f"/api/v1/techniques/{technique['mitre_id']}", - headers=auth_headers, - ) - assert response.json()["status_global"] == "validated" - - -@pytest.mark.xfail(reason="V1 test: /reject endpoint replaced by dual-validation in V2") -def test_reject_test(client, auth_headers, red_tech_headers, technique): - """Test rejecting a test.""" - # Create a test - create_response = client.post( - "/api/v1/tests", - json={"technique_id": technique["id"], "name": "Test"}, - headers=red_tech_headers, - ) - test_id = create_response.json()["id"] - - # Reject it - response = client.post( - f"/api/v1/tests/{test_id}/reject", - headers=auth_headers, - ) - assert response.status_code == 200 - assert response.json()["state"] == "rejected" - - -@pytest.mark.xfail(reason="V1 test: /validate endpoint replaced by dual-validation in V2") -def test_update_test_only_in_draft(client, auth_headers, red_tech_headers, technique): - """Test that tests can only be updated when in draft/rejected state.""" - # Create and validate a test - create_response = client.post( - "/api/v1/tests", - json={"technique_id": technique["id"], "name": "Test"}, - headers=red_tech_headers, - ) - test_id = create_response.json()["id"] - - client.post( - f"/api/v1/tests/{test_id}/validate", - json={"result": "detected"}, - headers=auth_headers, - ) - - # Try to update validated test - response = client.patch( - f"/api/v1/tests/{test_id}", - json={"name": "New Name"}, - headers=red_tech_headers, - ) - assert response.status_code == 400