- Add test_test_entity.py with 46 pure unit tests covering the full domain entity - Fix _FakeSettings in 11 test files (REPORT_TEMPLATES_DIR, JIRA, TEMPO) - Fix stale db.commit assertions to db.flush after UoW refactor - Add missing mock fields for TestEntity.from_orm compatibility - Make database.py skip pool args for SQLite in test environment - Disable slowapi rate limiter in test client fixture - Inject test engine into app.database to fix threading errors - Update role assertions to match current require_any_role policy - Mark 6 legacy V1 endpoint tests as xfail (replaced by V2 workflow)
342 lines
13 KiB
Python
342 lines
13 KiB
Python
"""Validation tests for T-109: Tests router with Red/Blue workflow.
|
|
|
|
Uses FastAPI TestClient with mocked dependencies to test all endpoints
|
|
without requiring a database.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import uuid
|
|
from unittest.mock import MagicMock, patch, PropertyMock
|
|
from types import ModuleType
|
|
from datetime import datetime
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stub heavy deps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if backend_dir not in sys.path:
|
|
sys.path.insert(0, backend_dir)
|
|
|
|
if "pydantic_settings" not in sys.modules:
|
|
pydantic_settings_mock = ModuleType("pydantic_settings")
|
|
class _BaseSettings:
|
|
def __init__(self, **kwargs): pass
|
|
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
|
|
pydantic_settings_mock.BaseSettings = _BaseSettings
|
|
sys.modules["pydantic_settings"] = pydantic_settings_mock
|
|
|
|
if "app.config" not in sys.modules:
|
|
config_mod = ModuleType("app.config")
|
|
class _FakeSettings:
|
|
DATABASE_URL = "sqlite:///:memory:"
|
|
SECRET_KEY = "test"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
MINIO_ENDPOINT = "localhost:9000"
|
|
MINIO_ACCESS_KEY = "test"
|
|
MINIO_SECRET_KEY = "test"
|
|
MINIO_BUCKET = "test"
|
|
REPORT_TEMPLATES_DIR = "app/templates/reports"
|
|
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
|
|
COMPANY_NAME = "Test Org"
|
|
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
|
|
JIRA_ENABLED = False
|
|
JIRA_URL = ""
|
|
JIRA_USERNAME = ""
|
|
JIRA_API_TOKEN = ""
|
|
JIRA_IS_CLOUD = True
|
|
JIRA_DEFAULT_PROJECT = ""
|
|
JIRA_ISSUE_TYPE_TEST = "Task"
|
|
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
|
|
TEMPO_ENABLED = False
|
|
TEMPO_API_TOKEN = ""
|
|
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
|
|
NVD_API_KEY = ""
|
|
STALE_THRESHOLD_DAYS = 365
|
|
CORS_ORIGINS = "http://localhost:3000"
|
|
SCORING_WEIGHT_TESTS = 40
|
|
SCORING_WEIGHT_DETECTION_RULES = 20
|
|
SCORING_WEIGHT_D3FEND = 15
|
|
SCORING_WEIGHT_FRESHNESS = 15
|
|
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
|
|
config_mod.settings = _FakeSettings()
|
|
sys.modules["app.config"] = config_mod
|
|
|
|
if "app.database" not in sys.modules:
|
|
db_mod = ModuleType("app.database")
|
|
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
|
|
db_mod.get_db = MagicMock()
|
|
sys.modules["app.database"] = db_mod
|
|
|
|
for mod_name in [
|
|
"taxii2client", "taxii2client.v20",
|
|
"jose", "boto3", "botocore", "botocore.exceptions",
|
|
"apscheduler", "apscheduler.schedulers",
|
|
"apscheduler.schedulers.background",
|
|
"apscheduler.triggers", "apscheduler.triggers.cron",
|
|
]:
|
|
if mod_name not in sys.modules:
|
|
m = ModuleType(mod_name)
|
|
if mod_name == "taxii2client.v20":
|
|
m.Server = MagicMock
|
|
elif mod_name == "jose":
|
|
m.JWTError = Exception
|
|
m.jwt = MagicMock()
|
|
elif mod_name == "boto3":
|
|
m.client = MagicMock()
|
|
elif mod_name == "botocore.exceptions":
|
|
m.ClientError = Exception
|
|
elif mod_name == "apscheduler.schedulers.background":
|
|
m.BackgroundScheduler = MagicMock
|
|
elif mod_name == "apscheduler.triggers.cron":
|
|
m.CronTrigger = MagicMock
|
|
sys.modules[mod_name] = m
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Now validate by inspecting the router module structure
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from app.models.enums import TestState, TestResult
|
|
|
|
# Import the router to inspect its routes
|
|
from app.routers.tests import router
|
|
|
|
|
|
def _get_route_paths():
|
|
"""Extract all route paths and methods from the router."""
|
|
routes = {}
|
|
for route in router.routes:
|
|
path = getattr(route, "path", "")
|
|
methods = getattr(route, "methods", set())
|
|
for method in methods:
|
|
key = f"{method} {path}"
|
|
routes[key] = route
|
|
return routes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. POST /tests creates a test in draft state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_create_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert "POST " in routes or "POST /" in routes or any(
|
|
"POST" in k and k.endswith(("", "/"))
|
|
for k in routes
|
|
), f"POST /tests endpoint not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /tests endpoint exists (creates test in draft)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. POST /tests/from-template endpoint exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_from_template_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/from-template" in k and "POST" in k for k in routes), \
|
|
f"POST /tests/from-template not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /tests/from-template endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. POST /tests/{id}/start-execution exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_start_execution_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/start-execution" in k and "POST" in k for k in routes), \
|
|
f"POST /tests/{{id}}/start-execution not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /tests/{id}/start-execution endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. PATCH /tests/{id}/red endpoint exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_red_update_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/red" in k and "PATCH" in k for k in routes), \
|
|
f"PATCH /tests/{{id}}/red not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] PATCH /tests/{id}/red endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. PATCH /tests/{id}/blue endpoint exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_blue_update_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/blue" in k and "PATCH" in k for k in routes), \
|
|
f"PATCH /tests/{{id}}/blue not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] PATCH /tests/{id}/blue endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 6. POST /tests/{id}/submit-red exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_submit_red_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/submit-red" in k and "POST" in k for k in routes), \
|
|
f"POST /tests/{{id}}/submit-red not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /tests/{id}/submit-red endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 7. POST /tests/{id}/submit-blue exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_submit_blue_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/submit-blue" in k and "POST" in k for k in routes), \
|
|
f"POST /tests/{{id}}/submit-blue not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /tests/{id}/submit-blue endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 8. POST /tests/{id}/validate-red exists with role check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_red_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/validate-red" in k and "POST" in k for k in routes), \
|
|
f"POST /tests/{{id}}/validate-red not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /tests/{id}/validate-red endpoint exists (red_lead/admin)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 9. POST /tests/{id}/validate-blue exists with role check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_blue_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/validate-blue" in k and "POST" in k for k in routes), \
|
|
f"POST /tests/{{id}}/validate-blue not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /tests/{id}/validate-blue endpoint exists (blue_lead/admin)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 10. POST /tests/{id}/reopen exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_reopen_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/reopen" in k and "POST" in k for k in routes), \
|
|
f"POST /tests/{{id}}/reopen not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] POST /tests/{id}/reopen endpoint exists (leads/admin)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 11. GET /tests/{id}/timeline exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_timeline_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
assert any("/timeline" in k and "GET" in k for k in routes), \
|
|
f"GET /tests/{{id}}/timeline not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] GET /tests/{id}/timeline endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 12. GET /tests (list) exists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_list_endpoint_exists():
|
|
routes = _get_route_paths()
|
|
# The list endpoint is GET on empty path ""
|
|
assert any(k == "GET " or k == "GET /" for k in routes) or \
|
|
any("GET" in k and "{test_id}" not in k for k in routes), \
|
|
f"GET /tests list not found. Routes: {list(routes.keys())}"
|
|
print(" [PASS] GET /tests (list with filters) endpoint exists")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 13. Validate the update_test_red function guards against wrong state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_red_update_state_guard():
|
|
"""Verify the red update handler checks state is draft or red_executing."""
|
|
from app.routers.tests import update_test_red
|
|
import inspect
|
|
source = inspect.getsource(update_test_red)
|
|
# The function should check for draft and red_executing
|
|
assert "draft" in source and "red_executing" in source, \
|
|
"Red update should guard against states other than draft/red_executing"
|
|
print(" [PASS] PATCH /tests/{id}/red guards state (draft, red_executing)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 14. Validate the update_test_blue function guards against wrong state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_blue_update_state_guard():
|
|
"""Verify the blue update handler checks state is blue_evaluating."""
|
|
from app.routers.tests import update_test_blue
|
|
import inspect
|
|
source = inspect.getsource(update_test_blue)
|
|
assert "blue_evaluating" in source, \
|
|
"Blue update should guard against states other than blue_evaluating"
|
|
print(" [PASS] PATCH /tests/{id}/blue guards state (blue_evaluating only)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 15. All endpoints use audit logging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_audit_logging_used():
|
|
"""Verify all major endpoints call log_action."""
|
|
from app.routers import tests as tests_module
|
|
import inspect
|
|
source = inspect.getsource(tests_module)
|
|
|
|
# Count log_action calls (at least one per mutating endpoint)
|
|
log_count = source.count("log_action(")
|
|
# We have: create_test, create_test_from_template, update_test,
|
|
# update_test_red, update_test_blue = 5
|
|
# Workflow endpoints delegate to workflow service which does its own logging
|
|
assert log_count >= 5, f"Expected at least 5 log_action calls, found {log_count}"
|
|
print(" [PASS] Each mutating operation uses audit logging")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run all
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
print("T-109 Validation: Tests Router with Red/Blue Workflow")
|
|
print("=" * 55)
|
|
test_create_endpoint_exists()
|
|
test_from_template_endpoint_exists()
|
|
test_start_execution_endpoint_exists()
|
|
test_red_update_endpoint_exists()
|
|
test_blue_update_endpoint_exists()
|
|
test_submit_red_endpoint_exists()
|
|
test_submit_blue_endpoint_exists()
|
|
test_validate_red_endpoint_exists()
|
|
test_validate_blue_endpoint_exists()
|
|
test_reopen_endpoint_exists()
|
|
test_timeline_endpoint_exists()
|
|
test_list_endpoint_exists()
|
|
test_red_update_state_guard()
|
|
test_blue_update_state_guard()
|
|
test_audit_logging_used()
|
|
print("=" * 55)
|
|
print("ALL T-109 validations PASSED!")
|