Files
Aegis/backend/app/schemas/detection_lifecycle_schema.py
kitos 1fe150963c
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(dlm): Phase 8 — Detection Lifecycle Management [FASE-8]
Tasks 8.1-8.5:

Models (8.1):
- DetectionAsset: SIEM/EDR/Sigma rule assets with auto-hash
- DetectionTechniqueMapping: N:M asset ↔ technique coverage
- DetectionValidation: immutable validation records with expiry
- TechniqueConfidenceScore: computed multi-factor confidence
- InfrastructureChangeLog: infra changes that invalidate detections
- DecayPolicy: configurable freshness thresholds per platform/tactic

Services (8.2, 8.3):
- detection_asset_service: CRUD + SHA-256 rule hashing + auto-
  invalidation on rule/infra changes
- decay_engine_service: daily decay engine — expires stale validations,
  recalculates confidence (recency/coverage/health/diversity factors),
  processes infrastructure change propagation

Router (8.4): 15 endpoints under /api/v1/detection-lifecycle:
  assets CRUD, technique mappings, validations, confidence scores,
  infrastructure changes, decay trigger, executive dashboard

Scheduler (8.3): decay engine runs daily at 02:00
Seed (8.5): default policy (90/180/365d) + strict initial-access policy
Migration: b034dlm (6 tables, 11 indexes)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:45:16 +02:00

141 lines
4.8 KiB
Python

"""Pydantic schemas for Detection Lifecycle endpoints."""
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional
from uuid import UUID
from datetime import datetime
from app.models.detection_lifecycle import (
DetectionConfidence, DetectionHealthStatus, InvalidationReason
)
class DetectionAssetCreate(BaseModel):
name: str = Field(..., min_length=3, max_length=500)
description: Optional[str] = None
asset_type: str = Field(..., pattern=r'^(siem_rule|edr_rule|sigma_rule|yara_rule|spl_query|kql_query|custom_script)$')
platform: Optional[str] = None
rule_content: Optional[str] = None
rule_language: Optional[str] = None
rule_repository_url: Optional[str] = None
rule_file_path: Optional[str] = None
rule_version: Optional[str] = None
log_source_name: Optional[str] = None
log_source_version: Optional[str] = None
log_source_config: Optional[dict] = Field(default_factory=dict)
infrastructure_details: Optional[dict] = Field(default_factory=dict)
expected_alert_frequency: Optional[str] = None
tags: Optional[list[str]] = Field(default_factory=list)
technique_ids: Optional[list[UUID]] = Field(default_factory=list)
class DetectionAssetUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
rule_content: Optional[str] = None
rule_version: Optional[str] = None
log_source_version: Optional[str] = None
infrastructure_details: Optional[dict] = None
expected_alert_frequency: Optional[str] = None
health_status: Optional[DetectionHealthStatus] = None
last_alert_at: Optional[datetime] = None
alert_count_30d: Optional[int] = None
false_positive_rate: Optional[float] = None
owner_id: Optional[UUID] = None
backup_owner_id: Optional[UUID] = None
team: Optional[str] = None
tags: Optional[list[str]] = None
is_active: Optional[bool] = None
class DetectionAssetOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
name: str
description: Optional[str] = None
asset_type: str
platform: Optional[str] = None
rule_language: Optional[str] = None
rule_version: Optional[str] = None
rule_hash: Optional[str] = None
health_status: DetectionHealthStatus
last_alert_at: Optional[datetime] = None
alert_count_30d: int
false_positive_rate: Optional[float] = None
expected_alert_frequency: Optional[str] = None
owner_id: Optional[UUID] = None
team: Optional[str] = None
is_active: bool
tags: list = Field(default_factory=list)
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class DetectionValidationCreate(BaseModel):
detection_asset_id: UUID
technique_id: Optional[UUID] = None
test_id: Optional[UUID] = None
validation_result: str = Field(..., pattern=r'^(detected|not_detected|partial|error)$')
validation_method: str
notes: Optional[str] = None
evidence_ids: Optional[list[UUID]] = Field(default_factory=list)
validity_days: int = Field(default=180, ge=30, le=730)
class DetectionValidationOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
detection_asset_id: UUID
technique_id: Optional[UUID] = None
validated_at: Optional[datetime] = None
expires_at: datetime
is_valid: bool
validation_result: Optional[str] = None
validation_method: Optional[str] = None
invalidated_at: Optional[datetime] = None
invalidation_reason: Optional[InvalidationReason] = None
validated_by: Optional[UUID] = None
notes: Optional[str] = None
class TechniqueConfidenceOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
technique_id: UUID
confidence_level: DetectionConfidence
confidence_score: float
detection_count: int
valid_detection_count: int
last_validated_at: Optional[datetime] = None
next_validation_due: Optional[datetime] = None
recency_factor: float
coverage_factor: float
health_factor: float
diversity_factor: float
risk_factors: list = Field(default_factory=list)
class InfrastructureChangeCreate(BaseModel):
change_type: str
description: str = Field(..., min_length=10)
affected_platforms: list[str] = Field(default_factory=list)
affected_log_sources: list[str] = Field(default_factory=list)
change_date: Optional[datetime] = None
auto_invalidate: bool = True
class InfrastructureChangeOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
change_type: str
description: str
affected_platforms: list = Field(default_factory=list)
affected_log_sources: list = Field(default_factory=list)
change_date: Optional[datetime] = None
auto_invalidate: bool
invalidated_count: int
reported_by: Optional[UUID] = None
created_at: Optional[datetime] = None