From 9a020f97ef6fc04a2c7de0ae289f48a091304f13 Mon Sep 17 00:00:00 2001 From: kitos Date: Tue, 19 May 2026 16:29:04 +0200 Subject: [PATCH] fix(detection-lifecycle): fix timezone naive/aware mismatch and duplicate technique mapping - Replace datetime.now(timezone.utc) with datetime.utcnow() in _now() across all three Phase 8 files to match DB DateTime column type (naive UTC) - Guard POST /assets/{id}/techniques/{tid} against duplicate mappings: if mapping already exists, update coverage_type/confidence_level instead of inserting a duplicate row Co-Authored-By: Claude Sonnet 4.6 --- backend/app/routers/detection_lifecycle.py | 21 +++++++++++++++++-- backend/app/services/decay_engine_service.py | 4 ++-- .../app/services/detection_asset_service.py | 4 ++-- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/backend/app/routers/detection_lifecycle.py b/backend/app/routers/detection_lifecycle.py index 4f63ee5..8905ee2 100644 --- a/backend/app/routers/detection_lifecycle.py +++ b/backend/app/routers/detection_lifecycle.py @@ -1,7 +1,7 @@ """Detection Lifecycle Management router.""" import hashlib -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta from typing import Optional from uuid import UUID @@ -28,7 +28,7 @@ router = APIRouter(prefix="/detection-lifecycle", tags=["detection-lifecycle"]) def _now() -> datetime: - return datetime.now(timezone.utc) + return datetime.utcnow() # ── Detection Assets ───────────────────────────────────────────────────────── @@ -81,6 +81,23 @@ def map_technique( db: Session = Depends(get_db), user=Depends(get_current_user), ): + # Validate asset exists + asset = db.query(DetectionAsset).filter(DetectionAsset.id == asset_id).first() + if not asset: + raise EntityNotFoundError("DetectionAsset", str(asset_id)) + + # Prevent duplicate mappings + existing = db.query(DetectionTechniqueMapping).filter( + DetectionTechniqueMapping.detection_asset_id == asset_id, + DetectionTechniqueMapping.technique_id == technique_id, + ).first() + if existing: + # Update coverage/confidence on existing mapping instead of duplicating + existing.coverage_type = coverage_type + existing.confidence_level = confidence_level + db.commit() + return {"message": "Technique mapping updated", "mapping_id": str(existing.id)} + mapping = DetectionTechniqueMapping( detection_asset_id=asset_id, technique_id=technique_id, coverage_type=coverage_type, confidence_level=confidence_level, diff --git a/backend/app/services/decay_engine_service.py b/backend/app/services/decay_engine_service.py index b8b52c7..4d9324f 100644 --- a/backend/app/services/decay_engine_service.py +++ b/backend/app/services/decay_engine_service.py @@ -1,7 +1,7 @@ """Decay Engine — calculates confidence scores and expires validations.""" import logging -from datetime import datetime, timezone +from datetime import datetime from typing import Optional from uuid import UUID @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) def _now() -> datetime: - return datetime.now(timezone.utc) + return datetime.utcnow() def get_applicable_policy(db: Session, platform: Optional[str] = None, asset_type: Optional[str] = None, tactic: Optional[str] = None) -> DecayPolicy: diff --git a/backend/app/services/detection_asset_service.py b/backend/app/services/detection_asset_service.py index 6d65c48..563bc68 100644 --- a/backend/app/services/detection_asset_service.py +++ b/backend/app/services/detection_asset_service.py @@ -2,7 +2,7 @@ import hashlib import logging -from datetime import datetime, timezone +from datetime import datetime from typing import Optional from uuid import UUID @@ -25,7 +25,7 @@ def _compute_rule_hash(content: str) -> str: def _now() -> datetime: - return datetime.now(timezone.utc) + return datetime.utcnow() def create_detection_asset(db: Session, data: dict, user_id: UUID) -> DetectionAsset: