Files
Aegis/backend/alembic/versions/b034_detection_lifecycle.py
kitos 1fe150963c
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(dlm): Phase 8 — Detection Lifecycle Management [FASE-8]
Tasks 8.1-8.5:

Models (8.1):
- DetectionAsset: SIEM/EDR/Sigma rule assets with auto-hash
- DetectionTechniqueMapping: N:M asset ↔ technique coverage
- DetectionValidation: immutable validation records with expiry
- TechniqueConfidenceScore: computed multi-factor confidence
- InfrastructureChangeLog: infra changes that invalidate detections
- DecayPolicy: configurable freshness thresholds per platform/tactic

Services (8.2, 8.3):
- detection_asset_service: CRUD + SHA-256 rule hashing + auto-
  invalidation on rule/infra changes
- decay_engine_service: daily decay engine — expires stale validations,
  recalculates confidence (recency/coverage/health/diversity factors),
  processes infrastructure change propagation

Router (8.4): 15 endpoints under /api/v1/detection-lifecycle:
  assets CRUD, technique mappings, validations, confidence scores,
  infrastructure changes, decay trigger, executive dashboard

Scheduler (8.3): decay engine runs daily at 02:00
Seed (8.5): default policy (90/180/365d) + strict initial-access policy
Migration: b034dlm (6 tables, 11 indexes)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:45:16 +02:00

175 lines
10 KiB
Python

"""Phase 8: Detection Lifecycle Management tables.
Revision ID: b034dlm
Revises: b033syscfg
"""
from typing import Sequence, Union
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
revision: str = "b034dlm"
down_revision: Union[str, None] = "b033syscfg"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _table_exists(name: str) -> bool:
bind = op.get_bind()
insp = sa.inspect(bind)
return name in insp.get_table_names()
def upgrade() -> None:
if not _table_exists("detection_assets"):
op.create_table(
"detection_assets",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("name", sa.String(500), nullable=False),
sa.Column("description", sa.Text),
sa.Column("asset_type", sa.String(50), nullable=False),
sa.Column("platform", sa.String(100)),
sa.Column("rule_content", sa.Text),
sa.Column("rule_language", sa.String(50)),
sa.Column("rule_repository_url", sa.Text),
sa.Column("rule_file_path", sa.String(500)),
sa.Column("rule_version", sa.String(50)),
sa.Column("rule_hash", sa.String(64)),
sa.Column("last_rule_change_at", sa.DateTime),
sa.Column("log_source_name", sa.String(200)),
sa.Column("log_source_version", sa.String(50)),
sa.Column("log_source_config", postgresql.JSONB, server_default="{}"),
sa.Column("infrastructure_hash", sa.String(64)),
sa.Column("infrastructure_details", postgresql.JSONB, server_default="{}"),
sa.Column("health_status", sa.String(20), server_default="untested", nullable=False),
sa.Column("last_alert_at", sa.DateTime),
sa.Column("alert_count_30d", sa.Integer, server_default="0"),
sa.Column("false_positive_rate", sa.Float),
sa.Column("expected_alert_frequency", sa.String(50)),
sa.Column("owner_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")),
sa.Column("backup_owner_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")),
sa.Column("team", sa.String(100)),
sa.Column("is_active", sa.Boolean, server_default="true", nullable=False),
sa.Column("tags", postgresql.JSONB, server_default="[]"),
sa.Column("asset_metadata", postgresql.JSONB, server_default="{}"),
sa.Column("created_by", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()")),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()")),
)
op.create_index("ix_detection_assets_platform", "detection_assets", ["platform"])
op.create_index("ix_detection_assets_health_status", "detection_assets", ["health_status"])
op.create_index("ix_detection_assets_owner_id", "detection_assets", ["owner_id"])
if not _table_exists("detection_technique_mappings"):
op.create_table(
"detection_technique_mappings",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("detection_asset_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=False),
sa.Column("technique_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False),
sa.Column("coverage_type", sa.String(50), server_default="detect"),
sa.Column("confidence_level", sa.String(20), server_default="medium"),
sa.Column("notes", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()")),
)
op.create_index("ix_detection_technique_mappings_technique_id", "detection_technique_mappings", ["technique_id"])
op.create_index("ix_detection_technique_mappings_asset_id", "detection_technique_mappings", ["detection_asset_id"])
if not _table_exists("detection_validations"):
op.create_table(
"detection_validations",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("detection_asset_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=False),
sa.Column("technique_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("techniques.id", ondelete="SET NULL")),
sa.Column("test_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("tests.id", ondelete="SET NULL")),
sa.Column("validated_at", sa.DateTime),
sa.Column("expires_at", sa.DateTime, nullable=False),
sa.Column("is_valid", sa.Boolean, server_default="true", nullable=False),
sa.Column("validation_result", sa.String(50)),
sa.Column("validation_method", sa.String(100)),
sa.Column("rule_hash_at_validation", sa.String(64)),
sa.Column("log_source_version_at_validation", sa.String(50)),
sa.Column("infrastructure_hash_at_validation", sa.String(64)),
sa.Column("environment_snapshot", postgresql.JSONB, server_default="{}"),
sa.Column("invalidated_at", sa.DateTime),
sa.Column("invalidation_reason", sa.String(50)),
sa.Column("invalidation_details", sa.Text),
sa.Column("invalidated_by", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")),
sa.Column("validated_by", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=False),
sa.Column("integrity_hash", sa.String(64)),
sa.Column("notes", sa.Text),
sa.Column("evidence_ids", postgresql.JSONB, server_default="[]"),
)
op.create_index("ix_detection_validations_asset_id_valid", "detection_validations", ["detection_asset_id", "is_valid"])
op.create_index("ix_detection_validations_expires_at", "detection_validations", ["expires_at"])
if not _table_exists("technique_confidence_scores"):
op.create_table(
"technique_confidence_scores",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("technique_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False, unique=True),
sa.Column("confidence_level", sa.String(20), server_default="unknown"),
sa.Column("confidence_score", sa.Float, server_default="0.0"),
sa.Column("detection_count", sa.Integer, server_default="0"),
sa.Column("valid_detection_count", sa.Integer, server_default="0"),
sa.Column("last_validated_at", sa.DateTime),
sa.Column("next_validation_due", sa.DateTime),
sa.Column("last_recalculated_at", sa.DateTime),
sa.Column("recency_factor", sa.Float, server_default="0.0"),
sa.Column("coverage_factor", sa.Float, server_default="0.0"),
sa.Column("health_factor", sa.Float, server_default="0.0"),
sa.Column("diversity_factor", sa.Float, server_default="0.0"),
sa.Column("score_breakdown", postgresql.JSONB, server_default="{}"),
sa.Column("risk_factors", postgresql.JSONB, server_default="[]"),
sa.Column("updated_at", sa.DateTime),
)
op.create_index("ix_technique_confidence_scores_technique_id", "technique_confidence_scores", ["technique_id"])
op.create_index("ix_technique_confidence_scores_confidence_level", "technique_confidence_scores", ["confidence_level"])
if not _table_exists("infrastructure_change_logs"):
op.create_table(
"infrastructure_change_logs",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("change_type", sa.String(100), nullable=False),
sa.Column("description", sa.Text, nullable=False),
sa.Column("affected_platforms", postgresql.JSONB, server_default="[]"),
sa.Column("affected_log_sources", postgresql.JSONB, server_default="[]"),
sa.Column("change_date", sa.DateTime),
sa.Column("reported_by", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL")),
sa.Column("auto_invalidate", sa.Boolean, server_default="true"),
sa.Column("invalidated_count", sa.Integer, server_default="0"),
sa.Column("change_metadata", postgresql.JSONB, server_default="{}"),
sa.Column("created_at", sa.DateTime),
)
op.create_index("ix_infrastructure_change_logs_change_date", "infrastructure_change_logs", ["change_date"])
if not _table_exists("decay_policies"):
op.create_table(
"decay_policies",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("name", sa.String(200), nullable=False),
sa.Column("description", sa.Text),
sa.Column("applies_to_platform", sa.String(100)),
sa.Column("applies_to_asset_type", sa.String(50)),
sa.Column("applies_to_tactic", sa.String(100)),
sa.Column("fresh_days", sa.Integer, server_default="90"),
sa.Column("aging_days", sa.Integer, server_default="180"),
sa.Column("stale_days", sa.Integer, server_default="365"),
sa.Column("default_validity_days", sa.Integer, server_default="180"),
sa.Column("silent_threshold_days", sa.Integer, server_default="30"),
sa.Column("noisy_threshold_daily", sa.Integer, server_default="100"),
sa.Column("recency_weight", sa.Float, server_default="0.3"),
sa.Column("coverage_weight", sa.Float, server_default="0.3"),
sa.Column("health_weight", sa.Float, server_default="0.25"),
sa.Column("diversity_weight", sa.Float, server_default="0.15"),
sa.Column("is_default", sa.Boolean, server_default="false"),
sa.Column("is_active", sa.Boolean, server_default="true"),
sa.Column("created_at", sa.DateTime),
sa.Column("updated_at", sa.DateTime),
)
def downgrade() -> None:
for table in ["decay_policies", "infrastructure_change_logs", "technique_confidence_scores", "detection_validations", "detection_technique_mappings", "detection_assets"]:
if _table_exists(table):
op.drop_table(table)