From cda59de426850428ac666bcdd1b9347dac1e061f Mon Sep 17 00:00:00 2001 From: Kitos Date: Mon, 9 Feb 2026 13:35:40 +0100 Subject: [PATCH] test(phase-17): add automated tests for Red/Blue workflow, templates CRUD, and V2 metrics (T-125, T-126, T-127) --- backend/tests/conftest.py | 96 +++++ backend/tests/test_metrics_v2.py | 409 +++++++++++++++++++ backend/tests/test_templates_crud.py | 285 ++++++++++++++ backend/tests/test_workflow.py | 565 +++++++++++++++++++++++++++ 4 files changed, 1355 insertions(+) create mode 100644 backend/tests/test_metrics_v2.py create mode 100644 backend/tests/test_templates_crud.py create mode 100644 backend/tests/test_workflow.py diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 15fe9d8..2bad85e 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -86,6 +86,54 @@ def red_tech_user(db): return user +@pytest.fixture(scope="function") +def blue_tech_user(db): + """Create a blue_tech user for testing.""" + user = User( + username="bluetech", + email="bluetech@test.com", + hashed_password=hash_password("bluetech123"), + role="blue_tech", + is_active=True, + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + +@pytest.fixture(scope="function") +def red_lead_user(db): + """Create a red_lead user for testing.""" + user = User( + username="redlead", + email="redlead@test.com", + hashed_password=hash_password("redlead123"), + role="red_lead", + is_active=True, + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + +@pytest.fixture(scope="function") +def blue_lead_user(db): + """Create a blue_lead user for testing.""" + user = User( + username="bluelead", + email="bluelead@test.com", + hashed_password=hash_password("bluelead123"), + role="blue_lead", + is_active=True, + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + @pytest.fixture(scope="function") def admin_token(client, admin_user): """Get an auth token for the admin user.""" @@ -116,3 +164,51 @@ def auth_headers(admin_token): def red_tech_headers(red_tech_token): """Return authorization headers for red_tech user.""" return {"Authorization": f"Bearer {red_tech_token}"} + + +@pytest.fixture(scope="function") +def blue_tech_token(client, blue_tech_user): + """Get an auth token for the blue_tech user.""" + response = client.post( + "/api/v1/auth/login", + data={"username": "bluetech", "password": "bluetech123"}, + ) + return response.json()["access_token"] + + +@pytest.fixture(scope="function") +def blue_tech_headers(blue_tech_token): + """Return authorization headers for blue_tech user.""" + return {"Authorization": f"Bearer {blue_tech_token}"} + + +@pytest.fixture(scope="function") +def red_lead_token(client, red_lead_user): + """Get an auth token for the red_lead user.""" + response = client.post( + "/api/v1/auth/login", + data={"username": "redlead", "password": "redlead123"}, + ) + return response.json()["access_token"] + + +@pytest.fixture(scope="function") +def red_lead_headers(red_lead_token): + """Return authorization headers for red_lead user.""" + return {"Authorization": f"Bearer {red_lead_token}"} + + +@pytest.fixture(scope="function") +def blue_lead_token(client, blue_lead_user): + """Get an auth token for the blue_lead user.""" + response = client.post( + "/api/v1/auth/login", + data={"username": "bluelead", "password": "bluelead123"}, + ) + return response.json()["access_token"] + + +@pytest.fixture(scope="function") +def blue_lead_headers(blue_lead_token): + """Return authorization headers for blue_lead user.""" + return {"Authorization": f"Bearer {blue_lead_token}"} diff --git a/backend/tests/test_metrics_v2.py b/backend/tests/test_metrics_v2.py new file mode 100644 index 0000000..191cdfa --- /dev/null +++ b/backend/tests/test_metrics_v2.py @@ -0,0 +1,409 @@ +"""T-127: Tests de métricas actualizadas. + +Tests for the V2 metrics endpoints (pipeline, team-activity, validation-rate) +and for the technique status recalculation logic with the new test states. +""" + +import sys +import os +import uuid +import inspect +from unittest.mock import MagicMock, patch, PropertyMock +from types import ModuleType + +# --------------------------------------------------------------------------- +# Stub heavy dependencies before importing app modules +# --------------------------------------------------------------------------- + +backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if backend_dir not in sys.path: + sys.path.insert(0, backend_dir) + +if "pydantic_settings" not in sys.modules: + _ps = ModuleType("pydantic_settings") + class _BaseSettings: + def __init__(self, **kwargs): pass + def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) + _ps.BaseSettings = _BaseSettings + sys.modules["pydantic_settings"] = _ps + +if "app.config" not in sys.modules: + _cfg = ModuleType("app.config") + class _FakeSettings: + DATABASE_URL = "sqlite:///:memory:" + SECRET_KEY = "test" + ALGORITHM = "HS256" + ACCESS_TOKEN_EXPIRE_MINUTES = 60 + MINIO_ENDPOINT = "localhost:9000" + MINIO_ACCESS_KEY = "test" + MINIO_SECRET_KEY = "test" + MINIO_BUCKET = "test" + _cfg.settings = _FakeSettings() + sys.modules["app.config"] = _cfg + +if "app.database" not in sys.modules: + _db = ModuleType("app.database") + _db.Base = type("Base", (), {"metadata": MagicMock()}) + _db.get_db = MagicMock() + sys.modules["app.database"] = _db + +for _mod in [ + "taxii2client", "taxii2client.v20", + "jose", "boto3", "botocore", "botocore.exceptions", + "apscheduler", "apscheduler.schedulers", + "apscheduler.schedulers.background", + "apscheduler.triggers", "apscheduler.triggers.cron", +]: + if _mod not in sys.modules: + m = ModuleType(_mod) + if _mod == "taxii2client.v20": m.Server = MagicMock + elif _mod == "jose": m.JWTError = Exception; m.jwt = MagicMock() + elif _mod == "boto3": m.client = MagicMock() + elif _mod == "botocore.exceptions": m.ClientError = Exception + elif _mod == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock + elif _mod == "apscheduler.triggers.cron": m.CronTrigger = MagicMock + sys.modules[_mod] = m + +# --------------------------------------------------------------------------- +# Imports +# --------------------------------------------------------------------------- + +from app.models.enums import TestState, TestResult, TechniqueStatus +from app.services.status_service import recalculate_technique_status +from app.routers.metrics import router as metrics_router + + +def _get_route_paths(): + routes = {} + for route in metrics_router.routes: + path = getattr(route, "path", "") + methods = getattr(route, "methods", set()) + for method in methods: + routes[f"{method} {path}"] = route + return routes + + +# --------------------------------------------------------------------------- +# Helpers for technique status recalculation tests +# --------------------------------------------------------------------------- + + +def _make_test(state: TestState, detection_result=None) -> MagicMock: + t = MagicMock() + t.id = uuid.uuid4() + t.state = state + t.detection_result = detection_result + t.red_validation_status = None + t.blue_validation_status = None + return t + + +def _make_technique(tests=None) -> MagicMock: + tech = MagicMock() + tech.id = uuid.uuid4() + tech.tests = tests or [] + tech.status_global = TechniqueStatus.not_evaluated + return tech + + +def _make_db() -> MagicMock: + return MagicMock() + + +# =========================================================================== +# 1. test_pipeline_metrics — endpoint exists and queries TestState +# =========================================================================== + + +def test_pipeline_metrics_endpoint_exists(): + """GET /metrics/test-pipeline endpoint exists.""" + routes = _get_route_paths() + found = any("test-pipeline" in k and "GET" in k for k in routes) + assert found, f"GET /metrics/test-pipeline not found. Routes: {list(routes.keys())}" + + +def test_pipeline_metrics_queries_all_states(): + """Pipeline endpoint groups by all test states.""" + from app.routers.metrics import test_pipeline + source = inspect.getsource(test_pipeline) + + assert "Test.state" in source, "Must query Test.state" + assert "group_by" in source, "Must group by state" + assert "TestPipelineCounts" in source, "Must return TestPipelineCounts schema" + + +# =========================================================================== +# 2. test_team_activity_metrics — endpoint exists and calculates correctly +# =========================================================================== + + +def test_team_activity_endpoint_exists(): + """GET /metrics/team-activity endpoint exists.""" + routes = _get_route_paths() + found = any("team-activity" in k and "GET" in k for k in routes) + assert found, f"GET /metrics/team-activity not found. Routes: {list(routes.keys())}" + + +def test_team_activity_calculates_both_teams(): + """Team activity endpoint returns data for both Red and Blue teams.""" + from app.routers.metrics import team_activity + source = inspect.getsource(team_activity) + + assert "Red Team" in source or "red" in source.lower(), "Must include Red Team data" + assert "Blue Team" in source or "blue" in source.lower(), "Must include Blue Team data" + assert "tests_completed" in source, "Must calculate completed tests" + assert "tests_pending" in source, "Must calculate pending tests" + + +def test_team_activity_red_pending_states(): + """Red Team pending includes draft and red_executing.""" + from app.routers.metrics import team_activity + source = inspect.getsource(team_activity) + + assert "draft" in source, "Red pending must include draft" + assert "red_executing" in source, "Red pending must include red_executing" + + +def test_team_activity_blue_pending_states(): + """Blue Team pending includes blue_evaluating.""" + from app.routers.metrics import team_activity + source = inspect.getsource(team_activity) + + assert "blue_evaluating" in source, "Blue pending must include blue_evaluating" + + +# =========================================================================== +# 3. test_technique_status_recalculation_with_new_states +# =========================================================================== + + +def test_technique_no_tests_is_not_evaluated(): + """Technique with no tests -> not_evaluated.""" + tech = _make_technique(tests=[]) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.not_evaluated + + +def test_technique_all_validated_detected(): + """All tests validated with detected -> technique validated.""" + tests = [ + _make_test(TestState.validated, detection_result="detected"), + _make_test(TestState.validated, detection_result="detected"), + ] + tech = _make_technique(tests=tests) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.validated + + +def test_technique_all_validated_partially_detected(): + """All tests validated with partially_detected -> technique partial.""" + tests = [ + _make_test(TestState.validated, detection_result="detected"), + _make_test(TestState.validated, detection_result="partially_detected"), + ] + tech = _make_technique(tests=tests) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.partial + + +def test_technique_all_validated_not_detected(): + """All tests validated with not_detected -> technique not_covered.""" + tests = [ + _make_test(TestState.validated, detection_result="not_detected"), + ] + tech = _make_technique(tests=tests) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.not_covered + + +def test_technique_mixed_validated_and_in_progress(): + """Some validated, some still in pipeline -> technique partial.""" + tests = [ + _make_test(TestState.validated, detection_result="detected"), + _make_test(TestState.red_executing), + ] + tech = _make_technique(tests=tests) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.partial + + +def test_technique_all_in_progress(): + """All tests in intermediate states (no validated) -> technique in_progress.""" + tests = [ + _make_test(TestState.draft), + _make_test(TestState.red_executing), + _make_test(TestState.blue_evaluating), + ] + tech = _make_technique(tests=tests) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.in_progress + + +def test_technique_with_in_review_tests(): + """Tests in in_review are still in-progress (not yet validated).""" + tests = [ + _make_test(TestState.in_review), + ] + tech = _make_technique(tests=tests) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.in_progress + + +def test_technique_with_rejected_tests(): + """Rejected tests count as in-progress (need rework).""" + tests = [ + _make_test(TestState.rejected), + ] + tech = _make_technique(tests=tests) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.in_progress + + +# =========================================================================== +# 4. test_coverage_with_dual_validation +# =========================================================================== + + +def test_coverage_correct_after_dual_validation(): + """After dual validation (both approved), technique status is correct.""" + # A test that completed the full pipeline with detection + test = _make_test(TestState.validated, detection_result="detected") + test.red_validation_status = "approved" + test.blue_validation_status = "approved" + + tech = _make_technique(tests=[test]) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.validated + + +def test_coverage_partial_when_one_detected_one_partial(): + """Mixed detection results after dual validation -> partial coverage.""" + test1 = _make_test(TestState.validated, detection_result="detected") + test1.red_validation_status = "approved" + test1.blue_validation_status = "approved" + + test2 = _make_test(TestState.validated, detection_result="partially_detected") + test2.red_validation_status = "approved" + test2.blue_validation_status = "approved" + + tech = _make_technique(tests=[test1, test2]) + db = _make_db() + + recalculate_technique_status(db, tech) + assert tech.status_global == TechniqueStatus.partial + + +# =========================================================================== +# 5. test_validation_rate_endpoint — approval/rejection rates +# =========================================================================== + + +def test_validation_rate_endpoint_exists(): + """GET /metrics/validation-rate endpoint exists.""" + routes = _get_route_paths() + found = any("validation-rate" in k and "GET" in k for k in routes) + assert found, f"GET /metrics/validation-rate not found. Routes: {list(routes.keys())}" + + +def test_validation_rate_queries_both_roles(): + """Validation rate endpoint returns data for both red_lead and blue_lead.""" + from app.routers.metrics import validation_rate + source = inspect.getsource(validation_rate) + + assert "red_validation_status" in source, "Must query red_validation_status" + assert "blue_validation_status" in source, "Must query blue_validation_status" + assert "approved" in source, "Must count approved validations" + assert "rejected" in source, "Must count rejected validations" + assert "approval_rate" in source, "Must calculate approval_rate" + + +# =========================================================================== +# 6. test_recent_tests_endpoint — latest 10 tests +# =========================================================================== + + +def test_recent_tests_endpoint_exists(): + """GET /metrics/recent-tests endpoint exists.""" + routes = _get_route_paths() + found = any("recent-tests" in k and "GET" in k for k in routes) + assert found, f"GET /metrics/recent-tests not found. Routes: {list(routes.keys())}" + + +def test_recent_tests_limits_to_10(): + """Recent tests endpoint limits to 10 results.""" + from app.routers.metrics import recent_tests + source = inspect.getsource(recent_tests) + + assert "limit(10)" in source or ".limit(10)" in source, \ + "Must limit to 10 recent tests" + assert "created_at" in source, "Must order by created_at" + + +# =========================================================================== +# 7. test_original_endpoints_still_work +# =========================================================================== + + +def test_summary_endpoint_exists(): + """GET /metrics/summary (original) endpoint still exists.""" + routes = _get_route_paths() + found = any("summary" in k and "GET" in k for k in routes) + assert found, f"GET /metrics/summary not found. Routes: {list(routes.keys())}" + + +def test_by_tactic_endpoint_exists(): + """GET /metrics/by-tactic (original) endpoint still exists.""" + routes = _get_route_paths() + found = any("by-tactic" in k and "GET" in k for k in routes) + assert found, f"GET /metrics/by-tactic not found. Routes: {list(routes.keys())}" + + +# --------------------------------------------------------------------------- +# Run all +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + print("T-127 Validation: Metrics V2 Tests") + print("=" * 55) + test_pipeline_metrics_endpoint_exists() + test_pipeline_metrics_queries_all_states() + test_team_activity_endpoint_exists() + test_team_activity_calculates_both_teams() + test_team_activity_red_pending_states() + test_team_activity_blue_pending_states() + test_technique_no_tests_is_not_evaluated() + test_technique_all_validated_detected() + test_technique_all_validated_partially_detected() + test_technique_all_validated_not_detected() + test_technique_mixed_validated_and_in_progress() + test_technique_all_in_progress() + test_technique_with_in_review_tests() + test_technique_with_rejected_tests() + test_coverage_correct_after_dual_validation() + test_coverage_partial_when_one_detected_one_partial() + test_validation_rate_endpoint_exists() + test_validation_rate_queries_both_roles() + test_recent_tests_endpoint_exists() + test_recent_tests_limits_to_10() + test_summary_endpoint_exists() + test_by_tactic_endpoint_exists() + print("=" * 55) + print("ALL T-127 validations PASSED!") diff --git a/backend/tests/test_templates_crud.py b/backend/tests/test_templates_crud.py new file mode 100644 index 0000000..4d2050e --- /dev/null +++ b/backend/tests/test_templates_crud.py @@ -0,0 +1,285 @@ +"""T-126: Tests de TestTemplates — CRUD, filters, instantiation, permissions. + +Tests the template CRUD endpoints, filter logic, template instantiation, +soft-delete behaviour, and admin-only access control. +Uses mock objects and router inspection to avoid needing a database. +""" + +import sys +import os +import uuid +import inspect +from unittest.mock import MagicMock +from types import ModuleType + +# --------------------------------------------------------------------------- +# Stub heavy dependencies before importing app modules +# --------------------------------------------------------------------------- + +backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if backend_dir not in sys.path: + sys.path.insert(0, backend_dir) + +if "pydantic_settings" not in sys.modules: + _ps = ModuleType("pydantic_settings") + class _BaseSettings: + def __init__(self, **kwargs): pass + def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) + _ps.BaseSettings = _BaseSettings + sys.modules["pydantic_settings"] = _ps + +if "app.config" not in sys.modules: + _cfg = ModuleType("app.config") + class _FakeSettings: + DATABASE_URL = "sqlite:///:memory:" + SECRET_KEY = "test" + ALGORITHM = "HS256" + ACCESS_TOKEN_EXPIRE_MINUTES = 60 + MINIO_ENDPOINT = "localhost:9000" + MINIO_ACCESS_KEY = "test" + MINIO_SECRET_KEY = "test" + MINIO_BUCKET = "test" + _cfg.settings = _FakeSettings() + sys.modules["app.config"] = _cfg + +if "app.database" not in sys.modules: + _db = ModuleType("app.database") + _db.Base = type("Base", (), {"metadata": MagicMock()}) + _db.get_db = MagicMock() + sys.modules["app.database"] = _db + +for _mod in [ + "taxii2client", "taxii2client.v20", + "jose", "boto3", "botocore", "botocore.exceptions", + "apscheduler", "apscheduler.schedulers", + "apscheduler.schedulers.background", + "apscheduler.triggers", "apscheduler.triggers.cron", +]: + if _mod not in sys.modules: + m = ModuleType(_mod) + if _mod == "taxii2client.v20": m.Server = MagicMock + elif _mod == "jose": m.JWTError = Exception; m.jwt = MagicMock() + elif _mod == "boto3": m.client = MagicMock() + elif _mod == "botocore.exceptions": m.ClientError = Exception + elif _mod == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock + elif _mod == "apscheduler.triggers.cron": m.CronTrigger = MagicMock + sys.modules[_mod] = m + +# --------------------------------------------------------------------------- +# Imports +# --------------------------------------------------------------------------- + +from app.routers.test_templates import ( + router, + list_templates, + templates_by_technique, + create_template, + delete_template, + toggle_template_active, + template_stats, +) +from app.routers.tests import create_test_from_template +from app.schemas.test_template import TestTemplateCreate + + +def _get_route_paths(): + routes = {} + for route in router.routes: + path = getattr(route, "path", "") + methods = getattr(route, "methods", set()) + for method in methods: + routes[f"{method} {path}"] = route + return routes + + +# =========================================================================== +# 1. test_create_template — admin can create a template +# =========================================================================== + + +def test_create_template(): + """Admin can create a template — endpoint exists and requires admin role.""" + routes = _get_route_paths() + found = any("POST" in k and "{template_id}" not in k for k in routes) + assert found, f"POST /test-templates not found. Routes: {list(routes.keys())}" + + # Verify admin role is required + source = inspect.getsource(create_template) + assert "require_role" in source and "admin" in source, \ + "create_template must require admin role" + + +# =========================================================================== +# 2. test_list_templates_with_filters — source, platform, severity work +# =========================================================================== + + +def test_list_templates_with_filters(): + """Filters of source, platform, severity, search all work.""" + source = inspect.getsource(list_templates) + + # Verify all filter parameters exist in the function signature + assert "source" in source, "List must accept source filter" + assert "platform" in source, "List must accept platform filter" + assert "severity" in source, "List must accept severity filter" + assert "search" in source, "List must accept search filter" + assert "mitre_technique_id" in source, "List must accept mitre_technique_id filter" + + # Verify ilike is used for search + assert "ilike" in source, "Search should use ilike for case-insensitive matching" + + +# =========================================================================== +# 3. test_get_templates_by_technique — filter by MITRE technique +# =========================================================================== + + +def test_get_templates_by_technique(): + """Endpoint to get templates by technique exists and filters correctly.""" + routes = _get_route_paths() + found = any("by-technique" in k and "GET" in k for k in routes) + assert found, f"GET /test-templates/by-technique/{{mitre_id}} not found. Routes: {list(routes.keys())}" + + source = inspect.getsource(templates_by_technique) + assert "mitre_technique_id" in source, "Must filter by mitre_technique_id" + assert "is_active" in source, "Must filter only active templates" + + +# =========================================================================== +# 4. test_instantiate_template — create test from template pre-fills fields +# =========================================================================== + + +def test_instantiate_template(): + """POST /tests/from-template creates a test pre-filled from template data.""" + source = inspect.getsource(create_test_from_template) + + # Verify it reads from template and copies fields + assert "template" in source, "Must reference template" + assert "template.name" in source, "Must copy name from template" + assert "template.description" in source, "Must copy description from template" + assert "template.platform" in source, "Must copy platform from template" + assert "template.attack_procedure" in source or "attack_procedure" in source, \ + "Must copy attack_procedure from template" + + # Verify state is set to draft + assert "draft" in source, "New test from template must be in draft state" + + +# =========================================================================== +# 5. test_soft_delete_template — deactivation doesn't physically remove +# =========================================================================== + + +def test_soft_delete_template(): + """DELETE endpoint sets is_active=False instead of removing the record.""" + source = inspect.getsource(delete_template) + + assert "is_active" in source, "Must set is_active" + assert "False" in source, "Must set is_active to False" + # Should NOT call db.delete(template) + assert "db.delete" not in source, "Should NOT physically delete the template" + assert "deactivated" in source.lower() or "soft" in source.lower() or "detail" in source.lower(), \ + "Should return a deactivation message" + + +# =========================================================================== +# 6. test_non_admin_cannot_create_template — only admin role +# =========================================================================== + + +def test_non_admin_cannot_create_template(): + """Only admin can create templates — enforce via require_role.""" + source = inspect.getsource(create_template) + assert 'require_role("admin")' in source, \ + "create_template must use require_role('admin')" + + # Also check update and delete + from app.routers.test_templates import update_template + source_update = inspect.getsource(update_template) + assert 'require_role("admin")' in source_update, \ + "update_template must use require_role('admin')" + + source_delete = inspect.getsource(delete_template) + assert 'require_role("admin")' in source_delete, \ + "delete_template must use require_role('admin')" + + +# =========================================================================== +# 7. test_toggle_active_endpoint — toggle between active/inactive +# =========================================================================== + + +def test_toggle_active_endpoint(): + """PATCH /test-templates/{id}/toggle-active exists and toggles is_active.""" + routes = _get_route_paths() + found = any("toggle-active" in k and "PATCH" in k for k in routes) + assert found, f"PATCH /test-templates/{{id}}/toggle-active not found. Routes: {list(routes.keys())}" + + source = inspect.getsource(toggle_template_active) + assert "is_active" in source, "Must reference is_active" + assert "not" in source, "Must toggle (negate) the is_active value" + assert 'require_role("admin")' in source, "Must require admin role" + + +# =========================================================================== +# 8. test_stats_endpoint — catalog statistics +# =========================================================================== + + +def test_stats_endpoint(): + """GET /test-templates/stats returns catalog statistics.""" + routes = _get_route_paths() + found = any("stats" in k and "GET" in k for k in routes) + assert found, f"GET /test-templates/stats not found. Routes: {list(routes.keys())}" + + source = inspect.getsource(template_stats) + assert "by_source" in source, "Must return breakdown by source" + assert "by_platform" in source, "Must return breakdown by platform" + assert "active" in source, "Must return active count" + assert 'require_role("admin")' in source, "Must require admin role" + + +# =========================================================================== +# 9. test_list_only_active_by_default — list filters inactive templates +# =========================================================================== + + +def test_list_only_active_by_default(): + """The list endpoint filters to is_active=True by default.""" + source = inspect.getsource(list_templates) + assert "is_active" in source and "True" in source, \ + "List must filter by is_active == True by default" + + +# =========================================================================== +# 10. test_pagination_support +# =========================================================================== + + +def test_pagination_support(): + """List endpoint supports offset and limit pagination.""" + source = inspect.getsource(list_templates) + assert "offset" in source, "Must accept offset parameter" + assert "limit" in source, "Must accept limit parameter" + + +# --------------------------------------------------------------------------- +# Run all +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + print("T-126 Validation: TestTemplates CRUD Tests") + print("=" * 55) + test_create_template() + test_list_templates_with_filters() + test_get_templates_by_technique() + test_instantiate_template() + test_soft_delete_template() + test_non_admin_cannot_create_template() + test_toggle_active_endpoint() + test_stats_endpoint() + test_list_only_active_by_default() + test_pagination_support() + print("=" * 55) + print("ALL T-126 validations PASSED!") diff --git a/backend/tests/test_workflow.py b/backend/tests/test_workflow.py new file mode 100644 index 0000000..c4d8729 --- /dev/null +++ b/backend/tests/test_workflow.py @@ -0,0 +1,565 @@ +"""T-125: Tests del flujo de trabajo Red/Blue. + +Comprehensive tests covering the full test lifecycle: + draft -> red_executing -> blue_evaluating -> in_review -> validated/rejected + +Uses mock objects to test the workflow service and router logic +without requiring a running database. +""" + +import sys +import os +import uuid +from unittest.mock import MagicMock, patch +from types import ModuleType +from datetime import datetime + +# --------------------------------------------------------------------------- +# Stub heavy dependencies before importing app modules +# --------------------------------------------------------------------------- + +backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if backend_dir not in sys.path: + sys.path.insert(0, backend_dir) + +if "pydantic_settings" not in sys.modules: + _ps = ModuleType("pydantic_settings") + class _BaseSettings: + def __init__(self, **kwargs): pass + def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) + _ps.BaseSettings = _BaseSettings + sys.modules["pydantic_settings"] = _ps + +if "app.config" not in sys.modules: + _cfg = ModuleType("app.config") + class _FakeSettings: + DATABASE_URL = "sqlite:///:memory:" + SECRET_KEY = "test" + ALGORITHM = "HS256" + ACCESS_TOKEN_EXPIRE_MINUTES = 60 + MINIO_ENDPOINT = "localhost:9000" + MINIO_ACCESS_KEY = "test" + MINIO_SECRET_KEY = "test" + MINIO_BUCKET = "test" + _cfg.settings = _FakeSettings() + sys.modules["app.config"] = _cfg + +if "app.database" not in sys.modules: + _db = ModuleType("app.database") + _db.Base = type("Base", (), {"metadata": MagicMock()}) + _db.get_db = MagicMock() + sys.modules["app.database"] = _db + +for _mod in [ + "taxii2client", "taxii2client.v20", + "jose", "boto3", "botocore", "botocore.exceptions", + "apscheduler", "apscheduler.schedulers", + "apscheduler.schedulers.background", + "apscheduler.triggers", "apscheduler.triggers.cron", +]: + if _mod not in sys.modules: + m = ModuleType(_mod) + if _mod == "taxii2client.v20": m.Server = MagicMock + elif _mod == "jose": m.JWTError = Exception; m.jwt = MagicMock() + elif _mod == "boto3": m.client = MagicMock() + elif _mod == "botocore.exceptions": m.ClientError = Exception + elif _mod == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock + elif _mod == "apscheduler.triggers.cron": m.CronTrigger = MagicMock + sys.modules[_mod] = m + +# --------------------------------------------------------------------------- +# Imports +# --------------------------------------------------------------------------- + +from fastapi import HTTPException +from app.models.enums import TestState, TestResult +from app.services.test_workflow_service import ( + VALID_TRANSITIONS, + can_transition, + transition_state, + start_execution, + submit_red_evidence, + submit_blue_evidence, + validate_as_red_lead, + validate_as_blue_lead, + check_dual_validation, + reopen_test, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_test(state: TestState = TestState.draft, **kwargs) -> MagicMock: + t = MagicMock() + t.id = uuid.uuid4() + t.name = "Test Security Check" + t.technique_id = uuid.uuid4() + t.state = state + t.red_validation_status = kwargs.get("red_validation_status", None) + t.blue_validation_status = kwargs.get("blue_validation_status", None) + t.red_validated_by = kwargs.get("red_validated_by", None) + t.red_validated_at = kwargs.get("red_validated_at", None) + t.red_validation_notes = kwargs.get("red_validation_notes", None) + t.blue_validated_by = kwargs.get("blue_validated_by", None) + t.blue_validated_at = kwargs.get("blue_validated_at", None) + t.blue_validation_notes = kwargs.get("blue_validation_notes", None) + t.execution_date = kwargs.get("execution_date", None) + return t + + +def _make_user(role: str = "red_tech") -> MagicMock: + user = MagicMock() + user.id = uuid.uuid4() + user.role = role + return user + + +def _make_db() -> MagicMock: + return MagicMock() + + +# =========================================================================== +# 1. test_full_happy_path +# draft -> red_executing -> blue_evaluating -> in_review -> validated +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_full_happy_path(mock_log): + """draft -> red_executing -> blue_evaluating -> in_review -> validated""" + test = _make_test(TestState.draft) + red_tech = _make_user("red_tech") + blue_tech = _make_user("blue_tech") + red_lead = _make_user("red_lead") + blue_lead = _make_user("blue_lead") + db = _make_db() + + # Step 1: draft -> red_executing + result = start_execution(db, test, red_tech) + assert result.state == TestState.red_executing + assert result.execution_date is not None + + # Step 2: red_executing -> blue_evaluating + result = submit_red_evidence(db, result, red_tech) + assert result.state == TestState.blue_evaluating + + # Step 3: blue_evaluating -> in_review + result = submit_blue_evidence(db, result, blue_tech) + assert result.state == TestState.in_review + + # Step 4: Red Lead approves + result = validate_as_red_lead(db, result, red_lead, "approved", "Attack well documented") + assert result.red_validation_status == "approved" + assert result.red_validated_by == red_lead.id + assert result.red_validated_at is not None + assert result.red_validation_notes == "Attack well documented" + # Still in_review (waiting for blue lead) + assert result.state == TestState.in_review + + # Step 5: Blue Lead approves -> validated + result = validate_as_blue_lead(db, result, blue_lead, "approved", "Detection confirmed") + assert result.blue_validation_status == "approved" + assert result.state == TestState.validated + + # Verify audit logs were generated at each step + assert mock_log.call_count >= 5 + + +# =========================================================================== +# 2. test_rejection_and_reopen +# in_review -> rejected -> draft -> red_executing -> ... +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_rejection_and_reopen(mock_log): + """in_review -> rejected -> draft -> red_executing -> ...""" + test = _make_test(TestState.draft) + red_tech = _make_user("red_tech") + blue_tech = _make_user("blue_tech") + red_lead = _make_user("red_lead") + db = _make_db() + + # Advance to in_review + start_execution(db, test, red_tech) + submit_red_evidence(db, test, red_tech) + submit_blue_evidence(db, test, blue_tech) + assert test.state == TestState.in_review + + # Red Lead rejects -> rejected + validate_as_red_lead(db, test, red_lead, "rejected", "Need more evidence") + assert test.state == TestState.rejected + + # Reopen -> draft + reopen_test(db, test, red_lead) + assert test.state == TestState.draft + + # Restart the cycle + start_execution(db, test, red_tech) + assert test.state == TestState.red_executing + + +# =========================================================================== +# 3. test_invalid_transitions +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_invalid_transitions(mock_log): + """Verify that invalid state transitions raise HTTPException.""" + db = _make_db() + user = _make_user("admin") + + # draft -> validated (should fail) + test = _make_test(TestState.draft) + try: + transition_state(db, test, TestState.validated, user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + # draft -> blue_evaluating (should fail) + test = _make_test(TestState.draft) + try: + transition_state(db, test, TestState.blue_evaluating, user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + # red_executing -> in_review (should fail, must go through blue_evaluating) + test = _make_test(TestState.red_executing) + try: + transition_state(db, test, TestState.in_review, user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + # validated -> anything (terminal state) + test = _make_test(TestState.validated) + try: + transition_state(db, test, TestState.draft, user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + # rejected -> red_executing (must go through draft first) + test = _make_test(TestState.rejected) + try: + transition_state(db, test, TestState.red_executing, user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + +# =========================================================================== +# 4. test_red_tech_cannot_access_blue_phase +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_red_tech_cannot_access_blue_phase(mock_log): + """Red tech cannot submit blue evidence (wrong transition from wrong state).""" + db = _make_db() + red_tech = _make_user("red_tech") + + # A test in red_executing cannot jump to in_review + test = _make_test(TestState.red_executing) + try: + submit_blue_evidence(db, test, red_tech) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + # Red tech cannot validate (test must be in blue_evaluating for submit_blue) + test2 = _make_test(TestState.draft) + try: + submit_blue_evidence(db, test2, red_tech) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + +# =========================================================================== +# 5. test_blue_tech_cannot_access_red_phase +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_blue_tech_cannot_access_red_phase(mock_log): + """Blue tech cannot start execution or submit red evidence.""" + db = _make_db() + blue_tech = _make_user("blue_tech") + + # Blue tech cannot start execution (test must be in draft -> red_executing) + # The workflow service doesn't check role, but the router does. + # At service level, blue_evaluating -> blue_evaluating is invalid transition: + test = _make_test(TestState.blue_evaluating) + try: + start_execution(db, test, blue_tech) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + # Blue tech cannot submit red evidence on a draft test + test2 = _make_test(TestState.draft) + try: + submit_red_evidence(db, test2, blue_tech) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + +# =========================================================================== +# 6. test_dual_validation_both_approve +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_dual_validation_both_approve(mock_log): + """Both managers approve -> test becomes validated.""" + test = _make_test(TestState.in_review) + red_lead = _make_user("red_lead") + blue_lead = _make_user("blue_lead") + db = _make_db() + + # Red Lead approves first + validate_as_red_lead(db, test, red_lead, "approved", "LGTM") + assert test.red_validation_status == "approved" + # Not yet validated — waiting for blue + assert test.state == TestState.in_review + + # Blue Lead approves + validate_as_blue_lead(db, test, blue_lead, "approved", "Detection verified") + assert test.blue_validation_status == "approved" + assert test.state == TestState.validated + + +# =========================================================================== +# 7. test_dual_validation_one_rejects +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_dual_validation_one_rejects(mock_log): + """One manager rejects -> test becomes rejected immediately.""" + test = _make_test(TestState.in_review) + red_lead = _make_user("red_lead") + db = _make_db() + + validate_as_red_lead(db, test, red_lead, "rejected", "Insufficient evidence") + assert test.red_validation_status == "rejected" + assert test.state == TestState.rejected + + +@patch("app.services.test_workflow_service.log_action") +def test_dual_validation_blue_rejects_first(mock_log): + """Blue Lead rejects first -> test becomes rejected immediately.""" + test = _make_test(TestState.in_review) + blue_lead = _make_user("blue_lead") + db = _make_db() + + validate_as_blue_lead(db, test, blue_lead, "rejected", "Detection not adequate") + assert test.blue_validation_status == "rejected" + assert test.state == TestState.rejected + + +@patch("app.services.test_workflow_service.log_action") +def test_dual_validation_red_approves_blue_rejects(mock_log): + """Red approves, then blue rejects -> rejected.""" + test = _make_test(TestState.in_review) + red_lead = _make_user("red_lead") + blue_lead = _make_user("blue_lead") + db = _make_db() + + validate_as_red_lead(db, test, red_lead, "approved", "Good attack") + assert test.state == TestState.in_review # waiting for blue + + validate_as_blue_lead(db, test, blue_lead, "rejected", "Bad detection") + assert test.state == TestState.rejected + + +# =========================================================================== +# 8. test_evidence_team_separation +# =========================================================================== + + +def test_evidence_team_separation(): + """Verify evidence router logic separates red and blue evidence correctly.""" + from app.routers.evidence import _validate_upload_permission, _RED_EDITABLE_STATES, _BLUE_EDITABLE_STATES + + # Red tech can upload red evidence in draft + test = _make_test(TestState.draft) + red_user = _make_user("red_tech") + red_user.role = "red_tech" + from app.models.enums import TeamSide + _validate_upload_permission(test, TeamSide.red, red_user) # should not raise + + # Red tech can upload red evidence in red_executing + test.state = TestState.red_executing + _validate_upload_permission(test, TeamSide.red, red_user) # should not raise + + # Red tech CANNOT upload red evidence in blue_evaluating + test.state = TestState.blue_evaluating + try: + _validate_upload_permission(test, TeamSide.red, red_user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + # Red tech CANNOT upload blue evidence + test.state = TestState.blue_evaluating + try: + _validate_upload_permission(test, TeamSide.blue, red_user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 403 + + # Blue tech can upload blue evidence in blue_evaluating + test.state = TestState.blue_evaluating + blue_user = _make_user("blue_tech") + blue_user.role = "blue_tech" + _validate_upload_permission(test, TeamSide.blue, blue_user) # should not raise + + # Blue tech CANNOT upload blue evidence in draft + test.state = TestState.draft + try: + _validate_upload_permission(test, TeamSide.blue, blue_user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 400 + + # Blue tech CANNOT upload red evidence + test.state = TestState.draft + try: + _validate_upload_permission(test, TeamSide.red, blue_user) + assert False, "Should have raised HTTPException" + except HTTPException as exc: + assert exc.status_code == 403 + + +# =========================================================================== +# 9. test_red_edit_allowed_in_draft_and_red_executing +# =========================================================================== + + +def test_red_edit_allowed_in_draft_and_red_executing(): + """Verify the red update router checks that state is draft or red_executing.""" + from app.routers.tests import update_test_red + import inspect + source = inspect.getsource(update_test_red) + + # The function must guard against states other than draft/red_executing + assert "draft" in source, "Red update must allow draft state" + assert "red_executing" in source, "Red update must allow red_executing state" + assert "400" in source or "HTTP_400_BAD_REQUEST" in source, "Red update must return 400 for invalid state" + + +# =========================================================================== +# 10. test_reopen_clears_validation_fields +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_reopen_clears_validation_fields(mock_log): + """Reopen clears all red/blue validation status, notes, timestamps.""" + test = _make_test( + TestState.rejected, + red_validation_status="rejected", + red_validated_by=uuid.uuid4(), + red_validated_at=datetime.utcnow(), + red_validation_notes="Bad attack", + blue_validation_status="approved", + blue_validated_by=uuid.uuid4(), + blue_validated_at=datetime.utcnow(), + blue_validation_notes="Good detection", + ) + user = _make_user("red_lead") + db = _make_db() + + result = reopen_test(db, test, user) + + assert result.state == TestState.draft + assert result.red_validation_status is None + assert result.red_validated_by is None + assert result.red_validated_at is None + assert result.red_validation_notes is None + assert result.blue_validation_status is None + assert result.blue_validated_by is None + assert result.blue_validated_at is None + assert result.blue_validation_notes is None + db.commit.assert_called() + + +# =========================================================================== +# 11. test_cannot_validate_outside_in_review +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_cannot_validate_outside_in_review(mock_log): + """Managers cannot validate a test that is not in in_review state.""" + db = _make_db() + red_lead = _make_user("red_lead") + blue_lead = _make_user("blue_lead") + + for state in [TestState.draft, TestState.red_executing, TestState.blue_evaluating, TestState.validated, TestState.rejected]: + test = _make_test(state) + try: + validate_as_red_lead(db, test, red_lead, "approved", "OK") + assert False, f"Red Lead should not validate in {state.value}" + except HTTPException as exc: + assert exc.status_code == 400 + + test2 = _make_test(state) + try: + validate_as_blue_lead(db, test2, blue_lead, "approved", "OK") + assert False, f"Blue Lead should not validate in {state.value}" + except HTTPException as exc: + assert exc.status_code == 400 + + +# =========================================================================== +# 12. test_cannot_reopen_non_rejected_test +# =========================================================================== + + +@patch("app.services.test_workflow_service.log_action") +def test_cannot_reopen_non_rejected_test(mock_log): + """Reopen only works on rejected tests.""" + db = _make_db() + user = _make_user("red_lead") + + for state in [TestState.draft, TestState.red_executing, TestState.blue_evaluating, TestState.in_review, TestState.validated]: + test = _make_test(state) + try: + reopen_test(db, test, user) + assert False, f"Should not reopen from {state.value}" + except HTTPException as exc: + assert exc.status_code == 400 + + +# --------------------------------------------------------------------------- +# Run all +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + print("T-125 Validation: Workflow Tests") + print("=" * 55) + test_full_happy_path() + test_rejection_and_reopen() + test_invalid_transitions() + test_red_tech_cannot_access_blue_phase() + test_blue_tech_cannot_access_red_phase() + test_dual_validation_both_approve() + test_dual_validation_one_rejects() + test_dual_validation_blue_rejects_first() + test_dual_validation_red_approves_blue_rejects() + test_evidence_team_separation() + test_red_edit_allowed_in_draft_and_red_executing() + test_reopen_clears_validation_fields() + test_cannot_validate_outside_in_review() + test_cannot_reopen_non_rejected_test() + print("=" * 55) + print("ALL T-125 validations PASSED!")