Files
Aegis/backend/app/schemas/attack_path_schema.py
kitos 080ce56de7
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(attack-paths): Phase 10 — Attack Paths & Advanced Purple Team [FASE-10]
Models (5 tables):
  - AttackPath: named reusable attack scenario with template flag
  - AttackPathStep: ordered kill-chain step (technique + test link)
  - AttackPathExecution: a run with Red/Blue leads, timing, stored metrics
  - AttackPathStepResult: per-step detected/not_detected/skipped result
  - TimelineEntry: timestamped Red/Blue/system actions for MTTD/MTTR

Migration b036atk: raw SQL to avoid SQLAlchemy DDL hook issues

Service (attack_path_service.py):
  - Full CRUD for paths + steps (add, update, delete, reorder)
  - Execution lifecycle: create → start → execute steps → complete/abort
  - Pre-creates pending step results on execution creation
  - Auto-adds system timeline entries on key state transitions
  - complete_execution() computes: detection_rate, mttd_seconds,
    furthest_undetected_step, detected/not_detected/skipped counts
  - get_kill_chain_metrics(): per-step breakdown + phase summary

Router /api/v1/attack-paths (20 endpoints):
  POST/GET/PATCH/DELETE attack paths
  GET/POST/PATCH/DELETE steps + reorder
  POST/GET executions per path
  GET/POST/start/complete/abort executions
  POST/GET step results
  POST/GET timeline entries
  GET kill-chain metrics

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 13:11:01 +02:00

231 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pydantic schemas for Phase 10: Attack Paths & Advanced Purple Team."""
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, ConfigDict, field_validator
VALID_KILL_CHAIN_PHASES = [
"reconnaissance", "resource_development", "initial_access", "execution",
"persistence", "privilege_escalation", "defense_evasion", "credential_access",
"discovery", "lateral_movement", "collection", "command_and_control",
"exfiltration", "impact",
]
# ── Attack Path ───────────────────────────────────────────────────────────────
class AttackPathCreate(BaseModel):
name: str
description: Optional[str] = None
objective: Optional[str] = None
is_template: bool = False
threat_actor_id: Optional[UUID] = None
tags: Optional[list[str]] = None
class AttackPathUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
objective: Optional[str] = None
is_template: Optional[bool] = None
threat_actor_id: Optional[UUID] = None
tags: Optional[list[str]] = None
is_active: Optional[bool] = None
class AttackPathOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
name: str
description: Optional[str] = None
objective: Optional[str] = None
is_template: bool
threat_actor_id: Optional[UUID] = None
created_by: Optional[UUID] = None
tags: Optional[list] = None
is_active: bool
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
step_count: Optional[int] = None # injected by service
# ── Attack Path Step ──────────────────────────────────────────────────────────
class AttackPathStepCreate(BaseModel):
order_index: int = 0
kill_chain_phase: Optional[str] = None
technique_id: Optional[UUID] = None
test_id: Optional[UUID] = None
name: Optional[str] = None
description: Optional[str] = None
expected_detection: bool = True
notes: Optional[str] = None
@field_validator("kill_chain_phase")
@classmethod
def validate_phase(cls, v):
if v is not None and v not in VALID_KILL_CHAIN_PHASES:
raise ValueError(f"Invalid kill_chain_phase '{v}'. Valid: {VALID_KILL_CHAIN_PHASES}")
return v
class AttackPathStepUpdate(BaseModel):
order_index: Optional[int] = None
kill_chain_phase: Optional[str] = None
technique_id: Optional[UUID] = None
test_id: Optional[UUID] = None
name: Optional[str] = None
description: Optional[str] = None
expected_detection: Optional[bool] = None
notes: Optional[str] = None
@field_validator("kill_chain_phase")
@classmethod
def validate_phase(cls, v):
if v is not None and v not in VALID_KILL_CHAIN_PHASES:
raise ValueError(f"Invalid kill_chain_phase '{v}'.")
return v
class AttackPathStepOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
attack_path_id: UUID
order_index: int
kill_chain_phase: Optional[str] = None
technique_id: Optional[UUID] = None
test_id: Optional[UUID] = None
name: Optional[str] = None
description: Optional[str] = None
expected_detection: bool
notes: Optional[str] = None
# ── Execution ─────────────────────────────────────────────────────────────────
class ExecutionCreate(BaseModel):
environment: Optional[str] = None
red_team_lead: Optional[UUID] = None
blue_team_lead: Optional[UUID] = None
notes: Optional[str] = None
class ExecutionOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
attack_path_id: UUID
status: str
environment: Optional[str] = None
red_team_lead: Optional[UUID] = None
blue_team_lead: Optional[UUID] = None
started_by: Optional[UUID] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
notes: Optional[str] = None
created_at: Optional[datetime] = None
# metrics
total_steps: Optional[int] = None
detected_steps: Optional[int] = None
not_detected_steps: Optional[int] = None
skipped_steps: Optional[int] = None
detection_rate: Optional[float] = None
mttd_seconds: Optional[float] = None
furthest_undetected_step: Optional[int] = None
# ── Step Result ───────────────────────────────────────────────────────────────
class StepExecuteRequest(BaseModel):
status: str # detected / not_detected / skipped
executed_at: Optional[datetime] = None
detected_at: Optional[datetime] = None
detection_asset_id: Optional[UUID] = None
notes: Optional[str] = None
evidence_ids: Optional[list[UUID]] = None
@field_validator("status")
@classmethod
def validate_status(cls, v):
valid = ("detected", "not_detected", "skipped", "executing")
if v not in valid:
raise ValueError(f"status must be one of {valid}")
return v
class StepResultOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
execution_id: UUID
step_id: UUID
step_order: int
status: str
executed_by: Optional[UUID] = None
executed_at: Optional[datetime] = None
detected_at: Optional[datetime] = None
time_to_detect_seconds: Optional[float] = None
detection_asset_id: Optional[UUID] = None
notes: Optional[str] = None
evidence_ids: Optional[list] = None
# ── Timeline ──────────────────────────────────────────────────────────────────
class TimelineEntryCreate(BaseModel):
actor_side: str
entry_type: str
content: str
step_id: Optional[UUID] = None
timestamp: Optional[datetime] = None
extra: Optional[dict] = None
@field_validator("actor_side")
@classmethod
def validate_side(cls, v):
if v not in ("red", "blue", "system"):
raise ValueError("actor_side must be red, blue or system")
return v
@field_validator("entry_type")
@classmethod
def validate_type(cls, v):
valid = ("action", "detection", "note", "phase_transition", "flag")
if v not in valid:
raise ValueError(f"entry_type must be one of {valid}")
return v
class TimelineEntryOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
execution_id: UUID
step_id: Optional[UUID] = None
timestamp: datetime
actor_side: str
actor_id: Optional[UUID] = None
entry_type: str
content: str
extra: Optional[dict] = None
# ── Metrics ───────────────────────────────────────────────────────────────────
class KillChainMetrics(BaseModel):
execution_id: UUID
total_steps: int
detected_steps: int
not_detected_steps: int
skipped_steps: int
detection_rate: float # 0.01.0
mttd_seconds: Optional[float] # mean time to detect
furthest_undetected_step: Optional[int]
furthest_undetected_phase: Optional[str]
step_breakdown: list[dict] # per-step detail
phase_summary: dict # detection rate per kill-chain phase