fix(detection-lifecycle): fix timezone naive/aware mismatch and duplicate technique mapping

- Replace datetime.now(timezone.utc) with datetime.utcnow() in _now() across
  all three Phase 8 files to match DB DateTime column type (naive UTC)
- Guard POST /assets/{id}/techniques/{tid} against duplicate mappings:
  if mapping already exists, update coverage_type/confidence_level instead
  of inserting a duplicate row
This commit is contained in:
kitos
2026-05-19 16:29:04 +02:00
parent 634abc289b
commit 61c26ddd0f
3 changed files with 23 additions and 6 deletions
+19 -2
View File
@@ -1,7 +1,7 @@
"""Detection Lifecycle Management router.""" """Detection Lifecycle Management router."""
import hashlib import hashlib
from datetime import datetime, timezone, timedelta from datetime import datetime, timedelta
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
@@ -28,7 +28,7 @@ router = APIRouter(prefix="/detection-lifecycle", tags=["detection-lifecycle"])
def _now() -> datetime: def _now() -> datetime:
return datetime.now(timezone.utc) return datetime.utcnow()
# ── Detection Assets ───────────────────────────────────────────────────────── # ── Detection Assets ─────────────────────────────────────────────────────────
@@ -81,6 +81,23 @@ def map_technique(
db: Session = Depends(get_db), db: Session = Depends(get_db),
user=Depends(get_current_user), user=Depends(get_current_user),
): ):
# Validate asset exists
asset = db.query(DetectionAsset).filter(DetectionAsset.id == asset_id).first()
if not asset:
raise EntityNotFoundError("DetectionAsset", str(asset_id))
# Prevent duplicate mappings
existing = db.query(DetectionTechniqueMapping).filter(
DetectionTechniqueMapping.detection_asset_id == asset_id,
DetectionTechniqueMapping.technique_id == technique_id,
).first()
if existing:
# Update coverage/confidence on existing mapping instead of duplicating
existing.coverage_type = coverage_type
existing.confidence_level = confidence_level
db.commit()
return {"message": "Technique mapping updated", "mapping_id": str(existing.id)}
mapping = DetectionTechniqueMapping( mapping = DetectionTechniqueMapping(
detection_asset_id=asset_id, technique_id=technique_id, detection_asset_id=asset_id, technique_id=technique_id,
coverage_type=coverage_type, confidence_level=confidence_level, coverage_type=coverage_type, confidence_level=confidence_level,
+2 -2
View File
@@ -1,7 +1,7 @@
"""Decay Engine — calculates confidence scores and expires validations.""" """Decay Engine — calculates confidence scores and expires validations."""
import logging import logging
from datetime import datetime, timezone from datetime import datetime
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
@@ -20,7 +20,7 @@ logger = logging.getLogger(__name__)
def _now() -> datetime: def _now() -> datetime:
return datetime.now(timezone.utc) return datetime.utcnow()
def get_applicable_policy(db: Session, platform: Optional[str] = None, asset_type: Optional[str] = None, tactic: Optional[str] = None) -> DecayPolicy: def get_applicable_policy(db: Session, platform: Optional[str] = None, asset_type: Optional[str] = None, tactic: Optional[str] = None) -> DecayPolicy:
@@ -2,7 +2,7 @@
import hashlib import hashlib
import logging import logging
from datetime import datetime, timezone from datetime import datetime
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
@@ -25,7 +25,7 @@ def _compute_rule_hash(content: str) -> str:
def _now() -> datetime: def _now() -> datetime:
return datetime.now(timezone.utc) return datetime.utcnow()
def create_detection_asset(db: Session, data: dict, user_id: UUID) -> DetectionAsset: def create_detection_asset(db: Session, data: dict, user_id: UUID) -> DetectionAsset: