Files
Aegis/backend/app/services/playbook_service.py
kitos 4fba4152d9
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix(knowledge): use EntityNotFoundError/DuplicateEntityError instead of DomainError(status_code=)
2026-05-20 15:21:36 +02:00

200 lines
6.7 KiB
Python

"""Phase 11: Playbook service — CRUD + versioning."""
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.domain.errors import (
DomainError, EntityNotFoundError, DuplicateEntityError, BusinessRuleViolation
)
from app.models.knowledge import Playbook, PlaybookVersion
from app.models.technique import Technique
def _get_or_404(db: Session, playbook_id: UUID) -> Playbook:
pb = db.query(Playbook).filter(
Playbook.id == playbook_id,
Playbook.is_active == True,
).first()
if not pb:
raise EntityNotFoundError("Playbook", str(playbook_id))
return pb
def _snapshot(db: Session, pb: Playbook, changed_by: Optional[UUID], change_note: Optional[str]):
"""Save a version snapshot of the current playbook state."""
snap = PlaybookVersion(
playbook_id=pb.id,
version=pb.version,
title=pb.title,
content=pb.content,
tools=list(pb.tools or []),
prerequisites=list(pb.prerequisites or []),
changed_by=changed_by,
change_note=change_note,
created_at=datetime.utcnow(),
)
db.add(snap)
# ── Read ──────────────────────────────────────────────────────────────────────
def get_playbook(db: Session, playbook_id: UUID) -> Playbook:
return _get_or_404(db, playbook_id)
def get_playbook_by_technique_type(
db: Session, technique_id: UUID, playbook_type: str
) -> Optional[Playbook]:
return db.query(Playbook).filter(
Playbook.technique_id == technique_id,
Playbook.playbook_type == playbook_type,
Playbook.is_active == True,
).first()
def list_playbooks(
db: Session,
technique_id: Optional[UUID] = None,
playbook_type: Optional[str] = None,
include_inactive: bool = False,
) -> List[Playbook]:
q = db.query(Playbook)
if not include_inactive:
q = q.filter(Playbook.is_active == True)
if technique_id:
q = q.filter(Playbook.technique_id == technique_id)
if playbook_type:
q = q.filter(Playbook.playbook_type == playbook_type)
return q.order_by(Playbook.technique_id, Playbook.playbook_type).all()
def get_playbook_versions(db: Session, playbook_id: UUID) -> List[PlaybookVersion]:
_get_or_404(db, playbook_id) # 404 guard
return (
db.query(PlaybookVersion)
.filter(PlaybookVersion.playbook_id == playbook_id)
.order_by(PlaybookVersion.version.desc())
.all()
)
# ── Create / Update (upsert) ──────────────────────────────────────────────────
def create_playbook(db: Session, data: dict, user_id: UUID) -> Playbook:
technique_id = data["technique_id"]
playbook_type = data["playbook_type"]
# Validate technique exists
tech = db.query(Technique).filter(Technique.id == technique_id).first()
if not tech:
raise EntityNotFoundError("Technique", str(technique_id))
# Check for existing (even soft-deleted) to reactivate
existing = db.query(Playbook).filter(
Playbook.technique_id == technique_id,
Playbook.playbook_type == playbook_type,
).first()
if existing:
if existing.is_active:
raise DuplicateEntityError(
"Playbook",
"technique_id+playbook_type",
f"{technique_id}/{playbook_type}",
)
# Reactivate soft-deleted playbook
existing.is_active = True
existing.title = data.get("title", existing.title)
existing.content = data.get("content", existing.content)
existing.tools = data.get("tools", existing.tools) or []
existing.prerequisites = data.get("prerequisites", existing.prerequisites) or []
existing.updated_by = user_id
existing.updated_at = datetime.utcnow()
existing.version += 1
change_note = data.get("change_note")
_snapshot(db, existing, user_id, change_note or "Reactivated")
db.commit()
db.refresh(existing)
return existing
pb = Playbook(
technique_id = technique_id,
playbook_type = playbook_type,
title = data.get("title", f"{playbook_type.capitalize()} playbook"),
content = data.get("content", ""),
version = 1,
tools = data.get("tools") or [],
prerequisites = data.get("prerequisites") or [],
created_by = user_id,
updated_by = user_id,
)
db.add(pb)
db.flush() # get id
_snapshot(db, pb, user_id, data.get("change_note") or "Initial version")
db.commit()
db.refresh(pb)
return pb
def update_playbook(db: Session, playbook_id: UUID, data: dict, user_id: UUID) -> Playbook:
pb = _get_or_404(db, playbook_id)
# Save snapshot before mutating
_snapshot(db, pb, user_id, data.get("change_note"))
if "title" in data and data["title"] is not None:
pb.title = data["title"]
if "content" in data and data["content"] is not None:
pb.content = data["content"]
if "tools" in data and data["tools"] is not None:
pb.tools = data["tools"]
if "prerequisites" in data and data["prerequisites"] is not None:
pb.prerequisites = data["prerequisites"]
pb.version += 1
pb.updated_by = user_id
pb.updated_at = datetime.utcnow()
db.commit()
db.refresh(pb)
return pb
def restore_version(db: Session, playbook_id: UUID, version_number: int, user_id: UUID) -> Playbook:
"""Roll back a playbook to a specific historical version."""
pb = _get_or_404(db, playbook_id)
snap = db.query(PlaybookVersion).filter(
PlaybookVersion.playbook_id == playbook_id,
PlaybookVersion.version == version_number,
).first()
if not snap:
raise EntityNotFoundError("PlaybookVersion", f"{playbook_id}/v{version_number}")
# Snapshot current state before restoring
_snapshot(db, pb, user_id, f"Auto-snapshot before restore to v{version_number}")
pb.title = snap.title
pb.content = snap.content
pb.tools = list(snap.tools or [])
pb.prerequisites = list(snap.prerequisites or [])
pb.version += 1
pb.updated_by = user_id
pb.updated_at = datetime.utcnow()
_snapshot(db, pb, user_id, f"Restored from v{version_number}")
db.commit()
db.refresh(pb)
return pb
def delete_playbook(db: Session, playbook_id: UUID, user_id: UUID) -> None:
pb = _get_or_404(db, playbook_id)
pb.is_active = False
pb.updated_by = user_id
pb.updated_at = datetime.utcnow()
db.commit()