"""Phase 11: Playbook service — CRUD + versioning.""" from datetime import datetime from typing import List, Optional from uuid import UUID from sqlalchemy.orm import Session from app.domain.errors import DomainError from app.models.knowledge import Playbook, PlaybookVersion from app.models.technique import Technique def _get_or_404(db: Session, playbook_id: UUID) -> Playbook: pb = db.query(Playbook).filter( Playbook.id == playbook_id, Playbook.is_active == True, ).first() if not pb: raise DomainError(f"Playbook {playbook_id} not found", status_code=404) return pb def _snapshot(db: Session, pb: Playbook, changed_by: Optional[UUID], change_note: Optional[str]): """Save a version snapshot of the current playbook state.""" snap = PlaybookVersion( playbook_id=pb.id, version=pb.version, title=pb.title, content=pb.content, tools=list(pb.tools or []), prerequisites=list(pb.prerequisites or []), changed_by=changed_by, change_note=change_note, created_at=datetime.utcnow(), ) db.add(snap) # ── Read ────────────────────────────────────────────────────────────────────── def get_playbook(db: Session, playbook_id: UUID) -> Playbook: return _get_or_404(db, playbook_id) def get_playbook_by_technique_type( db: Session, technique_id: UUID, playbook_type: str ) -> Optional[Playbook]: return db.query(Playbook).filter( Playbook.technique_id == technique_id, Playbook.playbook_type == playbook_type, Playbook.is_active == True, ).first() def list_playbooks( db: Session, technique_id: Optional[UUID] = None, playbook_type: Optional[str] = None, include_inactive: bool = False, ) -> List[Playbook]: q = db.query(Playbook) if not include_inactive: q = q.filter(Playbook.is_active == True) if technique_id: q = q.filter(Playbook.technique_id == technique_id) if playbook_type: q = q.filter(Playbook.playbook_type == playbook_type) return q.order_by(Playbook.technique_id, Playbook.playbook_type).all() def get_playbook_versions(db: Session, playbook_id: UUID) -> List[PlaybookVersion]: _get_or_404(db, playbook_id) # 404 guard return ( db.query(PlaybookVersion) .filter(PlaybookVersion.playbook_id == playbook_id) .order_by(PlaybookVersion.version.desc()) .all() ) # ── Create / Update (upsert) ────────────────────────────────────────────────── def create_playbook(db: Session, data: dict, user_id: UUID) -> Playbook: technique_id = data["technique_id"] playbook_type = data["playbook_type"] # Validate technique exists tech = db.query(Technique).filter(Technique.id == technique_id).first() if not tech: raise DomainError(f"Technique {technique_id} not found", status_code=404) # Check for existing (even soft-deleted) to reactivate existing = db.query(Playbook).filter( Playbook.technique_id == technique_id, Playbook.playbook_type == playbook_type, ).first() if existing: if existing.is_active: raise DomainError( f"Playbook ({playbook_type}) for technique {technique_id} already exists. " "Use PATCH to update it.", status_code=409, ) # Reactivate soft-deleted playbook existing.is_active = True existing.title = data.get("title", existing.title) existing.content = data.get("content", existing.content) existing.tools = data.get("tools", existing.tools) or [] existing.prerequisites = data.get("prerequisites", existing.prerequisites) or [] existing.updated_by = user_id existing.updated_at = datetime.utcnow() existing.version += 1 change_note = data.get("change_note") _snapshot(db, existing, user_id, change_note or "Reactivated") db.commit() db.refresh(existing) return existing pb = Playbook( technique_id = technique_id, playbook_type = playbook_type, title = data.get("title", f"{playbook_type.capitalize()} playbook"), content = data.get("content", ""), version = 1, tools = data.get("tools") or [], prerequisites = data.get("prerequisites") or [], created_by = user_id, updated_by = user_id, ) db.add(pb) db.flush() # get id _snapshot(db, pb, user_id, data.get("change_note") or "Initial version") db.commit() db.refresh(pb) return pb def update_playbook(db: Session, playbook_id: UUID, data: dict, user_id: UUID) -> Playbook: pb = _get_or_404(db, playbook_id) # Save snapshot before mutating _snapshot(db, pb, user_id, data.get("change_note")) if "title" in data and data["title"] is not None: pb.title = data["title"] if "content" in data and data["content"] is not None: pb.content = data["content"] if "tools" in data and data["tools"] is not None: pb.tools = data["tools"] if "prerequisites" in data and data["prerequisites"] is not None: pb.prerequisites = data["prerequisites"] pb.version += 1 pb.updated_by = user_id pb.updated_at = datetime.utcnow() db.commit() db.refresh(pb) return pb def restore_version(db: Session, playbook_id: UUID, version_number: int, user_id: UUID) -> Playbook: """Roll back a playbook to a specific historical version.""" pb = _get_or_404(db, playbook_id) snap = db.query(PlaybookVersion).filter( PlaybookVersion.playbook_id == playbook_id, PlaybookVersion.version == version_number, ).first() if not snap: raise DomainError( f"Version {version_number} not found for playbook {playbook_id}", status_code=404 ) # Snapshot current state before restoring _snapshot(db, pb, user_id, f"Auto-snapshot before restore to v{version_number}") pb.title = snap.title pb.content = snap.content pb.tools = list(snap.tools or []) pb.prerequisites = list(snap.prerequisites or []) pb.version += 1 pb.updated_by = user_id pb.updated_at = datetime.utcnow() _snapshot(db, pb, user_id, f"Restored from v{version_number}") db.commit() db.refresh(pb) return pb def delete_playbook(db: Session, playbook_id: UUID, user_id: UUID) -> None: pb = _get_or_404(db, playbook_id) pb.is_active = False pb.updated_by = user_id pb.updated_at = datetime.utcnow() db.commit()