Files
Aegis/backend/app/models/attack_path.py
kitos 080ce56de7
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(attack-paths): Phase 10 — Attack Paths & Advanced Purple Team [FASE-10]
Models (5 tables):
  - AttackPath: named reusable attack scenario with template flag
  - AttackPathStep: ordered kill-chain step (technique + test link)
  - AttackPathExecution: a run with Red/Blue leads, timing, stored metrics
  - AttackPathStepResult: per-step detected/not_detected/skipped result
  - TimelineEntry: timestamped Red/Blue/system actions for MTTD/MTTR

Migration b036atk: raw SQL to avoid SQLAlchemy DDL hook issues

Service (attack_path_service.py):
  - Full CRUD for paths + steps (add, update, delete, reorder)
  - Execution lifecycle: create → start → execute steps → complete/abort
  - Pre-creates pending step results on execution creation
  - Auto-adds system timeline entries on key state transitions
  - complete_execution() computes: detection_rate, mttd_seconds,
    furthest_undetected_step, detected/not_detected/skipped counts
  - get_kill_chain_metrics(): per-step breakdown + phase summary

Router /api/v1/attack-paths (20 endpoints):
  POST/GET/PATCH/DELETE attack paths
  GET/POST/PATCH/DELETE steps + reorder
  POST/GET executions per path
  GET/POST/start/complete/abort executions
  POST/GET step results
  POST/GET timeline entries
  GET kill-chain metrics

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 13:11:01 +02:00

254 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase 10: Attack Paths & Advanced Purple Team models."""
import enum
import uuid
from datetime import datetime
from sqlalchemy import (
Boolean, Column, DateTime, Enum, Float, ForeignKey,
Index, Integer, String, Text,
)
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.database import Base
class ExecutionStatus(str, enum.Enum):
planned = "planned"
in_progress = "in_progress"
completed = "completed"
aborted = "aborted"
class StepResultStatus(str, enum.Enum):
pending = "pending"
executing = "executing"
detected = "detected"
not_detected = "not_detected"
skipped = "skipped"
class TimelineActorSide(str, enum.Enum):
red = "red"
blue = "blue"
system = "system"
class TimelineEntryType(str, enum.Enum):
action = "action"
detection = "detection"
note = "note"
phase_transition = "phase_transition"
flag = "flag"
# ---------------------------------------------------------------------------
class AttackPath(Base):
"""
A reusable attack scenario composed of ordered kill-chain steps.
Can be a template (shared) or a one-off scenario.
"""
__tablename__ = "attack_paths"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(300), nullable=False)
description = Column(Text, nullable=True)
objective = Column(Text, nullable=True) # what the attacker aims to achieve
is_template = Column(Boolean, default=False) # reusable template flag
threat_actor_id = Column(
UUID(as_uuid=True), ForeignKey("threat_actors.id", ondelete="SET NULL"), nullable=True
)
created_by = Column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
tags = Column(JSONB, nullable=True, default=list)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
steps = relationship(
"AttackPathStep", back_populates="attack_path",
cascade="all, delete-orphan",
order_by="AttackPathStep.order_index",
)
executions = relationship("AttackPathExecution", back_populates="attack_path")
creator = relationship("User", foreign_keys=[created_by])
threat_actor = relationship("ThreatActor", foreign_keys=[threat_actor_id])
__table_args__ = (
Index("ix_attack_paths_created_by", "created_by"),
Index("ix_attack_paths_is_template", "is_template"),
)
class AttackPathStep(Base):
"""One step in an attack path — maps to a kill-chain phase + technique."""
__tablename__ = "attack_path_steps"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
attack_path_id = Column(
UUID(as_uuid=True), ForeignKey("attack_paths.id", ondelete="CASCADE"), nullable=False
)
order_index = Column(Integer, nullable=False, default=0)
kill_chain_phase = Column(String(60), nullable=True) # initial_access, execution, …
technique_id = Column(
UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="SET NULL"), nullable=True
)
test_id = Column(
UUID(as_uuid=True), ForeignKey("tests.id", ondelete="SET NULL"), nullable=True
)
name = Column(String(300), nullable=True) # human label for the step
description = Column(Text, nullable=True)
expected_detection = Column(Boolean, default=True) # do we expect blue to detect this?
notes = Column(Text, nullable=True)
attack_path = relationship("AttackPath", back_populates="steps")
technique = relationship("Technique", foreign_keys=[technique_id])
test = relationship("Test", foreign_keys=[test_id])
__table_args__ = (
Index("ix_ap_steps_path_id", "attack_path_id"),
Index("ix_ap_steps_technique_id", "technique_id"),
)
class AttackPathExecution(Base):
"""
A single run of an attack path.
Tracks Red/Blue participants, timing, and aggregated kill-chain metrics.
"""
__tablename__ = "attack_path_executions"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
attack_path_id = Column(
UUID(as_uuid=True), ForeignKey("attack_paths.id", ondelete="CASCADE"), nullable=False
)
status = Column(
Enum(ExecutionStatus, name="execution_status"), nullable=False,
default=ExecutionStatus.planned,
)
environment = Column(String(100), nullable=True) # prod, staging, lab
red_team_lead = Column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
blue_team_lead = Column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
started_by = Column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
notes = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
# ── Computed kill-chain metrics (written on complete) ─────────────────
total_steps = Column(Integer, nullable=True)
detected_steps = Column(Integer, nullable=True)
not_detected_steps = Column(Integer, nullable=True)
skipped_steps = Column(Integer, nullable=True)
detection_rate = Column(Float, nullable=True) # 0.01.0
mttd_seconds = Column(Float, nullable=True) # mean time to detect (avg across detected)
furthest_undetected_step = Column(Integer, nullable=True) # order_index of deepest undetected step
attack_path = relationship("AttackPath", back_populates="executions")
step_results = relationship(
"AttackPathStepResult", back_populates="execution",
cascade="all, delete-orphan",
order_by="AttackPathStepResult.step_order",
)
timeline = relationship(
"TimelineEntry", back_populates="execution",
cascade="all, delete-orphan",
order_by="TimelineEntry.timestamp",
)
red_lead_user = relationship("User", foreign_keys=[red_team_lead])
blue_lead_user = relationship("User", foreign_keys=[blue_team_lead])
__table_args__ = (
Index("ix_ap_exec_path_id", "attack_path_id"),
Index("ix_ap_exec_status", "status"),
)
class AttackPathStepResult(Base):
"""Result of executing one step in an attack path execution."""
__tablename__ = "attack_path_step_results"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
execution_id = Column(
UUID(as_uuid=True), ForeignKey("attack_path_executions.id", ondelete="CASCADE"),
nullable=False,
)
step_id = Column(
UUID(as_uuid=True), ForeignKey("attack_path_steps.id", ondelete="CASCADE"),
nullable=False,
)
step_order = Column(Integer, nullable=False, default=0) # denormalized for sorting
status = Column(
Enum(StepResultStatus, name="step_result_status"), nullable=False,
default=StepResultStatus.pending,
)
executed_by = Column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
executed_at = Column(DateTime, nullable=True)
detected_at = Column(DateTime, nullable=True)
time_to_detect_seconds = Column(Float, nullable=True)
detection_asset_id = Column(
UUID(as_uuid=True),
ForeignKey("detection_assets.id", ondelete="SET NULL"), nullable=True
)
notes = Column(Text, nullable=True)
evidence_ids = Column(JSONB, nullable=True, default=list)
execution = relationship("AttackPathExecution", back_populates="step_results")
step = relationship("AttackPathStep")
detection_asset = relationship("DetectionAsset", foreign_keys=[detection_asset_id])
executor = relationship("User", foreign_keys=[executed_by])
__table_args__ = (
Index("ix_ap_stepres_execution_id", "execution_id"),
Index("ix_ap_stepres_step_id", "step_id"),
)
class TimelineEntry(Base):
"""Timestamped Red/Blue action during an execution — used for MTTD/MTTR."""
__tablename__ = "attack_path_timeline"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
execution_id = Column(
UUID(as_uuid=True), ForeignKey("attack_path_executions.id", ondelete="CASCADE"),
nullable=False,
)
step_id = Column(
UUID(as_uuid=True), ForeignKey("attack_path_steps.id", ondelete="SET NULL"),
nullable=True,
)
timestamp = Column(DateTime, nullable=False, default=datetime.utcnow)
actor_side = Column(
Enum(TimelineActorSide, name="timeline_actor_side"), nullable=False,
)
actor_id = Column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
entry_type = Column(
Enum(TimelineEntryType, name="timeline_entry_type"), nullable=False,
)
content = Column(Text, nullable=False)
extra = Column(JSONB, nullable=True)
execution = relationship("AttackPathExecution", back_populates="timeline")
actor = relationship("User", foreign_keys=[actor_id])
__table_args__ = (
Index("ix_timeline_execution_id", "execution_id"),
Index("ix_timeline_timestamp", "timestamp"),
)