feat(attack-paths): Phase 10 — Attack Paths & Advanced Purple Team [FASE-10]
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

Models (5 tables):
  - AttackPath: named reusable attack scenario with template flag
  - AttackPathStep: ordered kill-chain step (technique + test link)
  - AttackPathExecution: a run with Red/Blue leads, timing, stored metrics
  - AttackPathStepResult: per-step detected/not_detected/skipped result
  - TimelineEntry: timestamped Red/Blue/system actions for MTTD/MTTR

Migration b036atk: raw SQL to avoid SQLAlchemy DDL hook issues

Service (attack_path_service.py):
  - Full CRUD for paths + steps (add, update, delete, reorder)
  - Execution lifecycle: create → start → execute steps → complete/abort
  - Pre-creates pending step results on execution creation
  - Auto-adds system timeline entries on key state transitions
  - complete_execution() computes: detection_rate, mttd_seconds,
    furthest_undetected_step, detected/not_detected/skipped counts
  - get_kill_chain_metrics(): per-step breakdown + phase summary

Router /api/v1/attack-paths (20 endpoints):
  POST/GET/PATCH/DELETE attack paths
  GET/POST/PATCH/DELETE steps + reorder
  POST/GET executions per path
  GET/POST/start/complete/abort executions
  POST/GET step results
  POST/GET timeline entries
  GET kill-chain metrics

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-20 13:11:01 +02:00
parent 4ece2293ec
commit 080ce56de7
7 changed files with 1480 additions and 0 deletions

View File

@@ -0,0 +1,230 @@
"""Pydantic schemas for Phase 10: Attack Paths & Advanced Purple Team."""
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, ConfigDict, field_validator
VALID_KILL_CHAIN_PHASES = [
"reconnaissance", "resource_development", "initial_access", "execution",
"persistence", "privilege_escalation", "defense_evasion", "credential_access",
"discovery", "lateral_movement", "collection", "command_and_control",
"exfiltration", "impact",
]
# ── Attack Path ───────────────────────────────────────────────────────────────
class AttackPathCreate(BaseModel):
name: str
description: Optional[str] = None
objective: Optional[str] = None
is_template: bool = False
threat_actor_id: Optional[UUID] = None
tags: Optional[list[str]] = None
class AttackPathUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
objective: Optional[str] = None
is_template: Optional[bool] = None
threat_actor_id: Optional[UUID] = None
tags: Optional[list[str]] = None
is_active: Optional[bool] = None
class AttackPathOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
name: str
description: Optional[str] = None
objective: Optional[str] = None
is_template: bool
threat_actor_id: Optional[UUID] = None
created_by: Optional[UUID] = None
tags: Optional[list] = None
is_active: bool
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
step_count: Optional[int] = None # injected by service
# ── Attack Path Step ──────────────────────────────────────────────────────────
class AttackPathStepCreate(BaseModel):
order_index: int = 0
kill_chain_phase: Optional[str] = None
technique_id: Optional[UUID] = None
test_id: Optional[UUID] = None
name: Optional[str] = None
description: Optional[str] = None
expected_detection: bool = True
notes: Optional[str] = None
@field_validator("kill_chain_phase")
@classmethod
def validate_phase(cls, v):
if v is not None and v not in VALID_KILL_CHAIN_PHASES:
raise ValueError(f"Invalid kill_chain_phase '{v}'. Valid: {VALID_KILL_CHAIN_PHASES}")
return v
class AttackPathStepUpdate(BaseModel):
order_index: Optional[int] = None
kill_chain_phase: Optional[str] = None
technique_id: Optional[UUID] = None
test_id: Optional[UUID] = None
name: Optional[str] = None
description: Optional[str] = None
expected_detection: Optional[bool] = None
notes: Optional[str] = None
@field_validator("kill_chain_phase")
@classmethod
def validate_phase(cls, v):
if v is not None and v not in VALID_KILL_CHAIN_PHASES:
raise ValueError(f"Invalid kill_chain_phase '{v}'.")
return v
class AttackPathStepOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
attack_path_id: UUID
order_index: int
kill_chain_phase: Optional[str] = None
technique_id: Optional[UUID] = None
test_id: Optional[UUID] = None
name: Optional[str] = None
description: Optional[str] = None
expected_detection: bool
notes: Optional[str] = None
# ── Execution ─────────────────────────────────────────────────────────────────
class ExecutionCreate(BaseModel):
environment: Optional[str] = None
red_team_lead: Optional[UUID] = None
blue_team_lead: Optional[UUID] = None
notes: Optional[str] = None
class ExecutionOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
attack_path_id: UUID
status: str
environment: Optional[str] = None
red_team_lead: Optional[UUID] = None
blue_team_lead: Optional[UUID] = None
started_by: Optional[UUID] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
notes: Optional[str] = None
created_at: Optional[datetime] = None
# metrics
total_steps: Optional[int] = None
detected_steps: Optional[int] = None
not_detected_steps: Optional[int] = None
skipped_steps: Optional[int] = None
detection_rate: Optional[float] = None
mttd_seconds: Optional[float] = None
furthest_undetected_step: Optional[int] = None
# ── Step Result ───────────────────────────────────────────────────────────────
class StepExecuteRequest(BaseModel):
status: str # detected / not_detected / skipped
executed_at: Optional[datetime] = None
detected_at: Optional[datetime] = None
detection_asset_id: Optional[UUID] = None
notes: Optional[str] = None
evidence_ids: Optional[list[UUID]] = None
@field_validator("status")
@classmethod
def validate_status(cls, v):
valid = ("detected", "not_detected", "skipped", "executing")
if v not in valid:
raise ValueError(f"status must be one of {valid}")
return v
class StepResultOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
execution_id: UUID
step_id: UUID
step_order: int
status: str
executed_by: Optional[UUID] = None
executed_at: Optional[datetime] = None
detected_at: Optional[datetime] = None
time_to_detect_seconds: Optional[float] = None
detection_asset_id: Optional[UUID] = None
notes: Optional[str] = None
evidence_ids: Optional[list] = None
# ── Timeline ──────────────────────────────────────────────────────────────────
class TimelineEntryCreate(BaseModel):
actor_side: str
entry_type: str
content: str
step_id: Optional[UUID] = None
timestamp: Optional[datetime] = None
extra: Optional[dict] = None
@field_validator("actor_side")
@classmethod
def validate_side(cls, v):
if v not in ("red", "blue", "system"):
raise ValueError("actor_side must be red, blue or system")
return v
@field_validator("entry_type")
@classmethod
def validate_type(cls, v):
valid = ("action", "detection", "note", "phase_transition", "flag")
if v not in valid:
raise ValueError(f"entry_type must be one of {valid}")
return v
class TimelineEntryOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
execution_id: UUID
step_id: Optional[UUID] = None
timestamp: datetime
actor_side: str
actor_id: Optional[UUID] = None
entry_type: str
content: str
extra: Optional[dict] = None
# ── Metrics ───────────────────────────────────────────────────────────────────
class KillChainMetrics(BaseModel):
execution_id: UUID
total_steps: int
detected_steps: int
not_detected_steps: int
skipped_steps: int
detection_rate: float # 0.01.0
mttd_seconds: Optional[float] # mean time to detect
furthest_undetected_step: Optional[int]
furthest_undetected_phase: Optional[str]
step_breakdown: list[dict] # per-step detail
phase_summary: dict # detection rate per kill-chain phase