fix(detection-lifecycle): fix timezone naive/aware mismatch and duplicate technique mapping
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

- Replace datetime.now(timezone.utc) with datetime.utcnow() in _now() across
  all three Phase 8 files to match DB DateTime column type (naive UTC)
- Guard POST /assets/{id}/techniques/{tid} against duplicate mappings:
  if mapping already exists, update coverage_type/confidence_level instead
  of inserting a duplicate row

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-19 16:29:04 +02:00
parent 1fe150963c
commit 9a020f97ef
3 changed files with 23 additions and 6 deletions

View File

@@ -1,7 +1,7 @@
"""Detection Lifecycle Management router.""" """Detection Lifecycle Management router."""
import hashlib import hashlib
from datetime import datetime, timezone, timedelta from datetime import datetime, timedelta
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
@@ -28,7 +28,7 @@ router = APIRouter(prefix="/detection-lifecycle", tags=["detection-lifecycle"])
def _now() -> datetime: def _now() -> datetime:
return datetime.now(timezone.utc) return datetime.utcnow()
# ── Detection Assets ───────────────────────────────────────────────────────── # ── Detection Assets ─────────────────────────────────────────────────────────
@@ -81,6 +81,23 @@ def map_technique(
db: Session = Depends(get_db), db: Session = Depends(get_db),
user=Depends(get_current_user), user=Depends(get_current_user),
): ):
# Validate asset exists
asset = db.query(DetectionAsset).filter(DetectionAsset.id == asset_id).first()
if not asset:
raise EntityNotFoundError("DetectionAsset", str(asset_id))
# Prevent duplicate mappings
existing = db.query(DetectionTechniqueMapping).filter(
DetectionTechniqueMapping.detection_asset_id == asset_id,
DetectionTechniqueMapping.technique_id == technique_id,
).first()
if existing:
# Update coverage/confidence on existing mapping instead of duplicating
existing.coverage_type = coverage_type
existing.confidence_level = confidence_level
db.commit()
return {"message": "Technique mapping updated", "mapping_id": str(existing.id)}
mapping = DetectionTechniqueMapping( mapping = DetectionTechniqueMapping(
detection_asset_id=asset_id, technique_id=technique_id, detection_asset_id=asset_id, technique_id=technique_id,
coverage_type=coverage_type, confidence_level=confidence_level, coverage_type=coverage_type, confidence_level=confidence_level,

View File

@@ -1,7 +1,7 @@
"""Decay Engine — calculates confidence scores and expires validations.""" """Decay Engine — calculates confidence scores and expires validations."""
import logging import logging
from datetime import datetime, timezone from datetime import datetime
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
@@ -20,7 +20,7 @@ logger = logging.getLogger(__name__)
def _now() -> datetime: def _now() -> datetime:
return datetime.now(timezone.utc) return datetime.utcnow()
def get_applicable_policy(db: Session, platform: Optional[str] = None, asset_type: Optional[str] = None, tactic: Optional[str] = None) -> DecayPolicy: def get_applicable_policy(db: Session, platform: Optional[str] = None, asset_type: Optional[str] = None, tactic: Optional[str] = None) -> DecayPolicy:

View File

@@ -2,7 +2,7 @@
import hashlib import hashlib
import logging import logging
from datetime import datetime, timezone from datetime import datetime
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
@@ -25,7 +25,7 @@ def _compute_rule_hash(content: str) -> str:
def _now() -> datetime: def _now() -> datetime:
return datetime.now(timezone.utc) return datetime.utcnow()
def create_detection_asset(db: Session, data: dict, user_id: UUID) -> DetectionAsset: def create_detection_asset(db: Session, data: dict, user_id: UUID) -> DetectionAsset: