Files
Aegis/backend/app/schemas/ownership_queue_schema.py
kitos 4ece2293ec
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix(ownership): validate reason+priority in QueueItemCreate to return 422 not 500
POST /ownership/queue with an invalid reason or priority was silently
passing Pydantic and crashing at the DB layer (PostgreSQL enum type
mismatch → 500). Added @field_validator for both fields, matching the
existing validators in QueueItemPatch.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 17:57:34 +02:00

154 lines
5.2 KiB
Python

"""Pydantic schemas for Phase 9: Ownership & Revalidation Queue."""
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, ConfigDict, field_validator
# ── Technique Ownership ───────────────────────────────────────────────────────
class TechniqueOwnershipSet(BaseModel):
"""Set (create or replace) ownership for a technique."""
owner_id: Optional[UUID] = None
backup_owner_id: Optional[UUID] = None
team: Optional[str] = None
notes: Optional[str] = None
class TechniqueOwnershipOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
technique_id: UUID
owner_id: Optional[UUID] = None
backup_owner_id: Optional[UUID] = None
team: Optional[str] = None
notes: Optional[str] = None
assigned_at: Optional[datetime] = None
assigned_by: Optional[UUID] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class DetectionAssetOwnershipPatch(BaseModel):
"""Update ownership fields on a detection asset."""
owner_id: Optional[UUID] = None
backup_owner_id: Optional[UUID] = None
team: Optional[str] = None
# ── Bulk Assignment ───────────────────────────────────────────────────────────
class BulkAssignRequest(BaseModel):
"""Bulk-assign ownership by tactic, platform, or team filter."""
owner_id: Optional[UUID] = None
backup_owner_id: Optional[UUID] = None
team: Optional[str] = None
# Filters — at least one must be set
tactic: Optional[str] = None # assign all techniques with this tactic
platform: Optional[str] = None # assign all detection assets with this platform
overwrite: bool = False # overwrite existing assignments
class BulkAssignResult(BaseModel):
assigned_count: int
skipped_count: int
target_type: str # "technique" or "detection_asset"
# ── Revalidation Queue ────────────────────────────────────────────────────────
class QueueItemPatch(BaseModel):
"""Update a revalidation queue item."""
status: Optional[str] = None
assigned_to: Optional[UUID] = None
priority: Optional[str] = None
due_date: Optional[datetime] = None
@field_validator("status")
@classmethod
def validate_status(cls, v):
from app.models.ownership_queue import QueueStatus
if v is not None:
try:
QueueStatus(v)
except ValueError:
raise ValueError(f"Invalid status: {v}")
return v
@field_validator("priority")
@classmethod
def validate_priority(cls, v):
from app.models.ownership_queue import QueuePriority
if v is not None:
try:
QueuePriority(v)
except ValueError:
raise ValueError(f"Invalid priority: {v}")
return v
class QueueItemCreate(BaseModel):
"""Manually create a queue item."""
technique_id: Optional[UUID] = None
detection_asset_id: Optional[UUID] = None
priority: str = "medium"
reason: str = "manual"
reason_detail: Optional[str] = None
assigned_to: Optional[UUID] = None
due_date: Optional[datetime] = None
@field_validator("reason")
@classmethod
def validate_reason(cls, v):
from app.models.ownership_queue import QueueReason
try:
QueueReason(v)
except ValueError:
valid = [e.value for e in QueueReason]
raise ValueError(f"Invalid reason '{v}'. Must be one of: {valid}")
return v
@field_validator("priority")
@classmethod
def validate_priority(cls, v):
from app.models.ownership_queue import QueuePriority
try:
QueuePriority(v)
except ValueError:
valid = [e.value for e in QueuePriority]
raise ValueError(f"Invalid priority '{v}'. Must be one of: {valid}")
return v
class QueueItemOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
technique_id: Optional[UUID] = None
detection_asset_id: Optional[UUID] = None
priority: str
reason: str
reason_detail: Optional[str] = None
status: str
assigned_to: Optional[UUID] = None
due_date: Optional[datetime] = None
created_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
dismissed_at: Optional[datetime] = None
completed_by: Optional[UUID] = None
extra: Optional[dict] = None
# ── Analyst Dashboard ─────────────────────────────────────────────────────────
class AnalystDashboard(BaseModel):
"""Personalised daily workday view for an analyst."""
my_pending_items: list[QueueItemOut]
expiring_validations_7d: list[dict]
recent_infra_changes: list[dict]
my_low_confidence_techniques: list[dict]
summary: dict