Files
Aegis/backend/tests/test_t106_workflow_service.py
Kitos 9e204b78ec test: add TestEntity tests and fix test infrastructure (222 green)
- Add test_test_entity.py with 46 pure unit tests covering the full domain entity

- Fix _FakeSettings in 11 test files (REPORT_TEMPLATES_DIR, JIRA, TEMPO)

- Fix stale db.commit assertions to db.flush after UoW refactor

- Add missing mock fields for TestEntity.from_orm compatibility

- Make database.py skip pool args for SQLite in test environment

- Disable slowapi rate limiter in test client fixture

- Inject test engine into app.database to fix threading errors

- Update role assertions to match current require_any_role policy

- Mark 6 legacy V1 endpoint tests as xfail (replaced by V2 workflow)
2026-02-18 15:29:24 +01:00

378 lines
12 KiB
Python

"""Validation tests for T-106: Test Workflow Service.
Uses mock objects to avoid needing a running database.
The database module is stubbed before any app imports.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock, patch
from types import ModuleType
from datetime import datetime
# ---------------------------------------------------------------------------
# 0. Stub heavy dependencies BEFORE importing any app modules
# ---------------------------------------------------------------------------
# Ensure backend/ is on sys.path
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
# Stub pydantic_settings so config doesn't fail
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs):
pass
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
# Stub app.config
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
REDIS_URL = "redis://localhost:6379/0"
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
MINIO_SECURE = False
MAX_RETEST_COUNT = 3
CORS_ORIGINS = "http://localhost:3000"
SCORING_WEIGHT_TESTS = 40
SCORING_WEIGHT_DETECTION_RULES = 20
SCORING_WEIGHT_D3FEND = 15
SCORING_WEIGHT_FRESHNESS = 15
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
REPORT_TEMPLATES_DIR = "app/templates/reports"
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
COMPANY_NAME = "Test Org"
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
JIRA_ENABLED = False
JIRA_URL = ""
JIRA_USERNAME = ""
JIRA_API_TOKEN = ""
JIRA_IS_CLOUD = True
JIRA_DEFAULT_PROJECT = ""
JIRA_ISSUE_TYPE_TEST = "Task"
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
TEMPO_ENABLED = False
TEMPO_API_TOKEN = ""
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
NVD_API_KEY = ""
STALE_THRESHOLD_DAYS = 365
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
# Stub app.database so no real engine is created
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
sys.modules["app.database"] = db_mod
# Stub taxii2client
taxii_v20 = ModuleType("taxii2client.v20")
taxii_v20.Server = MagicMock
sys.modules["taxii2client"] = ModuleType("taxii2client")
sys.modules["taxii2client.v20"] = taxii_v20
# Stub jose
jose_mod = ModuleType("jose")
jose_mod.JWTError = Exception
jose_mod.jwt = MagicMock()
sys.modules["jose"] = jose_mod
# Stub boto3
boto3_mod = ModuleType("boto3")
boto3_mod.client = MagicMock()
sys.modules["boto3"] = boto3_mod
sys.modules["botocore"] = ModuleType("botocore")
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
sys.modules["botocore.exceptions"].ClientError = Exception
# Stub apscheduler
sys.modules["apscheduler"] = ModuleType("apscheduler")
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
# ---------------------------------------------------------------------------
# Now we can safely import
# ---------------------------------------------------------------------------
from app.models.enums import TestState
from app.services.test_workflow_service import (
VALID_TRANSITIONS,
can_transition,
transition_state,
start_execution,
submit_red_evidence,
submit_blue_evidence,
validate_as_red_lead,
validate_as_blue_lead,
check_dual_validation,
reopen_test,
)
# We need the domain exceptions for assertions
from app.domain.exceptions import InvalidTransitionError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_test(state: TestState = TestState.draft, **kwargs) -> MagicMock:
t = MagicMock()
t.id = uuid.uuid4()
t.name = "Mock Test"
t.technique_id = uuid.uuid4()
t.state = state
t.red_validation_status = kwargs.get("red_validation_status", None)
t.blue_validation_status = kwargs.get("blue_validation_status", None)
t.red_validated_by = None
t.red_validated_at = None
t.red_validation_notes = None
t.blue_validated_by = None
t.blue_validated_at = None
t.blue_validation_notes = None
t.execution_date = None
t.red_started_at = None
t.blue_started_at = None
t.paused_at = None
t.red_paused_seconds = 0
t.blue_paused_seconds = 0
return t
def _make_user(role: str = "red_tech") -> MagicMock:
user = MagicMock()
user.id = uuid.uuid4()
user.role = role
return user
def _make_db() -> MagicMock:
return MagicMock()
# ---------------------------------------------------------------------------
# 1. draft -> red_executing works
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_draft_to_red_executing(mock_log):
test = _make_test(TestState.draft)
user = _make_user("red_tech")
db = _make_db()
result = start_execution(db, test, user)
assert result.state == TestState.red_executing
assert result.execution_date is not None
db.flush.assert_called()
mock_log.assert_called()
print(" [PASS] Transition draft -> red_executing works")
# ---------------------------------------------------------------------------
# 2. draft -> validated fails (not allowed)
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_draft_to_validated_fails(mock_log):
test = _make_test(TestState.draft)
user = _make_user("admin")
db = _make_db()
try:
transition_state(db, test, TestState.validated, user)
assert False, "Should have raised InvalidTransitionError"
except InvalidTransitionError as exc:
assert exc.code == "INVALID_TRANSITION"
assert exc.current_state == "draft"
assert exc.target_state == "validated"
print(" [PASS] Transition draft -> validated correctly fails")
# ---------------------------------------------------------------------------
# 3. red_executing -> blue_evaluating works
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_red_executing_to_blue_evaluating(mock_log):
test = _make_test(TestState.red_executing)
user = _make_user("red_tech")
db = _make_db()
result = submit_red_evidence(db, test, user)
assert result.state == TestState.blue_evaluating
db.flush.assert_called()
mock_log.assert_called()
print(" [PASS] Transition red_executing -> blue_evaluating works")
# ---------------------------------------------------------------------------
# 4. check_dual_validation -> validated when both approved
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_both_approved(mock_log):
test = _make_test(TestState.in_review)
user_red = _make_user("red_lead")
user_blue = _make_user("blue_lead")
db = _make_db()
validate_as_red_lead(db, test, user_red, "approved", "LGTM")
validate_as_blue_lead(db, test, user_blue, "approved", "Detection OK")
assert test.state == TestState.validated
print(" [PASS] check_dual_validation -> validated when both approved")
# ---------------------------------------------------------------------------
# 5. check_dual_validation -> rejected when one rejects
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_one_rejected(mock_log):
test = _make_test(TestState.in_review)
user_red = _make_user("red_lead")
db = _make_db()
validate_as_red_lead(db, test, user_red, "rejected", "Insufficient evidence")
assert test.state == TestState.rejected
print(" [PASS] check_dual_validation -> rejected when one rejects")
# ---------------------------------------------------------------------------
# 6. reopen_test clears validation fields
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_reopen_clears_validation(mock_log):
test = _make_test(
TestState.rejected,
red_validation_status="rejected",
blue_validation_status="approved",
)
user = _make_user("red_lead")
db = _make_db()
result = reopen_test(db, test, user)
assert result.state == TestState.draft
assert result.red_validation_status is None
assert result.blue_validation_status is None
assert result.red_validated_by is None
assert result.red_validated_at is None
assert result.red_validation_notes is None
assert result.blue_validated_by is None
assert result.blue_validated_at is None
assert result.blue_validation_notes is None
db.flush.assert_called()
print(" [PASS] reopen_test clears validation fields and moves to draft")
# ---------------------------------------------------------------------------
# 7. Every transition generates an audit log
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_transitions_generate_audit_logs(mock_log):
test = _make_test(TestState.draft)
user = _make_user("red_tech")
db = _make_db()
start_execution(db, test, user)
assert mock_log.call_count >= 1
c1 = mock_log.call_count
submit_red_evidence(db, test, user)
assert mock_log.call_count > c1
c2 = mock_log.call_count
submit_blue_evidence(db, test, user)
assert mock_log.call_count > c2
print(" [PASS] Each transition generates an audit log")
# ---------------------------------------------------------------------------
# 8. can_transition correctness
# ---------------------------------------------------------------------------
def test_can_transition_map():
test = _make_test(TestState.draft)
assert can_transition(test, TestState.red_executing) is True
assert can_transition(test, TestState.validated) is False
assert can_transition(test, TestState.blue_evaluating) is False
test.state = TestState.red_executing
assert can_transition(test, TestState.blue_evaluating) is True
assert can_transition(test, TestState.draft) is False
test.state = TestState.blue_evaluating
assert can_transition(test, TestState.in_review) is True
test.state = TestState.in_review
assert can_transition(test, TestState.validated) is True
assert can_transition(test, TestState.rejected) is True
assert can_transition(test, TestState.draft) is False
test.state = TestState.rejected
assert can_transition(test, TestState.draft) is True
test.state = TestState.validated
assert can_transition(test, TestState.draft) is False
assert can_transition(test, TestState.rejected) is False
print(" [PASS] can_transition map is correct")
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-106 Validation: Test Workflow Service")
print("=" * 50)
test_draft_to_red_executing()
test_draft_to_validated_fails()
test_red_executing_to_blue_evaluating()
test_dual_validation_both_approved()
test_dual_validation_one_rejected()
test_reopen_clears_validation()
test_transitions_generate_audit_logs()
test_can_transition_map()
print("=" * 50)
print("ALL T-106 validations PASSED!")