Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Models (5 tables):
- AttackPath: named reusable attack scenario with template flag
- AttackPathStep: ordered kill-chain step (technique + test link)
- AttackPathExecution: a run with Red/Blue leads, timing, stored metrics
- AttackPathStepResult: per-step detected/not_detected/skipped result
- TimelineEntry: timestamped Red/Blue/system actions for MTTD/MTTR
Migration b036atk: raw SQL to avoid SQLAlchemy DDL hook issues
Service (attack_path_service.py):
- Full CRUD for paths + steps (add, update, delete, reorder)
- Execution lifecycle: create → start → execute steps → complete/abort
- Pre-creates pending step results on execution creation
- Auto-adds system timeline entries on key state transitions
- complete_execution() computes: detection_rate, mttd_seconds,
furthest_undetected_step, detected/not_detected/skipped counts
- get_kill_chain_metrics(): per-step breakdown + phase summary
Router /api/v1/attack-paths (20 endpoints):
POST/GET/PATCH/DELETE attack paths
GET/POST/PATCH/DELETE steps + reorder
POST/GET executions per path
GET/POST/start/complete/abort executions
POST/GET step results
POST/GET timeline entries
GET kill-chain metrics
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
254 lines
9.6 KiB
Python
254 lines
9.6 KiB
Python
"""Phase 10: Attack Paths & Advanced Purple Team models."""
|
||
|
||
import enum
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
from sqlalchemy import (
|
||
Boolean, Column, DateTime, Enum, Float, ForeignKey,
|
||
Index, Integer, String, Text,
|
||
)
|
||
from sqlalchemy.dialects.postgresql import UUID, JSONB
|
||
from sqlalchemy.orm import relationship
|
||
|
||
from app.database import Base
|
||
|
||
|
||
class ExecutionStatus(str, enum.Enum):
|
||
planned = "planned"
|
||
in_progress = "in_progress"
|
||
completed = "completed"
|
||
aborted = "aborted"
|
||
|
||
|
||
class StepResultStatus(str, enum.Enum):
|
||
pending = "pending"
|
||
executing = "executing"
|
||
detected = "detected"
|
||
not_detected = "not_detected"
|
||
skipped = "skipped"
|
||
|
||
|
||
class TimelineActorSide(str, enum.Enum):
|
||
red = "red"
|
||
blue = "blue"
|
||
system = "system"
|
||
|
||
|
||
class TimelineEntryType(str, enum.Enum):
|
||
action = "action"
|
||
detection = "detection"
|
||
note = "note"
|
||
phase_transition = "phase_transition"
|
||
flag = "flag"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class AttackPath(Base):
|
||
"""
|
||
A reusable attack scenario composed of ordered kill-chain steps.
|
||
Can be a template (shared) or a one-off scenario.
|
||
"""
|
||
__tablename__ = "attack_paths"
|
||
|
||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||
name = Column(String(300), nullable=False)
|
||
description = Column(Text, nullable=True)
|
||
objective = Column(Text, nullable=True) # what the attacker aims to achieve
|
||
is_template = Column(Boolean, default=False) # reusable template flag
|
||
threat_actor_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("threat_actors.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
created_by = Column(
|
||
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
tags = Column(JSONB, nullable=True, default=list)
|
||
is_active = Column(Boolean, default=True)
|
||
created_at = Column(DateTime, default=datetime.utcnow)
|
||
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||
|
||
steps = relationship(
|
||
"AttackPathStep", back_populates="attack_path",
|
||
cascade="all, delete-orphan",
|
||
order_by="AttackPathStep.order_index",
|
||
)
|
||
executions = relationship("AttackPathExecution", back_populates="attack_path")
|
||
creator = relationship("User", foreign_keys=[created_by])
|
||
threat_actor = relationship("ThreatActor", foreign_keys=[threat_actor_id])
|
||
|
||
__table_args__ = (
|
||
Index("ix_attack_paths_created_by", "created_by"),
|
||
Index("ix_attack_paths_is_template", "is_template"),
|
||
)
|
||
|
||
|
||
class AttackPathStep(Base):
|
||
"""One step in an attack path — maps to a kill-chain phase + technique."""
|
||
|
||
__tablename__ = "attack_path_steps"
|
||
|
||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||
attack_path_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("attack_paths.id", ondelete="CASCADE"), nullable=False
|
||
)
|
||
order_index = Column(Integer, nullable=False, default=0)
|
||
kill_chain_phase = Column(String(60), nullable=True) # initial_access, execution, …
|
||
technique_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
test_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("tests.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
name = Column(String(300), nullable=True) # human label for the step
|
||
description = Column(Text, nullable=True)
|
||
expected_detection = Column(Boolean, default=True) # do we expect blue to detect this?
|
||
notes = Column(Text, nullable=True)
|
||
|
||
attack_path = relationship("AttackPath", back_populates="steps")
|
||
technique = relationship("Technique", foreign_keys=[technique_id])
|
||
test = relationship("Test", foreign_keys=[test_id])
|
||
|
||
__table_args__ = (
|
||
Index("ix_ap_steps_path_id", "attack_path_id"),
|
||
Index("ix_ap_steps_technique_id", "technique_id"),
|
||
)
|
||
|
||
|
||
class AttackPathExecution(Base):
|
||
"""
|
||
A single run of an attack path.
|
||
Tracks Red/Blue participants, timing, and aggregated kill-chain metrics.
|
||
"""
|
||
__tablename__ = "attack_path_executions"
|
||
|
||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||
attack_path_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("attack_paths.id", ondelete="CASCADE"), nullable=False
|
||
)
|
||
status = Column(
|
||
Enum(ExecutionStatus, name="execution_status"), nullable=False,
|
||
default=ExecutionStatus.planned,
|
||
)
|
||
environment = Column(String(100), nullable=True) # prod, staging, lab
|
||
red_team_lead = Column(
|
||
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
blue_team_lead = Column(
|
||
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
started_by = Column(
|
||
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
started_at = Column(DateTime, nullable=True)
|
||
completed_at = Column(DateTime, nullable=True)
|
||
notes = Column(Text, nullable=True)
|
||
created_at = Column(DateTime, default=datetime.utcnow)
|
||
|
||
# ── Computed kill-chain metrics (written on complete) ─────────────────
|
||
total_steps = Column(Integer, nullable=True)
|
||
detected_steps = Column(Integer, nullable=True)
|
||
not_detected_steps = Column(Integer, nullable=True)
|
||
skipped_steps = Column(Integer, nullable=True)
|
||
detection_rate = Column(Float, nullable=True) # 0.0–1.0
|
||
mttd_seconds = Column(Float, nullable=True) # mean time to detect (avg across detected)
|
||
furthest_undetected_step = Column(Integer, nullable=True) # order_index of deepest undetected step
|
||
|
||
attack_path = relationship("AttackPath", back_populates="executions")
|
||
step_results = relationship(
|
||
"AttackPathStepResult", back_populates="execution",
|
||
cascade="all, delete-orphan",
|
||
order_by="AttackPathStepResult.step_order",
|
||
)
|
||
timeline = relationship(
|
||
"TimelineEntry", back_populates="execution",
|
||
cascade="all, delete-orphan",
|
||
order_by="TimelineEntry.timestamp",
|
||
)
|
||
red_lead_user = relationship("User", foreign_keys=[red_team_lead])
|
||
blue_lead_user = relationship("User", foreign_keys=[blue_team_lead])
|
||
|
||
__table_args__ = (
|
||
Index("ix_ap_exec_path_id", "attack_path_id"),
|
||
Index("ix_ap_exec_status", "status"),
|
||
)
|
||
|
||
|
||
class AttackPathStepResult(Base):
|
||
"""Result of executing one step in an attack path execution."""
|
||
|
||
__tablename__ = "attack_path_step_results"
|
||
|
||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||
execution_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("attack_path_executions.id", ondelete="CASCADE"),
|
||
nullable=False,
|
||
)
|
||
step_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("attack_path_steps.id", ondelete="CASCADE"),
|
||
nullable=False,
|
||
)
|
||
step_order = Column(Integer, nullable=False, default=0) # denormalized for sorting
|
||
status = Column(
|
||
Enum(StepResultStatus, name="step_result_status"), nullable=False,
|
||
default=StepResultStatus.pending,
|
||
)
|
||
executed_by = Column(
|
||
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
executed_at = Column(DateTime, nullable=True)
|
||
detected_at = Column(DateTime, nullable=True)
|
||
time_to_detect_seconds = Column(Float, nullable=True)
|
||
detection_asset_id = Column(
|
||
UUID(as_uuid=True),
|
||
ForeignKey("detection_assets.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
notes = Column(Text, nullable=True)
|
||
evidence_ids = Column(JSONB, nullable=True, default=list)
|
||
|
||
execution = relationship("AttackPathExecution", back_populates="step_results")
|
||
step = relationship("AttackPathStep")
|
||
detection_asset = relationship("DetectionAsset", foreign_keys=[detection_asset_id])
|
||
executor = relationship("User", foreign_keys=[executed_by])
|
||
|
||
__table_args__ = (
|
||
Index("ix_ap_stepres_execution_id", "execution_id"),
|
||
Index("ix_ap_stepres_step_id", "step_id"),
|
||
)
|
||
|
||
|
||
class TimelineEntry(Base):
|
||
"""Timestamped Red/Blue action during an execution — used for MTTD/MTTR."""
|
||
|
||
__tablename__ = "attack_path_timeline"
|
||
|
||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||
execution_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("attack_path_executions.id", ondelete="CASCADE"),
|
||
nullable=False,
|
||
)
|
||
step_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("attack_path_steps.id", ondelete="SET NULL"),
|
||
nullable=True,
|
||
)
|
||
timestamp = Column(DateTime, nullable=False, default=datetime.utcnow)
|
||
actor_side = Column(
|
||
Enum(TimelineActorSide, name="timeline_actor_side"), nullable=False,
|
||
)
|
||
actor_id = Column(
|
||
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
|
||
)
|
||
entry_type = Column(
|
||
Enum(TimelineEntryType, name="timeline_entry_type"), nullable=False,
|
||
)
|
||
content = Column(Text, nullable=False)
|
||
extra = Column(JSONB, nullable=True)
|
||
|
||
execution = relationship("AttackPathExecution", back_populates="timeline")
|
||
actor = relationship("User", foreign_keys=[actor_id])
|
||
|
||
__table_args__ = (
|
||
Index("ix_timeline_execution_id", "execution_id"),
|
||
Index("ix_timeline_timestamp", "timestamp"),
|
||
)
|