Files
Aegis/backend/alembic/versions/b012_add_detection_rule_associations.py

67 lines
3.0 KiB
Python

"""add_detection_rule_associations
Revision ID: b012detectionassoc
Revises: b011defensive
Create Date: 2026-02-09 17:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
# revision identifiers, used by Alembic.
revision: str = 'b012detectionassoc'
down_revision: Union[str, Sequence[str], None] = 'b011defensive'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create test_template_detection_rules and test_detection_results tables."""
# test_template_detection_rules (template ↔ detection rule association)
op.create_table(
'test_template_detection_rules',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('test_template_id', UUID(as_uuid=True),
sa.ForeignKey('test_templates.id', ondelete='CASCADE'), nullable=True),
sa.Column('detection_rule_id', UUID(as_uuid=True),
sa.ForeignKey('detection_rules.id', ondelete='CASCADE'), nullable=False),
sa.Column('is_primary', sa.Boolean(), server_default='false'),
)
op.create_index('ix_ttdr_template', 'test_template_detection_rules', ['test_template_id'])
op.create_index('ix_ttdr_rule', 'test_template_detection_rules', ['detection_rule_id'])
op.create_unique_constraint('uq_template_detection_rule', 'test_template_detection_rules',
['test_template_id', 'detection_rule_id'])
# test_detection_results (per-test, per-rule evaluation results)
op.create_table(
'test_detection_results',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('test_id', UUID(as_uuid=True),
sa.ForeignKey('tests.id', ondelete='CASCADE'), nullable=False),
sa.Column('detection_rule_id', UUID(as_uuid=True),
sa.ForeignKey('detection_rules.id', ondelete='CASCADE'), nullable=False),
sa.Column('triggered', sa.Boolean(), nullable=True),
sa.Column('notes', sa.Text(), nullable=True),
sa.Column('evaluated_by', UUID(as_uuid=True),
sa.ForeignKey('users.id', ondelete='SET NULL'), nullable=True),
sa.Column('evaluated_at', sa.DateTime(), nullable=True),
)
op.create_index('ix_tdr_test', 'test_detection_results', ['test_id'])
op.create_index('ix_tdr_rule', 'test_detection_results', ['detection_rule_id'])
def downgrade() -> None:
"""Drop test_detection_results and test_template_detection_rules tables."""
op.drop_index('ix_tdr_rule', table_name='test_detection_results')
op.drop_index('ix_tdr_test', table_name='test_detection_results')
op.drop_table('test_detection_results')
op.drop_constraint('uq_template_detection_rule', 'test_template_detection_rules', type_='unique')
op.drop_index('ix_ttdr_rule', table_name='test_template_detection_rules')
op.drop_index('ix_ttdr_template', table_name='test_template_detection_rules')
op.drop_table('test_template_detection_rules')