diff --git a/backend/alembic/versions/b036_attack_paths.py b/backend/alembic/versions/b036_attack_paths.py new file mode 100644 index 0000000..3ef43b8 --- /dev/null +++ b/backend/alembic/versions/b036_attack_paths.py @@ -0,0 +1,184 @@ +"""Phase 10: Attack Paths & Advanced Purple Team + +Revision ID: b036atk +Revises: b035ownerq +Create Date: 2026-05-19 + +Uses raw SQL to avoid SQLAlchemy DDL hook issues with enum types. +""" + +from typing import Union +from alembic import op +import sqlalchemy as sa + +revision: str = "b036atk" +down_revision: Union[str, None] = "b035ownerq" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + + # ── Enums ───────────────────────────────────────────────────────────────── + conn.execute(sa.text(""" + DO $$ BEGIN + CREATE TYPE execution_status AS ENUM + ('planned','in_progress','completed','aborted'); + EXCEPTION WHEN duplicate_object THEN NULL; END $$ + """)) + conn.execute(sa.text(""" + DO $$ BEGIN + CREATE TYPE step_result_status AS ENUM + ('pending','executing','detected','not_detected','skipped'); + EXCEPTION WHEN duplicate_object THEN NULL; END $$ + """)) + conn.execute(sa.text(""" + DO $$ BEGIN + CREATE TYPE timeline_actor_side AS ENUM ('red','blue','system'); + EXCEPTION WHEN duplicate_object THEN NULL; END $$ + """)) + conn.execute(sa.text(""" + DO $$ BEGIN + CREATE TYPE timeline_entry_type AS ENUM + ('action','detection','note','phase_transition','flag'); + EXCEPTION WHEN duplicate_object THEN NULL; END $$ + """)) + + # ── attack_paths ────────────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS attack_paths ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(300) NOT NULL, + description TEXT, + objective TEXT, + is_template BOOLEAN DEFAULT FALSE, + threat_actor_id UUID REFERENCES threat_actors(id) ON DELETE SET NULL, + created_by UUID REFERENCES users(id) ON DELETE SET NULL, + tags JSONB, + is_active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT now(), + updated_at TIMESTAMP DEFAULT now() + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_attack_paths_created_by ON attack_paths (created_by)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_attack_paths_is_template ON attack_paths (is_template)" + )) + + # ── attack_path_steps ───────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS attack_path_steps ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + attack_path_id UUID NOT NULL REFERENCES attack_paths(id) ON DELETE CASCADE, + order_index INTEGER NOT NULL DEFAULT 0, + kill_chain_phase VARCHAR(60), + technique_id UUID REFERENCES techniques(id) ON DELETE SET NULL, + test_id UUID REFERENCES tests(id) ON DELETE SET NULL, + name VARCHAR(300), + description TEXT, + expected_detection BOOLEAN DEFAULT TRUE, + notes TEXT + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ap_steps_path_id ON attack_path_steps (attack_path_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ap_steps_technique_id ON attack_path_steps (technique_id)" + )) + + # ── attack_path_executions ──────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS attack_path_executions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + attack_path_id UUID NOT NULL REFERENCES attack_paths(id) ON DELETE CASCADE, + status execution_status NOT NULL DEFAULT 'planned', + environment VARCHAR(100), + red_team_lead UUID REFERENCES users(id) ON DELETE SET NULL, + blue_team_lead UUID REFERENCES users(id) ON DELETE SET NULL, + started_by UUID REFERENCES users(id) ON DELETE SET NULL, + started_at TIMESTAMP, + completed_at TIMESTAMP, + notes TEXT, + created_at TIMESTAMP DEFAULT now(), + -- kill-chain metrics (populated on completion) + total_steps INTEGER, + detected_steps INTEGER, + not_detected_steps INTEGER, + skipped_steps INTEGER, + detection_rate FLOAT, + mttd_seconds FLOAT, + furthest_undetected_step INTEGER + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ap_exec_path_id ON attack_path_executions (attack_path_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ap_exec_status ON attack_path_executions (status)" + )) + + # ── attack_path_step_results ────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS attack_path_step_results ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + execution_id UUID NOT NULL + REFERENCES attack_path_executions(id) ON DELETE CASCADE, + step_id UUID NOT NULL + REFERENCES attack_path_steps(id) ON DELETE CASCADE, + step_order INTEGER NOT NULL DEFAULT 0, + status step_result_status NOT NULL DEFAULT 'pending', + executed_by UUID REFERENCES users(id) ON DELETE SET NULL, + executed_at TIMESTAMP, + detected_at TIMESTAMP, + time_to_detect_seconds FLOAT, + detection_asset_id UUID + REFERENCES detection_assets(id) ON DELETE SET NULL, + notes TEXT, + evidence_ids JSONB + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ap_stepres_exec ON attack_path_step_results (execution_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ap_stepres_step ON attack_path_step_results (step_id)" + )) + + # ── attack_path_timeline ────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS attack_path_timeline ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + execution_id UUID NOT NULL + REFERENCES attack_path_executions(id) ON DELETE CASCADE, + step_id UUID REFERENCES attack_path_steps(id) ON DELETE SET NULL, + timestamp TIMESTAMP NOT NULL DEFAULT now(), + actor_side timeline_actor_side NOT NULL, + actor_id UUID REFERENCES users(id) ON DELETE SET NULL, + entry_type timeline_entry_type NOT NULL, + content TEXT NOT NULL, + extra JSONB + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_timeline_execution_id ON attack_path_timeline (execution_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_timeline_timestamp ON attack_path_timeline (timestamp)" + )) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text("DROP TABLE IF EXISTS attack_path_timeline")) + conn.execute(sa.text("DROP TABLE IF EXISTS attack_path_step_results")) + conn.execute(sa.text("DROP TABLE IF EXISTS attack_path_executions")) + conn.execute(sa.text("DROP TABLE IF EXISTS attack_path_steps")) + conn.execute(sa.text("DROP TABLE IF EXISTS attack_paths")) + conn.execute(sa.text("DROP TYPE IF EXISTS timeline_entry_type")) + conn.execute(sa.text("DROP TYPE IF EXISTS timeline_actor_side")) + conn.execute(sa.text("DROP TYPE IF EXISTS step_result_status")) + conn.execute(sa.text("DROP TYPE IF EXISTS execution_status")) diff --git a/backend/app/main.py b/backend/app/main.py index 582980b..c2f6558 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -40,6 +40,7 @@ from app.routers import osint as osint_router from app.routers import webhooks as webhooks_router from app.routers import detection_lifecycle as detection_lifecycle_router from app.routers import ownership as ownership_router +from app.routers import attack_paths as attack_paths_router from app.domain.errors import DomainError from app.middleware.error_handler import domain_exception_handler from app.middleware.request_context import RequestContextMiddleware @@ -139,6 +140,7 @@ app.include_router(osint_router.router, prefix="/api/v1") app.include_router(webhooks_router.router, prefix="/api/v1") app.include_router(detection_lifecycle_router.router, prefix="/api/v1") app.include_router(ownership_router.router, prefix="/api/v1") +app.include_router(attack_paths_router.router, prefix="/api/v1") @app.get("/health", include_in_schema=False) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index c031561..d52aacb 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -33,6 +33,11 @@ from app.models.ownership_queue import ( TechniqueOwnership, RevalidationQueueItem, QueuePriority, QueueStatus, QueueReason, ) +from app.models.attack_path import ( + AttackPath, AttackPathStep, AttackPathExecution, + AttackPathStepResult, TimelineEntry, + ExecutionStatus, StepResultStatus, TimelineActorSide, TimelineEntryType, +) __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", @@ -51,4 +56,7 @@ __all__ = [ "TechniqueConfidenceScore", "InfrastructureChangeLog", "DecayPolicy", "TechniqueOwnership", "RevalidationQueueItem", "QueuePriority", "QueueStatus", "QueueReason", + "AttackPath", "AttackPathStep", "AttackPathExecution", + "AttackPathStepResult", "TimelineEntry", + "ExecutionStatus", "StepResultStatus", "TimelineActorSide", "TimelineEntryType", ] diff --git a/backend/app/models/attack_path.py b/backend/app/models/attack_path.py new file mode 100644 index 0000000..e83f7fa --- /dev/null +++ b/backend/app/models/attack_path.py @@ -0,0 +1,253 @@ +"""Phase 10: Attack Paths & Advanced Purple Team models.""" + +import enum +import uuid +from datetime import datetime + +from sqlalchemy import ( + Boolean, Column, DateTime, Enum, Float, ForeignKey, + Index, Integer, String, Text, +) +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from app.database import Base + + +class ExecutionStatus(str, enum.Enum): + planned = "planned" + in_progress = "in_progress" + completed = "completed" + aborted = "aborted" + + +class StepResultStatus(str, enum.Enum): + pending = "pending" + executing = "executing" + detected = "detected" + not_detected = "not_detected" + skipped = "skipped" + + +class TimelineActorSide(str, enum.Enum): + red = "red" + blue = "blue" + system = "system" + + +class TimelineEntryType(str, enum.Enum): + action = "action" + detection = "detection" + note = "note" + phase_transition = "phase_transition" + flag = "flag" + + +# --------------------------------------------------------------------------- + +class AttackPath(Base): + """ + A reusable attack scenario composed of ordered kill-chain steps. + Can be a template (shared) or a one-off scenario. + """ + __tablename__ = "attack_paths" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(300), nullable=False) + description = Column(Text, nullable=True) + objective = Column(Text, nullable=True) # what the attacker aims to achieve + is_template = Column(Boolean, default=False) # reusable template flag + threat_actor_id = Column( + UUID(as_uuid=True), ForeignKey("threat_actors.id", ondelete="SET NULL"), nullable=True + ) + created_by = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + tags = Column(JSONB, nullable=True, default=list) + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + steps = relationship( + "AttackPathStep", back_populates="attack_path", + cascade="all, delete-orphan", + order_by="AttackPathStep.order_index", + ) + executions = relationship("AttackPathExecution", back_populates="attack_path") + creator = relationship("User", foreign_keys=[created_by]) + threat_actor = relationship("ThreatActor", foreign_keys=[threat_actor_id]) + + __table_args__ = ( + Index("ix_attack_paths_created_by", "created_by"), + Index("ix_attack_paths_is_template", "is_template"), + ) + + +class AttackPathStep(Base): + """One step in an attack path — maps to a kill-chain phase + technique.""" + + __tablename__ = "attack_path_steps" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + attack_path_id = Column( + UUID(as_uuid=True), ForeignKey("attack_paths.id", ondelete="CASCADE"), nullable=False + ) + order_index = Column(Integer, nullable=False, default=0) + kill_chain_phase = Column(String(60), nullable=True) # initial_access, execution, … + technique_id = Column( + UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="SET NULL"), nullable=True + ) + test_id = Column( + UUID(as_uuid=True), ForeignKey("tests.id", ondelete="SET NULL"), nullable=True + ) + name = Column(String(300), nullable=True) # human label for the step + description = Column(Text, nullable=True) + expected_detection = Column(Boolean, default=True) # do we expect blue to detect this? + notes = Column(Text, nullable=True) + + attack_path = relationship("AttackPath", back_populates="steps") + technique = relationship("Technique", foreign_keys=[technique_id]) + test = relationship("Test", foreign_keys=[test_id]) + + __table_args__ = ( + Index("ix_ap_steps_path_id", "attack_path_id"), + Index("ix_ap_steps_technique_id", "technique_id"), + ) + + +class AttackPathExecution(Base): + """ + A single run of an attack path. + Tracks Red/Blue participants, timing, and aggregated kill-chain metrics. + """ + __tablename__ = "attack_path_executions" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + attack_path_id = Column( + UUID(as_uuid=True), ForeignKey("attack_paths.id", ondelete="CASCADE"), nullable=False + ) + status = Column( + Enum(ExecutionStatus, name="execution_status"), nullable=False, + default=ExecutionStatus.planned, + ) + environment = Column(String(100), nullable=True) # prod, staging, lab + red_team_lead = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + blue_team_lead = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + started_by = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + started_at = Column(DateTime, nullable=True) + completed_at = Column(DateTime, nullable=True) + notes = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + + # ── Computed kill-chain metrics (written on complete) ───────────────── + total_steps = Column(Integer, nullable=True) + detected_steps = Column(Integer, nullable=True) + not_detected_steps = Column(Integer, nullable=True) + skipped_steps = Column(Integer, nullable=True) + detection_rate = Column(Float, nullable=True) # 0.0–1.0 + mttd_seconds = Column(Float, nullable=True) # mean time to detect (avg across detected) + furthest_undetected_step = Column(Integer, nullable=True) # order_index of deepest undetected step + + attack_path = relationship("AttackPath", back_populates="executions") + step_results = relationship( + "AttackPathStepResult", back_populates="execution", + cascade="all, delete-orphan", + order_by="AttackPathStepResult.step_order", + ) + timeline = relationship( + "TimelineEntry", back_populates="execution", + cascade="all, delete-orphan", + order_by="TimelineEntry.timestamp", + ) + red_lead_user = relationship("User", foreign_keys=[red_team_lead]) + blue_lead_user = relationship("User", foreign_keys=[blue_team_lead]) + + __table_args__ = ( + Index("ix_ap_exec_path_id", "attack_path_id"), + Index("ix_ap_exec_status", "status"), + ) + + +class AttackPathStepResult(Base): + """Result of executing one step in an attack path execution.""" + + __tablename__ = "attack_path_step_results" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + execution_id = Column( + UUID(as_uuid=True), ForeignKey("attack_path_executions.id", ondelete="CASCADE"), + nullable=False, + ) + step_id = Column( + UUID(as_uuid=True), ForeignKey("attack_path_steps.id", ondelete="CASCADE"), + nullable=False, + ) + step_order = Column(Integer, nullable=False, default=0) # denormalized for sorting + status = Column( + Enum(StepResultStatus, name="step_result_status"), nullable=False, + default=StepResultStatus.pending, + ) + executed_by = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + executed_at = Column(DateTime, nullable=True) + detected_at = Column(DateTime, nullable=True) + time_to_detect_seconds = Column(Float, nullable=True) + detection_asset_id = Column( + UUID(as_uuid=True), + ForeignKey("detection_assets.id", ondelete="SET NULL"), nullable=True + ) + notes = Column(Text, nullable=True) + evidence_ids = Column(JSONB, nullable=True, default=list) + + execution = relationship("AttackPathExecution", back_populates="step_results") + step = relationship("AttackPathStep") + detection_asset = relationship("DetectionAsset", foreign_keys=[detection_asset_id]) + executor = relationship("User", foreign_keys=[executed_by]) + + __table_args__ = ( + Index("ix_ap_stepres_execution_id", "execution_id"), + Index("ix_ap_stepres_step_id", "step_id"), + ) + + +class TimelineEntry(Base): + """Timestamped Red/Blue action during an execution — used for MTTD/MTTR.""" + + __tablename__ = "attack_path_timeline" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + execution_id = Column( + UUID(as_uuid=True), ForeignKey("attack_path_executions.id", ondelete="CASCADE"), + nullable=False, + ) + step_id = Column( + UUID(as_uuid=True), ForeignKey("attack_path_steps.id", ondelete="SET NULL"), + nullable=True, + ) + timestamp = Column(DateTime, nullable=False, default=datetime.utcnow) + actor_side = Column( + Enum(TimelineActorSide, name="timeline_actor_side"), nullable=False, + ) + actor_id = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + entry_type = Column( + Enum(TimelineEntryType, name="timeline_entry_type"), nullable=False, + ) + content = Column(Text, nullable=False) + extra = Column(JSONB, nullable=True) + + execution = relationship("AttackPathExecution", back_populates="timeline") + actor = relationship("User", foreign_keys=[actor_id]) + + __table_args__ = ( + Index("ix_timeline_execution_id", "execution_id"), + Index("ix_timeline_timestamp", "timestamp"), + ) diff --git a/backend/app/routers/attack_paths.py b/backend/app/routers/attack_paths.py new file mode 100644 index 0000000..c27f9d9 --- /dev/null +++ b/backend/app/routers/attack_paths.py @@ -0,0 +1,250 @@ +"""Phase 10: Attack Paths & Advanced Purple Team router.""" + +from typing import Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.schemas.attack_path_schema import ( + AttackPathCreate, AttackPathUpdate, AttackPathOut, + AttackPathStepCreate, AttackPathStepUpdate, AttackPathStepOut, + ExecutionCreate, ExecutionOut, + StepExecuteRequest, StepResultOut, + TimelineEntryCreate, TimelineEntryOut, + KillChainMetrics, +) +from app.services import attack_path_service as svc + +router = APIRouter(prefix="/attack-paths", tags=["attack-paths"]) + + +# ── Attack Paths CRUD ───────────────────────────────────────────────────────── + +@router.post("", response_model=AttackPathOut, status_code=201) +def create_attack_path( + body: AttackPathCreate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.create_attack_path(db, body.model_dump(), user.id) + + +@router.get("", response_model=list[AttackPathOut]) +def list_attack_paths( + is_template: Optional[bool] = None, + technique_id: Optional[UUID] = None, + is_active: Optional[bool] = True, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + paths = svc.list_attack_paths(db, is_template=is_template, + technique_id=technique_id, is_active=is_active) + # Inject step_count + result = [] + for p in paths: + d = AttackPathOut.model_validate(p) + d.step_count = len(p.steps) + result.append(d) + return result + + +@router.get("/{path_id}", response_model=AttackPathOut) +def get_attack_path( + path_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + p = svc.get_attack_path(db, path_id) + d = AttackPathOut.model_validate(p) + d.step_count = len(p.steps) + return d + + +@router.patch("/{path_id}", response_model=AttackPathOut) +def update_attack_path( + path_id: UUID, + body: AttackPathUpdate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.update_attack_path(db, path_id, body.model_dump(exclude_unset=True), user.id) + + +@router.delete("/{path_id}", status_code=204) +def delete_attack_path( + path_id: UUID, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + svc.delete_attack_path(db, path_id, user.id) + + +# ── Steps ───────────────────────────────────────────────────────────────────── + +@router.get("/{path_id}/steps", response_model=list[AttackPathStepOut]) +def list_steps( + path_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + path = svc.get_attack_path(db, path_id) + return path.steps + + +@router.post("/{path_id}/steps", response_model=AttackPathStepOut, status_code=201) +def add_step( + path_id: UUID, + body: AttackPathStepCreate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.add_step(db, path_id, body.model_dump(), user.id) + + +@router.patch("/{path_id}/steps/{step_id}", response_model=AttackPathStepOut) +def update_step( + path_id: UUID, + step_id: UUID, + body: AttackPathStepUpdate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.update_step(db, step_id, body.model_dump(exclude_unset=True), user.id) + + +@router.delete("/{path_id}/steps/{step_id}", status_code=204) +def delete_step( + path_id: UUID, + step_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + svc.delete_step(db, step_id, user.id) + + +@router.post("/{path_id}/steps/reorder", response_model=list[AttackPathStepOut]) +def reorder_steps( + path_id: UUID, + step_ids: list[UUID], + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Pass an ordered list of step UUIDs to reorder the steps.""" + return svc.reorder_steps(db, path_id, step_ids, user.id) + + +# ── Executions ──────────────────────────────────────────────────────────────── + +@router.post("/{path_id}/executions", response_model=ExecutionOut, status_code=201) +def create_execution( + path_id: UUID, + body: ExecutionCreate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.create_execution(db, path_id, body.model_dump(), user.id) + + +@router.get("/{path_id}/executions", response_model=list[ExecutionOut]) +def list_executions( + path_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.list_executions(db, path_id) + + +@router.get("/executions/{execution_id}", response_model=ExecutionOut) +def get_execution( + execution_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.get_execution(db, execution_id) + + +@router.post("/executions/{execution_id}/start", response_model=ExecutionOut) +def start_execution( + execution_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.start_execution(db, execution_id, user.id) + + +@router.post("/executions/{execution_id}/steps/{step_id}", response_model=StepResultOut) +def execute_step( + execution_id: UUID, + step_id: UUID, + body: StepExecuteRequest, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Record the result of one step (detected / not_detected / skipped).""" + return svc.execute_step(db, execution_id, step_id, body.model_dump(), user.id) + + +@router.get("/executions/{execution_id}/steps", response_model=list[StepResultOut]) +def list_step_results( + execution_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + ex = svc.get_execution(db, execution_id) + return ex.step_results + + +@router.post("/executions/{execution_id}/complete", response_model=ExecutionOut) +def complete_execution( + execution_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Mark execution as complete and compute kill-chain metrics.""" + return svc.complete_execution(db, execution_id, user.id) + + +@router.post("/executions/{execution_id}/abort", response_model=ExecutionOut) +def abort_execution( + execution_id: UUID, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + return svc.abort_execution(db, execution_id, user.id) + + +# ── Timeline ────────────────────────────────────────────────────────────────── + +@router.post("/executions/{execution_id}/timeline", + response_model=TimelineEntryOut, status_code=201) +def add_timeline_entry( + execution_id: UUID, + body: TimelineEntryCreate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.add_timeline_entry(db, execution_id, body.model_dump(), user.id) + + +@router.get("/executions/{execution_id}/timeline", response_model=list[TimelineEntryOut]) +def get_timeline( + execution_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return svc.get_timeline(db, execution_id) + + +# ── Kill-Chain Metrics ──────────────────────────────────────────────────────── + +@router.get("/executions/{execution_id}/metrics") +def get_metrics( + execution_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Return full kill-chain metrics for a completed (or partial) execution.""" + return svc.get_kill_chain_metrics(db, execution_id) diff --git a/backend/app/schemas/attack_path_schema.py b/backend/app/schemas/attack_path_schema.py new file mode 100644 index 0000000..972d2d6 --- /dev/null +++ b/backend/app/schemas/attack_path_schema.py @@ -0,0 +1,230 @@ +"""Pydantic schemas for Phase 10: Attack Paths & Advanced Purple Team.""" + +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, field_validator + +VALID_KILL_CHAIN_PHASES = [ + "reconnaissance", "resource_development", "initial_access", "execution", + "persistence", "privilege_escalation", "defense_evasion", "credential_access", + "discovery", "lateral_movement", "collection", "command_and_control", + "exfiltration", "impact", +] + + +# ── Attack Path ─────────────────────────────────────────────────────────────── + +class AttackPathCreate(BaseModel): + name: str + description: Optional[str] = None + objective: Optional[str] = None + is_template: bool = False + threat_actor_id: Optional[UUID] = None + tags: Optional[list[str]] = None + + +class AttackPathUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + objective: Optional[str] = None + is_template: Optional[bool] = None + threat_actor_id: Optional[UUID] = None + tags: Optional[list[str]] = None + is_active: Optional[bool] = None + + +class AttackPathOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + name: str + description: Optional[str] = None + objective: Optional[str] = None + is_template: bool + threat_actor_id: Optional[UUID] = None + created_by: Optional[UUID] = None + tags: Optional[list] = None + is_active: bool + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + step_count: Optional[int] = None # injected by service + + +# ── Attack Path Step ────────────────────────────────────────────────────────── + +class AttackPathStepCreate(BaseModel): + order_index: int = 0 + kill_chain_phase: Optional[str] = None + technique_id: Optional[UUID] = None + test_id: Optional[UUID] = None + name: Optional[str] = None + description: Optional[str] = None + expected_detection: bool = True + notes: Optional[str] = None + + @field_validator("kill_chain_phase") + @classmethod + def validate_phase(cls, v): + if v is not None and v not in VALID_KILL_CHAIN_PHASES: + raise ValueError(f"Invalid kill_chain_phase '{v}'. Valid: {VALID_KILL_CHAIN_PHASES}") + return v + + +class AttackPathStepUpdate(BaseModel): + order_index: Optional[int] = None + kill_chain_phase: Optional[str] = None + technique_id: Optional[UUID] = None + test_id: Optional[UUID] = None + name: Optional[str] = None + description: Optional[str] = None + expected_detection: Optional[bool] = None + notes: Optional[str] = None + + @field_validator("kill_chain_phase") + @classmethod + def validate_phase(cls, v): + if v is not None and v not in VALID_KILL_CHAIN_PHASES: + raise ValueError(f"Invalid kill_chain_phase '{v}'.") + return v + + +class AttackPathStepOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + attack_path_id: UUID + order_index: int + kill_chain_phase: Optional[str] = None + technique_id: Optional[UUID] = None + test_id: Optional[UUID] = None + name: Optional[str] = None + description: Optional[str] = None + expected_detection: bool + notes: Optional[str] = None + + +# ── Execution ───────────────────────────────────────────────────────────────── + +class ExecutionCreate(BaseModel): + environment: Optional[str] = None + red_team_lead: Optional[UUID] = None + blue_team_lead: Optional[UUID] = None + notes: Optional[str] = None + + +class ExecutionOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + attack_path_id: UUID + status: str + environment: Optional[str] = None + red_team_lead: Optional[UUID] = None + blue_team_lead: Optional[UUID] = None + started_by: Optional[UUID] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + notes: Optional[str] = None + created_at: Optional[datetime] = None + # metrics + total_steps: Optional[int] = None + detected_steps: Optional[int] = None + not_detected_steps: Optional[int] = None + skipped_steps: Optional[int] = None + detection_rate: Optional[float] = None + mttd_seconds: Optional[float] = None + furthest_undetected_step: Optional[int] = None + + +# ── Step Result ─────────────────────────────────────────────────────────────── + +class StepExecuteRequest(BaseModel): + status: str # detected / not_detected / skipped + executed_at: Optional[datetime] = None + detected_at: Optional[datetime] = None + detection_asset_id: Optional[UUID] = None + notes: Optional[str] = None + evidence_ids: Optional[list[UUID]] = None + + @field_validator("status") + @classmethod + def validate_status(cls, v): + valid = ("detected", "not_detected", "skipped", "executing") + if v not in valid: + raise ValueError(f"status must be one of {valid}") + return v + + +class StepResultOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + execution_id: UUID + step_id: UUID + step_order: int + status: str + executed_by: Optional[UUID] = None + executed_at: Optional[datetime] = None + detected_at: Optional[datetime] = None + time_to_detect_seconds: Optional[float] = None + detection_asset_id: Optional[UUID] = None + notes: Optional[str] = None + evidence_ids: Optional[list] = None + + +# ── Timeline ────────────────────────────────────────────────────────────────── + +class TimelineEntryCreate(BaseModel): + actor_side: str + entry_type: str + content: str + step_id: Optional[UUID] = None + timestamp: Optional[datetime] = None + extra: Optional[dict] = None + + @field_validator("actor_side") + @classmethod + def validate_side(cls, v): + if v not in ("red", "blue", "system"): + raise ValueError("actor_side must be red, blue or system") + return v + + @field_validator("entry_type") + @classmethod + def validate_type(cls, v): + valid = ("action", "detection", "note", "phase_transition", "flag") + if v not in valid: + raise ValueError(f"entry_type must be one of {valid}") + return v + + +class TimelineEntryOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + execution_id: UUID + step_id: Optional[UUID] = None + timestamp: datetime + actor_side: str + actor_id: Optional[UUID] = None + entry_type: str + content: str + extra: Optional[dict] = None + + +# ── Metrics ─────────────────────────────────────────────────────────────────── + +class KillChainMetrics(BaseModel): + execution_id: UUID + total_steps: int + detected_steps: int + not_detected_steps: int + skipped_steps: int + detection_rate: float # 0.0–1.0 + mttd_seconds: Optional[float] # mean time to detect + furthest_undetected_step: Optional[int] + furthest_undetected_phase: Optional[str] + step_breakdown: list[dict] # per-step detail + phase_summary: dict # detection rate per kill-chain phase diff --git a/backend/app/services/attack_path_service.py b/backend/app/services/attack_path_service.py new file mode 100644 index 0000000..64c5194 --- /dev/null +++ b/backend/app/services/attack_path_service.py @@ -0,0 +1,553 @@ +"""Phase 10: Attack Path CRUD service.""" + +import logging +from datetime import datetime +from typing import Optional +from uuid import UUID + +from sqlalchemy.orm import Session, joinedload + +from app.models.attack_path import ( + AttackPath, AttackPathStep, AttackPathExecution, + AttackPathStepResult, TimelineEntry, + ExecutionStatus, StepResultStatus, TimelineActorSide, TimelineEntryType, +) +from app.domain.exceptions import EntityNotFoundError +from app.services import audit_service + +logger = logging.getLogger(__name__) + + +def _now() -> datetime: + return datetime.utcnow() + + +# ── Attack Path CRUD ────────────────────────────────────────────────────────── + +def create_attack_path(db: Session, data: dict, user_id: UUID) -> AttackPath: + path = AttackPath( + name=data["name"], + description=data.get("description"), + objective=data.get("objective"), + is_template=data.get("is_template", False), + threat_actor_id=data.get("threat_actor_id"), + tags=data.get("tags") or [], + created_by=user_id, + ) + db.add(path) + db.commit() + db.refresh(path) + audit_service.log_action( + db, user_id, "ATTACK_PATH_CREATED", "attack_path", str(path.id), + details={"name": path.name, "is_template": path.is_template}, + ) + return path + + +def get_attack_path(db: Session, path_id: UUID) -> AttackPath: + path = ( + db.query(AttackPath) + .options(joinedload(AttackPath.steps)) + .filter(AttackPath.id == path_id) + .first() + ) + if not path: + raise EntityNotFoundError("AttackPath", str(path_id)) + return path + + +def list_attack_paths( + db: Session, + is_template: Optional[bool] = None, + technique_id: Optional[UUID] = None, + is_active: Optional[bool] = True, +) -> list[AttackPath]: + q = db.query(AttackPath) + if is_active is not None: + q = q.filter(AttackPath.is_active == is_active) + if is_template is not None: + q = q.filter(AttackPath.is_template == is_template) + if technique_id: + q = q.join(AttackPathStep).filter(AttackPathStep.technique_id == technique_id) + return q.order_by(AttackPath.created_at.desc()).all() + + +def update_attack_path(db: Session, path_id: UUID, data: dict, user_id: UUID) -> AttackPath: + path = db.query(AttackPath).filter(AttackPath.id == path_id).first() + if not path: + raise EntityNotFoundError("AttackPath", str(path_id)) + for k, v in data.items(): + if v is not None and hasattr(path, k): + setattr(path, k, v) + path.updated_at = _now() + db.commit() + db.refresh(path) + return path + + +def delete_attack_path(db: Session, path_id: UUID, user_id: UUID) -> None: + path = db.query(AttackPath).filter(AttackPath.id == path_id).first() + if not path: + raise EntityNotFoundError("AttackPath", str(path_id)) + path.is_active = False + path.updated_at = _now() + db.commit() + audit_service.log_action(db, user_id, "ATTACK_PATH_ARCHIVED", "attack_path", str(path_id)) + + +# ── Steps CRUD ──────────────────────────────────────────────────────────────── + +def add_step(db: Session, path_id: UUID, data: dict, user_id: UUID) -> AttackPathStep: + path = db.query(AttackPath).filter(AttackPath.id == path_id).first() + if not path: + raise EntityNotFoundError("AttackPath", str(path_id)) + + # Auto-assign order_index if not provided + if data.get("order_index") is None: + max_idx = db.query(AttackPathStep).filter( + AttackPathStep.attack_path_id == path_id + ).count() + data["order_index"] = max_idx + + step = AttackPathStep( + attack_path_id=path_id, + order_index=data.get("order_index", 0), + kill_chain_phase=data.get("kill_chain_phase"), + technique_id=data.get("technique_id"), + test_id=data.get("test_id"), + name=data.get("name"), + description=data.get("description"), + expected_detection=data.get("expected_detection", True), + notes=data.get("notes"), + ) + db.add(step) + path.updated_at = _now() + db.commit() + db.refresh(step) + return step + + +def update_step(db: Session, step_id: UUID, data: dict, user_id: UUID) -> AttackPathStep: + step = db.query(AttackPathStep).filter(AttackPathStep.id == step_id).first() + if not step: + raise EntityNotFoundError("AttackPathStep", str(step_id)) + for k, v in data.items(): + if v is not None and hasattr(step, k): + setattr(step, k, v) + db.commit() + db.refresh(step) + return step + + +def delete_step(db: Session, step_id: UUID, user_id: UUID) -> None: + step = db.query(AttackPathStep).filter(AttackPathStep.id == step_id).first() + if not step: + raise EntityNotFoundError("AttackPathStep", str(step_id)) + db.delete(step) + db.commit() + + +def reorder_steps(db: Session, path_id: UUID, step_ids: list[UUID], user_id: UUID) -> list[AttackPathStep]: + """Reorder steps by providing ordered list of step IDs.""" + path = db.query(AttackPath).filter(AttackPath.id == path_id).first() + if not path: + raise EntityNotFoundError("AttackPath", str(path_id)) + + for idx, step_id in enumerate(step_ids): + db.query(AttackPathStep).filter( + AttackPathStep.id == step_id, + AttackPathStep.attack_path_id == path_id, + ).update({"order_index": idx}) + + path.updated_at = _now() + db.commit() + + return ( + db.query(AttackPathStep) + .filter(AttackPathStep.attack_path_id == path_id) + .order_by(AttackPathStep.order_index) + .all() + ) + + +# ── Executions ──────────────────────────────────────────────────────────────── + +def create_execution( + db: Session, path_id: UUID, data: dict, user_id: UUID +) -> AttackPathExecution: + path = db.query(AttackPath).filter(AttackPath.id == path_id).first() + if not path: + raise EntityNotFoundError("AttackPath", str(path_id)) + + execution = AttackPathExecution( + attack_path_id=path_id, + status=ExecutionStatus.planned, + environment=data.get("environment"), + red_team_lead=data.get("red_team_lead"), + blue_team_lead=data.get("blue_team_lead"), + notes=data.get("notes"), + started_by=user_id, + ) + db.add(execution) + db.flush() + + # Pre-create pending step results for every step in the path + steps = ( + db.query(AttackPathStep) + .filter(AttackPathStep.attack_path_id == path_id) + .order_by(AttackPathStep.order_index) + .all() + ) + for step in steps: + result = AttackPathStepResult( + execution_id=execution.id, + step_id=step.id, + step_order=step.order_index, + status=StepResultStatus.pending, + ) + db.add(result) + + db.commit() + db.refresh(execution) + + # Auto-add system timeline entry + _add_system_entry( + db, execution.id, + entry_type=TimelineEntryType.phase_transition, + content=f"Execution created for '{path.name}' with {len(steps)} steps.", + ) + + audit_service.log_action( + db, user_id, "ATTACK_PATH_EXECUTION_STARTED", "attack_path_execution", + str(execution.id), + details={"path_id": str(path_id), "path_name": path.name, "steps": len(steps)}, + ) + return execution + + +def get_execution(db: Session, execution_id: UUID) -> AttackPathExecution: + ex = ( + db.query(AttackPathExecution) + .options( + joinedload(AttackPathExecution.step_results), + joinedload(AttackPathExecution.timeline), + ) + .filter(AttackPathExecution.id == execution_id) + .first() + ) + if not ex: + raise EntityNotFoundError("AttackPathExecution", str(execution_id)) + return ex + + +def list_executions(db: Session, path_id: UUID) -> list[AttackPathExecution]: + path = db.query(AttackPath).filter(AttackPath.id == path_id).first() + if not path: + raise EntityNotFoundError("AttackPath", str(path_id)) + return ( + db.query(AttackPathExecution) + .filter(AttackPathExecution.attack_path_id == path_id) + .order_by(AttackPathExecution.created_at.desc()) + .all() + ) + + +def start_execution(db: Session, execution_id: UUID, user_id: UUID) -> AttackPathExecution: + ex = db.query(AttackPathExecution).filter(AttackPathExecution.id == execution_id).first() + if not ex: + raise EntityNotFoundError("AttackPathExecution", str(execution_id)) + if ex.status not in (ExecutionStatus.planned,): + from fastapi import HTTPException + raise HTTPException(400, "Execution is not in 'planned' state") + ex.status = ExecutionStatus.in_progress + ex.started_at = _now() + db.commit() + db.refresh(ex) + _add_system_entry(db, execution_id, TimelineEntryType.phase_transition, + "Execution started.", actor_id=user_id, actor_side=TimelineActorSide.system) + return ex + + +# ── Step Execution ──────────────────────────────────────────────────────────── + +def execute_step( + db: Session, + execution_id: UUID, + step_id: UUID, + data: dict, + user_id: UUID, +) -> AttackPathStepResult: + """Record the result of executing one step.""" + ex = db.query(AttackPathExecution).filter(AttackPathExecution.id == execution_id).first() + if not ex: + raise EntityNotFoundError("AttackPathExecution", str(execution_id)) + if ex.status not in (ExecutionStatus.in_progress, ExecutionStatus.planned): + from fastapi import HTTPException + raise HTTPException(400, "Execution must be in_progress to record step results") + + # Auto-start if still planned + if ex.status == ExecutionStatus.planned: + ex.status = ExecutionStatus.in_progress + ex.started_at = _now() + + result = ( + db.query(AttackPathStepResult) + .filter( + AttackPathStepResult.execution_id == execution_id, + AttackPathStepResult.step_id == step_id, + ) + .first() + ) + if not result: + # Create on-the-fly if step was added after execution started + step = db.query(AttackPathStep).filter(AttackPathStep.id == step_id).first() + if not step: + raise EntityNotFoundError("AttackPathStep", str(step_id)) + result = AttackPathStepResult( + execution_id=execution_id, + step_id=step_id, + step_order=step.order_index, + ) + db.add(result) + + now = _now() + new_status = StepResultStatus(data["status"]) + result.status = new_status + result.executed_by = user_id + result.executed_at = data.get("executed_at") or now + result.notes = data.get("notes") + result.evidence_ids = [str(e) for e in (data.get("evidence_ids") or [])] + result.detection_asset_id = data.get("detection_asset_id") + + if new_status == StepResultStatus.detected: + result.detected_at = data.get("detected_at") or now + if result.executed_at: + delta = (result.detected_at - result.executed_at).total_seconds() + result.time_to_detect_seconds = max(0.0, delta) + + db.commit() + db.refresh(result) + + # Add timeline entry + step_obj = db.query(AttackPathStep).filter(AttackPathStep.id == step_id).first() + step_name = step_obj.name or (step_obj.kill_chain_phase or "Unknown step") + actor_side = TimelineActorSide.red if new_status != StepResultStatus.detected else TimelineActorSide.blue + entry_type = ( + TimelineEntryType.detection if new_status == StepResultStatus.detected + else TimelineEntryType.action + ) + content = ( + f"Step '{step_name}' marked as {new_status.value}." + + (f" Detected in {result.time_to_detect_seconds:.0f}s." if result.time_to_detect_seconds else "") + ) + _add_system_entry( + db, execution_id, entry_type, content, + actor_id=user_id, actor_side=actor_side, step_id=step_id, + ) + + return result + + +# ── Completion & Metrics ────────────────────────────────────────────────────── + +def complete_execution(db: Session, execution_id: UUID, user_id: UUID) -> AttackPathExecution: + """Mark execution complete and compute all kill-chain metrics.""" + ex = db.query(AttackPathExecution).filter(AttackPathExecution.id == execution_id).first() + if not ex: + raise EntityNotFoundError("AttackPathExecution", str(execution_id)) + + results = ( + db.query(AttackPathStepResult) + .filter(AttackPathStepResult.execution_id == execution_id) + .order_by(AttackPathStepResult.step_order) + .all() + ) + + total = len(results) + detected = sum(1 for r in results if r.status == StepResultStatus.detected) + not_detected = sum(1 for r in results if r.status == StepResultStatus.not_detected) + skipped = sum(1 for r in results if r.status == StepResultStatus.skipped) + + detection_rate = (detected / total) if total > 0 else 0.0 + + ttds = [r.time_to_detect_seconds for r in results + if r.time_to_detect_seconds is not None] + mttd = (sum(ttds) / len(ttds)) if ttds else None + + # Furthest undetected step (highest order_index with not_detected status) + undetected = [r for r in results if r.status == StepResultStatus.not_detected] + furthest = max((r.step_order for r in undetected), default=None) + + ex.status = ExecutionStatus.completed + ex.completed_at = _now() + ex.total_steps = total + ex.detected_steps = detected + ex.not_detected_steps = not_detected + ex.skipped_steps = skipped + ex.detection_rate = round(detection_rate, 4) + ex.mttd_seconds = round(mttd, 1) if mttd is not None else None + ex.furthest_undetected_step = furthest + + db.commit() + db.refresh(ex) + + _add_system_entry( + db, execution_id, TimelineEntryType.phase_transition, + f"Execution completed. Detection rate: {detection_rate:.0%}. " + f"Detected {detected}/{total} steps. " + + (f"MTTD: {mttd:.0f}s." if mttd else ""), + actor_id=user_id, actor_side=TimelineActorSide.system, + ) + + audit_service.log_action( + db, user_id, "ATTACK_PATH_EXECUTION_COMPLETED", "attack_path_execution", + str(execution_id), + details={"detection_rate": detection_rate, "mttd_seconds": mttd, + "detected": detected, "total": total}, + ) + return ex + + +def abort_execution(db: Session, execution_id: UUID, user_id: UUID) -> AttackPathExecution: + ex = db.query(AttackPathExecution).filter(AttackPathExecution.id == execution_id).first() + if not ex: + raise EntityNotFoundError("AttackPathExecution", str(execution_id)) + ex.status = ExecutionStatus.aborted + ex.completed_at = _now() + db.commit() + db.refresh(ex) + _add_system_entry(db, execution_id, TimelineEntryType.flag, "Execution aborted.", + actor_id=user_id, actor_side=TimelineActorSide.system) + return ex + + +# ── Timeline ────────────────────────────────────────────────────────────────── + +def add_timeline_entry( + db: Session, execution_id: UUID, data: dict, user_id: UUID +) -> TimelineEntry: + ex = db.query(AttackPathExecution).filter(AttackPathExecution.id == execution_id).first() + if not ex: + raise EntityNotFoundError("AttackPathExecution", str(execution_id)) + + entry = TimelineEntry( + execution_id=execution_id, + step_id=data.get("step_id"), + timestamp=data.get("timestamp") or _now(), + actor_side=TimelineActorSide(data["actor_side"]), + actor_id=user_id, + entry_type=TimelineEntryType(data["entry_type"]), + content=data["content"], + extra=data.get("extra"), + ) + db.add(entry) + db.commit() + db.refresh(entry) + return entry + + +def get_timeline(db: Session, execution_id: UUID) -> list[TimelineEntry]: + ex = db.query(AttackPathExecution).filter(AttackPathExecution.id == execution_id).first() + if not ex: + raise EntityNotFoundError("AttackPathExecution", str(execution_id)) + return ( + db.query(TimelineEntry) + .filter(TimelineEntry.execution_id == execution_id) + .order_by(TimelineEntry.timestamp.asc()) + .all() + ) + + +# ── Kill-Chain Metrics ──────────────────────────────────────────────────────── + +def get_kill_chain_metrics(db: Session, execution_id: UUID) -> dict: + ex = db.query(AttackPathExecution).filter(AttackPathExecution.id == execution_id).first() + if not ex: + raise EntityNotFoundError("AttackPathExecution", str(execution_id)) + + results = ( + db.query(AttackPathStepResult) + .filter(AttackPathStepResult.execution_id == execution_id) + .order_by(AttackPathStepResult.step_order) + .all() + ) + + step_breakdown = [] + phase_detected: dict[str, list] = {} + + for r in results: + step = db.query(AttackPathStep).filter(AttackPathStep.id == r.step_id).first() + phase = step.kill_chain_phase if step else None + entry = { + "step_id": str(r.step_id), + "step_order": r.step_order, + "step_name": step.name if step else None, + "kill_chain_phase": phase, + "status": r.status.value if hasattr(r.status, "value") else r.status, + "executed_at": r.executed_at.isoformat() if r.executed_at else None, + "detected_at": r.detected_at.isoformat() if r.detected_at else None, + "time_to_detect_seconds": r.time_to_detect_seconds, + "detection_asset_id": str(r.detection_asset_id) if r.detection_asset_id else None, + } + step_breakdown.append(entry) + if phase: + phase_detected.setdefault(phase, []).append( + r.status == StepResultStatus.detected + ) + + phase_summary = { + phase: { + "total": len(v), + "detected": sum(v), + "detection_rate": round(sum(v) / len(v), 3) if v else 0.0, + } + for phase, v in phase_detected.items() + } + + # Furthest undetected phase + furthest_undetected_phase = None + if ex.furthest_undetected_step is not None: + for r in reversed(results): + if r.step_order == ex.furthest_undetected_step: + step = db.query(AttackPathStep).filter(AttackPathStep.id == r.step_id).first() + if step: + furthest_undetected_phase = step.kill_chain_phase + break + + return { + "execution_id": str(execution_id), + "total_steps": ex.total_steps or len(results), + "detected_steps": ex.detected_steps or 0, + "not_detected_steps": ex.not_detected_steps or 0, + "skipped_steps": ex.skipped_steps or 0, + "detection_rate": ex.detection_rate or 0.0, + "mttd_seconds": ex.mttd_seconds, + "furthest_undetected_step": ex.furthest_undetected_step, + "furthest_undetected_phase": furthest_undetected_phase, + "step_breakdown": step_breakdown, + "phase_summary": phase_summary, + } + + +# ── Helper ──────────────────────────────────────────────────────────────────── + +def _add_system_entry( + db: Session, + execution_id: UUID, + entry_type: TimelineEntryType, + content: str, + actor_id: Optional[UUID] = None, + actor_side: TimelineActorSide = TimelineActorSide.system, + step_id: Optional[UUID] = None, +) -> None: + entry = TimelineEntry( + execution_id=execution_id, + step_id=step_id, + timestamp=_now(), + actor_side=actor_side, + actor_id=actor_id, + entry_type=entry_type, + content=content, + ) + db.add(entry) + db.commit()