"""Validation tests for T-110: Evidence Router with Red/Blue separation. Tests the permission logic and endpoint structure. """ import sys import os import uuid from unittest.mock import MagicMock from types import ModuleType # --------------------------------------------------------------------------- # Stubs # --------------------------------------------------------------------------- backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if backend_dir not in sys.path: sys.path.insert(0, backend_dir) if "pydantic_settings" not in sys.modules: pydantic_settings_mock = ModuleType("pydantic_settings") class _BaseSettings: def __init__(self, **kwargs): pass def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) pydantic_settings_mock.BaseSettings = _BaseSettings sys.modules["pydantic_settings"] = pydantic_settings_mock if "app.config" not in sys.modules: config_mod = ModuleType("app.config") class _FakeSettings: DATABASE_URL = "sqlite:///:memory:" SECRET_KEY = "test" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 MINIO_ENDPOINT = "localhost:9000" MINIO_ACCESS_KEY = "test" MINIO_SECRET_KEY = "test" MINIO_BUCKET = "test" REPORT_TEMPLATES_DIR = "app/templates/reports" REPORT_OUTPUT_DIR = "/tmp/aegis_reports" COMPANY_NAME = "Test Org" COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png" JIRA_ENABLED = False JIRA_URL = "" JIRA_USERNAME = "" JIRA_API_TOKEN = "" JIRA_IS_CLOUD = True JIRA_DEFAULT_PROJECT = "" JIRA_ISSUE_TYPE_TEST = "Task" JIRA_ISSUE_TYPE_CAMPAIGN = "Epic" TEMPO_ENABLED = False TEMPO_API_TOKEN = "" TEMPO_DEFAULT_WORK_TYPE = "Red Team" NVD_API_KEY = "" STALE_THRESHOLD_DAYS = 365 CORS_ORIGINS = "http://localhost:3000" SCORING_WEIGHT_TESTS = 40 SCORING_WEIGHT_DETECTION_RULES = 20 SCORING_WEIGHT_D3FEND = 15 SCORING_WEIGHT_FRESHNESS = 15 SCORING_WEIGHT_PLATFORM_DIVERSITY = 10 config_mod.settings = _FakeSettings() sys.modules["app.config"] = config_mod if "app.database" not in sys.modules: db_mod = ModuleType("app.database") db_mod.Base = type("Base", (), {"metadata": MagicMock()}) db_mod.get_db = MagicMock() sys.modules["app.database"] = db_mod for mod_name in [ "taxii2client", "taxii2client.v20", "jose", "boto3", "botocore", "botocore.exceptions", "apscheduler", "apscheduler.schedulers", "apscheduler.schedulers.background", "apscheduler.triggers", "apscheduler.triggers.cron", ]: if mod_name not in sys.modules: m = ModuleType(mod_name) if mod_name == "taxii2client.v20": m.Server = MagicMock elif mod_name == "jose": m.JWTError = Exception; m.jwt = MagicMock() elif mod_name == "boto3": m.client = MagicMock() elif mod_name == "botocore.exceptions": m.ClientError = Exception elif mod_name == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock elif mod_name == "apscheduler.triggers.cron": m.CronTrigger = MagicMock sys.modules[mod_name] = m # --------------------------------------------------------------------------- # Imports # --------------------------------------------------------------------------- from fastapi import HTTPException from app.models.enums import TeamSide, TestState from app.routers.evidence import ( router, _validate_upload_permission, _validate_delete_permission, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_test(state): t = MagicMock() t.id = uuid.uuid4() t.state = state return t def _make_user(role): u = MagicMock() u.id = uuid.uuid4() u.role = role return u def _make_evidence(team, uploaded_by=None, test_id=None): e = MagicMock() e.id = uuid.uuid4() e.test_id = test_id or uuid.uuid4() e.team = team e.uploaded_by = uploaded_by or uuid.uuid4() return e # --------------------------------------------------------------------------- # 1. red_tech can upload team=red in red_executing # --------------------------------------------------------------------------- def test_red_tech_upload_red_in_red_executing(): test = _make_test(TestState.red_executing) user = _make_user("red_tech") # Should not raise _validate_upload_permission(test, TeamSide.red, user) print(" [PASS] red_tech can upload team=red in red_executing") # --------------------------------------------------------------------------- # 2. red_tech can upload team=red in draft # --------------------------------------------------------------------------- def test_red_tech_upload_red_in_draft(): test = _make_test(TestState.draft) user = _make_user("red_tech") _validate_upload_permission(test, TeamSide.red, user) print(" [PASS] red_tech can upload team=red in draft") # --------------------------------------------------------------------------- # 3. red_tech CANNOT upload team=blue (403) # --------------------------------------------------------------------------- def test_red_tech_cannot_upload_blue(): test = _make_test(TestState.red_executing) user = _make_user("red_tech") try: _validate_upload_permission(test, TeamSide.blue, user) assert False, "Should have raised HTTPException" except HTTPException as exc: assert exc.status_code == 403 print(" [PASS] red_tech CANNOT upload team=blue (403)") # --------------------------------------------------------------------------- # 4. blue_tech can upload team=blue in blue_evaluating # --------------------------------------------------------------------------- def test_blue_tech_upload_blue_in_blue_evaluating(): test = _make_test(TestState.blue_evaluating) user = _make_user("blue_tech") _validate_upload_permission(test, TeamSide.blue, user) print(" [PASS] blue_tech can upload team=blue in blue_evaluating") # --------------------------------------------------------------------------- # 5. blue_tech CANNOT upload team=red (403) # --------------------------------------------------------------------------- def test_blue_tech_cannot_upload_red(): test = _make_test(TestState.blue_evaluating) user = _make_user("blue_tech") try: _validate_upload_permission(test, TeamSide.red, user) assert False, "Should have raised HTTPException" except HTTPException as exc: assert exc.status_code == 403 print(" [PASS] blue_tech CANNOT upload team=red (403)") # --------------------------------------------------------------------------- # 6. GET /tests/{id}/evidence?team=red — endpoint exists with team filter # --------------------------------------------------------------------------- def test_list_evidence_endpoint(): routes = {} for route in router.routes: path = getattr(route, "path", "") methods = getattr(route, "methods", set()) for method in methods: routes[f"{method} {path}"] = route found = any( "GET" in k and "/evidence" in k and "{test_id}" in k for k in routes ) assert found, f"GET /tests/{{test_id}}/evidence not found. Routes: {list(routes.keys())}" print(" [PASS] GET /tests/{id}/evidence endpoint exists (filterable by team)") # --------------------------------------------------------------------------- # 7. DELETE in in_review → 403 # --------------------------------------------------------------------------- def test_delete_in_review_fails(): test = _make_test(TestState.in_review) user = _make_user("red_tech") evidence = _make_evidence(TeamSide.red, uploaded_by=user.id) try: _validate_delete_permission(test, evidence, user) assert False, "Should have raised HTTPException" except HTTPException as exc: assert exc.status_code == 403 print(" [PASS] DELETE in in_review -> 403") # --------------------------------------------------------------------------- # 8. DELETE red evidence in red_executing → allowed # --------------------------------------------------------------------------- def test_delete_red_evidence_in_red_executing(): test = _make_test(TestState.red_executing) user = _make_user("red_tech") evidence = _make_evidence(TeamSide.red, uploaded_by=user.id) # Should not raise _validate_delete_permission(test, evidence, user) print(" [PASS] DELETE red evidence in red_executing -> allowed") # --------------------------------------------------------------------------- # 9. Admin can upload any team in any state # --------------------------------------------------------------------------- def test_admin_bypass(): admin = _make_user("admin") # Red in blue_evaluating (normally blocked) test1 = _make_test(TestState.blue_evaluating) _validate_upload_permission(test1, TeamSide.red, admin) # Blue in draft (normally blocked) test2 = _make_test(TestState.draft) _validate_upload_permission(test2, TeamSide.blue, admin) print(" [PASS] Admin can upload any team in any state") # --------------------------------------------------------------------------- # Run all # --------------------------------------------------------------------------- if __name__ == "__main__": print("T-110 Validation: Evidence Router with Red/Blue Separation") print("=" * 60) test_red_tech_upload_red_in_red_executing() test_red_tech_upload_red_in_draft() test_red_tech_cannot_upload_blue() test_blue_tech_upload_blue_in_blue_evaluating() test_blue_tech_cannot_upload_red() test_list_evidence_endpoint() test_delete_in_review_fails() test_delete_red_evidence_in_red_executing() test_admin_bypass() print("=" * 60) print("ALL T-110 validations PASSED!")