Files
Aegis/backend/tests/test_t112_system_import.py
Kitos 9e204b78ec test: add TestEntity tests and fix test infrastructure (222 green)
- Add test_test_entity.py with 46 pure unit tests covering the full domain entity

- Fix _FakeSettings in 11 test files (REPORT_TEMPLATES_DIR, JIRA, TEMPO)

- Fix stale db.commit assertions to db.flush after UoW refactor

- Add missing mock fields for TestEntity.from_orm compatibility

- Make database.py skip pool args for SQLite in test environment

- Disable slowapi rate limiter in test client fixture

- Inject test engine into app.database to fix threading errors

- Update role assertions to match current require_any_role policy

- Mark 6 legacy V1 endpoint tests as xfail (replaced by V2 workflow)
2026-02-18 15:29:24 +01:00

172 lines
6.3 KiB
Python

"""Validation tests for T-112: System endpoint for Atomic Red Team import.
Tests endpoint existence, admin-only access, and audit logging.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock
from types import ModuleType
import inspect
# ---------------------------------------------------------------------------
# Stubs
# ---------------------------------------------------------------------------
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs): pass
def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
if "app.config" not in sys.modules:
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
REPORT_TEMPLATES_DIR = "app/templates/reports"
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
COMPANY_NAME = "Test Org"
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
JIRA_ENABLED = False
JIRA_URL = ""
JIRA_USERNAME = ""
JIRA_API_TOKEN = ""
JIRA_IS_CLOUD = True
JIRA_DEFAULT_PROJECT = ""
JIRA_ISSUE_TYPE_TEST = "Task"
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
TEMPO_ENABLED = False
TEMPO_API_TOKEN = ""
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
NVD_API_KEY = ""
STALE_THRESHOLD_DAYS = 365
CORS_ORIGINS = "http://localhost:3000"
SCORING_WEIGHT_TESTS = 40
SCORING_WEIGHT_DETECTION_RULES = 20
SCORING_WEIGHT_D3FEND = 15
SCORING_WEIGHT_FRESHNESS = 15
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
if "app.database" not in sys.modules:
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
db_mod.SessionLocal = MagicMock()
sys.modules["app.database"] = db_mod
elif not hasattr(sys.modules["app.database"], "SessionLocal"):
sys.modules["app.database"].SessionLocal = MagicMock()
for mod_name in [
"taxii2client", "taxii2client.v20",
"jose", "boto3", "botocore", "botocore.exceptions",
"apscheduler", "apscheduler.schedulers",
"apscheduler.schedulers.background",
"apscheduler.triggers", "apscheduler.triggers.cron",
]:
if mod_name not in sys.modules:
m = ModuleType(mod_name)
if mod_name == "taxii2client.v20": m.Server = MagicMock
elif mod_name == "jose": m.JWTError = Exception; m.jwt = MagicMock()
elif mod_name == "boto3": m.client = MagicMock()
elif mod_name == "botocore.exceptions": m.ClientError = Exception
elif mod_name == "apscheduler.schedulers.background": m.BackgroundScheduler = MagicMock
elif mod_name == "apscheduler.triggers.cron": m.CronTrigger = MagicMock
sys.modules[mod_name] = m
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
from app.routers.system import router
def _get_route_paths():
routes = {}
for route in router.routes:
path = getattr(route, "path", "")
methods = getattr(route, "methods", set())
for method in methods:
routes[f"{method} {path}"] = route
return routes
# ---------------------------------------------------------------------------
# 1. POST /system/import-atomic-tests endpoint exists
# ---------------------------------------------------------------------------
def test_import_endpoint_exists():
routes = _get_route_paths()
found = any("import-atomic-tests" in k and "POST" in k for k in routes)
assert found, f"POST /system/import-atomic-tests not found. Routes: {list(routes.keys())}"
print(" [PASS] POST /system/import-atomic-tests endpoint exists")
# ---------------------------------------------------------------------------
# 2. Only admin can execute
# ---------------------------------------------------------------------------
def test_admin_only():
from app.routers.system import trigger_atomic_import
source = inspect.getsource(trigger_atomic_import)
assert 'require_role("admin")' in source or "require_role" in source
print(" [PASS] Only admin can execute the import")
# ---------------------------------------------------------------------------
# 3. Audit log is registered (via atomic_import_service)
# ---------------------------------------------------------------------------
def test_audit_log_in_service():
from app.services.atomic_import_service import import_atomic_red_team
source = inspect.getsource(import_atomic_red_team)
assert "log_action" in source
assert "import_atomic_red_team" in source
print(" [PASS] Audit log is registered in the import service")
# ---------------------------------------------------------------------------
# 4. Response includes imported and skipped counts
# ---------------------------------------------------------------------------
def test_response_format():
from app.routers.system import trigger_atomic_import
source = inspect.getsource(trigger_atomic_import)
assert '"imported"' in source or "'imported'" in source
assert '"skipped"' in source or "'skipped'" in source
print(" [PASS] Response includes imported and skipped counts")
# ---------------------------------------------------------------------------
# Run all
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("T-112 Validation: System Import Atomic Red Team Endpoint")
print("=" * 58)
test_import_endpoint_exists()
test_admin_only()
test_audit_log_in_service()
test_response_format()
print("=" * 58)
print("ALL T-112 validations PASSED!")