"""Detection Lifecycle Management models.""" import uuid import enum from datetime import datetime from sqlalchemy import ( Column, String, Integer, Float, Boolean, DateTime, ForeignKey, Text, Enum as SQLEnum ) from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.orm import relationship from app.database import Base class DetectionConfidence(str, enum.Enum): fresh = "fresh" aging = "aging" stale = "stale" broken = "broken" unknown = "unknown" class DetectionHealthStatus(str, enum.Enum): healthy = "healthy" silent = "silent" noisy = "noisy" orphan = "orphan" deprecated = "deprecated" untested = "untested" class InvalidationReason(str, enum.Enum): time_decay = "time_decay" mitre_update = "mitre_update" log_source_change = "log_source_change" siem_update = "siem_update" edr_update = "edr_update" infrastructure_change = "infrastructure_change" parser_change = "parser_change" manual = "manual" rule_modified = "rule_modified" class DetectionAsset(Base): __tablename__ = "detection_assets" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(500), nullable=False) description = Column(Text) asset_type = Column(String(50), nullable=False) platform = Column(String(100)) rule_content = Column(Text) rule_language = Column(String(50)) rule_repository_url = Column(Text) rule_file_path = Column(String(500)) rule_version = Column(String(50)) rule_hash = Column(String(64)) last_rule_change_at = Column(DateTime) log_source_name = Column(String(200)) log_source_version = Column(String(50)) log_source_config = Column(JSONB, server_default='{}') infrastructure_hash = Column(String(64)) infrastructure_details = Column(JSONB, server_default='{}') health_status = Column( SQLEnum(DetectionHealthStatus, name="detectionhealthstatus"), default=DetectionHealthStatus.untested, nullable=False, server_default="untested", ) last_alert_at = Column(DateTime) alert_count_30d = Column(Integer, default=0, server_default='0') false_positive_rate = Column(Float) expected_alert_frequency = Column(String(50)) owner_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True) backup_owner_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True) team = Column(String(100)) is_active = Column(Boolean, default=True, nullable=False, server_default='true') tags = Column(JSONB, server_default='[]') asset_metadata = Column(JSONB, server_default='{}') created_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True) created_at = Column(DateTime(timezone=True), server_default='now()') updated_at = Column(DateTime(timezone=True), server_default='now()') technique_mappings = relationship("DetectionTechniqueMapping", back_populates="detection_asset", cascade="all, delete-orphan") validations = relationship("DetectionValidation", back_populates="detection_asset", cascade="all, delete-orphan") class DetectionTechniqueMapping(Base): __tablename__ = "detection_technique_mappings" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) detection_asset_id = Column(UUID(as_uuid=True), ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=False) technique_id = Column(UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False) coverage_type = Column(String(50), default="detect", server_default="detect") confidence_level = Column(String(20), default="medium", server_default="medium") notes = Column(Text) created_at = Column(DateTime(timezone=True), server_default='now()') detection_asset = relationship("DetectionAsset", back_populates="technique_mappings") class DetectionValidation(Base): __tablename__ = "detection_validations" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) detection_asset_id = Column(UUID(as_uuid=True), ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=False) technique_id = Column(UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="SET NULL"), nullable=True) test_id = Column(UUID(as_uuid=True), ForeignKey("tests.id", ondelete="SET NULL"), nullable=True) validated_at = Column(DateTime, default=datetime.utcnow) expires_at = Column(DateTime, nullable=False) is_valid = Column(Boolean, default=True, nullable=False, server_default='true') validation_result = Column(String(50)) validation_method = Column(String(100)) rule_hash_at_validation = Column(String(64)) log_source_version_at_validation = Column(String(50)) infrastructure_hash_at_validation = Column(String(64)) environment_snapshot = Column(JSONB, server_default='{}') invalidated_at = Column(DateTime) invalidation_reason = Column(SQLEnum(InvalidationReason, name="invalidationreason"), nullable=True) invalidation_details = Column(Text) invalidated_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True) validated_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=False) integrity_hash = Column(String(64)) notes = Column(Text) evidence_ids = Column(JSONB, server_default='[]') detection_asset = relationship("DetectionAsset", back_populates="validations") class TechniqueConfidenceScore(Base): __tablename__ = "technique_confidence_scores" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) technique_id = Column(UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False, unique=True) confidence_level = Column( SQLEnum(DetectionConfidence, name="detectionconfidence"), default=DetectionConfidence.unknown, server_default="unknown", ) confidence_score = Column(Float, default=0.0, server_default='0.0') detection_count = Column(Integer, default=0, server_default='0') valid_detection_count = Column(Integer, default=0, server_default='0') last_validated_at = Column(DateTime) next_validation_due = Column(DateTime) last_recalculated_at = Column(DateTime, default=datetime.utcnow) recency_factor = Column(Float, default=0.0, server_default='0.0') coverage_factor = Column(Float, default=0.0, server_default='0.0') health_factor = Column(Float, default=0.0, server_default='0.0') diversity_factor = Column(Float, default=0.0, server_default='0.0') score_breakdown = Column(JSONB, server_default='{}') risk_factors = Column(JSONB, server_default='[]') updated_at = Column(DateTime, default=datetime.utcnow) class InfrastructureChangeLog(Base): __tablename__ = "infrastructure_change_logs" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) change_type = Column(String(100), nullable=False) description = Column(Text, nullable=False) affected_platforms = Column(JSONB, server_default='[]') affected_log_sources = Column(JSONB, server_default='[]') change_date = Column(DateTime, default=datetime.utcnow) reported_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True) auto_invalidate = Column(Boolean, default=True, server_default='true') invalidated_count = Column(Integer, default=0, server_default='0') change_metadata = Column(JSONB, server_default='{}') created_at = Column(DateTime, default=datetime.utcnow)