Files
Aegis/backend/tests/test_t106_workflow_service.py
Kitos 6d18a5417d
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(phase-34): resolve blocking tech debt — Redis, domain exceptions, indexes, CI
Foundational changes required before any new feature work can begin.

- 0.1 Redis infrastructure: add redis:7-alpine to docker-compose dev and prod,
  REDIS_URL config, singleton client in app/infrastructure/redis_client.py
- 0.2 Token blacklist on Redis SEC-001: replace in-memory dict with Redis SETEX
  keyed by jti, auto-expiring TTL derived from token exp
- 0.3 Database indexes SR-006: Alembic migration b019 with 5 composite indexes
  for scoring, MTTD/MTTR, remediation, and notification queries
- 0.4 Domain exceptions TD-003: app/domain/exceptions.py with typed errors,
  error_handler middleware mapping them to HTTP, services decoupled from FastAPI
- 0.5 Fix silenced exceptions TD-007: replace 4 bare except-pass blocks in
  test_workflow_service with logger.warning with exc_info
- 0.6 CI pipeline TD-009: GitHub Actions workflow with Postgres and Redis
  service containers, ruff lint, pytest; ruff.toml for baseline config
2026-02-17 15:43:05 +01:00

356 lines
12 KiB
Python

"""Validation tests for T-106: Test Workflow Service.
Uses mock objects to avoid needing a running database.
The database module is stubbed before any app imports.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock, patch
from types import ModuleType
from datetime import datetime
# ---------------------------------------------------------------------------
# 0. Stub heavy dependencies BEFORE importing any app modules
# ---------------------------------------------------------------------------
# Ensure backend/ is on sys.path
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
# Stub pydantic_settings so config doesn't fail
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs):
pass
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
# Stub app.config
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
REDIS_URL = "redis://localhost:6379/0"
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
MINIO_SECURE = False
MAX_RETEST_COUNT = 3
CORS_ORIGINS = "http://localhost:3000"
SCORING_WEIGHT_TESTS = 40
SCORING_WEIGHT_DETECTION_RULES = 20
SCORING_WEIGHT_D3FEND = 15
SCORING_WEIGHT_FRESHNESS = 15
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
# Stub app.database so no real engine is created
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
sys.modules["app.database"] = db_mod
# Stub taxii2client
taxii_v20 = ModuleType("taxii2client.v20")
taxii_v20.Server = MagicMock
sys.modules["taxii2client"] = ModuleType("taxii2client")
sys.modules["taxii2client.v20"] = taxii_v20
# Stub jose
jose_mod = ModuleType("jose")
jose_mod.JWTError = Exception
jose_mod.jwt = MagicMock()
sys.modules["jose"] = jose_mod
# Stub boto3
boto3_mod = ModuleType("boto3")
boto3_mod.client = MagicMock()
sys.modules["boto3"] = boto3_mod
sys.modules["botocore"] = ModuleType("botocore")
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
sys.modules["botocore.exceptions"].ClientError = Exception
# Stub apscheduler
sys.modules["apscheduler"] = ModuleType("apscheduler")
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
# ---------------------------------------------------------------------------
# Now we can safely import
# ---------------------------------------------------------------------------
from app.models.enums import TestState
from app.services.test_workflow_service import (
VALID_TRANSITIONS,
can_transition,
transition_state,
start_execution,
submit_red_evidence,
submit_blue_evidence,
validate_as_red_lead,
validate_as_blue_lead,
check_dual_validation,
reopen_test,
)
# We need the domain exceptions for assertions
from app.domain.exceptions import InvalidTransitionError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_test(state: TestState = TestState.draft, **kwargs) -> MagicMock:
t = MagicMock()
t.id = uuid.uuid4()
t.name = "Mock Test"
t.technique_id = uuid.uuid4()
t.state = state
t.red_validation_status = kwargs.get("red_validation_status", None)
t.blue_validation_status = kwargs.get("blue_validation_status", None)
t.red_validated_by = None
t.red_validated_at = None
t.red_validation_notes = None
t.blue_validated_by = None
t.blue_validated_at = None
t.blue_validation_notes = None
t.execution_date = None
return t
def _make_user(role: str = "red_tech") -> MagicMock:
user = MagicMock()
user.id = uuid.uuid4()
user.role = role
return user
def _make_db() -> MagicMock:
return MagicMock()
# ---------------------------------------------------------------------------
# 1. draft -> red_executing works
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_draft_to_red_executing(mock_log):
test = _make_test(TestState.draft)
user = _make_user("red_tech")
db = _make_db()
result = start_execution(db, test, user)
assert result.state == TestState.red_executing
assert result.execution_date is not None
db.commit.assert_called()
mock_log.assert_called()
print(" [PASS] Transition draft -> red_executing works")
# ---------------------------------------------------------------------------
# 2. draft -> validated fails (not allowed)
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_draft_to_validated_fails(mock_log):
test = _make_test(TestState.draft)
user = _make_user("admin")
db = _make_db()
try:
transition_state(db, test, TestState.validated, user)
assert False, "Should have raised InvalidTransitionError"
except InvalidTransitionError as exc:
assert exc.code == "INVALID_TRANSITION"
assert exc.current_state == "draft"
assert exc.target_state == "validated"
print(" [PASS] Transition draft -> validated correctly fails")
# ---------------------------------------------------------------------------
# 3. red_executing -> blue_evaluating works
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_red_executing_to_blue_evaluating(mock_log):
test = _make_test(TestState.red_executing)
user = _make_user("red_tech")
db = _make_db()
result = submit_red_evidence(db, test, user)
assert result.state == TestState.blue_evaluating
db.commit.assert_called()
mock_log.assert_called()
print(" [PASS] Transition red_executing -> blue_evaluating works")
# ---------------------------------------------------------------------------
# 4. check_dual_validation -> validated when both approved
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_both_approved(mock_log):
test = _make_test(TestState.in_review)
user_red = _make_user("red_lead")
user_blue = _make_user("blue_lead")
db = _make_db()
validate_as_red_lead(db, test, user_red, "approved", "LGTM")
validate_as_blue_lead(db, test, user_blue, "approved", "Detection OK")
assert test.state == TestState.validated
print(" [PASS] check_dual_validation -> validated when both approved")
# ---------------------------------------------------------------------------
# 5. check_dual_validation -> rejected when one rejects
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_dual_validation_one_rejected(mock_log):
test = _make_test(TestState.in_review)
user_red = _make_user("red_lead")
db = _make_db()
validate_as_red_lead(db, test, user_red, "rejected", "Insufficient evidence")
assert test.state == TestState.rejected
print(" [PASS] check_dual_validation -> rejected when one rejects")
# ---------------------------------------------------------------------------
# 6. reopen_test clears validation fields
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_reopen_clears_validation(mock_log):
test = _make_test(
TestState.rejected,
red_validation_status="rejected",
blue_validation_status="approved",
)
user = _make_user("red_lead")
db = _make_db()
result = reopen_test(db, test, user)
assert result.state == TestState.draft
assert result.red_validation_status is None
assert result.blue_validation_status is None
assert result.red_validated_by is None
assert result.red_validated_at is None
assert result.red_validation_notes is None
assert result.blue_validated_by is None
assert result.blue_validated_at is None
assert result.blue_validation_notes is None
db.commit.assert_called()
print(" [PASS] reopen_test clears validation fields and moves to draft")
# ---------------------------------------------------------------------------
# 7. Every transition generates an audit log
# ---------------------------------------------------------------------------
@patch("app.services.test_workflow_service.log_action")
def test_transitions_generate_audit_logs(mock_log):
test = _make_test(TestState.draft)
user = _make_user("red_tech")
db = _make_db()
start_execution(db, test, user)
assert mock_log.call_count >= 1
c1 = mock_log.call_count
submit_red_evidence(db, test, user)
assert mock_log.call_count > c1
c2 = mock_log.call_count
submit_blue_evidence(db, test, user)
assert mock_log.call_count > c2
print(" [PASS] Each transition generates an audit log")
# ---------------------------------------------------------------------------
# 8. can_transition correctness
# ---------------------------------------------------------------------------
def test_can_transition_map():
test = _make_test(TestState.draft)
assert can_transition(test, TestState.red_executing) is True
assert can_transition(test, TestState.validated) is False
assert can_transition(test, TestState.blue_evaluating) is False
test.state = TestState.red_executing
assert can_transition(test, TestState.blue_evaluating) is True
assert can_transition(test, TestState.draft) is False
test.state = TestState.blue_evaluating
assert can_transition(test, TestState.in_review) is True
test.state = TestState.in_review
assert can_transition(test, TestState.validated) is True
assert can_transition(test, TestState.rejected) is True
assert can_transition(test, TestState.draft) is False
test.state = TestState.rejected
assert can_transition(test, TestState.draft) is True
test.state = TestState.validated
assert can_transition(test, TestState.draft) is False
assert can_transition(test, TestState.rejected) is False
print(" [PASS] can_transition map is correct")
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-106 Validation: Test Workflow Service")
print("=" * 50)
test_draft_to_red_executing()
test_draft_to_validated_fails()
test_red_executing_to_blue_evaluating()
test_dual_validation_both_approved()
test_dual_validation_one_rejected()
test_reopen_clears_validation()
test_transitions_generate_audit_logs()
test_can_transition_map()
print("=" * 50)
print("ALL T-106 validations PASSED!")