feat(dlm): Phase 8 — Detection Lifecycle Management [FASE-8]
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

Tasks 8.1-8.5:

Models (8.1):
- DetectionAsset: SIEM/EDR/Sigma rule assets with auto-hash
- DetectionTechniqueMapping: N:M asset ↔ technique coverage
- DetectionValidation: immutable validation records with expiry
- TechniqueConfidenceScore: computed multi-factor confidence
- InfrastructureChangeLog: infra changes that invalidate detections
- DecayPolicy: configurable freshness thresholds per platform/tactic

Services (8.2, 8.3):
- detection_asset_service: CRUD + SHA-256 rule hashing + auto-
  invalidation on rule/infra changes
- decay_engine_service: daily decay engine — expires stale validations,
  recalculates confidence (recency/coverage/health/diversity factors),
  processes infrastructure change propagation

Router (8.4): 15 endpoints under /api/v1/detection-lifecycle:
  assets CRUD, technique mappings, validations, confidence scores,
  infrastructure changes, decay trigger, executive dashboard

Scheduler (8.3): decay engine runs daily at 02:00
Seed (8.5): default policy (90/180/365d) + strict initial-access policy
Migration: b034dlm (6 tables, 11 indexes)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-19 15:45:16 +02:00
parent 0e1b8e2b39
commit 1fe150963c
11 changed files with 1369 additions and 0 deletions

View File

@@ -0,0 +1,140 @@
"""Pydantic schemas for Detection Lifecycle endpoints."""
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional
from uuid import UUID
from datetime import datetime
from app.models.detection_lifecycle import (
DetectionConfidence, DetectionHealthStatus, InvalidationReason
)
class DetectionAssetCreate(BaseModel):
name: str = Field(..., min_length=3, max_length=500)
description: Optional[str] = None
asset_type: str = Field(..., pattern=r'^(siem_rule|edr_rule|sigma_rule|yara_rule|spl_query|kql_query|custom_script)$')
platform: Optional[str] = None
rule_content: Optional[str] = None
rule_language: Optional[str] = None
rule_repository_url: Optional[str] = None
rule_file_path: Optional[str] = None
rule_version: Optional[str] = None
log_source_name: Optional[str] = None
log_source_version: Optional[str] = None
log_source_config: Optional[dict] = Field(default_factory=dict)
infrastructure_details: Optional[dict] = Field(default_factory=dict)
expected_alert_frequency: Optional[str] = None
tags: Optional[list[str]] = Field(default_factory=list)
technique_ids: Optional[list[UUID]] = Field(default_factory=list)
class DetectionAssetUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
rule_content: Optional[str] = None
rule_version: Optional[str] = None
log_source_version: Optional[str] = None
infrastructure_details: Optional[dict] = None
expected_alert_frequency: Optional[str] = None
health_status: Optional[DetectionHealthStatus] = None
last_alert_at: Optional[datetime] = None
alert_count_30d: Optional[int] = None
false_positive_rate: Optional[float] = None
owner_id: Optional[UUID] = None
backup_owner_id: Optional[UUID] = None
team: Optional[str] = None
tags: Optional[list[str]] = None
is_active: Optional[bool] = None
class DetectionAssetOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
name: str
description: Optional[str] = None
asset_type: str
platform: Optional[str] = None
rule_language: Optional[str] = None
rule_version: Optional[str] = None
rule_hash: Optional[str] = None
health_status: DetectionHealthStatus
last_alert_at: Optional[datetime] = None
alert_count_30d: int
false_positive_rate: Optional[float] = None
expected_alert_frequency: Optional[str] = None
owner_id: Optional[UUID] = None
team: Optional[str] = None
is_active: bool
tags: list = Field(default_factory=list)
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class DetectionValidationCreate(BaseModel):
detection_asset_id: UUID
technique_id: Optional[UUID] = None
test_id: Optional[UUID] = None
validation_result: str = Field(..., pattern=r'^(detected|not_detected|partial|error)$')
validation_method: str
notes: Optional[str] = None
evidence_ids: Optional[list[UUID]] = Field(default_factory=list)
validity_days: int = Field(default=180, ge=30, le=730)
class DetectionValidationOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
detection_asset_id: UUID
technique_id: Optional[UUID] = None
validated_at: Optional[datetime] = None
expires_at: datetime
is_valid: bool
validation_result: Optional[str] = None
validation_method: Optional[str] = None
invalidated_at: Optional[datetime] = None
invalidation_reason: Optional[InvalidationReason] = None
validated_by: Optional[UUID] = None
notes: Optional[str] = None
class TechniqueConfidenceOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
technique_id: UUID
confidence_level: DetectionConfidence
confidence_score: float
detection_count: int
valid_detection_count: int
last_validated_at: Optional[datetime] = None
next_validation_due: Optional[datetime] = None
recency_factor: float
coverage_factor: float
health_factor: float
diversity_factor: float
risk_factors: list = Field(default_factory=list)
class InfrastructureChangeCreate(BaseModel):
change_type: str
description: str = Field(..., min_length=10)
affected_platforms: list[str] = Field(default_factory=list)
affected_log_sources: list[str] = Field(default_factory=list)
change_date: Optional[datetime] = None
auto_invalidate: bool = True
class InfrastructureChangeOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
change_type: str
description: str
affected_platforms: list = Field(default_factory=list)
affected_log_sources: list = Field(default_factory=list)
change_date: Optional[datetime] = None
auto_invalidate: bool
invalidated_count: int
reported_by: Optional[UUID] = None
created_at: Optional[datetime] = None