"""Phase 8: Detection Lifecycle Management tables. Revision ID: b034dlm Revises: b033syscfg """ from typing import Sequence, Union import sqlalchemy as sa from sqlalchemy.dialects import postgresql from alembic import op revision: str = "b034dlm" down_revision: Union[str, None] = "b033syscfg" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def _table_exists(name: str) -> bool: bind = op.get_bind() insp = sa.inspect(bind) return name in insp.get_table_names() def upgrade() -> None: if not _table_exists("detection_assets"): op.create_table( "detection_assets", sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), sa.Column("name", sa.String(500), nullable=False), sa.Column("description", sa.Text), sa.Column("asset_type", sa.String(50), nullable=False), sa.Column("platform", sa.String(100)), sa.Column("rule_content", sa.Text), sa.Column("rule_language", sa.String(50)), sa.Column("rule_repository_url", sa.Text), sa.Column("rule_file_path", sa.String(500)), sa.Column("rule_version", sa.String(50)), sa.Column("rule_hash", sa.String(64)), sa.Column("last_rule_change_at", sa.DateTime), sa.Column("log_source_name", sa.String(200)), sa.Column("log_source_version", sa.String(50)), sa.Column("log_source_config", postgresql.JSONB, server_default="{}"), sa.Column("infrastructure_hash", sa.String(64)), sa.Column("infrastructure_details", postgresql.JSONB, server_default="{}"), sa.Column("health_status", sa.String(20), server_default="untested", nullable=False), sa.Column("last_alert_at", sa.DateTime), sa.Column("alert_count_30d", sa.Integer, server_default="0"), sa.Column("false_positive_rate", sa.Float), sa.Column("expected_alert_frequency", sa.String(50)), sa.Column("owner_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")), sa.Column("backup_owner_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")), sa.Column("team", sa.String(100)), sa.Column("is_active", sa.Boolean, server_default="true", nullable=False), sa.Column("tags", postgresql.JSONB, server_default="[]"), sa.Column("asset_metadata", postgresql.JSONB, server_default="{}"), sa.Column("created_by", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()")), sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()")), ) op.create_index("ix_detection_assets_platform", "detection_assets", ["platform"]) op.create_index("ix_detection_assets_health_status", "detection_assets", ["health_status"]) op.create_index("ix_detection_assets_owner_id", "detection_assets", ["owner_id"]) if not _table_exists("detection_technique_mappings"): op.create_table( "detection_technique_mappings", sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), sa.Column("detection_asset_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=False), sa.Column("technique_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False), sa.Column("coverage_type", sa.String(50), server_default="detect"), sa.Column("confidence_level", sa.String(20), server_default="medium"), sa.Column("notes", sa.Text), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()")), ) op.create_index("ix_detection_technique_mappings_technique_id", "detection_technique_mappings", ["technique_id"]) op.create_index("ix_detection_technique_mappings_asset_id", "detection_technique_mappings", ["detection_asset_id"]) if not _table_exists("detection_validations"): op.create_table( "detection_validations", sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), sa.Column("detection_asset_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=False), sa.Column("technique_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("techniques.id", ondelete="SET NULL")), sa.Column("test_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("tests.id", ondelete="SET NULL")), sa.Column("validated_at", sa.DateTime), sa.Column("expires_at", sa.DateTime, nullable=False), sa.Column("is_valid", sa.Boolean, server_default="true", nullable=False), sa.Column("validation_result", sa.String(50)), sa.Column("validation_method", sa.String(100)), sa.Column("rule_hash_at_validation", sa.String(64)), sa.Column("log_source_version_at_validation", sa.String(50)), sa.Column("infrastructure_hash_at_validation", sa.String(64)), sa.Column("environment_snapshot", postgresql.JSONB, server_default="{}"), sa.Column("invalidated_at", sa.DateTime), sa.Column("invalidation_reason", sa.String(50)), sa.Column("invalidation_details", sa.Text), sa.Column("invalidated_by", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")), sa.Column("validated_by", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=False), sa.Column("integrity_hash", sa.String(64)), sa.Column("notes", sa.Text), sa.Column("evidence_ids", postgresql.JSONB, server_default="[]"), ) op.create_index("ix_detection_validations_asset_id_valid", "detection_validations", ["detection_asset_id", "is_valid"]) op.create_index("ix_detection_validations_expires_at", "detection_validations", ["expires_at"]) if not _table_exists("technique_confidence_scores"): op.create_table( "technique_confidence_scores", sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), sa.Column("technique_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False, unique=True), sa.Column("confidence_level", sa.String(20), server_default="unknown"), sa.Column("confidence_score", sa.Float, server_default="0.0"), sa.Column("detection_count", sa.Integer, server_default="0"), sa.Column("valid_detection_count", sa.Integer, server_default="0"), sa.Column("last_validated_at", sa.DateTime), sa.Column("next_validation_due", sa.DateTime), sa.Column("last_recalculated_at", sa.DateTime), sa.Column("recency_factor", sa.Float, server_default="0.0"), sa.Column("coverage_factor", sa.Float, server_default="0.0"), sa.Column("health_factor", sa.Float, server_default="0.0"), sa.Column("diversity_factor", sa.Float, server_default="0.0"), sa.Column("score_breakdown", postgresql.JSONB, server_default="{}"), sa.Column("risk_factors", postgresql.JSONB, server_default="[]"), sa.Column("updated_at", sa.DateTime), ) op.create_index("ix_technique_confidence_scores_technique_id", "technique_confidence_scores", ["technique_id"]) op.create_index("ix_technique_confidence_scores_confidence_level", "technique_confidence_scores", ["confidence_level"]) if not _table_exists("infrastructure_change_logs"): op.create_table( "infrastructure_change_logs", sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), sa.Column("change_type", sa.String(100), nullable=False), sa.Column("description", sa.Text, nullable=False), sa.Column("affected_platforms", postgresql.JSONB, server_default="[]"), sa.Column("affected_log_sources", postgresql.JSONB, server_default="[]"), sa.Column("change_date", sa.DateTime), sa.Column("reported_by", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")), sa.Column("auto_invalidate", sa.Boolean, server_default="true"), sa.Column("invalidated_count", sa.Integer, server_default="0"), sa.Column("change_metadata", postgresql.JSONB, server_default="{}"), sa.Column("created_at", sa.DateTime), ) op.create_index("ix_infrastructure_change_logs_change_date", "infrastructure_change_logs", ["change_date"]) if not _table_exists("decay_policies"): op.create_table( "decay_policies", sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), sa.Column("name", sa.String(200), nullable=False), sa.Column("description", sa.Text), sa.Column("applies_to_platform", sa.String(100)), sa.Column("applies_to_asset_type", sa.String(50)), sa.Column("applies_to_tactic", sa.String(100)), sa.Column("fresh_days", sa.Integer, server_default="90"), sa.Column("aging_days", sa.Integer, server_default="180"), sa.Column("stale_days", sa.Integer, server_default="365"), sa.Column("default_validity_days", sa.Integer, server_default="180"), sa.Column("silent_threshold_days", sa.Integer, server_default="30"), sa.Column("noisy_threshold_daily", sa.Integer, server_default="100"), sa.Column("recency_weight", sa.Float, server_default="0.3"), sa.Column("coverage_weight", sa.Float, server_default="0.3"), sa.Column("health_weight", sa.Float, server_default="0.25"), sa.Column("diversity_weight", sa.Float, server_default="0.15"), sa.Column("is_default", sa.Boolean, server_default="false"), sa.Column("is_active", sa.Boolean, server_default="true"), sa.Column("created_at", sa.DateTime), sa.Column("updated_at", sa.DateTime), ) def downgrade() -> None: for table in ["decay_policies", "infrastructure_change_logs", "technique_confidence_scores", "detection_validations", "detection_technique_mappings", "detection_assets"]: if _table_exists(table): op.drop_table(table)