Files
Aegis/backend/app/services/playbook_service.py
kitos 4f5370db89
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(knowledge): Phase 11 — Knowledge Management (Playbooks + Lessons Learned) [FASE-11]
- Playbooks: versioned Markdown runbooks per technique × type (attack/detect/investigate/respond/hunt)
- PlaybookVersion: immutable snapshots on every update; restore to any previous version
- LessonLearned: post-mortem records linked to tests/campaigns/attack-paths or manual
- Alembic migration b037know (raw SQL, idempotent, no PostgreSQL enums)
- Router /api/v1/knowledge: 14 endpoints for playbooks + lessons + stats
- Pydantic validators for playbook_type, severity, entity_type (422 on invalid)
- Knowledge stats endpoint: totals + breakdown by severity and playbook type
- Soft-delete on both resources; include_inactive filter for admin recovery
- QA script: 70+ tests across CRUD, versioning, filtering, auth, soft-delete, regression

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 13:39:05 +02:00

200 lines
6.8 KiB
Python

"""Phase 11: Playbook service — CRUD + versioning."""
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.domain.errors import DomainError
from app.models.knowledge import Playbook, PlaybookVersion
from app.models.technique import Technique
def _get_or_404(db: Session, playbook_id: UUID) -> Playbook:
pb = db.query(Playbook).filter(
Playbook.id == playbook_id,
Playbook.is_active == True,
).first()
if not pb:
raise DomainError(f"Playbook {playbook_id} not found", status_code=404)
return pb
def _snapshot(db: Session, pb: Playbook, changed_by: Optional[UUID], change_note: Optional[str]):
"""Save a version snapshot of the current playbook state."""
snap = PlaybookVersion(
playbook_id=pb.id,
version=pb.version,
title=pb.title,
content=pb.content,
tools=list(pb.tools or []),
prerequisites=list(pb.prerequisites or []),
changed_by=changed_by,
change_note=change_note,
created_at=datetime.utcnow(),
)
db.add(snap)
# ── Read ──────────────────────────────────────────────────────────────────────
def get_playbook(db: Session, playbook_id: UUID) -> Playbook:
return _get_or_404(db, playbook_id)
def get_playbook_by_technique_type(
db: Session, technique_id: UUID, playbook_type: str
) -> Optional[Playbook]:
return db.query(Playbook).filter(
Playbook.technique_id == technique_id,
Playbook.playbook_type == playbook_type,
Playbook.is_active == True,
).first()
def list_playbooks(
db: Session,
technique_id: Optional[UUID] = None,
playbook_type: Optional[str] = None,
include_inactive: bool = False,
) -> List[Playbook]:
q = db.query(Playbook)
if not include_inactive:
q = q.filter(Playbook.is_active == True)
if technique_id:
q = q.filter(Playbook.technique_id == technique_id)
if playbook_type:
q = q.filter(Playbook.playbook_type == playbook_type)
return q.order_by(Playbook.technique_id, Playbook.playbook_type).all()
def get_playbook_versions(db: Session, playbook_id: UUID) -> List[PlaybookVersion]:
_get_or_404(db, playbook_id) # 404 guard
return (
db.query(PlaybookVersion)
.filter(PlaybookVersion.playbook_id == playbook_id)
.order_by(PlaybookVersion.version.desc())
.all()
)
# ── Create / Update (upsert) ──────────────────────────────────────────────────
def create_playbook(db: Session, data: dict, user_id: UUID) -> Playbook:
technique_id = data["technique_id"]
playbook_type = data["playbook_type"]
# Validate technique exists
tech = db.query(Technique).filter(Technique.id == technique_id).first()
if not tech:
raise DomainError(f"Technique {technique_id} not found", status_code=404)
# Check for existing (even soft-deleted) to reactivate
existing = db.query(Playbook).filter(
Playbook.technique_id == technique_id,
Playbook.playbook_type == playbook_type,
).first()
if existing:
if existing.is_active:
raise DomainError(
f"Playbook ({playbook_type}) for technique {technique_id} already exists. "
"Use PATCH to update it.",
status_code=409,
)
# Reactivate soft-deleted playbook
existing.is_active = True
existing.title = data.get("title", existing.title)
existing.content = data.get("content", existing.content)
existing.tools = data.get("tools", existing.tools) or []
existing.prerequisites = data.get("prerequisites", existing.prerequisites) or []
existing.updated_by = user_id
existing.updated_at = datetime.utcnow()
existing.version += 1
change_note = data.get("change_note")
_snapshot(db, existing, user_id, change_note or "Reactivated")
db.commit()
db.refresh(existing)
return existing
pb = Playbook(
technique_id = technique_id,
playbook_type = playbook_type,
title = data.get("title", f"{playbook_type.capitalize()} playbook"),
content = data.get("content", ""),
version = 1,
tools = data.get("tools") or [],
prerequisites = data.get("prerequisites") or [],
created_by = user_id,
updated_by = user_id,
)
db.add(pb)
db.flush() # get id
_snapshot(db, pb, user_id, data.get("change_note") or "Initial version")
db.commit()
db.refresh(pb)
return pb
def update_playbook(db: Session, playbook_id: UUID, data: dict, user_id: UUID) -> Playbook:
pb = _get_or_404(db, playbook_id)
# Save snapshot before mutating
_snapshot(db, pb, user_id, data.get("change_note"))
if "title" in data and data["title"] is not None:
pb.title = data["title"]
if "content" in data and data["content"] is not None:
pb.content = data["content"]
if "tools" in data and data["tools"] is not None:
pb.tools = data["tools"]
if "prerequisites" in data and data["prerequisites"] is not None:
pb.prerequisites = data["prerequisites"]
pb.version += 1
pb.updated_by = user_id
pb.updated_at = datetime.utcnow()
db.commit()
db.refresh(pb)
return pb
def restore_version(db: Session, playbook_id: UUID, version_number: int, user_id: UUID) -> Playbook:
"""Roll back a playbook to a specific historical version."""
pb = _get_or_404(db, playbook_id)
snap = db.query(PlaybookVersion).filter(
PlaybookVersion.playbook_id == playbook_id,
PlaybookVersion.version == version_number,
).first()
if not snap:
raise DomainError(
f"Version {version_number} not found for playbook {playbook_id}", status_code=404
)
# Snapshot current state before restoring
_snapshot(db, pb, user_id, f"Auto-snapshot before restore to v{version_number}")
pb.title = snap.title
pb.content = snap.content
pb.tools = list(snap.tools or [])
pb.prerequisites = list(snap.prerequisites or [])
pb.version += 1
pb.updated_by = user_id
pb.updated_at = datetime.utcnow()
_snapshot(db, pb, user_id, f"Restored from v{version_number}")
db.commit()
db.refresh(pb)
return pb
def delete_playbook(db: Session, playbook_id: UUID, user_id: UUID) -> None:
pb = _get_or_404(db, playbook_id)
pb.is_active = False
pb.updated_by = user_id
pb.updated_at = datetime.utcnow()
db.commit()