Files
Aegis/backend/tests/test_t110_evidence_router.py

284 lines
9.9 KiB
Python

"""Validation tests for T-110: Evidence Router with Red/Blue separation.
Tests the permission logic and endpoint structure.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock
from types import ModuleType
# ---------------------------------------------------------------------------
# Stubs
# ---------------------------------------------------------------------------
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs): pass
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
if "app.config" not in sys.modules:
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
REPORT_TEMPLATES_DIR = "app/templates/reports"
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
COMPANY_NAME = "Test Org"
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
JIRA_ENABLED = False
JIRA_URL = ""
JIRA_USERNAME = ""
JIRA_API_TOKEN = ""
JIRA_IS_CLOUD = True
JIRA_DEFAULT_PROJECT = ""
JIRA_ISSUE_TYPE_TEST = "Task"
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
TEMPO_ENABLED = False
TEMPO_API_TOKEN = ""
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
NVD_API_KEY = ""
STALE_THRESHOLD_DAYS = 365
CORS_ORIGINS = "http://localhost:3000"
SCORING_WEIGHT_TESTS = 40
SCORING_WEIGHT_DETECTION_RULES = 20
SCORING_WEIGHT_D3FEND = 15
SCORING_WEIGHT_FRESHNESS = 15
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
if "app.database" not in sys.modules:
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
sys.modules["app.database"] = db_mod
for mod_name in [
"taxii2client", "taxii2client.v20",
"jose", "boto3", "botocore", "botocore.exceptions",
"apscheduler", "apscheduler.schedulers",
"apscheduler.schedulers.background",
"apscheduler.triggers", "apscheduler.triggers.cron",
]:
if mod_name not in sys.modules:
m = ModuleType(mod_name)
if mod_name == "taxii2client.v20": m.Server = MagicMock
elif mod_name == "jose": m.JWTError = Exception; m.jwt = MagicMock()
elif mod_name == "boto3": m.client = MagicMock()
elif mod_name == "botocore.exceptions": m.ClientError = Exception
elif mod_name == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock
elif mod_name == "apscheduler.triggers.cron": m.CronTrigger = MagicMock
sys.modules[mod_name] = m
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
from app.domain.errors import PermissionViolation
from app.models.enums import TeamSide, TestState
from app.routers.evidence import router
from app.services.evidence_service import (
validate_delete_permission,
validate_upload_permission,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_test(state):
t = MagicMock()
t.id = uuid.uuid4()
t.state = state
return t
def _make_user(role):
u = MagicMock()
u.id = uuid.uuid4()
u.role = role
return u
def _make_evidence(team, uploaded_by=None, test_id=None):
e = MagicMock()
e.id = uuid.uuid4()
e.test_id = test_id or uuid.uuid4()
e.team = team
e.uploaded_by = uploaded_by or uuid.uuid4()
return e
# ---------------------------------------------------------------------------
# 1. red_tech can upload team=red in red_executing
# ---------------------------------------------------------------------------
def test_red_tech_upload_red_in_red_executing():
test = _make_test(TestState.red_executing)
user = _make_user("red_tech")
# Should not raise
validate_upload_permission(test, TeamSide.red, user.role)
print(" [PASS] red_tech can upload team=red in red_executing")
# ---------------------------------------------------------------------------
# 2. red_tech can upload team=red in draft
# ---------------------------------------------------------------------------
def test_red_tech_upload_red_in_draft():
test = _make_test(TestState.draft)
user = _make_user("red_tech")
validate_upload_permission(test, TeamSide.red, user.role)
print(" [PASS] red_tech can upload team=red in draft")
# ---------------------------------------------------------------------------
# 3. red_tech CANNOT upload team=blue (403)
# ---------------------------------------------------------------------------
def test_red_tech_cannot_upload_blue():
test = _make_test(TestState.red_executing)
user = _make_user("red_tech")
try:
validate_upload_permission(test, TeamSide.blue, user.role)
assert False, "Should have raised PermissionViolation"
except PermissionViolation:
pass
print(" [PASS] red_tech CANNOT upload team=blue (403)")
# ---------------------------------------------------------------------------
# 4. blue_tech can upload team=blue in blue_evaluating
# ---------------------------------------------------------------------------
def test_blue_tech_upload_blue_in_blue_evaluating():
test = _make_test(TestState.blue_evaluating)
user = _make_user("blue_tech")
validate_upload_permission(test, TeamSide.blue, user.role)
print(" [PASS] blue_tech can upload team=blue in blue_evaluating")
# ---------------------------------------------------------------------------
# 5. blue_tech CANNOT upload team=red (403)
# ---------------------------------------------------------------------------
def test_blue_tech_cannot_upload_red():
test = _make_test(TestState.blue_evaluating)
user = _make_user("blue_tech")
try:
validate_upload_permission(test, TeamSide.red, user.role)
assert False, "Should have raised PermissionViolation"
except PermissionViolation:
pass
print(" [PASS] blue_tech CANNOT upload team=red (403)")
# ---------------------------------------------------------------------------
# 6. GET /tests/{id}/evidence?team=red — endpoint exists with team filter
# ---------------------------------------------------------------------------
def test_list_evidence_endpoint():
routes = {}
for route in router.routes:
path = getattr(route, "path", "")
methods = getattr(route, "methods", set())
for method in methods:
routes[f"{method} {path}"] = route
found = any(
"GET" in k and "/evidence" in k and "{test_id}" in k
for k in routes
)
assert found, f"GET /tests/{{test_id}}/evidence not found. Routes: {list(routes.keys())}"
print(" [PASS] GET /tests/{id}/evidence endpoint exists (filterable by team)")
# ---------------------------------------------------------------------------
# 7. DELETE in in_review → 403
# ---------------------------------------------------------------------------
def test_delete_in_review_fails():
test = _make_test(TestState.in_review)
user = _make_user("red_tech")
evidence = _make_evidence(TeamSide.red, uploaded_by=user.id)
try:
validate_delete_permission(test, evidence, user.role, user.id)
assert False, "Should have raised PermissionViolation"
except PermissionViolation:
pass
print(" [PASS] DELETE in in_review -> 403")
# ---------------------------------------------------------------------------
# 8. DELETE red evidence in red_executing → allowed
# ---------------------------------------------------------------------------
def test_delete_red_evidence_in_red_executing():
test = _make_test(TestState.red_executing)
user = _make_user("red_tech")
evidence = _make_evidence(TeamSide.red, uploaded_by=user.id)
# Should not raise
validate_delete_permission(test, evidence, user.role, user.id)
print(" [PASS] DELETE red evidence in red_executing -> allowed")
# ---------------------------------------------------------------------------
# 9. Admin can upload any team in any state
# ---------------------------------------------------------------------------
def test_admin_bypass():
admin = _make_user("admin")
# Red in blue_evaluating (normally blocked)
test1 = _make_test(TestState.blue_evaluating)
validate_upload_permission(test1, TeamSide.red, admin.role)
# Blue in draft (normally blocked)
test2 = _make_test(TestState.draft)
validate_upload_permission(test2, TeamSide.blue, admin.role)
print(" [PASS] Admin can upload any team in any state")
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-110 Validation: Evidence Router with Red/Blue Separation")
print("=" * 60)
test_red_tech_upload_red_in_red_executing()
test_red_tech_upload_red_in_draft()
test_red_tech_cannot_upload_blue()
test_blue_tech_upload_blue_in_blue_evaluating()
test_blue_tech_cannot_upload_red()
test_list_evidence_endpoint()
test_delete_in_review_fails()
test_delete_red_evidence_in_red_executing()
test_admin_bypass()
print("=" * 60)
print("ALL T-110 validations PASSED!")