Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Tasks 8.1-8.5: Models (8.1): - DetectionAsset: SIEM/EDR/Sigma rule assets with auto-hash - DetectionTechniqueMapping: N:M asset ↔ technique coverage - DetectionValidation: immutable validation records with expiry - TechniqueConfidenceScore: computed multi-factor confidence - InfrastructureChangeLog: infra changes that invalidate detections - DecayPolicy: configurable freshness thresholds per platform/tactic Services (8.2, 8.3): - detection_asset_service: CRUD + SHA-256 rule hashing + auto- invalidation on rule/infra changes - decay_engine_service: daily decay engine — expires stale validations, recalculates confidence (recency/coverage/health/diversity factors), processes infrastructure change propagation Router (8.4): 15 endpoints under /api/v1/detection-lifecycle: assets CRUD, technique mappings, validations, confidence scores, infrastructure changes, decay trigger, executive dashboard Scheduler (8.3): decay engine runs daily at 02:00 Seed (8.5): default policy (90/180/365d) + strict initial-access policy Migration: b034dlm (6 tables, 11 indexes) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
169 lines
7.5 KiB
Python
169 lines
7.5 KiB
Python
"""Detection Lifecycle Management models."""
|
|
|
|
import uuid
|
|
import enum
|
|
from datetime import datetime
|
|
from sqlalchemy import (
|
|
Column, String, Integer, Float, Boolean, DateTime,
|
|
ForeignKey, Text, Enum as SQLEnum
|
|
)
|
|
from sqlalchemy.dialects.postgresql import UUID, JSONB
|
|
from sqlalchemy.orm import relationship
|
|
from app.database import Base
|
|
|
|
|
|
class DetectionConfidence(str, enum.Enum):
|
|
fresh = "fresh"
|
|
aging = "aging"
|
|
stale = "stale"
|
|
broken = "broken"
|
|
unknown = "unknown"
|
|
|
|
|
|
class DetectionHealthStatus(str, enum.Enum):
|
|
healthy = "healthy"
|
|
silent = "silent"
|
|
noisy = "noisy"
|
|
orphan = "orphan"
|
|
deprecated = "deprecated"
|
|
untested = "untested"
|
|
|
|
|
|
class InvalidationReason(str, enum.Enum):
|
|
time_decay = "time_decay"
|
|
mitre_update = "mitre_update"
|
|
log_source_change = "log_source_change"
|
|
siem_update = "siem_update"
|
|
edr_update = "edr_update"
|
|
infrastructure_change = "infrastructure_change"
|
|
parser_change = "parser_change"
|
|
manual = "manual"
|
|
rule_modified = "rule_modified"
|
|
|
|
|
|
class DetectionAsset(Base):
|
|
__tablename__ = "detection_assets"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
name = Column(String(500), nullable=False)
|
|
description = Column(Text)
|
|
asset_type = Column(String(50), nullable=False)
|
|
platform = Column(String(100))
|
|
rule_content = Column(Text)
|
|
rule_language = Column(String(50))
|
|
rule_repository_url = Column(Text)
|
|
rule_file_path = Column(String(500))
|
|
rule_version = Column(String(50))
|
|
rule_hash = Column(String(64))
|
|
last_rule_change_at = Column(DateTime)
|
|
log_source_name = Column(String(200))
|
|
log_source_version = Column(String(50))
|
|
log_source_config = Column(JSONB, server_default='{}')
|
|
infrastructure_hash = Column(String(64))
|
|
infrastructure_details = Column(JSONB, server_default='{}')
|
|
health_status = Column(
|
|
SQLEnum(DetectionHealthStatus, name="detectionhealthstatus"),
|
|
default=DetectionHealthStatus.untested,
|
|
nullable=False,
|
|
server_default="untested",
|
|
)
|
|
last_alert_at = Column(DateTime)
|
|
alert_count_30d = Column(Integer, default=0, server_default='0')
|
|
false_positive_rate = Column(Float)
|
|
expected_alert_frequency = Column(String(50))
|
|
owner_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
|
|
backup_owner_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
|
|
team = Column(String(100))
|
|
is_active = Column(Boolean, default=True, nullable=False, server_default='true')
|
|
tags = Column(JSONB, server_default='[]')
|
|
asset_metadata = Column(JSONB, server_default='{}')
|
|
created_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
|
|
created_at = Column(DateTime(timezone=True), server_default='now()')
|
|
updated_at = Column(DateTime(timezone=True), server_default='now()')
|
|
|
|
technique_mappings = relationship("DetectionTechniqueMapping", back_populates="detection_asset", cascade="all, delete-orphan")
|
|
validations = relationship("DetectionValidation", back_populates="detection_asset", cascade="all, delete-orphan")
|
|
|
|
|
|
class DetectionTechniqueMapping(Base):
|
|
__tablename__ = "detection_technique_mappings"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
detection_asset_id = Column(UUID(as_uuid=True), ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=False)
|
|
technique_id = Column(UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False)
|
|
coverage_type = Column(String(50), default="detect", server_default="detect")
|
|
confidence_level = Column(String(20), default="medium", server_default="medium")
|
|
notes = Column(Text)
|
|
created_at = Column(DateTime(timezone=True), server_default='now()')
|
|
|
|
detection_asset = relationship("DetectionAsset", back_populates="technique_mappings")
|
|
|
|
|
|
class DetectionValidation(Base):
|
|
__tablename__ = "detection_validations"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
detection_asset_id = Column(UUID(as_uuid=True), ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=False)
|
|
technique_id = Column(UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="SET NULL"), nullable=True)
|
|
test_id = Column(UUID(as_uuid=True), ForeignKey("tests.id", ondelete="SET NULL"), nullable=True)
|
|
validated_at = Column(DateTime, default=datetime.utcnow)
|
|
expires_at = Column(DateTime, nullable=False)
|
|
is_valid = Column(Boolean, default=True, nullable=False, server_default='true')
|
|
validation_result = Column(String(50))
|
|
validation_method = Column(String(100))
|
|
rule_hash_at_validation = Column(String(64))
|
|
log_source_version_at_validation = Column(String(50))
|
|
infrastructure_hash_at_validation = Column(String(64))
|
|
environment_snapshot = Column(JSONB, server_default='{}')
|
|
invalidated_at = Column(DateTime)
|
|
invalidation_reason = Column(SQLEnum(InvalidationReason, name="invalidationreason"), nullable=True)
|
|
invalidation_details = Column(Text)
|
|
invalidated_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
|
|
validated_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=False)
|
|
integrity_hash = Column(String(64))
|
|
notes = Column(Text)
|
|
evidence_ids = Column(JSONB, server_default='[]')
|
|
|
|
detection_asset = relationship("DetectionAsset", back_populates="validations")
|
|
|
|
|
|
class TechniqueConfidenceScore(Base):
|
|
__tablename__ = "technique_confidence_scores"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
technique_id = Column(UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False, unique=True)
|
|
confidence_level = Column(
|
|
SQLEnum(DetectionConfidence, name="detectionconfidence"),
|
|
default=DetectionConfidence.unknown,
|
|
server_default="unknown",
|
|
)
|
|
confidence_score = Column(Float, default=0.0, server_default='0.0')
|
|
detection_count = Column(Integer, default=0, server_default='0')
|
|
valid_detection_count = Column(Integer, default=0, server_default='0')
|
|
last_validated_at = Column(DateTime)
|
|
next_validation_due = Column(DateTime)
|
|
last_recalculated_at = Column(DateTime, default=datetime.utcnow)
|
|
recency_factor = Column(Float, default=0.0, server_default='0.0')
|
|
coverage_factor = Column(Float, default=0.0, server_default='0.0')
|
|
health_factor = Column(Float, default=0.0, server_default='0.0')
|
|
diversity_factor = Column(Float, default=0.0, server_default='0.0')
|
|
score_breakdown = Column(JSONB, server_default='{}')
|
|
risk_factors = Column(JSONB, server_default='[]')
|
|
updated_at = Column(DateTime, default=datetime.utcnow)
|
|
|
|
|
|
class InfrastructureChangeLog(Base):
|
|
__tablename__ = "infrastructure_change_logs"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
change_type = Column(String(100), nullable=False)
|
|
description = Column(Text, nullable=False)
|
|
affected_platforms = Column(JSONB, server_default='[]')
|
|
affected_log_sources = Column(JSONB, server_default='[]')
|
|
change_date = Column(DateTime, default=datetime.utcnow)
|
|
reported_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
|
|
auto_invalidate = Column(Boolean, default=True, server_default='true')
|
|
invalidated_count = Column(Integer, default=0, server_default='0')
|
|
change_metadata = Column(JSONB, server_default='{}')
|
|
created_at = Column(DateTime, default=datetime.utcnow)
|