test(phase-17): add automated tests for Red/Blue workflow, templates CRUD, and V2 metrics (T-125, T-126, T-127)

This commit is contained in:
2026-02-09 13:35:40 +01:00
parent a95defcee4
commit cda59de426
4 changed files with 1355 additions and 0 deletions

View File

@@ -86,6 +86,54 @@ def red_tech_user(db):
return user return user
@pytest.fixture(scope="function")
def blue_tech_user(db):
"""Create a blue_tech user for testing."""
user = User(
username="bluetech",
email="bluetech@test.com",
hashed_password=hash_password("bluetech123"),
role="blue_tech",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture(scope="function")
def red_lead_user(db):
"""Create a red_lead user for testing."""
user = User(
username="redlead",
email="redlead@test.com",
hashed_password=hash_password("redlead123"),
role="red_lead",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture(scope="function")
def blue_lead_user(db):
"""Create a blue_lead user for testing."""
user = User(
username="bluelead",
email="bluelead@test.com",
hashed_password=hash_password("bluelead123"),
role="blue_lead",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def admin_token(client, admin_user): def admin_token(client, admin_user):
"""Get an auth token for the admin user.""" """Get an auth token for the admin user."""
@@ -116,3 +164,51 @@ def auth_headers(admin_token):
def red_tech_headers(red_tech_token): def red_tech_headers(red_tech_token):
"""Return authorization headers for red_tech user.""" """Return authorization headers for red_tech user."""
return {"Authorization": f"Bearer {red_tech_token}"} return {"Authorization": f"Bearer {red_tech_token}"}
@pytest.fixture(scope="function")
def blue_tech_token(client, blue_tech_user):
"""Get an auth token for the blue_tech user."""
response = client.post(
"/api/v1/auth/login",
data={"username": "bluetech", "password": "bluetech123"},
)
return response.json()["access_token"]
@pytest.fixture(scope="function")
def blue_tech_headers(blue_tech_token):
"""Return authorization headers for blue_tech user."""
return {"Authorization": f"Bearer {blue_tech_token}"}
@pytest.fixture(scope="function")
def red_lead_token(client, red_lead_user):
"""Get an auth token for the red_lead user."""
response = client.post(
"/api/v1/auth/login",
data={"username": "redlead", "password": "redlead123"},
)
return response.json()["access_token"]
@pytest.fixture(scope="function")
def red_lead_headers(red_lead_token):
"""Return authorization headers for red_lead user."""
return {"Authorization": f"Bearer {red_lead_token}"}
@pytest.fixture(scope="function")
def blue_lead_token(client, blue_lead_user):
"""Get an auth token for the blue_lead user."""
response = client.post(
"/api/v1/auth/login",
data={"username": "bluelead", "password": "bluelead123"},
)
return response.json()["access_token"]
@pytest.fixture(scope="function")
def blue_lead_headers(blue_lead_token):
"""Return authorization headers for blue_lead user."""
return {"Authorization": f"Bearer {blue_lead_token}"}

View File

@@ -0,0 +1,409 @@
"""T-127: Tests de métricas actualizadas.
Tests for the V2 metrics endpoints (pipeline, team-activity, validation-rate)
and for the technique status recalculation logic with the new test states.
"""
import sys
import os
import uuid
import inspect
from unittest.mock import MagicMock, patch, PropertyMock
from types import ModuleType
# ---------------------------------------------------------------------------
# Stub heavy dependencies before importing app modules
# ---------------------------------------------------------------------------
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
if "pydantic_settings" not in sys.modules:
_ps = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs): pass
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
_ps.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = _ps
if "app.config" not in sys.modules:
_cfg = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
_cfg.settings = _FakeSettings()
sys.modules["app.config"] = _cfg
if "app.database" not in sys.modules:
_db = ModuleType("app.database")
_db.Base = type("Base", (), {"metadata": MagicMock()})
_db.get_db = MagicMock()
sys.modules["app.database"] = _db
for _mod in [
"taxii2client", "taxii2client.v20",
"jose", "boto3", "botocore", "botocore.exceptions",
"apscheduler", "apscheduler.schedulers",
"apscheduler.schedulers.background",
"apscheduler.triggers", "apscheduler.triggers.cron",
]:
if _mod not in sys.modules:
m = ModuleType(_mod)
if _mod == "taxii2client.v20": m.Server = MagicMock
elif _mod == "jose": m.JWTError = Exception; m.jwt = MagicMock()
elif _mod == "boto3": m.client = MagicMock()
elif _mod == "botocore.exceptions": m.ClientError = Exception
elif _mod == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock
elif _mod == "apscheduler.triggers.cron": m.CronTrigger = MagicMock
sys.modules[_mod] = m
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
from app.models.enums import TestState, TestResult, TechniqueStatus
from app.services.status_service import recalculate_technique_status
from app.routers.metrics import router as metrics_router
def _get_route_paths():
routes = {}
for route in metrics_router.routes:
path = getattr(route, "path", "")
methods = getattr(route, "methods", set())
for method in methods:
routes[f"{method} {path}"] = route
return routes
# ---------------------------------------------------------------------------
# Helpers for technique status recalculation tests
# ---------------------------------------------------------------------------
def _make_test(state: TestState, detection_result=None) -> MagicMock:
t = MagicMock()
t.id = uuid.uuid4()
t.state = state
t.detection_result = detection_result
t.red_validation_status = None
t.blue_validation_status = None
return t
def _make_technique(tests=None) -> MagicMock:
tech = MagicMock()
tech.id = uuid.uuid4()
tech.tests = tests or []
tech.status_global = TechniqueStatus.not_evaluated
return tech
def _make_db() -> MagicMock:
return MagicMock()
# ===========================================================================
# 1. test_pipeline_metrics — endpoint exists and queries TestState
# ===========================================================================
def test_pipeline_metrics_endpoint_exists():
"""GET /metrics/test-pipeline endpoint exists."""
routes = _get_route_paths()
found = any("test-pipeline" in k and "GET" in k for k in routes)
assert found, f"GET /metrics/test-pipeline not found. Routes: {list(routes.keys())}"
def test_pipeline_metrics_queries_all_states():
"""Pipeline endpoint groups by all test states."""
from app.routers.metrics import test_pipeline
source = inspect.getsource(test_pipeline)
assert "Test.state" in source, "Must query Test.state"
assert "group_by" in source, "Must group by state"
assert "TestPipelineCounts" in source, "Must return TestPipelineCounts schema"
# ===========================================================================
# 2. test_team_activity_metrics — endpoint exists and calculates correctly
# ===========================================================================
def test_team_activity_endpoint_exists():
"""GET /metrics/team-activity endpoint exists."""
routes = _get_route_paths()
found = any("team-activity" in k and "GET" in k for k in routes)
assert found, f"GET /metrics/team-activity not found. Routes: {list(routes.keys())}"
def test_team_activity_calculates_both_teams():
"""Team activity endpoint returns data for both Red and Blue teams."""
from app.routers.metrics import team_activity
source = inspect.getsource(team_activity)
assert "Red Team" in source or "red" in source.lower(), "Must include Red Team data"
assert "Blue Team" in source or "blue" in source.lower(), "Must include Blue Team data"
assert "tests_completed" in source, "Must calculate completed tests"
assert "tests_pending" in source, "Must calculate pending tests"
def test_team_activity_red_pending_states():
"""Red Team pending includes draft and red_executing."""
from app.routers.metrics import team_activity
source = inspect.getsource(team_activity)
assert "draft" in source, "Red pending must include draft"
assert "red_executing" in source, "Red pending must include red_executing"
def test_team_activity_blue_pending_states():
"""Blue Team pending includes blue_evaluating."""
from app.routers.metrics import team_activity
source = inspect.getsource(team_activity)
assert "blue_evaluating" in source, "Blue pending must include blue_evaluating"
# ===========================================================================
# 3. test_technique_status_recalculation_with_new_states
# ===========================================================================
def test_technique_no_tests_is_not_evaluated():
"""Technique with no tests -> not_evaluated."""
tech = _make_technique(tests=[])
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.not_evaluated
def test_technique_all_validated_detected():
"""All tests validated with detected -> technique validated."""
tests = [
_make_test(TestState.validated, detection_result="detected"),
_make_test(TestState.validated, detection_result="detected"),
]
tech = _make_technique(tests=tests)
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.validated
def test_technique_all_validated_partially_detected():
"""All tests validated with partially_detected -> technique partial."""
tests = [
_make_test(TestState.validated, detection_result="detected"),
_make_test(TestState.validated, detection_result="partially_detected"),
]
tech = _make_technique(tests=tests)
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.partial
def test_technique_all_validated_not_detected():
"""All tests validated with not_detected -> technique not_covered."""
tests = [
_make_test(TestState.validated, detection_result="not_detected"),
]
tech = _make_technique(tests=tests)
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.not_covered
def test_technique_mixed_validated_and_in_progress():
"""Some validated, some still in pipeline -> technique partial."""
tests = [
_make_test(TestState.validated, detection_result="detected"),
_make_test(TestState.red_executing),
]
tech = _make_technique(tests=tests)
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.partial
def test_technique_all_in_progress():
"""All tests in intermediate states (no validated) -> technique in_progress."""
tests = [
_make_test(TestState.draft),
_make_test(TestState.red_executing),
_make_test(TestState.blue_evaluating),
]
tech = _make_technique(tests=tests)
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.in_progress
def test_technique_with_in_review_tests():
"""Tests in in_review are still in-progress (not yet validated)."""
tests = [
_make_test(TestState.in_review),
]
tech = _make_technique(tests=tests)
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.in_progress
def test_technique_with_rejected_tests():
"""Rejected tests count as in-progress (need rework)."""
tests = [
_make_test(TestState.rejected),
]
tech = _make_technique(tests=tests)
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.in_progress
# ===========================================================================
# 4. test_coverage_with_dual_validation
# ===========================================================================
def test_coverage_correct_after_dual_validation():
"""After dual validation (both approved), technique status is correct."""
# A test that completed the full pipeline with detection
test = _make_test(TestState.validated, detection_result="detected")
test.red_validation_status = "approved"
test.blue_validation_status = "approved"
tech = _make_technique(tests=[test])
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.validated
def test_coverage_partial_when_one_detected_one_partial():
"""Mixed detection results after dual validation -> partial coverage."""
test1 = _make_test(TestState.validated, detection_result="detected")
test1.red_validation_status = "approved"
test1.blue_validation_status = "approved"
test2 = _make_test(TestState.validated, detection_result="partially_detected")
test2.red_validation_status = "approved"
test2.blue_validation_status = "approved"
tech = _make_technique(tests=[test1, test2])
db = _make_db()
recalculate_technique_status(db, tech)
assert tech.status_global == TechniqueStatus.partial
# ===========================================================================
# 5. test_validation_rate_endpoint — approval/rejection rates
# ===========================================================================
def test_validation_rate_endpoint_exists():
"""GET /metrics/validation-rate endpoint exists."""
routes = _get_route_paths()
found = any("validation-rate" in k and "GET" in k for k in routes)
assert found, f"GET /metrics/validation-rate not found. Routes: {list(routes.keys())}"
def test_validation_rate_queries_both_roles():
"""Validation rate endpoint returns data for both red_lead and blue_lead."""
from app.routers.metrics import validation_rate
source = inspect.getsource(validation_rate)
assert "red_validation_status" in source, "Must query red_validation_status"
assert "blue_validation_status" in source, "Must query blue_validation_status"
assert "approved" in source, "Must count approved validations"
assert "rejected" in source, "Must count rejected validations"
assert "approval_rate" in source, "Must calculate approval_rate"
# ===========================================================================
# 6. test_recent_tests_endpoint — latest 10 tests
# ===========================================================================
def test_recent_tests_endpoint_exists():
"""GET /metrics/recent-tests endpoint exists."""
routes = _get_route_paths()
found = any("recent-tests" in k and "GET" in k for k in routes)
assert found, f"GET /metrics/recent-tests not found. Routes: {list(routes.keys())}"
def test_recent_tests_limits_to_10():
"""Recent tests endpoint limits to 10 results."""
from app.routers.metrics import recent_tests
source = inspect.getsource(recent_tests)
assert "limit(10)" in source or ".limit(10)" in source, \
"Must limit to 10 recent tests"
assert "created_at" in source, "Must order by created_at"
# ===========================================================================
# 7. test_original_endpoints_still_work
# ===========================================================================
def test_summary_endpoint_exists():
"""GET /metrics/summary (original) endpoint still exists."""
routes = _get_route_paths()
found = any("summary" in k and "GET" in k for k in routes)
assert found, f"GET /metrics/summary not found. Routes: {list(routes.keys())}"
def test_by_tactic_endpoint_exists():
"""GET /metrics/by-tactic (original) endpoint still exists."""
routes = _get_route_paths()
found = any("by-tactic" in k and "GET" in k for k in routes)
assert found, f"GET /metrics/by-tactic not found. Routes: {list(routes.keys())}"
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-127 Validation: Metrics V2 Tests")
print("=" * 55)
test_pipeline_metrics_endpoint_exists()
test_pipeline_metrics_queries_all_states()
test_team_activity_endpoint_exists()
test_team_activity_calculates_both_teams()
test_team_activity_red_pending_states()
test_team_activity_blue_pending_states()
test_technique_no_tests_is_not_evaluated()
test_technique_all_validated_detected()
test_technique_all_validated_partially_detected()
test_technique_all_validated_not_detected()
test_technique_mixed_validated_and_in_progress()
test_technique_all_in_progress()
test_technique_with_in_review_tests()
test_technique_with_rejected_tests()
test_coverage_correct_after_dual_validation()
test_coverage_partial_when_one_detected_one_partial()
test_validation_rate_endpoint_exists()
test_validation_rate_queries_both_roles()
test_recent_tests_endpoint_exists()
test_recent_tests_limits_to_10()
test_summary_endpoint_exists()
test_by_tactic_endpoint_exists()
print("=" * 55)
print("ALL T-127 validations PASSED!")

View File

@@ -0,0 +1,285 @@
"""T-126: Tests de TestTemplates — CRUD, filters, instantiation, permissions.
Tests the template CRUD endpoints, filter logic, template instantiation,
soft-delete behaviour, and admin-only access control.
Uses mock objects and router inspection to avoid needing a database.
"""
import sys
import os
import uuid
import inspect
from unittest.mock import MagicMock
from types import ModuleType
# ---------------------------------------------------------------------------
# Stub heavy dependencies before importing app modules
# ---------------------------------------------------------------------------
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
if "pydantic_settings" not in sys.modules:
_ps = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs): pass
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
_ps.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = _ps
if "app.config" not in sys.modules:
_cfg = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
_cfg.settings = _FakeSettings()
sys.modules["app.config"] = _cfg
if "app.database" not in sys.modules:
_db = ModuleType("app.database")
_db.Base = type("Base", (), {"metadata": MagicMock()})
_db.get_db = MagicMock()
sys.modules["app.database"] = _db
for _mod in [
"taxii2client", "taxii2client.v20",
"jose", "boto3", "botocore", "botocore.exceptions",
"apscheduler", "apscheduler.schedulers",
"apscheduler.schedulers.background",
"apscheduler.triggers", "apscheduler.triggers.cron",
]:
if _mod not in sys.modules:
m = ModuleType(_mod)
if _mod == "taxii2client.v20": m.Server = MagicMock
elif _mod == "jose": m.JWTError = Exception; m.jwt = MagicMock()
elif _mod == "boto3": m.client = MagicMock()
elif _mod == "botocore.exceptions": m.ClientError = Exception
elif _mod == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock
elif _mod == "apscheduler.triggers.cron": m.CronTrigger = MagicMock
sys.modules[_mod] = m
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
from app.routers.test_templates import (
router,
list_templates,
templates_by_technique,
create_template,
delete_template,
toggle_template_active,
template_stats,
)
from app.routers.tests import create_test_from_template
from app.schemas.test_template import TestTemplateCreate
def _get_route_paths():
routes = {}
for route in router.routes:
path = getattr(route, "path", "")
methods = getattr(route, "methods", set())
for method in methods:
routes[f"{method} {path}"] = route
return routes
# ===========================================================================
# 1. test_create_template — admin can create a template
# ===========================================================================
def test_create_template():
"""Admin can create a template — endpoint exists and requires admin role."""
routes = _get_route_paths()
found = any("POST" in k and "{template_id}" not in k for k in routes)
assert found, f"POST /test-templates not found. Routes: {list(routes.keys())}"
# Verify admin role is required
source = inspect.getsource(create_template)
assert "require_role" in source and "admin" in source, \
"create_template must require admin role"
# ===========================================================================
# 2. test_list_templates_with_filters — source, platform, severity work
# ===========================================================================
def test_list_templates_with_filters():
"""Filters of source, platform, severity, search all work."""
source = inspect.getsource(list_templates)
# Verify all filter parameters exist in the function signature
assert "source" in source, "List must accept source filter"
assert "platform" in source, "List must accept platform filter"
assert "severity" in source, "List must accept severity filter"
assert "search" in source, "List must accept search filter"
assert "mitre_technique_id" in source, "List must accept mitre_technique_id filter"
# Verify ilike is used for search
assert "ilike" in source, "Search should use ilike for case-insensitive matching"
# ===========================================================================
# 3. test_get_templates_by_technique — filter by MITRE technique
# ===========================================================================
def test_get_templates_by_technique():
"""Endpoint to get templates by technique exists and filters correctly."""
routes = _get_route_paths()
found = any("by-technique" in k and "GET" in k for k in routes)
assert found, f"GET /test-templates/by-technique/{{mitre_id}} not found. Routes: {list(routes.keys())}"
source = inspect.getsource(templates_by_technique)
assert "mitre_technique_id" in source, "Must filter by mitre_technique_id"
assert "is_active" in source, "Must filter only active templates"
# ===========================================================================
# 4. test_instantiate_template — create test from template pre-fills fields
# ===========================================================================
def test_instantiate_template():
"""POST /tests/from-template creates a test pre-filled from template data."""
source = inspect.getsource(create_test_from_template)
# Verify it reads from template and copies fields
assert "template" in source, "Must reference template"
assert "template.name" in source, "Must copy name from template"
assert "template.description" in source, "Must copy description from template"
assert "template.platform" in source, "Must copy platform from template"
assert "template.attack_procedure" in source or "attack_procedure" in source, \
"Must copy attack_procedure from template"
# Verify state is set to draft
assert "draft" in source, "New test from template must be in draft state"
# ===========================================================================
# 5. test_soft_delete_template — deactivation doesn't physically remove
# ===========================================================================
def test_soft_delete_template():
"""DELETE endpoint sets is_active=False instead of removing the record."""
source = inspect.getsource(delete_template)
assert "is_active" in source, "Must set is_active"
assert "False" in source, "Must set is_active to False"
# Should NOT call db.delete(template)
assert "db.delete" not in source, "Should NOT physically delete the template"
assert "deactivated" in source.lower() or "soft" in source.lower() or "detail" in source.lower(), \
"Should return a deactivation message"
# ===========================================================================
# 6. test_non_admin_cannot_create_template — only admin role
# ===========================================================================
def test_non_admin_cannot_create_template():
"""Only admin can create templates — enforce via require_role."""
source = inspect.getsource(create_template)
assert 'require_role("admin")' in source, \
"create_template must use require_role('admin')"
# Also check update and delete
from app.routers.test_templates import update_template
source_update = inspect.getsource(update_template)
assert 'require_role("admin")' in source_update, \
"update_template must use require_role('admin')"
source_delete = inspect.getsource(delete_template)
assert 'require_role("admin")' in source_delete, \
"delete_template must use require_role('admin')"
# ===========================================================================
# 7. test_toggle_active_endpoint — toggle between active/inactive
# ===========================================================================
def test_toggle_active_endpoint():
"""PATCH /test-templates/{id}/toggle-active exists and toggles is_active."""
routes = _get_route_paths()
found = any("toggle-active" in k and "PATCH" in k for k in routes)
assert found, f"PATCH /test-templates/{{id}}/toggle-active not found. Routes: {list(routes.keys())}"
source = inspect.getsource(toggle_template_active)
assert "is_active" in source, "Must reference is_active"
assert "not" in source, "Must toggle (negate) the is_active value"
assert 'require_role("admin")' in source, "Must require admin role"
# ===========================================================================
# 8. test_stats_endpoint — catalog statistics
# ===========================================================================
def test_stats_endpoint():
"""GET /test-templates/stats returns catalog statistics."""
routes = _get_route_paths()
found = any("stats" in k and "GET" in k for k in routes)
assert found, f"GET /test-templates/stats not found. Routes: {list(routes.keys())}"
source = inspect.getsource(template_stats)
assert "by_source" in source, "Must return breakdown by source"
assert "by_platform" in source, "Must return breakdown by platform"
assert "active" in source, "Must return active count"
assert 'require_role("admin")' in source, "Must require admin role"
# ===========================================================================
# 9. test_list_only_active_by_default — list filters inactive templates
# ===========================================================================
def test_list_only_active_by_default():
"""The list endpoint filters to is_active=True by default."""
source = inspect.getsource(list_templates)
assert "is_active" in source and "True" in source, \
"List must filter by is_active == True by default"
# ===========================================================================
# 10. test_pagination_support
# ===========================================================================
def test_pagination_support():
"""List endpoint supports offset and limit pagination."""
source = inspect.getsource(list_templates)
assert "offset" in source, "Must accept offset parameter"
assert "limit" in source, "Must accept limit parameter"
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-126 Validation: TestTemplates CRUD Tests")
print("=" * 55)
test_create_template()
test_list_templates_with_filters()
test_get_templates_by_technique()
test_instantiate_template()
test_soft_delete_template()
test_non_admin_cannot_create_template()
test_toggle_active_endpoint()
test_stats_endpoint()
test_list_only_active_by_default()
test_pagination_support()
print("=" * 55)
print("ALL T-126 validations PASSED!")

View File

@@ -0,0 +1,565 @@
"""T-125: Tests del flujo de trabajo Red/Blue.
Comprehensive tests covering the full test lifecycle:
draft -> red_executing -> blue_evaluating -> in_review -> validated/rejected
Uses mock objects to test the workflow service and router logic
without requiring a running database.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock, patch
from types import ModuleType
from datetime import datetime
# ---------------------------------------------------------------------------
# Stub heavy dependencies before importing app modules
# ---------------------------------------------------------------------------
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
if "pydantic_settings" not in sys.modules:
_ps = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs): pass
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
_ps.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = _ps
if "app.config" not in sys.modules:
_cfg = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
_cfg.settings = _FakeSettings()
sys.modules["app.config"] = _cfg
if "app.database" not in sys.modules:
_db = ModuleType("app.database")
_db.Base = type("Base", (), {"metadata": MagicMock()})
_db.get_db = MagicMock()
sys.modules["app.database"] = _db
for _mod in [
"taxii2client", "taxii2client.v20",
"jose", "boto3", "botocore", "botocore.exceptions",
"apscheduler", "apscheduler.schedulers",
"apscheduler.schedulers.background",
"apscheduler.triggers", "apscheduler.triggers.cron",
]:
if _mod not in sys.modules:
m = ModuleType(_mod)
if _mod == "taxii2client.v20": m.Server = MagicMock
elif _mod == "jose": m.JWTError = Exception; m.jwt = MagicMock()
elif _mod == "boto3": m.client = MagicMock()
elif _mod == "botocore.exceptions": m.ClientError = Exception
elif _mod == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock
elif _mod == "apscheduler.triggers.cron": m.CronTrigger = MagicMock
sys.modules[_mod] = m
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
from fastapi import HTTPException
from app.models.enums import TestState, TestResult
from app.services.test_workflow_service import (
VALID_TRANSITIONS,
can_transition,
transition_state,
start_execution,
submit_red_evidence,
submit_blue_evidence,
validate_as_red_lead,
validate_as_blue_lead,
check_dual_validation,
reopen_test,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_test(state: TestState = TestState.draft, **kwargs) -> MagicMock:
t = MagicMock()
t.id = uuid.uuid4()
t.name = "Test Security Check"
t.technique_id = uuid.uuid4()
t.state = state
t.red_validation_status = kwargs.get("red_validation_status", None)
t.blue_validation_status = kwargs.get("blue_validation_status", None)
t.red_validated_by = kwargs.get("red_validated_by", None)
t.red_validated_at = kwargs.get("red_validated_at", None)
t.red_validation_notes = kwargs.get("red_validation_notes", None)
t.blue_validated_by = kwargs.get("blue_validated_by", None)
t.blue_validated_at = kwargs.get("blue_validated_at", None)
t.blue_validation_notes = kwargs.get("blue_validation_notes", None)
t.execution_date = kwargs.get("execution_date", None)
return t
def _make_user(role: str = "red_tech") -> MagicMock:
user = MagicMock()
user.id = uuid.uuid4()
user.role = role
return user
def _make_db() -> MagicMock:
return MagicMock()
# ===========================================================================
# 1. test_full_happy_path
# draft -> red_executing -> blue_evaluating -> in_review -> validated
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_full_happy_path(mock_log):
"""draft -> red_executing -> blue_evaluating -> in_review -> validated"""
test = _make_test(TestState.draft)
red_tech = _make_user("red_tech")
blue_tech = _make_user("blue_tech")
red_lead = _make_user("red_lead")
blue_lead = _make_user("blue_lead")
db = _make_db()
# Step 1: draft -> red_executing
result = start_execution(db, test, red_tech)
assert result.state == TestState.red_executing
assert result.execution_date is not None
# Step 2: red_executing -> blue_evaluating
result = submit_red_evidence(db, result, red_tech)
assert result.state == TestState.blue_evaluating
# Step 3: blue_evaluating -> in_review
result = submit_blue_evidence(db, result, blue_tech)
assert result.state == TestState.in_review
# Step 4: Red Lead approves
result = validate_as_red_lead(db, result, red_lead, "approved", "Attack well documented")
assert result.red_validation_status == "approved"
assert result.red_validated_by == red_lead.id
assert result.red_validated_at is not None
assert result.red_validation_notes == "Attack well documented"
# Still in_review (waiting for blue lead)
assert result.state == TestState.in_review
# Step 5: Blue Lead approves -> validated
result = validate_as_blue_lead(db, result, blue_lead, "approved", "Detection confirmed")
assert result.blue_validation_status == "approved"
assert result.state == TestState.validated
# Verify audit logs were generated at each step
assert mock_log.call_count >= 5
# ===========================================================================
# 2. test_rejection_and_reopen
# in_review -> rejected -> draft -> red_executing -> ...
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_rejection_and_reopen(mock_log):
"""in_review -> rejected -> draft -> red_executing -> ..."""
test = _make_test(TestState.draft)
red_tech = _make_user("red_tech")
blue_tech = _make_user("blue_tech")
red_lead = _make_user("red_lead")
db = _make_db()
# Advance to in_review
start_execution(db, test, red_tech)
submit_red_evidence(db, test, red_tech)
submit_blue_evidence(db, test, blue_tech)
assert test.state == TestState.in_review
# Red Lead rejects -> rejected
validate_as_red_lead(db, test, red_lead, "rejected", "Need more evidence")
assert test.state == TestState.rejected
# Reopen -> draft
reopen_test(db, test, red_lead)
assert test.state == TestState.draft
# Restart the cycle
start_execution(db, test, red_tech)
assert test.state == TestState.red_executing
# ===========================================================================
# 3. test_invalid_transitions
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_invalid_transitions(mock_log):
"""Verify that invalid state transitions raise HTTPException."""
db = _make_db()
user = _make_user("admin")
# draft -> validated (should fail)
test = _make_test(TestState.draft)
try:
transition_state(db, test, TestState.validated, user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# draft -> blue_evaluating (should fail)
test = _make_test(TestState.draft)
try:
transition_state(db, test, TestState.blue_evaluating, user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# red_executing -> in_review (should fail, must go through blue_evaluating)
test = _make_test(TestState.red_executing)
try:
transition_state(db, test, TestState.in_review, user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# validated -> anything (terminal state)
test = _make_test(TestState.validated)
try:
transition_state(db, test, TestState.draft, user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# rejected -> red_executing (must go through draft first)
test = _make_test(TestState.rejected)
try:
transition_state(db, test, TestState.red_executing, user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# ===========================================================================
# 4. test_red_tech_cannot_access_blue_phase
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_red_tech_cannot_access_blue_phase(mock_log):
"""Red tech cannot submit blue evidence (wrong transition from wrong state)."""
db = _make_db()
red_tech = _make_user("red_tech")
# A test in red_executing cannot jump to in_review
test = _make_test(TestState.red_executing)
try:
submit_blue_evidence(db, test, red_tech)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# Red tech cannot validate (test must be in blue_evaluating for submit_blue)
test2 = _make_test(TestState.draft)
try:
submit_blue_evidence(db, test2, red_tech)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# ===========================================================================
# 5. test_blue_tech_cannot_access_red_phase
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_blue_tech_cannot_access_red_phase(mock_log):
"""Blue tech cannot start execution or submit red evidence."""
db = _make_db()
blue_tech = _make_user("blue_tech")
# Blue tech cannot start execution (test must be in draft -> red_executing)
# The workflow service doesn't check role, but the router does.
# At service level, blue_evaluating -> blue_evaluating is invalid transition:
test = _make_test(TestState.blue_evaluating)
try:
start_execution(db, test, blue_tech)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# Blue tech cannot submit red evidence on a draft test
test2 = _make_test(TestState.draft)
try:
submit_red_evidence(db, test2, blue_tech)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# ===========================================================================
# 6. test_dual_validation_both_approve
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_both_approve(mock_log):
"""Both managers approve -> test becomes validated."""
test = _make_test(TestState.in_review)
red_lead = _make_user("red_lead")
blue_lead = _make_user("blue_lead")
db = _make_db()
# Red Lead approves first
validate_as_red_lead(db, test, red_lead, "approved", "LGTM")
assert test.red_validation_status == "approved"
# Not yet validated — waiting for blue
assert test.state == TestState.in_review
# Blue Lead approves
validate_as_blue_lead(db, test, blue_lead, "approved", "Detection verified")
assert test.blue_validation_status == "approved"
assert test.state == TestState.validated
# ===========================================================================
# 7. test_dual_validation_one_rejects
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_one_rejects(mock_log):
"""One manager rejects -> test becomes rejected immediately."""
test = _make_test(TestState.in_review)
red_lead = _make_user("red_lead")
db = _make_db()
validate_as_red_lead(db, test, red_lead, "rejected", "Insufficient evidence")
assert test.red_validation_status == "rejected"
assert test.state == TestState.rejected
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_blue_rejects_first(mock_log):
"""Blue Lead rejects first -> test becomes rejected immediately."""
test = _make_test(TestState.in_review)
blue_lead = _make_user("blue_lead")
db = _make_db()
validate_as_blue_lead(db, test, blue_lead, "rejected", "Detection not adequate")
assert test.blue_validation_status == "rejected"
assert test.state == TestState.rejected
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_red_approves_blue_rejects(mock_log):
"""Red approves, then blue rejects -> rejected."""
test = _make_test(TestState.in_review)
red_lead = _make_user("red_lead")
blue_lead = _make_user("blue_lead")
db = _make_db()
validate_as_red_lead(db, test, red_lead, "approved", "Good attack")
assert test.state == TestState.in_review # waiting for blue
validate_as_blue_lead(db, test, blue_lead, "rejected", "Bad detection")
assert test.state == TestState.rejected
# ===========================================================================
# 8. test_evidence_team_separation
# ===========================================================================
def test_evidence_team_separation():
"""Verify evidence router logic separates red and blue evidence correctly."""
from app.routers.evidence import _validate_upload_permission, _RED_EDITABLE_STATES, _BLUE_EDITABLE_STATES
# Red tech can upload red evidence in draft
test = _make_test(TestState.draft)
red_user = _make_user("red_tech")
red_user.role = "red_tech"
from app.models.enums import TeamSide
_validate_upload_permission(test, TeamSide.red, red_user) # should not raise
# Red tech can upload red evidence in red_executing
test.state = TestState.red_executing
_validate_upload_permission(test, TeamSide.red, red_user) # should not raise
# Red tech CANNOT upload red evidence in blue_evaluating
test.state = TestState.blue_evaluating
try:
_validate_upload_permission(test, TeamSide.red, red_user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# Red tech CANNOT upload blue evidence
test.state = TestState.blue_evaluating
try:
_validate_upload_permission(test, TeamSide.blue, red_user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 403
# Blue tech can upload blue evidence in blue_evaluating
test.state = TestState.blue_evaluating
blue_user = _make_user("blue_tech")
blue_user.role = "blue_tech"
_validate_upload_permission(test, TeamSide.blue, blue_user) # should not raise
# Blue tech CANNOT upload blue evidence in draft
test.state = TestState.draft
try:
_validate_upload_permission(test, TeamSide.blue, blue_user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 400
# Blue tech CANNOT upload red evidence
test.state = TestState.draft
try:
_validate_upload_permission(test, TeamSide.red, blue_user)
assert False, "Should have raised HTTPException"
except HTTPException as exc:
assert exc.status_code == 403
# ===========================================================================
# 9. test_red_edit_allowed_in_draft_and_red_executing
# ===========================================================================
def test_red_edit_allowed_in_draft_and_red_executing():
"""Verify the red update router checks that state is draft or red_executing."""
from app.routers.tests import update_test_red
import inspect
source = inspect.getsource(update_test_red)
# The function must guard against states other than draft/red_executing
assert "draft" in source, "Red update must allow draft state"
assert "red_executing" in source, "Red update must allow red_executing state"
assert "400" in source or "HTTP_400_BAD_REQUEST" in source, "Red update must return 400 for invalid state"
# ===========================================================================
# 10. test_reopen_clears_validation_fields
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_reopen_clears_validation_fields(mock_log):
"""Reopen clears all red/blue validation status, notes, timestamps."""
test = _make_test(
TestState.rejected,
red_validation_status="rejected",
red_validated_by=uuid.uuid4(),
red_validated_at=datetime.utcnow(),
red_validation_notes="Bad attack",
blue_validation_status="approved",
blue_validated_by=uuid.uuid4(),
blue_validated_at=datetime.utcnow(),
blue_validation_notes="Good detection",
)
user = _make_user("red_lead")
db = _make_db()
result = reopen_test(db, test, user)
assert result.state == TestState.draft
assert result.red_validation_status is None
assert result.red_validated_by is None
assert result.red_validated_at is None
assert result.red_validation_notes is None
assert result.blue_validation_status is None
assert result.blue_validated_by is None
assert result.blue_validated_at is None
assert result.blue_validation_notes is None
db.commit.assert_called()
# ===========================================================================
# 11. test_cannot_validate_outside_in_review
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_cannot_validate_outside_in_review(mock_log):
"""Managers cannot validate a test that is not in in_review state."""
db = _make_db()
red_lead = _make_user("red_lead")
blue_lead = _make_user("blue_lead")
for state in [TestState.draft, TestState.red_executing, TestState.blue_evaluating, TestState.validated, TestState.rejected]:
test = _make_test(state)
try:
validate_as_red_lead(db, test, red_lead, "approved", "OK")
assert False, f"Red Lead should not validate in {state.value}"
except HTTPException as exc:
assert exc.status_code == 400
test2 = _make_test(state)
try:
validate_as_blue_lead(db, test2, blue_lead, "approved", "OK")
assert False, f"Blue Lead should not validate in {state.value}"
except HTTPException as exc:
assert exc.status_code == 400
# ===========================================================================
# 12. test_cannot_reopen_non_rejected_test
# ===========================================================================
@patch("app.services.test_workflow_service.log_action")
def test_cannot_reopen_non_rejected_test(mock_log):
"""Reopen only works on rejected tests."""
db = _make_db()
user = _make_user("red_lead")
for state in [TestState.draft, TestState.red_executing, TestState.blue_evaluating, TestState.in_review, TestState.validated]:
test = _make_test(state)
try:
reopen_test(db, test, user)
assert False, f"Should not reopen from {state.value}"
except HTTPException as exc:
assert exc.status_code == 400
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-125 Validation: Workflow Tests")
print("=" * 55)
test_full_happy_path()
test_rejection_and_reopen()
test_invalid_transitions()
test_red_tech_cannot_access_blue_phase()
test_blue_tech_cannot_access_red_phase()
test_dual_validation_both_approve()
test_dual_validation_one_rejects()
test_dual_validation_blue_rejects_first()
test_dual_validation_red_approves_blue_rejects()
test_evidence_team_separation()
test_red_edit_allowed_in_draft_and_red_executing()
test_reopen_clears_validation_fields()
test_cannot_validate_outside_in_review()
test_cannot_reopen_non_rejected_test()
print("=" * 55)
print("ALL T-125 validations PASSED!")