feat: add Campaign/Compliance domain entities and extract users/audit/data_sources to services (LP-2 through LP-6)

This commit is contained in:
2026-02-20 13:28:14 +01:00
parent 44621364be
commit c0c6cda11d
11 changed files with 939 additions and 319 deletions

View File

@@ -1,3 +1,15 @@
from app.domain.entities.campaign import CampaignEntity
from app.domain.entities.compliance import (
ComplianceControlEntity,
ComplianceFrameworkEntity,
ControlCoverageStatus,
)
from app.domain.entities.technique import TechniqueEntity
__all__ = ["TechniqueEntity"]
__all__ = [
"CampaignEntity",
"ComplianceControlEntity",
"ComplianceFrameworkEntity",
"ControlCoverageStatus",
"TechniqueEntity",
]

View File

@@ -0,0 +1,103 @@
"""Campaign domain entity with lifecycle validation.
Pure domain logic — no framework imports.
"""
from __future__ import annotations
import enum
import uuid
from dataclasses import dataclass, field
from typing import Any
from app.domain.errors import BusinessRuleViolation, InvalidStateTransition
class CampaignStatus(str, enum.Enum):
draft = "draft"
active = "active"
completed = "completed"
archived = "archived"
class CampaignType(str, enum.Enum):
custom = "custom"
apt_emulation = "apt_emulation"
kill_chain = "kill_chain"
compliance = "compliance"
VALID_TRANSITIONS: dict[CampaignStatus, list[CampaignStatus]] = {
CampaignStatus.draft: [CampaignStatus.active],
CampaignStatus.active: [CampaignStatus.completed],
CampaignStatus.completed: [CampaignStatus.archived],
CampaignStatus.archived: [],
}
@dataclass
class CampaignEntity:
name: str
type: CampaignType = CampaignType.custom
status: CampaignStatus = CampaignStatus.draft
id: uuid.UUID | None = None
description: str | None = None
threat_actor_id: uuid.UUID | None = None
created_by: uuid.UUID | None = None
target_platform: str | None = None
tags: list[str] = field(default_factory=list)
test_count: int = 0
def can_transition_to(self, target: CampaignStatus) -> bool:
return target in VALID_TRANSITIONS.get(self.status, [])
def activate(self) -> None:
if not self.can_transition_to(CampaignStatus.active):
raise InvalidStateTransition(
self.status.value, CampaignStatus.active.value,
[s.value for s in VALID_TRANSITIONS[self.status]],
)
if self.test_count == 0:
raise BusinessRuleViolation(
"Campaign must have at least one test to activate"
)
self.status = CampaignStatus.active
def complete(self) -> None:
if not self.can_transition_to(CampaignStatus.completed):
raise InvalidStateTransition(
self.status.value, CampaignStatus.completed.value,
[s.value for s in VALID_TRANSITIONS[self.status]],
)
self.status = CampaignStatus.completed
def archive(self) -> None:
if not self.can_transition_to(CampaignStatus.archived):
raise InvalidStateTransition(
self.status.value, CampaignStatus.archived.value,
[s.value for s in VALID_TRANSITIONS[self.status]],
)
self.status = CampaignStatus.archived
def ensure_modifiable(self) -> None:
if self.status not in (CampaignStatus.draft, CampaignStatus.active):
raise BusinessRuleViolation(
f"Cannot modify campaign in '{self.status.value}' state"
)
@classmethod
def from_orm(cls, orm: Any) -> CampaignEntity:
"""Build a CampaignEntity from a SQLAlchemy Campaign model."""
test_count = len(getattr(orm, "campaign_tests", None) or [])
return cls(
id=orm.id,
name=orm.name,
type=CampaignType(orm.type) if orm.type else CampaignType.custom,
status=CampaignStatus(orm.status) if orm.status else CampaignStatus.draft,
description=orm.description,
threat_actor_id=orm.threat_actor_id,
created_by=orm.created_by,
target_platform=orm.target_platform,
tags=orm.tags or [],
test_count=test_count,
)

View File

@@ -0,0 +1,71 @@
"""Compliance domain entities with coverage calculation logic.
Pure domain logic — no framework imports.
"""
from __future__ import annotations
import enum
import uuid
from dataclasses import dataclass, field
class ControlCoverageStatus(str, enum.Enum):
covered = "covered"
partially_covered = "partially_covered"
not_covered = "not_covered"
@dataclass
class ComplianceControlEntity:
control_id: str
title: str
id: uuid.UUID | None = None
description: str | None = None
category: str | None = None
technique_statuses: list[str] = field(default_factory=list)
@property
def coverage_status(self) -> ControlCoverageStatus:
if not self.technique_statuses:
return ControlCoverageStatus.not_covered
covered_statuses = {"validated", "partial"}
covered = [s for s in self.technique_statuses if s in covered_statuses]
if len(covered) == len(self.technique_statuses):
return ControlCoverageStatus.covered
elif len(covered) > 0:
return ControlCoverageStatus.partially_covered
return ControlCoverageStatus.not_covered
@dataclass
class ComplianceFrameworkEntity:
name: str
id: uuid.UUID | None = None
version: str | None = None
description: str | None = None
is_active: bool = True
controls: list[ComplianceControlEntity] = field(default_factory=list)
@property
def total_controls(self) -> int:
return len(self.controls)
@property
def covered_controls(self) -> int:
return sum(
1 for c in self.controls
if c.coverage_status == ControlCoverageStatus.covered
)
@property
def coverage_pct(self) -> float:
if self.total_controls == 0:
return 0.0
return round(self.covered_controls / self.total_controls * 100, 1)
def get_gap_controls(self) -> list[ComplianceControlEntity]:
return [
c for c in self.controls
if c.coverage_status != ControlCoverageStatus.covered
]