"""Phase 10: Attack Paths & Advanced Purple Team models.""" import enum import uuid from datetime import datetime from sqlalchemy import ( Boolean, Column, DateTime, Enum, Float, ForeignKey, Index, Integer, String, Text, ) from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.orm import relationship from app.database import Base class ExecutionStatus(str, enum.Enum): planned = "planned" in_progress = "in_progress" completed = "completed" aborted = "aborted" class StepResultStatus(str, enum.Enum): pending = "pending" executing = "executing" detected = "detected" not_detected = "not_detected" skipped = "skipped" class TimelineActorSide(str, enum.Enum): red = "red" blue = "blue" system = "system" class TimelineEntryType(str, enum.Enum): action = "action" detection = "detection" note = "note" phase_transition = "phase_transition" flag = "flag" # --------------------------------------------------------------------------- class AttackPath(Base): """ A reusable attack scenario composed of ordered kill-chain steps. Can be a template (shared) or a one-off scenario. """ __tablename__ = "attack_paths" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(300), nullable=False) description = Column(Text, nullable=True) objective = Column(Text, nullable=True) # what the attacker aims to achieve is_template = Column(Boolean, default=False) # reusable template flag threat_actor_id = Column( UUID(as_uuid=True), ForeignKey("threat_actors.id", ondelete="SET NULL"), nullable=True ) created_by = Column( UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True ) tags = Column(JSONB, nullable=True, default=list) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) steps = relationship( "AttackPathStep", back_populates="attack_path", cascade="all, delete-orphan", order_by="AttackPathStep.order_index", ) executions = relationship("AttackPathExecution", back_populates="attack_path") creator = relationship("User", foreign_keys=[created_by]) threat_actor = relationship("ThreatActor", foreign_keys=[threat_actor_id]) __table_args__ = ( Index("ix_attack_paths_created_by", "created_by"), Index("ix_attack_paths_is_template", "is_template"), ) class AttackPathStep(Base): """One step in an attack path — maps to a kill-chain phase + technique.""" __tablename__ = "attack_path_steps" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) attack_path_id = Column( UUID(as_uuid=True), ForeignKey("attack_paths.id", ondelete="CASCADE"), nullable=False ) order_index = Column(Integer, nullable=False, default=0) kill_chain_phase = Column(String(60), nullable=True) # initial_access, execution, … technique_id = Column( UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="SET NULL"), nullable=True ) test_id = Column( UUID(as_uuid=True), ForeignKey("tests.id", ondelete="SET NULL"), nullable=True ) name = Column(String(300), nullable=True) # human label for the step description = Column(Text, nullable=True) expected_detection = Column(Boolean, default=True) # do we expect blue to detect this? notes = Column(Text, nullable=True) attack_path = relationship("AttackPath", back_populates="steps") technique = relationship("Technique", foreign_keys=[technique_id]) test = relationship("Test", foreign_keys=[test_id]) __table_args__ = ( Index("ix_ap_steps_path_id", "attack_path_id"), Index("ix_ap_steps_technique_id", "technique_id"), ) class AttackPathExecution(Base): """ A single run of an attack path. Tracks Red/Blue participants, timing, and aggregated kill-chain metrics. """ __tablename__ = "attack_path_executions" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) attack_path_id = Column( UUID(as_uuid=True), ForeignKey("attack_paths.id", ondelete="CASCADE"), nullable=False ) status = Column( Enum(ExecutionStatus, name="execution_status"), nullable=False, default=ExecutionStatus.planned, ) environment = Column(String(100), nullable=True) # prod, staging, lab red_team_lead = Column( UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True ) blue_team_lead = Column( UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True ) started_by = Column( UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True ) started_at = Column(DateTime, nullable=True) completed_at = Column(DateTime, nullable=True) notes = Column(Text, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) # ── Computed kill-chain metrics (written on complete) ───────────────── total_steps = Column(Integer, nullable=True) detected_steps = Column(Integer, nullable=True) not_detected_steps = Column(Integer, nullable=True) skipped_steps = Column(Integer, nullable=True) detection_rate = Column(Float, nullable=True) # 0.0–1.0 mttd_seconds = Column(Float, nullable=True) # mean time to detect (avg across detected) furthest_undetected_step = Column(Integer, nullable=True) # order_index of deepest undetected step attack_path = relationship("AttackPath", back_populates="executions") step_results = relationship( "AttackPathStepResult", back_populates="execution", cascade="all, delete-orphan", order_by="AttackPathStepResult.step_order", ) timeline = relationship( "TimelineEntry", back_populates="execution", cascade="all, delete-orphan", order_by="TimelineEntry.timestamp", ) red_lead_user = relationship("User", foreign_keys=[red_team_lead]) blue_lead_user = relationship("User", foreign_keys=[blue_team_lead]) __table_args__ = ( Index("ix_ap_exec_path_id", "attack_path_id"), Index("ix_ap_exec_status", "status"), ) class AttackPathStepResult(Base): """Result of executing one step in an attack path execution.""" __tablename__ = "attack_path_step_results" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) execution_id = Column( UUID(as_uuid=True), ForeignKey("attack_path_executions.id", ondelete="CASCADE"), nullable=False, ) step_id = Column( UUID(as_uuid=True), ForeignKey("attack_path_steps.id", ondelete="CASCADE"), nullable=False, ) step_order = Column(Integer, nullable=False, default=0) # denormalized for sorting status = Column( Enum(StepResultStatus, name="step_result_status"), nullable=False, default=StepResultStatus.pending, ) executed_by = Column( UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True ) executed_at = Column(DateTime, nullable=True) detected_at = Column(DateTime, nullable=True) time_to_detect_seconds = Column(Float, nullable=True) detection_asset_id = Column( UUID(as_uuid=True), ForeignKey("detection_assets.id", ondelete="SET NULL"), nullable=True ) notes = Column(Text, nullable=True) evidence_ids = Column(JSONB, nullable=True, default=list) execution = relationship("AttackPathExecution", back_populates="step_results") step = relationship("AttackPathStep") detection_asset = relationship("DetectionAsset", foreign_keys=[detection_asset_id]) executor = relationship("User", foreign_keys=[executed_by]) __table_args__ = ( Index("ix_ap_stepres_execution_id", "execution_id"), Index("ix_ap_stepres_step_id", "step_id"), ) class TimelineEntry(Base): """Timestamped Red/Blue action during an execution — used for MTTD/MTTR.""" __tablename__ = "attack_path_timeline" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) execution_id = Column( UUID(as_uuid=True), ForeignKey("attack_path_executions.id", ondelete="CASCADE"), nullable=False, ) step_id = Column( UUID(as_uuid=True), ForeignKey("attack_path_steps.id", ondelete="SET NULL"), nullable=True, ) timestamp = Column(DateTime, nullable=False, default=datetime.utcnow) actor_side = Column( Enum(TimelineActorSide, name="timeline_actor_side"), nullable=False, ) actor_id = Column( UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True ) entry_type = Column( Enum(TimelineEntryType, name="timeline_entry_type"), nullable=False, ) content = Column(Text, nullable=False) extra = Column(JSONB, nullable=True) execution = relationship("AttackPathExecution", back_populates="timeline") actor = relationship("User", foreign_keys=[actor_id]) __table_args__ = ( Index("ix_timeline_execution_id", "execution_id"), Index("ix_timeline_timestamp", "timestamp"), )