Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Models (5 tables):
- AttackPath: named reusable attack scenario with template flag
- AttackPathStep: ordered kill-chain step (technique + test link)
- AttackPathExecution: a run with Red/Blue leads, timing, stored metrics
- AttackPathStepResult: per-step detected/not_detected/skipped result
- TimelineEntry: timestamped Red/Blue/system actions for MTTD/MTTR
Migration b036atk: raw SQL to avoid SQLAlchemy DDL hook issues
Service (attack_path_service.py):
- Full CRUD for paths + steps (add, update, delete, reorder)
- Execution lifecycle: create → start → execute steps → complete/abort
- Pre-creates pending step results on execution creation
- Auto-adds system timeline entries on key state transitions
- complete_execution() computes: detection_rate, mttd_seconds,
furthest_undetected_step, detected/not_detected/skipped counts
- get_kill_chain_metrics(): per-step breakdown + phase summary
Router /api/v1/attack-paths (20 endpoints):
POST/GET/PATCH/DELETE attack paths
GET/POST/PATCH/DELETE steps + reorder
POST/GET executions per path
GET/POST/start/complete/abort executions
POST/GET step results
POST/GET timeline entries
GET kill-chain metrics
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
231 lines
7.7 KiB
Python
231 lines
7.7 KiB
Python
"""Pydantic schemas for Phase 10: Attack Paths & Advanced Purple Team."""
|
||
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
from uuid import UUID
|
||
|
||
from pydantic import BaseModel, ConfigDict, field_validator
|
||
|
||
VALID_KILL_CHAIN_PHASES = [
|
||
"reconnaissance", "resource_development", "initial_access", "execution",
|
||
"persistence", "privilege_escalation", "defense_evasion", "credential_access",
|
||
"discovery", "lateral_movement", "collection", "command_and_control",
|
||
"exfiltration", "impact",
|
||
]
|
||
|
||
|
||
# ── Attack Path ───────────────────────────────────────────────────────────────
|
||
|
||
class AttackPathCreate(BaseModel):
|
||
name: str
|
||
description: Optional[str] = None
|
||
objective: Optional[str] = None
|
||
is_template: bool = False
|
||
threat_actor_id: Optional[UUID] = None
|
||
tags: Optional[list[str]] = None
|
||
|
||
|
||
class AttackPathUpdate(BaseModel):
|
||
name: Optional[str] = None
|
||
description: Optional[str] = None
|
||
objective: Optional[str] = None
|
||
is_template: Optional[bool] = None
|
||
threat_actor_id: Optional[UUID] = None
|
||
tags: Optional[list[str]] = None
|
||
is_active: Optional[bool] = None
|
||
|
||
|
||
class AttackPathOut(BaseModel):
|
||
model_config = ConfigDict(from_attributes=True)
|
||
|
||
id: UUID
|
||
name: str
|
||
description: Optional[str] = None
|
||
objective: Optional[str] = None
|
||
is_template: bool
|
||
threat_actor_id: Optional[UUID] = None
|
||
created_by: Optional[UUID] = None
|
||
tags: Optional[list] = None
|
||
is_active: bool
|
||
created_at: Optional[datetime] = None
|
||
updated_at: Optional[datetime] = None
|
||
step_count: Optional[int] = None # injected by service
|
||
|
||
|
||
# ── Attack Path Step ──────────────────────────────────────────────────────────
|
||
|
||
class AttackPathStepCreate(BaseModel):
|
||
order_index: int = 0
|
||
kill_chain_phase: Optional[str] = None
|
||
technique_id: Optional[UUID] = None
|
||
test_id: Optional[UUID] = None
|
||
name: Optional[str] = None
|
||
description: Optional[str] = None
|
||
expected_detection: bool = True
|
||
notes: Optional[str] = None
|
||
|
||
@field_validator("kill_chain_phase")
|
||
@classmethod
|
||
def validate_phase(cls, v):
|
||
if v is not None and v not in VALID_KILL_CHAIN_PHASES:
|
||
raise ValueError(f"Invalid kill_chain_phase '{v}'. Valid: {VALID_KILL_CHAIN_PHASES}")
|
||
return v
|
||
|
||
|
||
class AttackPathStepUpdate(BaseModel):
|
||
order_index: Optional[int] = None
|
||
kill_chain_phase: Optional[str] = None
|
||
technique_id: Optional[UUID] = None
|
||
test_id: Optional[UUID] = None
|
||
name: Optional[str] = None
|
||
description: Optional[str] = None
|
||
expected_detection: Optional[bool] = None
|
||
notes: Optional[str] = None
|
||
|
||
@field_validator("kill_chain_phase")
|
||
@classmethod
|
||
def validate_phase(cls, v):
|
||
if v is not None and v not in VALID_KILL_CHAIN_PHASES:
|
||
raise ValueError(f"Invalid kill_chain_phase '{v}'.")
|
||
return v
|
||
|
||
|
||
class AttackPathStepOut(BaseModel):
|
||
model_config = ConfigDict(from_attributes=True)
|
||
|
||
id: UUID
|
||
attack_path_id: UUID
|
||
order_index: int
|
||
kill_chain_phase: Optional[str] = None
|
||
technique_id: Optional[UUID] = None
|
||
test_id: Optional[UUID] = None
|
||
name: Optional[str] = None
|
||
description: Optional[str] = None
|
||
expected_detection: bool
|
||
notes: Optional[str] = None
|
||
|
||
|
||
# ── Execution ─────────────────────────────────────────────────────────────────
|
||
|
||
class ExecutionCreate(BaseModel):
|
||
environment: Optional[str] = None
|
||
red_team_lead: Optional[UUID] = None
|
||
blue_team_lead: Optional[UUID] = None
|
||
notes: Optional[str] = None
|
||
|
||
|
||
class ExecutionOut(BaseModel):
|
||
model_config = ConfigDict(from_attributes=True)
|
||
|
||
id: UUID
|
||
attack_path_id: UUID
|
||
status: str
|
||
environment: Optional[str] = None
|
||
red_team_lead: Optional[UUID] = None
|
||
blue_team_lead: Optional[UUID] = None
|
||
started_by: Optional[UUID] = None
|
||
started_at: Optional[datetime] = None
|
||
completed_at: Optional[datetime] = None
|
||
notes: Optional[str] = None
|
||
created_at: Optional[datetime] = None
|
||
# metrics
|
||
total_steps: Optional[int] = None
|
||
detected_steps: Optional[int] = None
|
||
not_detected_steps: Optional[int] = None
|
||
skipped_steps: Optional[int] = None
|
||
detection_rate: Optional[float] = None
|
||
mttd_seconds: Optional[float] = None
|
||
furthest_undetected_step: Optional[int] = None
|
||
|
||
|
||
# ── Step Result ───────────────────────────────────────────────────────────────
|
||
|
||
class StepExecuteRequest(BaseModel):
|
||
status: str # detected / not_detected / skipped
|
||
executed_at: Optional[datetime] = None
|
||
detected_at: Optional[datetime] = None
|
||
detection_asset_id: Optional[UUID] = None
|
||
notes: Optional[str] = None
|
||
evidence_ids: Optional[list[UUID]] = None
|
||
|
||
@field_validator("status")
|
||
@classmethod
|
||
def validate_status(cls, v):
|
||
valid = ("detected", "not_detected", "skipped", "executing")
|
||
if v not in valid:
|
||
raise ValueError(f"status must be one of {valid}")
|
||
return v
|
||
|
||
|
||
class StepResultOut(BaseModel):
|
||
model_config = ConfigDict(from_attributes=True)
|
||
|
||
id: UUID
|
||
execution_id: UUID
|
||
step_id: UUID
|
||
step_order: int
|
||
status: str
|
||
executed_by: Optional[UUID] = None
|
||
executed_at: Optional[datetime] = None
|
||
detected_at: Optional[datetime] = None
|
||
time_to_detect_seconds: Optional[float] = None
|
||
detection_asset_id: Optional[UUID] = None
|
||
notes: Optional[str] = None
|
||
evidence_ids: Optional[list] = None
|
||
|
||
|
||
# ── Timeline ──────────────────────────────────────────────────────────────────
|
||
|
||
class TimelineEntryCreate(BaseModel):
|
||
actor_side: str
|
||
entry_type: str
|
||
content: str
|
||
step_id: Optional[UUID] = None
|
||
timestamp: Optional[datetime] = None
|
||
extra: Optional[dict] = None
|
||
|
||
@field_validator("actor_side")
|
||
@classmethod
|
||
def validate_side(cls, v):
|
||
if v not in ("red", "blue", "system"):
|
||
raise ValueError("actor_side must be red, blue or system")
|
||
return v
|
||
|
||
@field_validator("entry_type")
|
||
@classmethod
|
||
def validate_type(cls, v):
|
||
valid = ("action", "detection", "note", "phase_transition", "flag")
|
||
if v not in valid:
|
||
raise ValueError(f"entry_type must be one of {valid}")
|
||
return v
|
||
|
||
|
||
class TimelineEntryOut(BaseModel):
|
||
model_config = ConfigDict(from_attributes=True)
|
||
|
||
id: UUID
|
||
execution_id: UUID
|
||
step_id: Optional[UUID] = None
|
||
timestamp: datetime
|
||
actor_side: str
|
||
actor_id: Optional[UUID] = None
|
||
entry_type: str
|
||
content: str
|
||
extra: Optional[dict] = None
|
||
|
||
|
||
# ── Metrics ───────────────────────────────────────────────────────────────────
|
||
|
||
class KillChainMetrics(BaseModel):
|
||
execution_id: UUID
|
||
total_steps: int
|
||
detected_steps: int
|
||
not_detected_steps: int
|
||
skipped_steps: int
|
||
detection_rate: float # 0.0–1.0
|
||
mttd_seconds: Optional[float] # mean time to detect
|
||
furthest_undetected_step: Optional[int]
|
||
furthest_undetected_phase: Optional[str]
|
||
step_breakdown: list[dict] # per-step detail
|
||
phase_summary: dict # detection rate per kill-chain phase
|