From 0d211d51569a76e34df835d5071acc451057a2f6 Mon Sep 17 00:00:00 2001 From: Kitos Date: Fri, 20 Feb 2026 15:02:38 +0100 Subject: [PATCH] feat: add ThreatActorEntity domain entity with coverage analysis (Tier 4) --- backend/app/domain/entities/__init__.py | 3 + backend/app/domain/entities/threat_actor.py | 96 +++++++++++++++ backend/tests/test_threat_actor_entity.py | 123 ++++++++++++++++++++ 3 files changed, 222 insertions(+) create mode 100644 backend/app/domain/entities/threat_actor.py create mode 100644 backend/tests/test_threat_actor_entity.py diff --git a/backend/app/domain/entities/__init__.py b/backend/app/domain/entities/__init__.py index 63969bf..29c3ae2 100644 --- a/backend/app/domain/entities/__init__.py +++ b/backend/app/domain/entities/__init__.py @@ -5,6 +5,7 @@ from app.domain.entities.compliance import ( ControlCoverageStatus, ) from app.domain.entities.technique import TechniqueEntity +from app.domain.entities.threat_actor import ThreatActorEntity, ThreatActorTechniqueRef __all__ = [ "CampaignEntity", @@ -12,4 +13,6 @@ __all__ = [ "ComplianceFrameworkEntity", "ControlCoverageStatus", "TechniqueEntity", + "ThreatActorEntity", + "ThreatActorTechniqueRef", ] diff --git a/backend/app/domain/entities/threat_actor.py b/backend/app/domain/entities/threat_actor.py new file mode 100644 index 0000000..d477014 --- /dev/null +++ b/backend/app/domain/entities/threat_actor.py @@ -0,0 +1,96 @@ +"""Threat actor domain entity with coverage analysis logic. + +Pure domain logic — no framework imports. +""" + +from __future__ import annotations + +import uuid +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class ThreatActorTechniqueRef: + """Lightweight reference to a technique used by an actor.""" + + technique_id: uuid.UUID + mitre_id: str | None = None + name: str | None = None + status: str | None = None + usage_description: str | None = None + + +@dataclass +class ThreatActorEntity: + name: str + id: uuid.UUID | None = None + mitre_id: str | None = None + aliases: list[str] = field(default_factory=list) + description: str | None = None + country: str | None = None + target_sectors: list[str] = field(default_factory=list) + target_regions: list[str] = field(default_factory=list) + motivation: str | None = None + sophistication: str | None = None + first_seen: str | None = None + last_seen: str | None = None + is_active: bool = True + techniques: list[ThreatActorTechniqueRef] = field(default_factory=list) + + @property + def technique_count(self) -> int: + return len(self.techniques) + + @property + def covered_techniques(self) -> list[ThreatActorTechniqueRef]: + return [ + t for t in self.techniques + if t.status in ("validated", "partial") + ] + + @property + def uncovered_techniques(self) -> list[ThreatActorTechniqueRef]: + return [ + t for t in self.techniques + if t.status not in ("validated", "partial") + ] + + @property + def coverage_pct(self) -> float: + if not self.techniques: + return 0.0 + return round(len(self.covered_techniques) / len(self.techniques) * 100, 1) + + @classmethod + def from_orm(cls, orm: Any) -> ThreatActorEntity: + techs: list[ThreatActorTechniqueRef] = [] + for tat in getattr(orm, "techniques", None) or []: + technique = getattr(tat, "technique", None) + techs.append(ThreatActorTechniqueRef( + technique_id=tat.technique_id, + mitre_id=getattr(technique, "mitre_id", None) if technique else None, + name=getattr(technique, "name", None) if technique else None, + status=( + technique.status_global.value + if technique and hasattr(technique.status_global, "value") + else getattr(technique, "status_global", None) if technique else None + ), + usage_description=tat.usage_description, + )) + return cls( + id=orm.id, + name=orm.name, + mitre_id=orm.mitre_id, + aliases=orm.aliases or [], + description=orm.description, + country=orm.country, + target_sectors=orm.target_sectors or [], + target_regions=orm.target_regions or [], + motivation=orm.motivation, + sophistication=orm.sophistication, + first_seen=orm.first_seen, + last_seen=orm.last_seen, + is_active=orm.is_active if orm.is_active is not None else True, + techniques=techs, + ) diff --git a/backend/tests/test_threat_actor_entity.py b/backend/tests/test_threat_actor_entity.py new file mode 100644 index 0000000..41d1f7d --- /dev/null +++ b/backend/tests/test_threat_actor_entity.py @@ -0,0 +1,123 @@ +"""Tests for the ThreatActorEntity domain entity.""" + +import uuid +from types import SimpleNamespace + +from app.domain.entities.threat_actor import ( + ThreatActorEntity, + ThreatActorTechniqueRef, +) + + +def _make_ref(status: str = "not_evaluated") -> ThreatActorTechniqueRef: + return ThreatActorTechniqueRef( + technique_id=uuid.uuid4(), + mitre_id="T1059", + name="Command and Scripting Interpreter", + status=status, + ) + + +def test_coverage_pct_all_covered(): + actor = ThreatActorEntity( + name="APT29", + techniques=[_make_ref("validated"), _make_ref("partial")], + ) + assert actor.coverage_pct == 100.0 + + +def test_coverage_pct_partial(): + actor = ThreatActorEntity( + name="APT28", + techniques=[_make_ref("validated"), _make_ref("not_evaluated")], + ) + assert actor.coverage_pct == 50.0 + + +def test_coverage_pct_none_covered(): + actor = ThreatActorEntity( + name="Lazarus", + techniques=[_make_ref("not_evaluated"), _make_ref("in_progress")], + ) + assert actor.coverage_pct == 0.0 + + +def test_coverage_pct_no_techniques(): + actor = ThreatActorEntity(name="Unknown") + assert actor.coverage_pct == 0.0 + + +def test_covered_and_uncovered_techniques(): + t1 = _make_ref("validated") + t2 = _make_ref("not_evaluated") + t3 = _make_ref("partial") + actor = ThreatActorEntity(name="Test", techniques=[t1, t2, t3]) + assert len(actor.covered_techniques) == 2 + assert len(actor.uncovered_techniques) == 1 + + +def test_technique_count(): + actor = ThreatActorEntity( + name="Test", + techniques=[_make_ref(), _make_ref(), _make_ref()], + ) + assert actor.technique_count == 3 + + +def test_from_orm_basic(): + orm = SimpleNamespace( + id=uuid.uuid4(), + name="APT29", + mitre_id="G0016", + aliases=["Cozy Bear", "The Dukes"], + description="Russian APT group", + country="Russia", + target_sectors=["government"], + target_regions=["north-america"], + motivation="espionage", + sophistication="advanced", + first_seen="2008", + last_seen="2023", + is_active=True, + techniques=[], + ) + entity = ThreatActorEntity.from_orm(orm) + assert entity.name == "APT29" + assert entity.mitre_id == "G0016" + assert entity.country == "Russia" + assert entity.aliases == ["Cozy Bear", "The Dukes"] + assert entity.technique_count == 0 + + +def test_from_orm_with_techniques(): + tech_orm = SimpleNamespace( + mitre_id="T1059", + name="Command and Scripting Interpreter", + status_global=SimpleNamespace(value="validated"), + ) + tat_orm = SimpleNamespace( + technique_id=uuid.uuid4(), + technique=tech_orm, + usage_description="Uses PowerShell", + ) + orm = SimpleNamespace( + id=uuid.uuid4(), + name="APT28", + mitre_id="G0007", + aliases=None, + description=None, + country=None, + target_sectors=None, + target_regions=None, + motivation=None, + sophistication=None, + first_seen=None, + last_seen=None, + is_active=None, + techniques=[tat_orm], + ) + entity = ThreatActorEntity.from_orm(orm) + assert entity.technique_count == 1 + assert entity.techniques[0].mitre_id == "T1059" + assert entity.techniques[0].status == "validated" + assert entity.is_active is True # defaults when None