"""Phase 10: Attack Paths & Advanced Purple Team Revision ID: b036atk Revises: b035ownerq Create Date: 2026-05-19 Uses raw SQL to avoid SQLAlchemy DDL hook issues with enum types. """ from typing import Union from alembic import op import sqlalchemy as sa revision: str = "b036atk" down_revision: Union[str, None] = "b035ownerq" branch_labels = None depends_on = None def upgrade() -> None: conn = op.get_bind() # ── Enums ───────────────────────────────────────────────────────────────── conn.execute(sa.text(""" DO $$ BEGIN CREATE TYPE execution_status AS ENUM ('planned','in_progress','completed','aborted'); EXCEPTION WHEN duplicate_object THEN NULL; END $$ """)) conn.execute(sa.text(""" DO $$ BEGIN CREATE TYPE step_result_status AS ENUM ('pending','executing','detected','not_detected','skipped'); EXCEPTION WHEN duplicate_object THEN NULL; END $$ """)) conn.execute(sa.text(""" DO $$ BEGIN CREATE TYPE timeline_actor_side AS ENUM ('red','blue','system'); EXCEPTION WHEN duplicate_object THEN NULL; END $$ """)) conn.execute(sa.text(""" DO $$ BEGIN CREATE TYPE timeline_entry_type AS ENUM ('action','detection','note','phase_transition','flag'); EXCEPTION WHEN duplicate_object THEN NULL; END $$ """)) # ── attack_paths ────────────────────────────────────────────────────────── conn.execute(sa.text(""" CREATE TABLE IF NOT EXISTS attack_paths ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(300) NOT NULL, description TEXT, objective TEXT, is_template BOOLEAN DEFAULT FALSE, threat_actor_id UUID REFERENCES threat_actors(id) ON DELETE SET NULL, created_by UUID REFERENCES users(id) ON DELETE SET NULL, tags JSONB, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT now(), updated_at TIMESTAMP DEFAULT now() ) """)) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_attack_paths_created_by ON attack_paths (created_by)" )) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_attack_paths_is_template ON attack_paths (is_template)" )) # ── attack_path_steps ───────────────────────────────────────────────────── conn.execute(sa.text(""" CREATE TABLE IF NOT EXISTS attack_path_steps ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), attack_path_id UUID NOT NULL REFERENCES attack_paths(id) ON DELETE CASCADE, order_index INTEGER NOT NULL DEFAULT 0, kill_chain_phase VARCHAR(60), technique_id UUID REFERENCES techniques(id) ON DELETE SET NULL, test_id UUID REFERENCES tests(id) ON DELETE SET NULL, name VARCHAR(300), description TEXT, expected_detection BOOLEAN DEFAULT TRUE, notes TEXT ) """)) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_ap_steps_path_id ON attack_path_steps (attack_path_id)" )) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_ap_steps_technique_id ON attack_path_steps (technique_id)" )) # ── attack_path_executions ──────────────────────────────────────────────── conn.execute(sa.text(""" CREATE TABLE IF NOT EXISTS attack_path_executions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), attack_path_id UUID NOT NULL REFERENCES attack_paths(id) ON DELETE CASCADE, status execution_status NOT NULL DEFAULT 'planned', environment VARCHAR(100), red_team_lead UUID REFERENCES users(id) ON DELETE SET NULL, blue_team_lead UUID REFERENCES users(id) ON DELETE SET NULL, started_by UUID REFERENCES users(id) ON DELETE SET NULL, started_at TIMESTAMP, completed_at TIMESTAMP, notes TEXT, created_at TIMESTAMP DEFAULT now(), -- kill-chain metrics (populated on completion) total_steps INTEGER, detected_steps INTEGER, not_detected_steps INTEGER, skipped_steps INTEGER, detection_rate FLOAT, mttd_seconds FLOAT, furthest_undetected_step INTEGER ) """)) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_ap_exec_path_id ON attack_path_executions (attack_path_id)" )) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_ap_exec_status ON attack_path_executions (status)" )) # ── attack_path_step_results ────────────────────────────────────────────── conn.execute(sa.text(""" CREATE TABLE IF NOT EXISTS attack_path_step_results ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), execution_id UUID NOT NULL REFERENCES attack_path_executions(id) ON DELETE CASCADE, step_id UUID NOT NULL REFERENCES attack_path_steps(id) ON DELETE CASCADE, step_order INTEGER NOT NULL DEFAULT 0, status step_result_status NOT NULL DEFAULT 'pending', executed_by UUID REFERENCES users(id) ON DELETE SET NULL, executed_at TIMESTAMP, detected_at TIMESTAMP, time_to_detect_seconds FLOAT, detection_asset_id UUID REFERENCES detection_assets(id) ON DELETE SET NULL, notes TEXT, evidence_ids JSONB ) """)) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_ap_stepres_exec ON attack_path_step_results (execution_id)" )) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_ap_stepres_step ON attack_path_step_results (step_id)" )) # ── attack_path_timeline ────────────────────────────────────────────────── conn.execute(sa.text(""" CREATE TABLE IF NOT EXISTS attack_path_timeline ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), execution_id UUID NOT NULL REFERENCES attack_path_executions(id) ON DELETE CASCADE, step_id UUID REFERENCES attack_path_steps(id) ON DELETE SET NULL, timestamp TIMESTAMP NOT NULL DEFAULT now(), actor_side timeline_actor_side NOT NULL, actor_id UUID REFERENCES users(id) ON DELETE SET NULL, entry_type timeline_entry_type NOT NULL, content TEXT NOT NULL, extra JSONB ) """)) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_timeline_execution_id ON attack_path_timeline (execution_id)" )) conn.execute(sa.text( "CREATE INDEX IF NOT EXISTS ix_timeline_timestamp ON attack_path_timeline (timestamp)" )) def downgrade() -> None: conn = op.get_bind() conn.execute(sa.text("DROP TABLE IF EXISTS attack_path_timeline")) conn.execute(sa.text("DROP TABLE IF EXISTS attack_path_step_results")) conn.execute(sa.text("DROP TABLE IF EXISTS attack_path_executions")) conn.execute(sa.text("DROP TABLE IF EXISTS attack_path_steps")) conn.execute(sa.text("DROP TABLE IF EXISTS attack_paths")) conn.execute(sa.text("DROP TYPE IF EXISTS timeline_entry_type")) conn.execute(sa.text("DROP TYPE IF EXISTS timeline_actor_side")) conn.execute(sa.text("DROP TYPE IF EXISTS step_result_status")) conn.execute(sa.text("DROP TYPE IF EXISTS execution_status"))