"""Phase 9: Ownership & Daily Operations router.""" from typing import Optional from uuid import UUID from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user, require_any_role from app.domain.exceptions import EntityNotFoundError from app.schemas.ownership_queue_schema import ( TechniqueOwnershipSet, TechniqueOwnershipOut, DetectionAssetOwnershipPatch, BulkAssignRequest, BulkAssignResult, QueueItemCreate, QueueItemPatch, QueueItemOut, AnalystDashboard, ) from app.services import ownership_service, revalidation_queue_service from app.models.ownership_queue import RevalidationQueueItem router = APIRouter(prefix="/ownership", tags=["ownership"]) # ── Technique Ownership ─────────────────────────────────────────────────────── @router.get("/techniques/{technique_id}", response_model=TechniqueOwnershipOut) def get_technique_ownership( technique_id: UUID, db: Session = Depends(get_db), user=Depends(get_current_user), ): ownership = ownership_service.get_technique_ownership(db, technique_id) if not ownership: raise EntityNotFoundError("TechniqueOwnership", str(technique_id)) return ownership @router.put("/techniques/{technique_id}", response_model=TechniqueOwnershipOut) def set_technique_ownership( technique_id: UUID, body: TechniqueOwnershipSet, db: Session = Depends(get_db), user=Depends(require_any_role("admin", "blue_lead", "red_lead")), ): return ownership_service.set_technique_ownership( db, technique_id, owner_id=body.owner_id, backup_owner_id=body.backup_owner_id, team=body.team, notes=body.notes, assigned_by=user.id, ) # ── Detection Asset Ownership ───────────────────────────────────────────────── @router.patch("/assets/{asset_id}", response_model=dict) def set_asset_ownership( asset_id: UUID, body: DetectionAssetOwnershipPatch, db: Session = Depends(get_db), user=Depends(require_any_role("admin", "blue_lead")), ): ownership_service.set_asset_ownership( db, asset_id, owner_id=body.owner_id, backup_owner_id=body.backup_owner_id, team=body.team, user_id=user.id, ) return {"message": "Asset ownership updated"} # ── Orphan Reports ──────────────────────────────────────────────────────────── @router.get("/orphans/techniques", response_model=list[dict]) def orphan_techniques( db: Session = Depends(get_db), user=Depends(get_current_user), ): """Return techniques with no assigned owner.""" return ownership_service.get_orphan_techniques(db) @router.get("/orphans/assets", response_model=list[dict]) def orphan_assets( db: Session = Depends(get_db), user=Depends(get_current_user), ): """Return active detection assets with no assigned owner.""" return ownership_service.get_orphan_assets(db) # ── Bulk Assignment ─────────────────────────────────────────────────────────── @router.post("/bulk-assign", response_model=BulkAssignResult) def bulk_assign( body: BulkAssignRequest, db: Session = Depends(get_db), user=Depends(require_any_role("admin", "blue_lead", "red_lead")), ): """ Bulk-assign ownership. - If `tactic` is set → assigns technique ownership for all techniques of that tactic. - If `platform` is set → assigns asset ownership for all assets on that platform. At least one of tactic/platform must be provided. """ if not body.tactic and not body.platform: from fastapi import HTTPException raise HTTPException(status_code=422, detail="Provide at least one of: tactic, platform") if body.tactic: result = ownership_service.bulk_assign_techniques_by_tactic( db, body.tactic, owner_id=body.owner_id, backup_owner_id=body.backup_owner_id, team=body.team, overwrite=body.overwrite, user_id=user.id, ) else: result = ownership_service.bulk_assign_assets_by_platform( db, body.platform, owner_id=body.owner_id, backup_owner_id=body.backup_owner_id, team=body.team, overwrite=body.overwrite, user_id=user.id, ) return BulkAssignResult(**result) # ── Revalidation Queue ──────────────────────────────────────────────────────── @router.get("/queue", response_model=list[QueueItemOut]) def list_queue( status: Optional[str] = Query(None), priority: Optional[str] = Query(None), reason: Optional[str] = Query(None), assigned_to: Optional[UUID] = Query(None), technique_id: Optional[UUID] = Query(None), detection_asset_id: Optional[UUID] = Query(None), limit: int = Query(100, ge=1, le=500), offset: int = Query(0, ge=0), db: Session = Depends(get_db), user=Depends(get_current_user), ): return revalidation_queue_service.list_queue( db, status=status, priority=priority, reason=reason, assigned_to=assigned_to, technique_id=technique_id, detection_asset_id=detection_asset_id, limit=limit, offset=offset, ) @router.post("/queue", response_model=QueueItemOut, status_code=201) def create_queue_item( body: QueueItemCreate, db: Session = Depends(get_db), user=Depends(get_current_user), ): return revalidation_queue_service.create_queue_item(db, body.model_dump(), user.id) @router.patch("/queue/{item_id}", response_model=QueueItemOut) def update_queue_item( item_id: UUID, body: QueueItemPatch, db: Session = Depends(get_db), user=Depends(get_current_user), ): return revalidation_queue_service.update_queue_item(db, item_id, body.model_dump(exclude_unset=True), user.id) @router.post("/queue/generate", response_model=dict) def generate_queue( db: Session = Depends(get_db), user=Depends(require_any_role("admin", "blue_lead")), ): """Scan the system and create new revalidation queue items.""" return revalidation_queue_service.generate_queue_items(db) # ── Analyst Dashboard ───────────────────────────────────────────────────────── @router.get("/analyst-dashboard") def analyst_dashboard( db: Session = Depends(get_db), user=Depends(get_current_user), ): """Personalised daily workday view: my queue, expiring validations, infra changes, low-confidence techniques.""" dashboard = revalidation_queue_service.get_analyst_dashboard(db, user.id) # Serialize queue items to dicts (ORM objects → plain dicts) def _item_to_dict(item: RevalidationQueueItem) -> dict: return { "id": str(item.id), "technique_id": str(item.technique_id) if item.technique_id else None, "detection_asset_id": str(item.detection_asset_id) if item.detection_asset_id else None, "priority": item.priority.value if hasattr(item.priority, "value") else item.priority, "reason": item.reason.value if hasattr(item.reason, "value") else item.reason, "reason_detail": item.reason_detail, "status": item.status.value if hasattr(item.status, "value") else item.status, "assigned_to": str(item.assigned_to) if item.assigned_to else None, "due_date": item.due_date.isoformat() if item.due_date else None, "created_at": item.created_at.isoformat() if item.created_at else None, } return { "my_pending_items": [_item_to_dict(i) for i in dashboard["my_pending_items"]], "expiring_validations_7d": dashboard["expiring_validations_7d"], "recent_infra_changes": dashboard["recent_infra_changes"], "my_low_confidence_techniques": dashboard["my_low_confidence_techniques"], "summary": dashboard["summary"], }