diff --git a/backend/alembic/versions/b035_ownership_queue.py b/backend/alembic/versions/b035_ownership_queue.py new file mode 100644 index 0000000..fd518da --- /dev/null +++ b/backend/alembic/versions/b035_ownership_queue.py @@ -0,0 +1,116 @@ +"""Phase 9: Ownership & Revalidation Queue + +Revision ID: b035ownerq +Revises: b034dlm +Create Date: 2026-05-19 +""" + +from typing import Union +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision: str = "b035ownerq" +down_revision: Union[str, None] = "b034dlm" +branch_labels = None +depends_on = None + + +def _table_exists(table_name: str) -> bool: + conn = op.get_bind() + return conn.dialect.has_table(conn, table_name) + + +def _column_exists(table_name: str, column_name: str) -> bool: + conn = op.get_bind() + insp = sa.inspect(conn) + cols = [c["name"] for c in insp.get_columns(table_name)] + return column_name in cols + + +def _enum_exists(enum_name: str) -> bool: + conn = op.get_bind() + result = conn.execute( + sa.text("SELECT 1 FROM pg_type WHERE typname = :name"), {"name": enum_name} + ).fetchone() + return result is not None + + +def upgrade() -> None: + # ── Enums ──────────────────────────────────────────────────────────────── + if not _enum_exists("queue_priority"): + op.execute("CREATE TYPE queue_priority AS ENUM ('critical', 'high', 'medium', 'low')") + if not _enum_exists("queue_status"): + op.execute("CREATE TYPE queue_status AS ENUM ('pending', 'in_progress', 'completed', 'dismissed')") + if not _enum_exists("queue_reason"): + op.execute( + "CREATE TYPE queue_reason AS ENUM (" + "'validation_expired', 'infra_change', 'osint_alert', " + "'mitre_update', 'rule_modified', 'low_confidence', 'manual')" + ) + + # ── technique_ownerships ───────────────────────────────────────────────── + if not _table_exists("technique_ownerships"): + op.create_table( + "technique_ownerships", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("technique_id", postgresql.UUID(as_uuid=True), + sa.ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False, unique=True), + sa.Column("owner_id", postgresql.UUID(as_uuid=True), + sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("backup_owner_id", postgresql.UUID(as_uuid=True), + sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("team", sa.String(200), nullable=True), + sa.Column("notes", sa.Text, nullable=True), + sa.Column("assigned_at", sa.DateTime, nullable=True), + sa.Column("assigned_by", postgresql.UUID(as_uuid=True), + sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("created_at", sa.DateTime, server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now()), + ) + op.create_index("ix_techown_owner_id", "technique_ownerships", ["owner_id"]) + op.create_index("ix_techown_technique_id", "technique_ownerships", ["technique_id"]) + + # ── revalidation_queue_items ────────────────────────────────────────────── + if not _table_exists("revalidation_queue_items"): + op.create_table( + "revalidation_queue_items", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("technique_id", postgresql.UUID(as_uuid=True), + sa.ForeignKey("techniques.id", ondelete="CASCADE"), nullable=True), + sa.Column("detection_asset_id", postgresql.UUID(as_uuid=True), + sa.ForeignKey("detection_assets.id", ondelete="CASCADE"), nullable=True), + sa.Column("priority", sa.Enum("critical", "high", "medium", "low", + name="queue_priority", create_type=False), + nullable=False, server_default="medium"), + sa.Column("reason", sa.Enum("validation_expired", "infra_change", "osint_alert", + "mitre_update", "rule_modified", "low_confidence", "manual", + name="queue_reason", create_type=False), + nullable=False), + sa.Column("reason_detail", sa.Text, nullable=True), + sa.Column("status", sa.Enum("pending", "in_progress", "completed", "dismissed", + name="queue_status", create_type=False), + nullable=False, server_default="pending"), + sa.Column("assigned_to", postgresql.UUID(as_uuid=True), + sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("due_date", sa.DateTime, nullable=True), + sa.Column("created_at", sa.DateTime, server_default=sa.func.now()), + sa.Column("completed_at", sa.DateTime, nullable=True), + sa.Column("dismissed_at", sa.DateTime, nullable=True), + sa.Column("completed_by", postgresql.UUID(as_uuid=True), + sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("extra", postgresql.JSONB, nullable=True), + ) + op.create_index("ix_rqueue_status", "revalidation_queue_items", ["status"]) + op.create_index("ix_rqueue_priority", "revalidation_queue_items", ["priority"]) + op.create_index("ix_rqueue_assigned_to", "revalidation_queue_items", ["assigned_to"]) + op.create_index("ix_rqueue_technique_id", "revalidation_queue_items", ["technique_id"]) + op.create_index("ix_rqueue_asset_id", "revalidation_queue_items", ["detection_asset_id"]) + + +def downgrade() -> None: + op.drop_table("revalidation_queue_items") + op.drop_table("technique_ownerships") + op.execute("DROP TYPE IF EXISTS queue_reason") + op.execute("DROP TYPE IF EXISTS queue_status") + op.execute("DROP TYPE IF EXISTS queue_priority") diff --git a/backend/app/jobs/mitre_sync_job.py b/backend/app/jobs/mitre_sync_job.py index b5049d2..5d8f314 100644 --- a/backend/app/jobs/mitre_sync_job.py +++ b/backend/app/jobs/mitre_sync_job.py @@ -209,6 +209,20 @@ def _run_decay_engine() -> None: db.close() +def _run_queue_generation() -> None: + """Generate revalidation queue items for analysts — runs after decay engine.""" + logger.info("Scheduled revalidation queue generation starting...") + db = SessionLocal() + try: + from app.services.revalidation_queue_service import generate_queue_items + results = generate_queue_items(db) + logger.info("Queue generation finished — %s", results) + except Exception: + logger.exception("Queue generation job failed") + finally: + db.close() + + # --------------------------------------------------------------------------- # Scheduler bootstrap # --------------------------------------------------------------------------- @@ -315,6 +329,15 @@ def start_scheduler() -> None: name="Detection decay engine (daily 02:00)", replace_existing=True, ) + scheduler.add_job( + _run_queue_generation, + trigger="cron", + hour=2, + minute=30, + id="queue_generation", + name="Revalidation queue generation (daily 02:30)", + replace_existing=True, + ) scheduler.start() logger.info( "Background scheduler started — mitre_sync (24h), intel_scan (7d), " diff --git a/backend/app/main.py b/backend/app/main.py index ff6fb00..582980b 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -39,6 +39,7 @@ from app.routers import advanced_metrics as advanced_metrics_router from app.routers import osint as osint_router from app.routers import webhooks as webhooks_router from app.routers import detection_lifecycle as detection_lifecycle_router +from app.routers import ownership as ownership_router from app.domain.errors import DomainError from app.middleware.error_handler import domain_exception_handler from app.middleware.request_context import RequestContextMiddleware @@ -137,6 +138,7 @@ app.include_router(advanced_metrics_router.router, prefix="/api/v1") app.include_router(osint_router.router, prefix="/api/v1") app.include_router(webhooks_router.router, prefix="/api/v1") app.include_router(detection_lifecycle_router.router, prefix="/api/v1") +app.include_router(ownership_router.router, prefix="/api/v1") @app.get("/health", include_in_schema=False) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 1ae717e..c031561 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -29,6 +29,10 @@ from app.models.detection_lifecycle import ( DetectionConfidence, DetectionHealthStatus, InvalidationReason, ) from app.models.decay_policy import DecayPolicy +from app.models.ownership_queue import ( + TechniqueOwnership, RevalidationQueueItem, + QueuePriority, QueueStatus, QueueReason, +) __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", @@ -45,4 +49,6 @@ __all__ = [ "WebhookConfig", "SystemConfig", "DetectionAsset", "DetectionTechniqueMapping", "DetectionValidation", "TechniqueConfidenceScore", "InfrastructureChangeLog", "DecayPolicy", + "TechniqueOwnership", "RevalidationQueueItem", + "QueuePriority", "QueueStatus", "QueueReason", ] diff --git a/backend/app/models/ownership_queue.py b/backend/app/models/ownership_queue.py new file mode 100644 index 0000000..ea07072 --- /dev/null +++ b/backend/app/models/ownership_queue.py @@ -0,0 +1,136 @@ +"""Phase 9: Ownership & Revalidation Queue models.""" + +import enum +import uuid +from datetime import datetime + +from sqlalchemy import Column, DateTime, Enum, ForeignKey, Index, String, Text +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from app.database import Base + + +class QueuePriority(str, enum.Enum): + critical = "critical" + high = "high" + medium = "medium" + low = "low" + + +class QueueStatus(str, enum.Enum): + pending = "pending" + in_progress = "in_progress" + completed = "completed" + dismissed = "dismissed" + + +class QueueReason(str, enum.Enum): + validation_expired = "validation_expired" + infra_change = "infra_change" + osint_alert = "osint_alert" + mitre_update = "mitre_update" + rule_modified = "rule_modified" + low_confidence = "low_confidence" + manual = "manual" + + +class TechniqueOwnership(Base): + """Ownership assignment for a MITRE technique.""" + + __tablename__ = "technique_ownerships" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + technique_id = Column( + UUID(as_uuid=True), + ForeignKey("techniques.id", ondelete="CASCADE"), + nullable=False, + unique=True, + ) + owner_id = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + backup_owner_id = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + team = Column(String(200), nullable=True) + notes = Column(Text, nullable=True) + assigned_at = Column(DateTime, nullable=True) + assigned_by = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + technique = relationship("Technique", foreign_keys=[technique_id]) + owner = relationship("User", foreign_keys=[owner_id]) + backup_owner = relationship("User", foreign_keys=[backup_owner_id]) + + +class RevalidationQueueItem(Base): + """A prioritised work item for the analyst's daily queue.""" + + __tablename__ = "revalidation_queue_items" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + technique_id = Column( + UUID(as_uuid=True), + ForeignKey("techniques.id", ondelete="CASCADE"), + nullable=True, + ) + detection_asset_id = Column( + UUID(as_uuid=True), + ForeignKey("detection_assets.id", ondelete="CASCADE"), + nullable=True, + ) + + priority = Column( + Enum(QueuePriority, name="queue_priority"), + nullable=False, + default=QueuePriority.medium, + ) + reason = Column( + Enum(QueueReason, name="queue_reason"), + nullable=False, + ) + reason_detail = Column(Text, nullable=True) + status = Column( + Enum(QueueStatus, name="queue_status"), + nullable=False, + default=QueueStatus.pending, + ) + + assigned_to = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + due_date = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + completed_at = Column(DateTime, nullable=True) + dismissed_at = Column(DateTime, nullable=True) + completed_by = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + extra = Column(JSONB, nullable=True) # arbitrary metadata + + technique = relationship("Technique", foreign_keys=[technique_id]) + detection_asset = relationship("DetectionAsset", foreign_keys=[detection_asset_id]) + assignee = relationship("User", foreign_keys=[assigned_to]) + + +# Indexes +Index("ix_rqueue_status", RevalidationQueueItem.status) +Index("ix_rqueue_priority", RevalidationQueueItem.priority) +Index("ix_rqueue_assigned_to", RevalidationQueueItem.assigned_to) +Index("ix_rqueue_technique_id", RevalidationQueueItem.technique_id) +Index("ix_rqueue_asset_id", RevalidationQueueItem.detection_asset_id) +Index("ix_techown_owner_id", TechniqueOwnership.owner_id) diff --git a/backend/app/routers/ownership.py b/backend/app/routers/ownership.py new file mode 100644 index 0000000..aea2652 --- /dev/null +++ b/backend/app/routers/ownership.py @@ -0,0 +1,216 @@ +"""Phase 9: Ownership & Daily Operations router.""" + +from typing import Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.domain.exceptions import EntityNotFoundError +from app.schemas.ownership_queue_schema import ( + TechniqueOwnershipSet, TechniqueOwnershipOut, + DetectionAssetOwnershipPatch, + BulkAssignRequest, BulkAssignResult, + QueueItemCreate, QueueItemPatch, QueueItemOut, + AnalystDashboard, +) +from app.services import ownership_service, revalidation_queue_service +from app.models.ownership_queue import RevalidationQueueItem + +router = APIRouter(prefix="/ownership", tags=["ownership"]) + + +# ── Technique Ownership ─────────────────────────────────────────────────────── + +@router.get("/techniques/{technique_id}", response_model=TechniqueOwnershipOut) +def get_technique_ownership( + technique_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + ownership = ownership_service.get_technique_ownership(db, technique_id) + if not ownership: + raise EntityNotFoundError("TechniqueOwnership", str(technique_id)) + return ownership + + +@router.put("/techniques/{technique_id}", response_model=TechniqueOwnershipOut) +def set_technique_ownership( + technique_id: UUID, + body: TechniqueOwnershipSet, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "blue_lead", "red_lead")), +): + return ownership_service.set_technique_ownership( + db, technique_id, + owner_id=body.owner_id, + backup_owner_id=body.backup_owner_id, + team=body.team, + notes=body.notes, + assigned_by=user.id, + ) + + +# ── Detection Asset Ownership ───────────────────────────────────────────────── + +@router.patch("/assets/{asset_id}", response_model=dict) +def set_asset_ownership( + asset_id: UUID, + body: DetectionAssetOwnershipPatch, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "blue_lead")), +): + ownership_service.set_asset_ownership( + db, asset_id, + owner_id=body.owner_id, + backup_owner_id=body.backup_owner_id, + team=body.team, + user_id=user.id, + ) + return {"message": "Asset ownership updated"} + + +# ── Orphan Reports ──────────────────────────────────────────────────────────── + +@router.get("/orphans/techniques", response_model=list[dict]) +def orphan_techniques( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Return techniques with no assigned owner.""" + return ownership_service.get_orphan_techniques(db) + + +@router.get("/orphans/assets", response_model=list[dict]) +def orphan_assets( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Return active detection assets with no assigned owner.""" + return ownership_service.get_orphan_assets(db) + + +# ── Bulk Assignment ─────────────────────────────────────────────────────────── + +@router.post("/bulk-assign", response_model=BulkAssignResult) +def bulk_assign( + body: BulkAssignRequest, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "blue_lead", "red_lead")), +): + """ + Bulk-assign ownership. + - If `tactic` is set → assigns technique ownership for all techniques of that tactic. + - If `platform` is set → assigns asset ownership for all assets on that platform. + At least one of tactic/platform must be provided. + """ + if not body.tactic and not body.platform: + from fastapi import HTTPException + raise HTTPException(status_code=422, detail="Provide at least one of: tactic, platform") + + if body.tactic: + result = ownership_service.bulk_assign_techniques_by_tactic( + db, body.tactic, + owner_id=body.owner_id, + backup_owner_id=body.backup_owner_id, + team=body.team, + overwrite=body.overwrite, + user_id=user.id, + ) + else: + result = ownership_service.bulk_assign_assets_by_platform( + db, body.platform, + owner_id=body.owner_id, + backup_owner_id=body.backup_owner_id, + team=body.team, + overwrite=body.overwrite, + user_id=user.id, + ) + + return BulkAssignResult(**result) + + +# ── Revalidation Queue ──────────────────────────────────────────────────────── + +@router.get("/queue", response_model=list[QueueItemOut]) +def list_queue( + status: Optional[str] = Query(None), + priority: Optional[str] = Query(None), + reason: Optional[str] = Query(None), + assigned_to: Optional[UUID] = Query(None), + technique_id: Optional[UUID] = Query(None), + detection_asset_id: Optional[UUID] = Query(None), + limit: int = Query(100, ge=1, le=500), + offset: int = Query(0, ge=0), + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return revalidation_queue_service.list_queue( + db, status=status, priority=priority, reason=reason, + assigned_to=assigned_to, technique_id=technique_id, + detection_asset_id=detection_asset_id, limit=limit, offset=offset, + ) + + +@router.post("/queue", response_model=QueueItemOut, status_code=201) +def create_queue_item( + body: QueueItemCreate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return revalidation_queue_service.create_queue_item(db, body.model_dump(), user.id) + + +@router.patch("/queue/{item_id}", response_model=QueueItemOut) +def update_queue_item( + item_id: UUID, + body: QueueItemPatch, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return revalidation_queue_service.update_queue_item(db, item_id, body.model_dump(exclude_unset=True), user.id) + + +@router.post("/queue/generate", response_model=dict) +def generate_queue( + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "blue_lead")), +): + """Scan the system and create new revalidation queue items.""" + return revalidation_queue_service.generate_queue_items(db) + + +# ── Analyst Dashboard ───────────────────────────────────────────────────────── + +@router.get("/analyst-dashboard") +def analyst_dashboard( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Personalised daily workday view: my queue, expiring validations, infra changes, low-confidence techniques.""" + dashboard = revalidation_queue_service.get_analyst_dashboard(db, user.id) + + # Serialize queue items to dicts (ORM objects → plain dicts) + def _item_to_dict(item: RevalidationQueueItem) -> dict: + return { + "id": str(item.id), + "technique_id": str(item.technique_id) if item.technique_id else None, + "detection_asset_id": str(item.detection_asset_id) if item.detection_asset_id else None, + "priority": item.priority.value if hasattr(item.priority, "value") else item.priority, + "reason": item.reason.value if hasattr(item.reason, "value") else item.reason, + "reason_detail": item.reason_detail, + "status": item.status.value if hasattr(item.status, "value") else item.status, + "assigned_to": str(item.assigned_to) if item.assigned_to else None, + "due_date": item.due_date.isoformat() if item.due_date else None, + "created_at": item.created_at.isoformat() if item.created_at else None, + } + + return { + "my_pending_items": [_item_to_dict(i) for i in dashboard["my_pending_items"]], + "expiring_validations_7d": dashboard["expiring_validations_7d"], + "recent_infra_changes": dashboard["recent_infra_changes"], + "my_low_confidence_techniques": dashboard["my_low_confidence_techniques"], + "summary": dashboard["summary"], + } diff --git a/backend/app/schemas/ownership_queue_schema.py b/backend/app/schemas/ownership_queue_schema.py new file mode 100644 index 0000000..a336f0f --- /dev/null +++ b/backend/app/schemas/ownership_queue_schema.py @@ -0,0 +1,131 @@ +"""Pydantic schemas for Phase 9: Ownership & Revalidation Queue.""" + +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, field_validator + + +# ── Technique Ownership ─────────────────────────────────────────────────────── + +class TechniqueOwnershipSet(BaseModel): + """Set (create or replace) ownership for a technique.""" + owner_id: Optional[UUID] = None + backup_owner_id: Optional[UUID] = None + team: Optional[str] = None + notes: Optional[str] = None + + +class TechniqueOwnershipOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + technique_id: UUID + owner_id: Optional[UUID] = None + backup_owner_id: Optional[UUID] = None + team: Optional[str] = None + notes: Optional[str] = None + assigned_at: Optional[datetime] = None + assigned_by: Optional[UUID] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + +class DetectionAssetOwnershipPatch(BaseModel): + """Update ownership fields on a detection asset.""" + owner_id: Optional[UUID] = None + backup_owner_id: Optional[UUID] = None + team: Optional[str] = None + + +# ── Bulk Assignment ─────────────────────────────────────────────────────────── + +class BulkAssignRequest(BaseModel): + """Bulk-assign ownership by tactic, platform, or team filter.""" + owner_id: Optional[UUID] = None + backup_owner_id: Optional[UUID] = None + team: Optional[str] = None + # Filters — at least one must be set + tactic: Optional[str] = None # assign all techniques with this tactic + platform: Optional[str] = None # assign all detection assets with this platform + overwrite: bool = False # overwrite existing assignments + + +class BulkAssignResult(BaseModel): + assigned_count: int + skipped_count: int + target_type: str # "technique" or "detection_asset" + + +# ── Revalidation Queue ──────────────────────────────────────────────────────── + +class QueueItemPatch(BaseModel): + """Update a revalidation queue item.""" + status: Optional[str] = None + assigned_to: Optional[UUID] = None + priority: Optional[str] = None + due_date: Optional[datetime] = None + + @field_validator("status") + @classmethod + def validate_status(cls, v): + from app.models.ownership_queue import QueueStatus + if v is not None: + try: + QueueStatus(v) + except ValueError: + raise ValueError(f"Invalid status: {v}") + return v + + @field_validator("priority") + @classmethod + def validate_priority(cls, v): + from app.models.ownership_queue import QueuePriority + if v is not None: + try: + QueuePriority(v) + except ValueError: + raise ValueError(f"Invalid priority: {v}") + return v + + +class QueueItemCreate(BaseModel): + """Manually create a queue item.""" + technique_id: Optional[UUID] = None + detection_asset_id: Optional[UUID] = None + priority: str = "medium" + reason: str = "manual" + reason_detail: Optional[str] = None + assigned_to: Optional[UUID] = None + due_date: Optional[datetime] = None + + +class QueueItemOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + technique_id: Optional[UUID] = None + detection_asset_id: Optional[UUID] = None + priority: str + reason: str + reason_detail: Optional[str] = None + status: str + assigned_to: Optional[UUID] = None + due_date: Optional[datetime] = None + created_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + dismissed_at: Optional[datetime] = None + completed_by: Optional[UUID] = None + extra: Optional[dict] = None + + +# ── Analyst Dashboard ───────────────────────────────────────────────────────── + +class AnalystDashboard(BaseModel): + """Personalised daily workday view for an analyst.""" + my_pending_items: list[QueueItemOut] + expiring_validations_7d: list[dict] + recent_infra_changes: list[dict] + my_low_confidence_techniques: list[dict] + summary: dict diff --git a/backend/app/services/ownership_service.py b/backend/app/services/ownership_service.py new file mode 100644 index 0000000..5bd643a --- /dev/null +++ b/backend/app/services/ownership_service.py @@ -0,0 +1,215 @@ +"""Phase 9: Ownership service — techniques and detection assets.""" + +import logging +from datetime import datetime +from typing import Optional +from uuid import UUID + +from sqlalchemy.orm import Session + +from app.models.ownership_queue import TechniqueOwnership +from app.models.detection_lifecycle import DetectionAsset +from app.models.technique import Technique +from app.domain.exceptions import EntityNotFoundError +from app.services import audit_service + +logger = logging.getLogger(__name__) + + +def _now() -> datetime: + return datetime.utcnow() + + +# ── Technique Ownership ─────────────────────────────────────────────────────── + +def get_technique_ownership(db: Session, technique_id: UUID) -> Optional[TechniqueOwnership]: + return db.query(TechniqueOwnership).filter( + TechniqueOwnership.technique_id == technique_id + ).first() + + +def set_technique_ownership( + db: Session, + technique_id: UUID, + owner_id: Optional[UUID], + backup_owner_id: Optional[UUID], + team: Optional[str], + notes: Optional[str], + assigned_by: UUID, +) -> TechniqueOwnership: + technique = db.query(Technique).filter(Technique.id == technique_id).first() + if not technique: + raise EntityNotFoundError("Technique", str(technique_id)) + + ownership = db.query(TechniqueOwnership).filter( + TechniqueOwnership.technique_id == technique_id + ).first() + + if not ownership: + ownership = TechniqueOwnership(technique_id=technique_id) + db.add(ownership) + + ownership.owner_id = owner_id + ownership.backup_owner_id = backup_owner_id + ownership.team = team + ownership.notes = notes + ownership.assigned_at = _now() + ownership.assigned_by = assigned_by + ownership.updated_at = _now() + + db.commit() + db.refresh(ownership) + + audit_service.log_action( + db, assigned_by, "TECHNIQUE_OWNERSHIP_SET", "technique_ownership", str(ownership.id), + details={"technique_id": str(technique_id), "owner_id": str(owner_id) if owner_id else None, "team": team}, + ) + return ownership + + +def get_orphan_techniques(db: Session) -> list[dict]: + """Techniques with no ownership record or null owner_id.""" + owned_ids = db.query(TechniqueOwnership.technique_id).filter( + TechniqueOwnership.owner_id.isnot(None) + ).subquery() + + orphans = db.query(Technique).filter( + ~Technique.id.in_(owned_ids) + ).order_by(Technique.tactic, Technique.mitre_id).all() + + return [ + { + "technique_id": str(t.id), + "mitre_id": t.mitre_id, + "name": t.name, + "tactic": t.tactic, + } + for t in orphans + ] + + +# ── Detection Asset Ownership ───────────────────────────────────────────────── + +def set_asset_ownership( + db: Session, + asset_id: UUID, + owner_id: Optional[UUID], + backup_owner_id: Optional[UUID], + team: Optional[str], + user_id: UUID, +) -> DetectionAsset: + asset = db.query(DetectionAsset).filter(DetectionAsset.id == asset_id).first() + if not asset: + raise EntityNotFoundError("DetectionAsset", str(asset_id)) + + asset.owner_id = owner_id + asset.backup_owner_id = backup_owner_id + asset.team = team + db.commit() + db.refresh(asset) + + audit_service.log_action( + db, user_id, "ASSET_OWNERSHIP_SET", "detection_asset", str(asset_id), + details={"owner_id": str(owner_id) if owner_id else None, "team": team}, + ) + return asset + + +def get_orphan_assets(db: Session) -> list[dict]: + """Active detection assets with no owner.""" + orphans = db.query(DetectionAsset).filter( + DetectionAsset.is_active == True, + DetectionAsset.owner_id.is_(None), + ).order_by(DetectionAsset.platform, DetectionAsset.name).all() + + return [ + { + "asset_id": str(a.id), + "name": a.name, + "platform": a.platform, + "asset_type": a.asset_type, + "health_status": a.health_status.value if a.health_status else None, + } + for a in orphans + ] + + +# ── Bulk Assignment ─────────────────────────────────────────────────────────── + +def bulk_assign_techniques_by_tactic( + db: Session, + tactic: str, + owner_id: Optional[UUID], + backup_owner_id: Optional[UUID], + team: Optional[str], + overwrite: bool, + user_id: UUID, +) -> dict: + techniques = db.query(Technique).filter(Technique.tactic == tactic).all() + assigned = 0 + skipped = 0 + now = _now() + + for technique in techniques: + existing = db.query(TechniqueOwnership).filter( + TechniqueOwnership.technique_id == technique.id + ).first() + + if existing and existing.owner_id and not overwrite: + skipped += 1 + continue + + if not existing: + existing = TechniqueOwnership(technique_id=technique.id) + db.add(existing) + + existing.owner_id = owner_id + existing.backup_owner_id = backup_owner_id + existing.team = team + existing.assigned_at = now + existing.assigned_by = user_id + existing.updated_at = now + assigned += 1 + + db.commit() + + audit_service.log_action( + db, user_id, "BULK_OWNERSHIP_ASSIGNED", "technique_ownership", None, + details={"tactic": tactic, "assigned": assigned, "skipped": skipped, "team": team}, + ) + logger.info("Bulk ownership: tactic=%s assigned=%d skipped=%d", tactic, assigned, skipped) + return {"assigned_count": assigned, "skipped_count": skipped, "target_type": "technique"} + + +def bulk_assign_assets_by_platform( + db: Session, + platform: str, + owner_id: Optional[UUID], + backup_owner_id: Optional[UUID], + team: Optional[str], + overwrite: bool, + user_id: UUID, +) -> dict: + assets = db.query(DetectionAsset).filter( + DetectionAsset.platform == platform, + DetectionAsset.is_active == True, + ).all() + assigned = 0 + skipped = 0 + + for asset in assets: + if asset.owner_id and not overwrite: + skipped += 1 + continue + asset.owner_id = owner_id + asset.backup_owner_id = backup_owner_id + asset.team = team + assigned += 1 + + db.commit() + + audit_service.log_action( + db, user_id, "BULK_ASSET_OWNERSHIP_ASSIGNED", "detection_asset", None, + details={"platform": platform, "assigned": assigned, "skipped": skipped}, + ) + return {"assigned_count": assigned, "skipped_count": skipped, "target_type": "detection_asset"} diff --git a/backend/app/services/revalidation_queue_service.py b/backend/app/services/revalidation_queue_service.py new file mode 100644 index 0000000..75b9650 --- /dev/null +++ b/backend/app/services/revalidation_queue_service.py @@ -0,0 +1,388 @@ +"""Phase 9: Revalidation Queue service — generation, management, analyst dashboard.""" + +import logging +from datetime import datetime, timedelta +from typing import Optional +from uuid import UUID + +from sqlalchemy.orm import Session + +from app.models.ownership_queue import ( + RevalidationQueueItem, QueuePriority, QueueStatus, QueueReason, + TechniqueOwnership, +) +from app.models.detection_lifecycle import ( + DetectionAsset, DetectionValidation, TechniqueConfidenceScore, + InfrastructureChangeLog, DetectionConfidence, +) +from app.models.technique import Technique +from app.domain.exceptions import EntityNotFoundError + +logger = logging.getLogger(__name__) + + +def _now() -> datetime: + return datetime.utcnow() + + +# ── Queue Generation ────────────────────────────────────────────────────────── + +def generate_queue_items(db: Session) -> dict: + """ + Scan the system and create new revalidation queue items for: + - Assets with no valid detection validation (validation_expired) + - Low-confidence techniques (low_confidence) + - Recent infrastructure changes that affected assets (infra_change) + + Skips already-pending/in_progress items for the same asset+reason. + Returns counts of created and skipped items. + """ + created = 0 + skipped = 0 + now = _now() + + def _has_active_item(technique_id=None, asset_id=None, reason=None) -> bool: + q = db.query(RevalidationQueueItem).filter( + RevalidationQueueItem.status.in_(["pending", "in_progress"]) + ) + if technique_id: + q = q.filter(RevalidationQueueItem.technique_id == technique_id) + if asset_id: + q = q.filter(RevalidationQueueItem.detection_asset_id == asset_id) + if reason: + q = q.filter(RevalidationQueueItem.reason == reason) + return q.first() is not None + + # 1) Assets with no active valid validation + active_assets = db.query(DetectionAsset).filter(DetectionAsset.is_active == True).all() + for asset in active_assets: + has_valid = db.query(DetectionValidation).filter( + DetectionValidation.detection_asset_id == asset.id, + DetectionValidation.is_valid == True, + DetectionValidation.expires_at > now, + ).first() + + if has_valid: + continue + + if _has_active_item(asset_id=asset.id, reason=QueueReason.validation_expired): + skipped += 1 + continue + + # Determine priority: assets not validated for >30 days are high + last_val = db.query(DetectionValidation).filter( + DetectionValidation.detection_asset_id == asset.id, + ).order_by(DetectionValidation.validated_at.desc()).first() + + if last_val is None: + priority = QueuePriority.high + detail = "Asset has never been validated" + else: + days_stale = (now - last_val.validated_at).days + if days_stale > 60: + priority = QueuePriority.critical + elif days_stale > 30: + priority = QueuePriority.high + else: + priority = QueuePriority.medium + detail = f"Last validation expired {days_stale} days ago" + + item = RevalidationQueueItem( + detection_asset_id=asset.id, + priority=priority, + reason=QueueReason.validation_expired, + reason_detail=detail, + due_date=now + timedelta(days=7), + ) + db.add(item) + created += 1 + + # 2) Low-confidence techniques (stale or broken) + low_scores = db.query(TechniqueConfidenceScore).filter( + TechniqueConfidenceScore.confidence_level.in_([ + DetectionConfidence.stale, DetectionConfidence.broken, + ]) + ).all() + + for score in low_scores: + if _has_active_item(technique_id=score.technique_id, reason=QueueReason.low_confidence): + skipped += 1 + continue + + priority = ( + QueuePriority.critical + if score.confidence_level == DetectionConfidence.broken + else QueuePriority.high + ) + detail = ( + f"Technique confidence {score.confidence_level.value} " + f"(score={score.confidence_score}). " + f"Risk: {', '.join(score.risk_factors or [])}" + ) + item = RevalidationQueueItem( + technique_id=score.technique_id, + priority=priority, + reason=QueueReason.low_confidence, + reason_detail=detail, + due_date=now + timedelta(days=14), + ) + db.add(item) + created += 1 + + # 3) Recent infrastructure changes (last 7 days) → one item per change + recent_changes = db.query(InfrastructureChangeLog).filter( + InfrastructureChangeLog.change_date >= now - timedelta(days=7), + InfrastructureChangeLog.auto_invalidate == True, + ).all() + + for change in recent_changes: + if _has_active_item(reason=QueueReason.infra_change) and False: + # Don't deduplicate infra changes — each change is separate + pass + existing = db.query(RevalidationQueueItem).filter( + RevalidationQueueItem.reason == QueueReason.infra_change, + RevalidationQueueItem.status.in_(["pending", "in_progress"]), + RevalidationQueueItem.extra["change_id"].astext == str(change.id), + ).first() + if existing: + skipped += 1 + continue + + item = RevalidationQueueItem( + priority=QueuePriority.high, + reason=QueueReason.infra_change, + reason_detail=( + f"{change.change_type}: {change.description[:120]} " + f"({change.invalidated_count or 0} validations invalidated)" + ), + due_date=now + timedelta(days=3), + extra={"change_id": str(change.id), "change_type": change.change_type, + "affected_platforms": change.affected_platforms or []}, + ) + db.add(item) + created += 1 + + if created > 0: + db.commit() + + logger.info("Queue generation: created=%d skipped=%d", created, skipped) + return {"created": created, "skipped": skipped} + + +# ── Queue CRUD ──────────────────────────────────────────────────────────────── + +def list_queue( + db: Session, + status: Optional[str] = None, + priority: Optional[str] = None, + reason: Optional[str] = None, + assigned_to: Optional[UUID] = None, + technique_id: Optional[UUID] = None, + detection_asset_id: Optional[UUID] = None, + limit: int = 100, + offset: int = 0, +) -> list[RevalidationQueueItem]: + q = db.query(RevalidationQueueItem) + if status: + q = q.filter(RevalidationQueueItem.status == status) + if priority: + q = q.filter(RevalidationQueueItem.priority == priority) + if reason: + q = q.filter(RevalidationQueueItem.reason == reason) + if assigned_to: + q = q.filter(RevalidationQueueItem.assigned_to == assigned_to) + if technique_id: + q = q.filter(RevalidationQueueItem.technique_id == technique_id) + if detection_asset_id: + q = q.filter(RevalidationQueueItem.detection_asset_id == detection_asset_id) + + # Priority order: critical > high > medium > low + priority_order = { + "critical": 0, "high": 1, "medium": 2, "low": 3, + } + from sqlalchemy import case + q = q.order_by( + case( + {"critical": 0, "high": 1, "medium": 2, "low": 3}, + value=RevalidationQueueItem.priority, + ), + RevalidationQueueItem.due_date.asc().nullslast(), + RevalidationQueueItem.created_at.asc(), + ) + return q.offset(offset).limit(limit).all() + + +def create_queue_item(db: Session, data: dict, user_id: UUID) -> RevalidationQueueItem: + item = RevalidationQueueItem( + technique_id=data.get("technique_id"), + detection_asset_id=data.get("detection_asset_id"), + priority=data.get("priority", "medium"), + reason=data.get("reason", "manual"), + reason_detail=data.get("reason_detail"), + assigned_to=data.get("assigned_to"), + due_date=data.get("due_date"), + extra={"created_by": str(user_id)}, + ) + db.add(item) + db.commit() + db.refresh(item) + return item + + +def update_queue_item(db: Session, item_id: UUID, data: dict, user_id: UUID) -> RevalidationQueueItem: + item = db.query(RevalidationQueueItem).filter(RevalidationQueueItem.id == item_id).first() + if not item: + raise EntityNotFoundError("RevalidationQueueItem", str(item_id)) + + now = _now() + + if "status" in data and data["status"] is not None: + new_status = data["status"] + item.status = new_status + if new_status == "completed": + item.completed_at = now + item.completed_by = user_id + elif new_status == "dismissed": + item.dismissed_at = now + + if "assigned_to" in data: + item.assigned_to = data["assigned_to"] + + if "priority" in data and data["priority"] is not None: + item.priority = data["priority"] + + if "due_date" in data: + item.due_date = data["due_date"] + + db.commit() + db.refresh(item) + return item + + +# ── Analyst Dashboard ───────────────────────────────────────────────────────── + +def get_analyst_dashboard(db: Session, user_id: UUID) -> dict: + """Return a personalised daily workday view for the logged-in analyst.""" + now = _now() + + # 1) My pending/in_progress queue items (assigned to me) + my_items = list_queue(db, status=None, assigned_to=user_id, limit=50) + my_items = [i for i in my_items if i.status in ("pending", "in_progress")] + + # Also include unassigned items where I'm the technique/asset owner + owned_tech_ids = [ + row.technique_id + for row in db.query(TechniqueOwnership.technique_id).filter( + (TechniqueOwnership.owner_id == user_id) | + (TechniqueOwnership.backup_owner_id == user_id) + ).all() + ] + owned_asset_ids = [ + row.id + for row in db.query(DetectionAsset.id).filter( + DetectionAsset.is_active == True, + (DetectionAsset.owner_id == user_id) | + (DetectionAsset.backup_owner_id == user_id), + ).all() + ] + + # Unassigned items for my techniques/assets + if owned_tech_ids or owned_asset_ids: + from sqlalchemy import or_ + unassigned_q = db.query(RevalidationQueueItem).filter( + RevalidationQueueItem.status.in_(["pending", "in_progress"]), + RevalidationQueueItem.assigned_to.is_(None), + ) + filters = [] + if owned_tech_ids: + filters.append(RevalidationQueueItem.technique_id.in_(owned_tech_ids)) + if owned_asset_ids: + filters.append(RevalidationQueueItem.detection_asset_id.in_(owned_asset_ids)) + unassigned_q = unassigned_q.filter(or_(*filters)) + unassigned_items = unassigned_q.limit(20).all() + # Merge, deduplicate by id + seen = {i.id for i in my_items} + for item in unassigned_items: + if item.id not in seen: + my_items.append(item) + seen.add(item.id) + + # 2) Validations expiring in next 7 days on assets I own + expiring = [] + if owned_asset_ids: + expiring_vals = db.query(DetectionValidation).filter( + DetectionValidation.detection_asset_id.in_(owned_asset_ids), + DetectionValidation.is_valid == True, + DetectionValidation.expires_at <= now + timedelta(days=7), + DetectionValidation.expires_at > now, + ).order_by(DetectionValidation.expires_at.asc()).limit(20).all() + + for v in expiring_vals: + asset = db.query(DetectionAsset).filter(DetectionAsset.id == v.detection_asset_id).first() + expiring.append({ + "validation_id": str(v.id), + "asset_id": str(v.detection_asset_id), + "asset_name": asset.name if asset else None, + "expires_at": v.expires_at.isoformat() if v.expires_at else None, + "days_until_expiry": (v.expires_at - now).days if v.expires_at else None, + }) + + # 3) Recent infra changes (last 7 days) + recent_changes = db.query(InfrastructureChangeLog).filter( + InfrastructureChangeLog.change_date >= now - timedelta(days=7), + ).order_by(InfrastructureChangeLog.change_date.desc()).limit(5).all() + + infra_list = [ + { + "change_id": str(c.id), + "change_type": c.change_type, + "description": c.description, + "affected_platforms": c.affected_platforms or [], + "invalidated_count": c.invalidated_count or 0, + "change_date": c.change_date.isoformat() if c.change_date else None, + } + for c in recent_changes + ] + + # 4) My techniques with low confidence + low_confidence = [] + if owned_tech_ids: + low_scores = db.query(TechniqueConfidenceScore).filter( + TechniqueConfidenceScore.technique_id.in_(owned_tech_ids), + TechniqueConfidenceScore.confidence_score < 50, + ).order_by(TechniqueConfidenceScore.confidence_score.asc()).limit(10).all() + + for score in low_scores: + tech = db.query(Technique).filter(Technique.id == score.technique_id).first() + low_confidence.append({ + "technique_id": str(score.technique_id), + "mitre_id": tech.mitre_id if tech else None, + "name": tech.name if tech else None, + "confidence_level": score.confidence_level.value if score.confidence_level else None, + "confidence_score": score.confidence_score, + "risk_factors": score.risk_factors or [], + }) + + # Summary + total_pending = db.query(RevalidationQueueItem).filter( + RevalidationQueueItem.status == QueueStatus.pending + ).count() + total_critical = db.query(RevalidationQueueItem).filter( + RevalidationQueueItem.status.in_(["pending", "in_progress"]), + RevalidationQueueItem.priority == QueuePriority.critical, + ).count() + + return { + "my_pending_items": my_items, + "expiring_validations_7d": expiring, + "recent_infra_changes": infra_list, + "my_low_confidence_techniques": low_confidence, + "summary": { + "my_assigned_items": len([i for i in my_items if i.assigned_to == user_id]), + "my_owned_techniques": len(owned_tech_ids), + "my_owned_assets": len(owned_asset_ids), + "total_pending_system": total_pending, + "critical_system": total_critical, + "expiring_soon": len(expiring), + }, + }