Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Backend:
- TechniqueOwnership model: per-technique owner, backup owner, team
- RevalidationQueueItem model: prioritised analyst work queue
(critical/high/medium/low, reasons: validation_expired/infra_change/
osint_alert/mitre_update/rule_modified/low_confidence/manual)
- Migration b035ownerq: creates technique_ownerships and
revalidation_queue_items tables with full indexes
Services:
- ownership_service: set/get technique ownership, bulk assign by tactic
or platform, orphan reports for techniques and assets
- revalidation_queue_service: smart queue generation (scans expired
validations, low-confidence techniques, recent infra changes),
list/create/update queue items, analyst dashboard
Router /api/v1/ownership:
GET/PUT /ownership/techniques/{id} — technique ownership
PATCH /ownership/assets/{id} — asset ownership
GET /ownership/orphans/techniques — orphan report
GET /ownership/orphans/assets — orphan report
POST /ownership/bulk-assign — bulk by tactic/platform
GET/POST /ownership/queue — revalidation queue CRUD
PATCH /ownership/queue/{id} — update item status/assignee
POST /ownership/queue/generate — scan & generate items
GET /ownership/analyst-dashboard — personalised daily view
Scheduler: queue_generation job daily at 02:30 (after decay engine)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
216 lines
6.6 KiB
Python
216 lines
6.6 KiB
Python
"""Phase 9: Ownership service — techniques and detection assets."""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.ownership_queue import TechniqueOwnership
|
|
from app.models.detection_lifecycle import DetectionAsset
|
|
from app.models.technique import Technique
|
|
from app.domain.exceptions import EntityNotFoundError
|
|
from app.services import audit_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _now() -> datetime:
|
|
return datetime.utcnow()
|
|
|
|
|
|
# ── Technique Ownership ───────────────────────────────────────────────────────
|
|
|
|
def get_technique_ownership(db: Session, technique_id: UUID) -> Optional[TechniqueOwnership]:
|
|
return db.query(TechniqueOwnership).filter(
|
|
TechniqueOwnership.technique_id == technique_id
|
|
).first()
|
|
|
|
|
|
def set_technique_ownership(
|
|
db: Session,
|
|
technique_id: UUID,
|
|
owner_id: Optional[UUID],
|
|
backup_owner_id: Optional[UUID],
|
|
team: Optional[str],
|
|
notes: Optional[str],
|
|
assigned_by: UUID,
|
|
) -> TechniqueOwnership:
|
|
technique = db.query(Technique).filter(Technique.id == technique_id).first()
|
|
if not technique:
|
|
raise EntityNotFoundError("Technique", str(technique_id))
|
|
|
|
ownership = db.query(TechniqueOwnership).filter(
|
|
TechniqueOwnership.technique_id == technique_id
|
|
).first()
|
|
|
|
if not ownership:
|
|
ownership = TechniqueOwnership(technique_id=technique_id)
|
|
db.add(ownership)
|
|
|
|
ownership.owner_id = owner_id
|
|
ownership.backup_owner_id = backup_owner_id
|
|
ownership.team = team
|
|
ownership.notes = notes
|
|
ownership.assigned_at = _now()
|
|
ownership.assigned_by = assigned_by
|
|
ownership.updated_at = _now()
|
|
|
|
db.commit()
|
|
db.refresh(ownership)
|
|
|
|
audit_service.log_action(
|
|
db, assigned_by, "TECHNIQUE_OWNERSHIP_SET", "technique_ownership", str(ownership.id),
|
|
details={"technique_id": str(technique_id), "owner_id": str(owner_id) if owner_id else None, "team": team},
|
|
)
|
|
return ownership
|
|
|
|
|
|
def get_orphan_techniques(db: Session) -> list[dict]:
|
|
"""Techniques with no ownership record or null owner_id."""
|
|
owned_ids = db.query(TechniqueOwnership.technique_id).filter(
|
|
TechniqueOwnership.owner_id.isnot(None)
|
|
).subquery()
|
|
|
|
orphans = db.query(Technique).filter(
|
|
~Technique.id.in_(owned_ids)
|
|
).order_by(Technique.tactic, Technique.mitre_id).all()
|
|
|
|
return [
|
|
{
|
|
"technique_id": str(t.id),
|
|
"mitre_id": t.mitre_id,
|
|
"name": t.name,
|
|
"tactic": t.tactic,
|
|
}
|
|
for t in orphans
|
|
]
|
|
|
|
|
|
# ── Detection Asset Ownership ─────────────────────────────────────────────────
|
|
|
|
def set_asset_ownership(
|
|
db: Session,
|
|
asset_id: UUID,
|
|
owner_id: Optional[UUID],
|
|
backup_owner_id: Optional[UUID],
|
|
team: Optional[str],
|
|
user_id: UUID,
|
|
) -> DetectionAsset:
|
|
asset = db.query(DetectionAsset).filter(DetectionAsset.id == asset_id).first()
|
|
if not asset:
|
|
raise EntityNotFoundError("DetectionAsset", str(asset_id))
|
|
|
|
asset.owner_id = owner_id
|
|
asset.backup_owner_id = backup_owner_id
|
|
asset.team = team
|
|
db.commit()
|
|
db.refresh(asset)
|
|
|
|
audit_service.log_action(
|
|
db, user_id, "ASSET_OWNERSHIP_SET", "detection_asset", str(asset_id),
|
|
details={"owner_id": str(owner_id) if owner_id else None, "team": team},
|
|
)
|
|
return asset
|
|
|
|
|
|
def get_orphan_assets(db: Session) -> list[dict]:
|
|
"""Active detection assets with no owner."""
|
|
orphans = db.query(DetectionAsset).filter(
|
|
DetectionAsset.is_active == True,
|
|
DetectionAsset.owner_id.is_(None),
|
|
).order_by(DetectionAsset.platform, DetectionAsset.name).all()
|
|
|
|
return [
|
|
{
|
|
"asset_id": str(a.id),
|
|
"name": a.name,
|
|
"platform": a.platform,
|
|
"asset_type": a.asset_type,
|
|
"health_status": a.health_status.value if a.health_status else None,
|
|
}
|
|
for a in orphans
|
|
]
|
|
|
|
|
|
# ── Bulk Assignment ───────────────────────────────────────────────────────────
|
|
|
|
def bulk_assign_techniques_by_tactic(
|
|
db: Session,
|
|
tactic: str,
|
|
owner_id: Optional[UUID],
|
|
backup_owner_id: Optional[UUID],
|
|
team: Optional[str],
|
|
overwrite: bool,
|
|
user_id: UUID,
|
|
) -> dict:
|
|
techniques = db.query(Technique).filter(Technique.tactic == tactic).all()
|
|
assigned = 0
|
|
skipped = 0
|
|
now = _now()
|
|
|
|
for technique in techniques:
|
|
existing = db.query(TechniqueOwnership).filter(
|
|
TechniqueOwnership.technique_id == technique.id
|
|
).first()
|
|
|
|
if existing and existing.owner_id and not overwrite:
|
|
skipped += 1
|
|
continue
|
|
|
|
if not existing:
|
|
existing = TechniqueOwnership(technique_id=technique.id)
|
|
db.add(existing)
|
|
|
|
existing.owner_id = owner_id
|
|
existing.backup_owner_id = backup_owner_id
|
|
existing.team = team
|
|
existing.assigned_at = now
|
|
existing.assigned_by = user_id
|
|
existing.updated_at = now
|
|
assigned += 1
|
|
|
|
db.commit()
|
|
|
|
audit_service.log_action(
|
|
db, user_id, "BULK_OWNERSHIP_ASSIGNED", "technique_ownership", None,
|
|
details={"tactic": tactic, "assigned": assigned, "skipped": skipped, "team": team},
|
|
)
|
|
logger.info("Bulk ownership: tactic=%s assigned=%d skipped=%d", tactic, assigned, skipped)
|
|
return {"assigned_count": assigned, "skipped_count": skipped, "target_type": "technique"}
|
|
|
|
|
|
def bulk_assign_assets_by_platform(
|
|
db: Session,
|
|
platform: str,
|
|
owner_id: Optional[UUID],
|
|
backup_owner_id: Optional[UUID],
|
|
team: Optional[str],
|
|
overwrite: bool,
|
|
user_id: UUID,
|
|
) -> dict:
|
|
assets = db.query(DetectionAsset).filter(
|
|
DetectionAsset.platform == platform,
|
|
DetectionAsset.is_active == True,
|
|
).all()
|
|
assigned = 0
|
|
skipped = 0
|
|
|
|
for asset in assets:
|
|
if asset.owner_id and not overwrite:
|
|
skipped += 1
|
|
continue
|
|
asset.owner_id = owner_id
|
|
asset.backup_owner_id = backup_owner_id
|
|
asset.team = team
|
|
assigned += 1
|
|
|
|
db.commit()
|
|
|
|
audit_service.log_action(
|
|
db, user_id, "BULK_ASSET_OWNERSHIP_ASSIGNED", "detection_asset", None,
|
|
details={"platform": platform, "assigned": assigned, "skipped": skipped},
|
|
)
|
|
return {"assigned_count": assigned, "skipped_count": skipped, "target_type": "detection_asset"}
|