"""Port defining the common interface for data import services. All import services (Atomic Red Team, Sigma, CALDERA, etc.) follow the same contract: they receive a database session and return a summary dict with import statistics. New import sources can be added by: 1. Implementing the ``ImportService`` protocol in a new module 2. Registering the handler in the ``IMPORT_REGISTRY`` This satisfies the Open/Closed Principle — the system is open for new import sources without modifying existing code. """ from __future__ import annotations from typing import Any, Protocol, runtime_checkable from sqlalchemy.orm import Session @runtime_checkable class ImportService(Protocol): """Contract for any data-import operation. Each implementation is a callable ``(Session) -> dict`` that downloads, parses, and upserts records from an external source. """ def __call__(self, db: Session) -> dict[str, Any]: ... class ImportServiceEntry: """Lazy-loading wrapper that resolves a module-level function on first call.""" __slots__ = ("_module_path", "_func_name", "_resolved") def __init__(self, module_path: str, func_name: str) -> None: self._module_path = module_path self._func_name = func_name self._resolved: ImportService | None = None def __call__(self, db: Session) -> dict[str, Any]: if self._resolved is None: import importlib mod = importlib.import_module(self._module_path) self._resolved = getattr(mod, self._func_name) return self._resolved(db) @property def source_info(self) -> str: return f"{self._module_path}.{self._func_name}" IMPORT_REGISTRY: dict[str, ImportServiceEntry] = { "atomic_red_team": ImportServiceEntry( "app.services.atomic_import_service", "import_atomic_red_team", ), "sigma": ImportServiceEntry( "app.services.sigma_import_service", "sync", ), "lolbas": ImportServiceEntry( "app.services.lolbas_import_service", "sync", ), "gtfobins": ImportServiceEntry( "app.services.lolbas_import_service", "sync_gtfobins", ), "caldera": ImportServiceEntry( "app.services.caldera_import_service", "sync", ), "elastic_rules": ImportServiceEntry( "app.services.elastic_import_service", "sync", ), "mitre_cti": ImportServiceEntry( "app.services.threat_actor_import_service", "sync", ), "d3fend": ImportServiceEntry( "app.services.d3fend_import_service", "sync", ), } def get_import_handler(source_name: str) -> ImportServiceEntry | None: """Look up the import handler for *source_name*. Returns ``None`` when no handler is registered. """ return IMPORT_REGISTRY.get(source_name)