Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Backend:
- TechniqueOwnership model: per-technique owner, backup owner, team
- RevalidationQueueItem model: prioritised analyst work queue
(critical/high/medium/low, reasons: validation_expired/infra_change/
osint_alert/mitre_update/rule_modified/low_confidence/manual)
- Migration b035ownerq: creates technique_ownerships and
revalidation_queue_items tables with full indexes
Services:
- ownership_service: set/get technique ownership, bulk assign by tactic
or platform, orphan reports for techniques and assets
- revalidation_queue_service: smart queue generation (scans expired
validations, low-confidence techniques, recent infra changes),
list/create/update queue items, analyst dashboard
Router /api/v1/ownership:
GET/PUT /ownership/techniques/{id} — technique ownership
PATCH /ownership/assets/{id} — asset ownership
GET /ownership/orphans/techniques — orphan report
GET /ownership/orphans/assets — orphan report
POST /ownership/bulk-assign — bulk by tactic/platform
GET/POST /ownership/queue — revalidation queue CRUD
PATCH /ownership/queue/{id} — update item status/assignee
POST /ownership/queue/generate — scan & generate items
GET /ownership/analyst-dashboard — personalised daily view
Scheduler: queue_generation job daily at 02:30 (after decay engine)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
132 lines
4.5 KiB
Python
132 lines
4.5 KiB
Python
"""Pydantic schemas for Phase 9: Ownership & Revalidation Queue."""
|
|
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
from pydantic import BaseModel, ConfigDict, field_validator
|
|
|
|
|
|
# ── Technique Ownership ───────────────────────────────────────────────────────
|
|
|
|
class TechniqueOwnershipSet(BaseModel):
|
|
"""Set (create or replace) ownership for a technique."""
|
|
owner_id: Optional[UUID] = None
|
|
backup_owner_id: Optional[UUID] = None
|
|
team: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
|
|
|
|
class TechniqueOwnershipOut(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: UUID
|
|
technique_id: UUID
|
|
owner_id: Optional[UUID] = None
|
|
backup_owner_id: Optional[UUID] = None
|
|
team: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
assigned_at: Optional[datetime] = None
|
|
assigned_by: Optional[UUID] = None
|
|
created_at: Optional[datetime] = None
|
|
updated_at: Optional[datetime] = None
|
|
|
|
|
|
class DetectionAssetOwnershipPatch(BaseModel):
|
|
"""Update ownership fields on a detection asset."""
|
|
owner_id: Optional[UUID] = None
|
|
backup_owner_id: Optional[UUID] = None
|
|
team: Optional[str] = None
|
|
|
|
|
|
# ── Bulk Assignment ───────────────────────────────────────────────────────────
|
|
|
|
class BulkAssignRequest(BaseModel):
|
|
"""Bulk-assign ownership by tactic, platform, or team filter."""
|
|
owner_id: Optional[UUID] = None
|
|
backup_owner_id: Optional[UUID] = None
|
|
team: Optional[str] = None
|
|
# Filters — at least one must be set
|
|
tactic: Optional[str] = None # assign all techniques with this tactic
|
|
platform: Optional[str] = None # assign all detection assets with this platform
|
|
overwrite: bool = False # overwrite existing assignments
|
|
|
|
|
|
class BulkAssignResult(BaseModel):
|
|
assigned_count: int
|
|
skipped_count: int
|
|
target_type: str # "technique" or "detection_asset"
|
|
|
|
|
|
# ── Revalidation Queue ────────────────────────────────────────────────────────
|
|
|
|
class QueueItemPatch(BaseModel):
|
|
"""Update a revalidation queue item."""
|
|
status: Optional[str] = None
|
|
assigned_to: Optional[UUID] = None
|
|
priority: Optional[str] = None
|
|
due_date: Optional[datetime] = None
|
|
|
|
@field_validator("status")
|
|
@classmethod
|
|
def validate_status(cls, v):
|
|
from app.models.ownership_queue import QueueStatus
|
|
if v is not None:
|
|
try:
|
|
QueueStatus(v)
|
|
except ValueError:
|
|
raise ValueError(f"Invalid status: {v}")
|
|
return v
|
|
|
|
@field_validator("priority")
|
|
@classmethod
|
|
def validate_priority(cls, v):
|
|
from app.models.ownership_queue import QueuePriority
|
|
if v is not None:
|
|
try:
|
|
QueuePriority(v)
|
|
except ValueError:
|
|
raise ValueError(f"Invalid priority: {v}")
|
|
return v
|
|
|
|
|
|
class QueueItemCreate(BaseModel):
|
|
"""Manually create a queue item."""
|
|
technique_id: Optional[UUID] = None
|
|
detection_asset_id: Optional[UUID] = None
|
|
priority: str = "medium"
|
|
reason: str = "manual"
|
|
reason_detail: Optional[str] = None
|
|
assigned_to: Optional[UUID] = None
|
|
due_date: Optional[datetime] = None
|
|
|
|
|
|
class QueueItemOut(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: UUID
|
|
technique_id: Optional[UUID] = None
|
|
detection_asset_id: Optional[UUID] = None
|
|
priority: str
|
|
reason: str
|
|
reason_detail: Optional[str] = None
|
|
status: str
|
|
assigned_to: Optional[UUID] = None
|
|
due_date: Optional[datetime] = None
|
|
created_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
dismissed_at: Optional[datetime] = None
|
|
completed_by: Optional[UUID] = None
|
|
extra: Optional[dict] = None
|
|
|
|
|
|
# ── Analyst Dashboard ─────────────────────────────────────────────────────────
|
|
|
|
class AnalystDashboard(BaseModel):
|
|
"""Personalised daily workday view for an analyst."""
|
|
my_pending_items: list[QueueItemOut]
|
|
expiring_validations_7d: list[dict]
|
|
recent_infra_changes: list[dict]
|
|
my_low_confidence_techniques: list[dict]
|
|
summary: dict
|