310 lines
12 KiB
Python
310 lines
12 KiB
Python
"""T-126: Tests de TestTemplates — CRUD, filters, instantiation, permissions.
|
|
|
|
Tests the template CRUD endpoints, filter logic, template instantiation,
|
|
soft-delete behaviour, and admin-only access control.
|
|
Uses mock objects and router inspection to avoid needing a database.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import uuid
|
|
import inspect
|
|
from unittest.mock import MagicMock
|
|
from types import ModuleType
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stub heavy dependencies before importing app modules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if backend_dir not in sys.path:
|
|
sys.path.insert(0, backend_dir)
|
|
|
|
if "pydantic_settings" not in sys.modules:
|
|
_ps = ModuleType("pydantic_settings")
|
|
class _BaseSettings:
|
|
def __init__(self, **kwargs): pass
|
|
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
|
|
_ps.BaseSettings = _BaseSettings
|
|
sys.modules["pydantic_settings"] = _ps
|
|
|
|
if "app.config" not in sys.modules:
|
|
_cfg = ModuleType("app.config")
|
|
class _FakeSettings:
|
|
DATABASE_URL = "sqlite:///:memory:"
|
|
SECRET_KEY = "test"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
MINIO_ENDPOINT = "localhost:9000"
|
|
MINIO_ACCESS_KEY = "test"
|
|
MINIO_SECRET_KEY = "test"
|
|
MINIO_BUCKET = "test"
|
|
REPORT_TEMPLATES_DIR = "app/templates/reports"
|
|
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
|
|
COMPANY_NAME = "Test Org"
|
|
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
|
|
JIRA_ENABLED = False
|
|
JIRA_URL = ""
|
|
JIRA_USERNAME = ""
|
|
JIRA_API_TOKEN = ""
|
|
JIRA_IS_CLOUD = True
|
|
JIRA_DEFAULT_PROJECT = ""
|
|
JIRA_ISSUE_TYPE_TEST = "Task"
|
|
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
|
|
TEMPO_ENABLED = False
|
|
TEMPO_API_TOKEN = ""
|
|
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
|
|
NVD_API_KEY = ""
|
|
STALE_THRESHOLD_DAYS = 365
|
|
CORS_ORIGINS = "http://localhost:3000"
|
|
SCORING_WEIGHT_TESTS = 40
|
|
SCORING_WEIGHT_DETECTION_RULES = 20
|
|
SCORING_WEIGHT_D3FEND = 15
|
|
SCORING_WEIGHT_FRESHNESS = 15
|
|
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
|
|
_cfg.settings = _FakeSettings()
|
|
sys.modules["app.config"] = _cfg
|
|
|
|
if "app.database" not in sys.modules:
|
|
_db = ModuleType("app.database")
|
|
_db.Base = type("Base", (), {"metadata": MagicMock()})
|
|
_db.get_db = MagicMock()
|
|
sys.modules["app.database"] = _db
|
|
|
|
for _mod in [
|
|
"taxii2client", "taxii2client.v20",
|
|
"jose", "boto3", "botocore", "botocore.exceptions",
|
|
"apscheduler", "apscheduler.schedulers",
|
|
"apscheduler.schedulers.background",
|
|
"apscheduler.triggers", "apscheduler.triggers.cron",
|
|
]:
|
|
if _mod not in sys.modules:
|
|
m = ModuleType(_mod)
|
|
if _mod == "taxii2client.v20": m.Server = MagicMock
|
|
elif _mod == "jose": m.JWTError = Exception; m.jwt = MagicMock()
|
|
elif _mod == "boto3": m.client = MagicMock()
|
|
elif _mod == "botocore.exceptions": m.ClientError = Exception
|
|
elif _mod == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock
|
|
elif _mod == "apscheduler.triggers.cron": m.CronTrigger = MagicMock
|
|
sys.modules[_mod] = m
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from app.routers.test_templates import (
|
|
router,
|
|
list_templates,
|
|
templates_by_technique,
|
|
create_template,
|
|
delete_template,
|
|
toggle_template_active,
|
|
template_stats,
|
|
)
|
|
from app.services.test_crud_service import create_test_from_template as crud_create_from_template
|
|
from app.schemas.test_template import TestTemplateCreate
|
|
|
|
|
|
def _get_route_paths():
|
|
routes = {}
|
|
for route in router.routes:
|
|
path = getattr(route, "path", "")
|
|
methods = getattr(route, "methods", set())
|
|
for method in methods:
|
|
routes[f"{method} {path}"] = route
|
|
return routes
|
|
|
|
|
|
# ===========================================================================
|
|
# 1. test_create_template — admin can create a template
|
|
# ===========================================================================
|
|
|
|
|
|
def test_create_template():
|
|
"""Admin can create a template — endpoint exists and requires admin role."""
|
|
routes = _get_route_paths()
|
|
found = any("POST" in k and "{template_id}" not in k for k in routes)
|
|
assert found, f"POST /test-templates not found. Routes: {list(routes.keys())}"
|
|
|
|
source = inspect.getsource(create_template)
|
|
assert "require_any_role" in source or "require_role" in source, \
|
|
"create_template must require role authorization"
|
|
|
|
|
|
# ===========================================================================
|
|
# 2. test_list_templates_with_filters — source, platform, severity work
|
|
# ===========================================================================
|
|
|
|
|
|
def test_list_templates_with_filters():
|
|
"""Filters of source, platform, severity, search all work."""
|
|
source = inspect.getsource(list_templates)
|
|
|
|
# Verify all filter parameters exist in the function signature
|
|
assert "source" in source, "List must accept source filter"
|
|
assert "platform" in source, "List must accept platform filter"
|
|
assert "severity" in source, "List must accept severity filter"
|
|
assert "search" in source, "List must accept search filter"
|
|
assert "mitre_technique_id" in source, "List must accept mitre_technique_id filter"
|
|
|
|
# Verify ilike is used for search
|
|
assert "ilike" in source, "Search should use ilike for case-insensitive matching"
|
|
|
|
|
|
# ===========================================================================
|
|
# 3. test_get_templates_by_technique — filter by MITRE technique
|
|
# ===========================================================================
|
|
|
|
|
|
def test_get_templates_by_technique():
|
|
"""Endpoint to get templates by technique exists and filters correctly."""
|
|
routes = _get_route_paths()
|
|
found = any("by-technique" in k and "GET" in k for k in routes)
|
|
assert found, f"GET /test-templates/by-technique/{{mitre_id}} not found. Routes: {list(routes.keys())}"
|
|
|
|
source = inspect.getsource(templates_by_technique)
|
|
assert "mitre_technique_id" in source, "Must filter by mitre_technique_id"
|
|
assert "is_active" in source, "Must filter only active templates"
|
|
|
|
|
|
# ===========================================================================
|
|
# 4. test_instantiate_template — create test from template pre-fills fields
|
|
# ===========================================================================
|
|
|
|
|
|
def test_instantiate_template():
|
|
"""POST /tests/from-template creates a test pre-filled from template data."""
|
|
# Template field copying lives in the service; router delegates to it
|
|
source = inspect.getsource(crud_create_from_template)
|
|
|
|
# Verify it reads from template and copies fields
|
|
assert "template" in source, "Must reference template"
|
|
assert "template.name" in source, "Must copy name from template"
|
|
assert "template.description" in source, "Must copy description from template"
|
|
assert "template.platform" in source, "Must copy platform from template"
|
|
assert "template.attack_procedure" in source or "attack_procedure" in source, \
|
|
"Must copy attack_procedure from template"
|
|
|
|
# Verify state is set to draft
|
|
assert "draft" in source, "New test from template must be in draft state"
|
|
|
|
|
|
# ===========================================================================
|
|
# 5. test_soft_delete_template — deactivation doesn't physically remove
|
|
# ===========================================================================
|
|
|
|
|
|
def test_soft_delete_template():
|
|
"""DELETE endpoint sets is_active=False instead of removing the record."""
|
|
source = inspect.getsource(delete_template)
|
|
|
|
assert "is_active" in source, "Must set is_active"
|
|
assert "False" in source, "Must set is_active to False"
|
|
# Should NOT call db.delete(template)
|
|
assert "db.delete" not in source, "Should NOT physically delete the template"
|
|
assert "deactivated" in source.lower() or "soft" in source.lower() or "detail" in source.lower(), \
|
|
"Should return a deactivation message"
|
|
|
|
|
|
# ===========================================================================
|
|
# 6. test_non_admin_cannot_create_template — only admin role
|
|
# ===========================================================================
|
|
|
|
|
|
def test_non_admin_cannot_create_template():
|
|
"""Templates require authorized role — enforce via require_any_role or require_role."""
|
|
source = inspect.getsource(create_template)
|
|
assert "require_any_role" in source or "require_role" in source, \
|
|
"create_template must enforce role authorization"
|
|
|
|
from app.routers.test_templates import update_template
|
|
source_update = inspect.getsource(update_template)
|
|
assert "require_any_role" in source_update or "require_role" in source_update, \
|
|
"update_template must enforce role authorization"
|
|
|
|
source_delete = inspect.getsource(delete_template)
|
|
assert "require_any_role" in source_delete or "require_role" in source_delete, \
|
|
"delete_template must enforce role authorization"
|
|
|
|
|
|
# ===========================================================================
|
|
# 7. test_toggle_active_endpoint — toggle between active/inactive
|
|
# ===========================================================================
|
|
|
|
|
|
def test_toggle_active_endpoint():
|
|
"""PATCH /test-templates/{id}/toggle-active exists and toggles is_active."""
|
|
routes = _get_route_paths()
|
|
found = any("toggle-active" in k and "PATCH" in k for k in routes)
|
|
assert found, f"PATCH /test-templates/{{id}}/toggle-active not found. Routes: {list(routes.keys())}"
|
|
|
|
source = inspect.getsource(toggle_template_active)
|
|
assert "is_active" in source, "Must reference is_active"
|
|
assert "not" in source, "Must toggle (negate) the is_active value"
|
|
assert "require_any_role" in source or "require_role" in source, \
|
|
"Must require role authorization"
|
|
|
|
|
|
# ===========================================================================
|
|
# 8. test_stats_endpoint — catalog statistics
|
|
# ===========================================================================
|
|
|
|
|
|
def test_stats_endpoint():
|
|
"""GET /test-templates/stats returns catalog statistics."""
|
|
routes = _get_route_paths()
|
|
found = any("stats" in k and "GET" in k for k in routes)
|
|
assert found, f"GET /test-templates/stats not found. Routes: {list(routes.keys())}"
|
|
|
|
source = inspect.getsource(template_stats)
|
|
assert "by_source" in source, "Must return breakdown by source"
|
|
assert "by_platform" in source, "Must return breakdown by platform"
|
|
assert "active" in source, "Must return active count"
|
|
assert "require_any_role" in source or "require_role" in source, \
|
|
"Must require role authorization"
|
|
|
|
|
|
# ===========================================================================
|
|
# 9. test_list_only_active_by_default — list filters inactive templates
|
|
# ===========================================================================
|
|
|
|
|
|
def test_list_supports_active_filter():
|
|
"""The list endpoint supports filtering by is_active."""
|
|
source = inspect.getsource(list_templates)
|
|
assert "is_active" in source, \
|
|
"List must support is_active filter parameter"
|
|
|
|
|
|
# ===========================================================================
|
|
# 10. test_pagination_support
|
|
# ===========================================================================
|
|
|
|
|
|
def test_pagination_support():
|
|
"""List endpoint supports offset and limit pagination."""
|
|
source = inspect.getsource(list_templates)
|
|
assert "offset" in source, "Must accept offset parameter"
|
|
assert "limit" in source, "Must accept limit parameter"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run all
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
print("T-126 Validation: TestTemplates CRUD Tests")
|
|
print("=" * 55)
|
|
test_create_template()
|
|
test_list_templates_with_filters()
|
|
test_get_templates_by_technique()
|
|
test_instantiate_template()
|
|
test_soft_delete_template()
|
|
test_non_admin_cannot_create_template()
|
|
test_toggle_active_endpoint()
|
|
test_stats_endpoint()
|
|
test_list_only_active_by_default()
|
|
test_pagination_support()
|
|
print("=" * 55)
|
|
print("ALL T-126 validations PASSED!")
|