- Add test_test_entity.py with 46 pure unit tests covering the full domain entity - Fix _FakeSettings in 11 test files (REPORT_TEMPLATES_DIR, JIRA, TEMPO) - Fix stale db.commit assertions to db.flush after UoW refactor - Add missing mock fields for TestEntity.from_orm compatibility - Make database.py skip pool args for SQLite in test environment - Disable slowapi rate limiter in test client fixture - Inject test engine into app.database to fix threading errors - Update role assertions to match current require_any_role policy - Mark 6 legacy V1 endpoint tests as xfail (replaced by V2 workflow)
378 lines
12 KiB
Python
378 lines
12 KiB
Python
"""Validation tests for T-106: Test Workflow Service.
|
|
|
|
Uses mock objects to avoid needing a running database.
|
|
The database module is stubbed before any app imports.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import uuid
|
|
from unittest.mock import MagicMock, patch
|
|
from types import ModuleType
|
|
from datetime import datetime
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 0. Stub heavy dependencies BEFORE importing any app modules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Ensure backend/ is on sys.path
|
|
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if backend_dir not in sys.path:
|
|
sys.path.insert(0, backend_dir)
|
|
|
|
# Stub pydantic_settings so config doesn't fail
|
|
if "pydantic_settings" not in sys.modules:
|
|
pydantic_settings_mock = ModuleType("pydantic_settings")
|
|
|
|
class _BaseSettings:
|
|
def __init__(self, **kwargs):
|
|
pass
|
|
|
|
def __init_subclass__(cls, **kwargs):
|
|
super().__init_subclass__(**kwargs)
|
|
|
|
pydantic_settings_mock.BaseSettings = _BaseSettings
|
|
sys.modules["pydantic_settings"] = pydantic_settings_mock
|
|
|
|
# Stub app.config
|
|
config_mod = ModuleType("app.config")
|
|
|
|
|
|
class _FakeSettings:
|
|
DATABASE_URL = "sqlite:///:memory:"
|
|
SECRET_KEY = "test"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
REDIS_URL = "redis://localhost:6379/0"
|
|
MINIO_ENDPOINT = "localhost:9000"
|
|
MINIO_ACCESS_KEY = "test"
|
|
MINIO_SECRET_KEY = "test"
|
|
MINIO_BUCKET = "test"
|
|
MINIO_SECURE = False
|
|
MAX_RETEST_COUNT = 3
|
|
CORS_ORIGINS = "http://localhost:3000"
|
|
SCORING_WEIGHT_TESTS = 40
|
|
SCORING_WEIGHT_DETECTION_RULES = 20
|
|
SCORING_WEIGHT_D3FEND = 15
|
|
SCORING_WEIGHT_FRESHNESS = 15
|
|
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
|
|
REPORT_TEMPLATES_DIR = "app/templates/reports"
|
|
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
|
|
COMPANY_NAME = "Test Org"
|
|
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
|
|
JIRA_ENABLED = False
|
|
JIRA_URL = ""
|
|
JIRA_USERNAME = ""
|
|
JIRA_API_TOKEN = ""
|
|
JIRA_IS_CLOUD = True
|
|
JIRA_DEFAULT_PROJECT = ""
|
|
JIRA_ISSUE_TYPE_TEST = "Task"
|
|
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
|
|
TEMPO_ENABLED = False
|
|
TEMPO_API_TOKEN = ""
|
|
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
|
|
NVD_API_KEY = ""
|
|
STALE_THRESHOLD_DAYS = 365
|
|
|
|
|
|
config_mod.settings = _FakeSettings()
|
|
sys.modules["app.config"] = config_mod
|
|
|
|
# Stub app.database so no real engine is created
|
|
db_mod = ModuleType("app.database")
|
|
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
|
|
db_mod.get_db = MagicMock()
|
|
sys.modules["app.database"] = db_mod
|
|
|
|
# Stub taxii2client
|
|
taxii_v20 = ModuleType("taxii2client.v20")
|
|
taxii_v20.Server = MagicMock
|
|
sys.modules["taxii2client"] = ModuleType("taxii2client")
|
|
sys.modules["taxii2client.v20"] = taxii_v20
|
|
|
|
# Stub jose
|
|
jose_mod = ModuleType("jose")
|
|
jose_mod.JWTError = Exception
|
|
jose_mod.jwt = MagicMock()
|
|
sys.modules["jose"] = jose_mod
|
|
|
|
# Stub boto3
|
|
boto3_mod = ModuleType("boto3")
|
|
boto3_mod.client = MagicMock()
|
|
sys.modules["boto3"] = boto3_mod
|
|
sys.modules["botocore"] = ModuleType("botocore")
|
|
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
|
|
sys.modules["botocore.exceptions"].ClientError = Exception
|
|
|
|
# Stub apscheduler
|
|
sys.modules["apscheduler"] = ModuleType("apscheduler")
|
|
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
|
|
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
|
|
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
|
|
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
|
|
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
|
|
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Now we can safely import
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from app.models.enums import TestState
|
|
from app.services.test_workflow_service import (
|
|
VALID_TRANSITIONS,
|
|
can_transition,
|
|
transition_state,
|
|
start_execution,
|
|
submit_red_evidence,
|
|
submit_blue_evidence,
|
|
validate_as_red_lead,
|
|
validate_as_blue_lead,
|
|
check_dual_validation,
|
|
reopen_test,
|
|
)
|
|
|
|
# We need the domain exceptions for assertions
|
|
from app.domain.exceptions import InvalidTransitionError
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_test(state: TestState = TestState.draft, **kwargs) -> MagicMock:
|
|
t = MagicMock()
|
|
t.id = uuid.uuid4()
|
|
t.name = "Mock Test"
|
|
t.technique_id = uuid.uuid4()
|
|
t.state = state
|
|
t.red_validation_status = kwargs.get("red_validation_status", None)
|
|
t.blue_validation_status = kwargs.get("blue_validation_status", None)
|
|
t.red_validated_by = None
|
|
t.red_validated_at = None
|
|
t.red_validation_notes = None
|
|
t.blue_validated_by = None
|
|
t.blue_validated_at = None
|
|
t.blue_validation_notes = None
|
|
t.execution_date = None
|
|
t.red_started_at = None
|
|
t.blue_started_at = None
|
|
t.paused_at = None
|
|
t.red_paused_seconds = 0
|
|
t.blue_paused_seconds = 0
|
|
return t
|
|
|
|
|
|
def _make_user(role: str = "red_tech") -> MagicMock:
|
|
user = MagicMock()
|
|
user.id = uuid.uuid4()
|
|
user.role = role
|
|
return user
|
|
|
|
|
|
def _make_db() -> MagicMock:
|
|
return MagicMock()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. draft -> red_executing works
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("app.services.test_workflow_service.log_action")
|
|
def test_draft_to_red_executing(mock_log):
|
|
test = _make_test(TestState.draft)
|
|
user = _make_user("red_tech")
|
|
db = _make_db()
|
|
|
|
result = start_execution(db, test, user)
|
|
|
|
assert result.state == TestState.red_executing
|
|
assert result.execution_date is not None
|
|
db.flush.assert_called()
|
|
mock_log.assert_called()
|
|
print(" [PASS] Transition draft -> red_executing works")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. draft -> validated fails (not allowed)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("app.services.test_workflow_service.log_action")
|
|
def test_draft_to_validated_fails(mock_log):
|
|
test = _make_test(TestState.draft)
|
|
user = _make_user("admin")
|
|
db = _make_db()
|
|
|
|
try:
|
|
transition_state(db, test, TestState.validated, user)
|
|
assert False, "Should have raised InvalidTransitionError"
|
|
except InvalidTransitionError as exc:
|
|
assert exc.code == "INVALID_TRANSITION"
|
|
assert exc.current_state == "draft"
|
|
assert exc.target_state == "validated"
|
|
print(" [PASS] Transition draft -> validated correctly fails")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. red_executing -> blue_evaluating works
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("app.services.test_workflow_service.log_action")
|
|
def test_red_executing_to_blue_evaluating(mock_log):
|
|
test = _make_test(TestState.red_executing)
|
|
user = _make_user("red_tech")
|
|
db = _make_db()
|
|
|
|
result = submit_red_evidence(db, test, user)
|
|
|
|
assert result.state == TestState.blue_evaluating
|
|
db.flush.assert_called()
|
|
mock_log.assert_called()
|
|
print(" [PASS] Transition red_executing -> blue_evaluating works")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. check_dual_validation -> validated when both approved
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("app.services.test_workflow_service.log_action")
|
|
def test_dual_validation_both_approved(mock_log):
|
|
test = _make_test(TestState.in_review)
|
|
user_red = _make_user("red_lead")
|
|
user_blue = _make_user("blue_lead")
|
|
db = _make_db()
|
|
|
|
validate_as_red_lead(db, test, user_red, "approved", "LGTM")
|
|
validate_as_blue_lead(db, test, user_blue, "approved", "Detection OK")
|
|
|
|
assert test.state == TestState.validated
|
|
print(" [PASS] check_dual_validation -> validated when both approved")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. check_dual_validation -> rejected when one rejects
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("app.services.test_workflow_service.log_action")
|
|
def test_dual_validation_one_rejected(mock_log):
|
|
test = _make_test(TestState.in_review)
|
|
user_red = _make_user("red_lead")
|
|
db = _make_db()
|
|
|
|
validate_as_red_lead(db, test, user_red, "rejected", "Insufficient evidence")
|
|
|
|
assert test.state == TestState.rejected
|
|
print(" [PASS] check_dual_validation -> rejected when one rejects")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 6. reopen_test clears validation fields
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("app.services.test_workflow_service.log_action")
|
|
def test_reopen_clears_validation(mock_log):
|
|
test = _make_test(
|
|
TestState.rejected,
|
|
red_validation_status="rejected",
|
|
blue_validation_status="approved",
|
|
)
|
|
user = _make_user("red_lead")
|
|
db = _make_db()
|
|
|
|
result = reopen_test(db, test, user)
|
|
|
|
assert result.state == TestState.draft
|
|
assert result.red_validation_status is None
|
|
assert result.blue_validation_status is None
|
|
assert result.red_validated_by is None
|
|
assert result.red_validated_at is None
|
|
assert result.red_validation_notes is None
|
|
assert result.blue_validated_by is None
|
|
assert result.blue_validated_at is None
|
|
assert result.blue_validation_notes is None
|
|
db.flush.assert_called()
|
|
print(" [PASS] reopen_test clears validation fields and moves to draft")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 7. Every transition generates an audit log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("app.services.test_workflow_service.log_action")
|
|
def test_transitions_generate_audit_logs(mock_log):
|
|
test = _make_test(TestState.draft)
|
|
user = _make_user("red_tech")
|
|
db = _make_db()
|
|
|
|
start_execution(db, test, user)
|
|
assert mock_log.call_count >= 1
|
|
c1 = mock_log.call_count
|
|
|
|
submit_red_evidence(db, test, user)
|
|
assert mock_log.call_count > c1
|
|
c2 = mock_log.call_count
|
|
|
|
submit_blue_evidence(db, test, user)
|
|
assert mock_log.call_count > c2
|
|
|
|
print(" [PASS] Each transition generates an audit log")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 8. can_transition correctness
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_can_transition_map():
|
|
test = _make_test(TestState.draft)
|
|
|
|
assert can_transition(test, TestState.red_executing) is True
|
|
assert can_transition(test, TestState.validated) is False
|
|
assert can_transition(test, TestState.blue_evaluating) is False
|
|
|
|
test.state = TestState.red_executing
|
|
assert can_transition(test, TestState.blue_evaluating) is True
|
|
assert can_transition(test, TestState.draft) is False
|
|
|
|
test.state = TestState.blue_evaluating
|
|
assert can_transition(test, TestState.in_review) is True
|
|
|
|
test.state = TestState.in_review
|
|
assert can_transition(test, TestState.validated) is True
|
|
assert can_transition(test, TestState.rejected) is True
|
|
assert can_transition(test, TestState.draft) is False
|
|
|
|
test.state = TestState.rejected
|
|
assert can_transition(test, TestState.draft) is True
|
|
|
|
test.state = TestState.validated
|
|
assert can_transition(test, TestState.draft) is False
|
|
assert can_transition(test, TestState.rejected) is False
|
|
|
|
print(" [PASS] can_transition map is correct")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run all
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
print("T-106 Validation: Test Workflow Service")
|
|
print("=" * 50)
|
|
test_draft_to_red_executing()
|
|
test_draft_to_validated_fails()
|
|
test_red_executing_to_blue_evaluating()
|
|
test_dual_validation_both_approved()
|
|
test_dual_validation_one_rejected()
|
|
test_reopen_clears_validation()
|
|
test_transitions_generate_audit_logs()
|
|
test_can_transition_map()
|
|
print("=" * 50)
|
|
print("ALL T-106 validations PASSED!")
|