"""Pydantic schemas for Phase 10: Attack Paths & Advanced Purple Team.""" from datetime import datetime from typing import Optional from uuid import UUID from pydantic import BaseModel, ConfigDict, field_validator VALID_KILL_CHAIN_PHASES = [ "reconnaissance", "resource_development", "initial_access", "execution", "persistence", "privilege_escalation", "defense_evasion", "credential_access", "discovery", "lateral_movement", "collection", "command_and_control", "exfiltration", "impact", ] # ── Attack Path ─────────────────────────────────────────────────────────────── class AttackPathCreate(BaseModel): name: str description: Optional[str] = None objective: Optional[str] = None is_template: bool = False threat_actor_id: Optional[UUID] = None tags: Optional[list[str]] = None class AttackPathUpdate(BaseModel): name: Optional[str] = None description: Optional[str] = None objective: Optional[str] = None is_template: Optional[bool] = None threat_actor_id: Optional[UUID] = None tags: Optional[list[str]] = None is_active: Optional[bool] = None class AttackPathOut(BaseModel): model_config = ConfigDict(from_attributes=True) id: UUID name: str description: Optional[str] = None objective: Optional[str] = None is_template: bool threat_actor_id: Optional[UUID] = None created_by: Optional[UUID] = None tags: Optional[list] = None is_active: bool created_at: Optional[datetime] = None updated_at: Optional[datetime] = None step_count: Optional[int] = None # injected by service # ── Attack Path Step ────────────────────────────────────────────────────────── class AttackPathStepCreate(BaseModel): order_index: int = 0 kill_chain_phase: Optional[str] = None technique_id: Optional[UUID] = None test_id: Optional[UUID] = None name: Optional[str] = None description: Optional[str] = None expected_detection: bool = True notes: Optional[str] = None @field_validator("kill_chain_phase") @classmethod def validate_phase(cls, v): if v is not None and v not in VALID_KILL_CHAIN_PHASES: raise ValueError(f"Invalid kill_chain_phase '{v}'. Valid: {VALID_KILL_CHAIN_PHASES}") return v class AttackPathStepUpdate(BaseModel): order_index: Optional[int] = None kill_chain_phase: Optional[str] = None technique_id: Optional[UUID] = None test_id: Optional[UUID] = None name: Optional[str] = None description: Optional[str] = None expected_detection: Optional[bool] = None notes: Optional[str] = None @field_validator("kill_chain_phase") @classmethod def validate_phase(cls, v): if v is not None and v not in VALID_KILL_CHAIN_PHASES: raise ValueError(f"Invalid kill_chain_phase '{v}'.") return v class AttackPathStepOut(BaseModel): model_config = ConfigDict(from_attributes=True) id: UUID attack_path_id: UUID order_index: int kill_chain_phase: Optional[str] = None technique_id: Optional[UUID] = None test_id: Optional[UUID] = None name: Optional[str] = None description: Optional[str] = None expected_detection: bool notes: Optional[str] = None # ── Execution ───────────────────────────────────────────────────────────────── class ExecutionCreate(BaseModel): environment: Optional[str] = None red_team_lead: Optional[UUID] = None blue_team_lead: Optional[UUID] = None notes: Optional[str] = None class ExecutionOut(BaseModel): model_config = ConfigDict(from_attributes=True) id: UUID attack_path_id: UUID status: str environment: Optional[str] = None red_team_lead: Optional[UUID] = None blue_team_lead: Optional[UUID] = None started_by: Optional[UUID] = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None notes: Optional[str] = None created_at: Optional[datetime] = None # metrics total_steps: Optional[int] = None detected_steps: Optional[int] = None not_detected_steps: Optional[int] = None skipped_steps: Optional[int] = None detection_rate: Optional[float] = None mttd_seconds: Optional[float] = None furthest_undetected_step: Optional[int] = None # ── Step Result ─────────────────────────────────────────────────────────────── class StepExecuteRequest(BaseModel): status: str # detected / not_detected / skipped executed_at: Optional[datetime] = None detected_at: Optional[datetime] = None detection_asset_id: Optional[UUID] = None notes: Optional[str] = None evidence_ids: Optional[list[UUID]] = None @field_validator("status") @classmethod def validate_status(cls, v): valid = ("detected", "not_detected", "skipped", "executing") if v not in valid: raise ValueError(f"status must be one of {valid}") return v class StepResultOut(BaseModel): model_config = ConfigDict(from_attributes=True) id: UUID execution_id: UUID step_id: UUID step_order: int status: str executed_by: Optional[UUID] = None executed_at: Optional[datetime] = None detected_at: Optional[datetime] = None time_to_detect_seconds: Optional[float] = None detection_asset_id: Optional[UUID] = None notes: Optional[str] = None evidence_ids: Optional[list] = None # ── Timeline ────────────────────────────────────────────────────────────────── class TimelineEntryCreate(BaseModel): actor_side: str entry_type: str content: str step_id: Optional[UUID] = None timestamp: Optional[datetime] = None extra: Optional[dict] = None @field_validator("actor_side") @classmethod def validate_side(cls, v): if v not in ("red", "blue", "system"): raise ValueError("actor_side must be red, blue or system") return v @field_validator("entry_type") @classmethod def validate_type(cls, v): valid = ("action", "detection", "note", "phase_transition", "flag") if v not in valid: raise ValueError(f"entry_type must be one of {valid}") return v class TimelineEntryOut(BaseModel): model_config = ConfigDict(from_attributes=True) id: UUID execution_id: UUID step_id: Optional[UUID] = None timestamp: datetime actor_side: str actor_id: Optional[UUID] = None entry_type: str content: str extra: Optional[dict] = None # ── Metrics ─────────────────────────────────────────────────────────────────── class KillChainMetrics(BaseModel): execution_id: UUID total_steps: int detected_steps: int not_detected_steps: int skipped_steps: int detection_rate: float # 0.0–1.0 mttd_seconds: Optional[float] # mean time to detect furthest_undetected_step: Optional[int] furthest_undetected_phase: Optional[str] step_breakdown: list[dict] # per-step detail phase_summary: dict # detection rate per kill-chain phase