fix(migration): rewrite b035 with raw SQL to avoid SQLAlchemy DDL hook
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

SQLAlchemy fires before_create for ALL known enum types when any table
is created via op.create_table, causing DuplicateObject even with
create_type=False. Rewrite both CREATE TABLE statements as raw SQL via
conn.execute(sa.text(...)) and use CREATE TABLE IF NOT EXISTS / CREATE
INDEX IF NOT EXISTS for full idempotency.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-19 16:54:32 +02:00
parent 36fe4aa250
commit f97b9e96b7

View File

@@ -3,12 +3,14 @@
Revision ID: b035ownerq Revision ID: b035ownerq
Revises: b034dlm Revises: b034dlm
Create Date: 2026-05-19 Create Date: 2026-05-19
Uses raw SQL for all DDL to avoid SQLAlchemy before_create hook issues
with existing enum types.
""" """
from typing import Union from typing import Union
from alembic import op from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision: str = "b035ownerq" revision: str = "b035ownerq"
down_revision: Union[str, None] = "b034dlm" down_revision: Union[str, None] = "b034dlm"
@@ -16,96 +18,101 @@ branch_labels = None
depends_on = None depends_on = None
def _table_exists(table_name: str) -> bool:
conn = op.get_bind()
return conn.dialect.has_table(conn, table_name)
def upgrade() -> None: def upgrade() -> None:
# ── Enums (idempotent via DO/EXCEPTION) ────────────────────────────────── conn = op.get_bind()
op.execute("""
# ── Enums (idempotent) ────────────────────────────────────────────────────
conn.execute(sa.text("""
DO $$ BEGIN DO $$ BEGIN
CREATE TYPE queue_priority AS ENUM ('critical', 'high', 'medium', 'low'); CREATE TYPE queue_priority AS ENUM ('critical', 'high', 'medium', 'low');
EXCEPTION WHEN duplicate_object THEN NULL; EXCEPTION WHEN duplicate_object THEN NULL;
END $$; END $$
""") """))
op.execute(""" conn.execute(sa.text("""
DO $$ BEGIN DO $$ BEGIN
CREATE TYPE queue_status AS ENUM ('pending', 'in_progress', 'completed', 'dismissed'); CREATE TYPE queue_status AS ENUM ('pending', 'in_progress', 'completed', 'dismissed');
EXCEPTION WHEN duplicate_object THEN NULL; EXCEPTION WHEN duplicate_object THEN NULL;
END $$; END $$
""") """))
op.execute(""" conn.execute(sa.text("""
DO $$ BEGIN DO $$ BEGIN
CREATE TYPE queue_reason AS ENUM ( CREATE TYPE queue_reason AS ENUM (
'validation_expired', 'infra_change', 'osint_alert', 'validation_expired', 'infra_change', 'osint_alert',
'mitre_update', 'rule_modified', 'low_confidence', 'manual'); 'mitre_update', 'rule_modified', 'low_confidence', 'manual');
EXCEPTION WHEN duplicate_object THEN NULL; EXCEPTION WHEN duplicate_object THEN NULL;
END $$; END $$
""") """))
# ── technique_ownerships ───────────────────────────────────────────────── # ── technique_ownerships ─────────────────────────────────────────────────
if not _table_exists("technique_ownerships"): conn.execute(sa.text("""
op.create_table( CREATE TABLE IF NOT EXISTS technique_ownerships (
"technique_ownerships", id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), technique_id UUID NOT NULL UNIQUE
sa.Column("technique_id", postgresql.UUID(as_uuid=True), REFERENCES techniques(id) ON DELETE CASCADE,
sa.ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False, unique=True), owner_id UUID
sa.Column("owner_id", postgresql.UUID(as_uuid=True), REFERENCES users(id) ON DELETE SET NULL,
sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), backup_owner_id UUID
sa.Column("backup_owner_id", postgresql.UUID(as_uuid=True), REFERENCES users(id) ON DELETE SET NULL,
sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), team VARCHAR(200),
sa.Column("team", sa.String(200), nullable=True), notes TEXT,
sa.Column("notes", sa.Text, nullable=True), assigned_at TIMESTAMP,
sa.Column("assigned_at", sa.DateTime, nullable=True), assigned_by UUID
sa.Column("assigned_by", postgresql.UUID(as_uuid=True), REFERENCES users(id) ON DELETE SET NULL,
sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), created_at TIMESTAMP DEFAULT now(),
sa.Column("created_at", sa.DateTime, server_default=sa.func.now()), updated_at TIMESTAMP DEFAULT now()
sa.Column("updated_at", sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now()),
) )
op.create_index("ix_techown_owner_id", "technique_ownerships", ["owner_id"]) """))
op.create_index("ix_techown_technique_id", "technique_ownerships", ["technique_id"]) conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_techown_owner_id ON technique_ownerships (owner_id)"
))
conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_techown_technique_id ON technique_ownerships (technique_id)"
))
# ── revalidation_queue_items ────────────────────────────────────────────── # ── revalidation_queue_items ──────────────────────────────────────────────
if not _table_exists("revalidation_queue_items"): conn.execute(sa.text("""
op.create_table( CREATE TABLE IF NOT EXISTS revalidation_queue_items (
"revalidation_queue_items", id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), technique_id UUID
sa.Column("technique_id", postgresql.UUID(as_uuid=True), REFERENCES techniques(id) ON DELETE CASCADE,
sa.ForeignKey("techniques.id", ondelete="CASCADE"), nullable=True), detection_asset_id UUID
sa.Column("detection_asset_id", postgresql.UUID(as_uuid=True), REFERENCES detection_assets(id) ON DELETE CASCADE,
sa.ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=True), priority queue_priority NOT NULL DEFAULT 'medium',
sa.Column("priority", sa.Enum("critical", "high", "medium", "low", reason queue_reason NOT NULL,
name="queue_priority", create_type=False), reason_detail TEXT,
nullable=False, server_default="medium"), status queue_status NOT NULL DEFAULT 'pending',
sa.Column("reason", sa.Enum("validation_expired", "infra_change", "osint_alert", assigned_to UUID
"mitre_update", "rule_modified", "low_confidence", "manual", REFERENCES users(id) ON DELETE SET NULL,
name="queue_reason", create_type=False), due_date TIMESTAMP,
nullable=False), created_at TIMESTAMP DEFAULT now(),
sa.Column("reason_detail", sa.Text, nullable=True), completed_at TIMESTAMP,
sa.Column("status", sa.Enum("pending", "in_progress", "completed", "dismissed", dismissed_at TIMESTAMP,
name="queue_status", create_type=False), completed_by UUID
nullable=False, server_default="pending"), REFERENCES users(id) ON DELETE SET NULL,
sa.Column("assigned_to", postgresql.UUID(as_uuid=True), extra JSONB
sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True),
sa.Column("due_date", sa.DateTime, nullable=True),
sa.Column("created_at", sa.DateTime, server_default=sa.func.now()),
sa.Column("completed_at", sa.DateTime, nullable=True),
sa.Column("dismissed_at", sa.DateTime, nullable=True),
sa.Column("completed_by", postgresql.UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True),
sa.Column("extra", postgresql.JSONB, nullable=True),
) )
op.create_index("ix_rqueue_status", "revalidation_queue_items", ["status"]) """))
op.create_index("ix_rqueue_priority", "revalidation_queue_items", ["priority"]) conn.execute(sa.text(
op.create_index("ix_rqueue_assigned_to", "revalidation_queue_items", ["assigned_to"]) "CREATE INDEX IF NOT EXISTS ix_rqueue_status ON revalidation_queue_items (status)"
op.create_index("ix_rqueue_technique_id", "revalidation_queue_items", ["technique_id"]) ))
op.create_index("ix_rqueue_asset_id", "revalidation_queue_items", ["detection_asset_id"]) conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_rqueue_priority ON revalidation_queue_items (priority)"
))
conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_rqueue_assigned_to ON revalidation_queue_items (assigned_to)"
))
conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_rqueue_technique_id ON revalidation_queue_items (technique_id)"
))
conn.execute(sa.text(
"CREATE INDEX IF NOT EXISTS ix_rqueue_asset_id ON revalidation_queue_items (detection_asset_id)"
))
def downgrade() -> None: def downgrade() -> None:
op.drop_table("revalidation_queue_items") conn = op.get_bind()
op.drop_table("technique_ownerships") conn.execute(sa.text("DROP TABLE IF EXISTS revalidation_queue_items"))
op.execute("DROP TYPE IF EXISTS queue_reason") conn.execute(sa.text("DROP TABLE IF EXISTS technique_ownerships"))
op.execute("DROP TYPE IF EXISTS queue_status") conn.execute(sa.text("DROP TYPE IF EXISTS queue_reason"))
op.execute("DROP TYPE IF EXISTS queue_priority") conn.execute(sa.text("DROP TYPE IF EXISTS queue_status"))
conn.execute(sa.text("DROP TYPE IF EXISTS queue_priority"))