diff --git a/backend/alembic/versions/b037_knowledge.py b/backend/alembic/versions/b037_knowledge.py new file mode 100644 index 0000000..bf6c89d --- /dev/null +++ b/backend/alembic/versions/b037_knowledge.py @@ -0,0 +1,106 @@ +"""Phase 11: Knowledge Management — Playbooks + Lessons Learned + +Revision ID: b037know +Revises: b036atk +Create Date: 2026-05-20 + +Uses raw SQL to bypass SQLAlchemy DDL hooks (no enum types — string columns +with Pydantic-layer validation instead, so no PostgreSQL enums needed). +""" + +from typing import Union +from alembic import op +import sqlalchemy as sa + +revision: str = "b037know" +down_revision: Union[str, None] = "b036atk" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + + # ── playbooks ────────────────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS playbooks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + technique_id UUID NOT NULL REFERENCES techniques(id) ON DELETE CASCADE, + playbook_type VARCHAR(32) NOT NULL, + title VARCHAR(255) NOT NULL, + content TEXT NOT NULL DEFAULT '', + version INTEGER NOT NULL DEFAULT 1, + tools JSONB, + prerequisites JSONB, + created_by UUID REFERENCES users(id) ON DELETE SET NULL, + updated_by UUID REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMP DEFAULT now(), + updated_at TIMESTAMP DEFAULT now(), + is_active BOOLEAN DEFAULT TRUE, + CONSTRAINT uq_playbook_technique_type UNIQUE (technique_id, playbook_type) + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_playbooks_technique_id ON playbooks (technique_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_playbooks_type ON playbooks (playbook_type)" + )) + + # ── playbook_versions ────────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS playbook_versions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + playbook_id UUID NOT NULL REFERENCES playbooks(id) ON DELETE CASCADE, + version INTEGER NOT NULL, + title VARCHAR(255) NOT NULL, + content TEXT NOT NULL DEFAULT '', + tools JSONB, + prerequisites JSONB, + changed_by UUID REFERENCES users(id) ON DELETE SET NULL, + change_note VARCHAR(500), + created_at TIMESTAMP DEFAULT now() + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_pb_versions_playbook_id ON playbook_versions (playbook_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_pb_versions_version ON playbook_versions (playbook_id, version)" + )) + + # ── lessons_learned ──────────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS lessons_learned ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + title VARCHAR(255) NOT NULL, + what_happened TEXT NOT NULL DEFAULT '', + root_cause TEXT NOT NULL DEFAULT '', + fix_applied TEXT, + severity VARCHAR(16) NOT NULL DEFAULT 'medium', + entity_type VARCHAR(32) NOT NULL DEFAULT 'manual', + entity_id UUID, + technique_ids JSONB, + tags JSONB, + created_by UUID REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMP DEFAULT now(), + updated_at TIMESTAMP DEFAULT now(), + is_active BOOLEAN DEFAULT TRUE + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ll_entity ON lessons_learned (entity_type, entity_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ll_severity ON lessons_learned (severity)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_ll_created_by ON lessons_learned (created_by)" + )) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text("DROP TABLE IF EXISTS lessons_learned")) + conn.execute(sa.text("DROP TABLE IF EXISTS playbook_versions")) + conn.execute(sa.text("DROP TABLE IF EXISTS playbooks")) diff --git a/backend/app/main.py b/backend/app/main.py index c2f6558..425c479 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -41,6 +41,7 @@ from app.routers import webhooks as webhooks_router from app.routers import detection_lifecycle as detection_lifecycle_router from app.routers import ownership as ownership_router from app.routers import attack_paths as attack_paths_router +from app.routers import knowledge as knowledge_router from app.domain.errors import DomainError from app.middleware.error_handler import domain_exception_handler from app.middleware.request_context import RequestContextMiddleware @@ -141,6 +142,7 @@ app.include_router(webhooks_router.router, prefix="/api/v1") app.include_router(detection_lifecycle_router.router, prefix="/api/v1") app.include_router(ownership_router.router, prefix="/api/v1") app.include_router(attack_paths_router.router, prefix="/api/v1") +app.include_router(knowledge_router.router, prefix="/api/v1") @app.get("/health", include_in_schema=False) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index d52aacb..f42a367 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -38,6 +38,7 @@ from app.models.attack_path import ( AttackPathStepResult, TimelineEntry, ExecutionStatus, StepResultStatus, TimelineActorSide, TimelineEntryType, ) +from app.models.knowledge import Playbook, PlaybookVersion, LessonLearned __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", @@ -59,4 +60,5 @@ __all__ = [ "AttackPath", "AttackPathStep", "AttackPathExecution", "AttackPathStepResult", "TimelineEntry", "ExecutionStatus", "StepResultStatus", "TimelineActorSide", "TimelineEntryType", + "Playbook", "PlaybookVersion", "LessonLearned", ] diff --git a/backend/app/models/knowledge.py b/backend/app/models/knowledge.py new file mode 100644 index 0000000..f9b5dc0 --- /dev/null +++ b/backend/app/models/knowledge.py @@ -0,0 +1,129 @@ +"""Phase 11: Knowledge Management models — Playbooks + Lessons Learned.""" + +import uuid +from datetime import datetime + +from sqlalchemy import ( + Boolean, Column, DateTime, ForeignKey, + Index, Integer, String, Text, UniqueConstraint, +) +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from app.database import Base + + +# ── Playbooks ────────────────────────────────────────────────────────────────── + +class Playbook(Base): + """ + Structured runbook for a specific technique and playbook type. + + playbook_type: attack | detect | investigate | respond | hunt + One playbook per (technique, type). Edits increment ``version`` + and save a snapshot to ``PlaybookVersion``. + """ + __tablename__ = "playbooks" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + technique_id = Column( + UUID(as_uuid=True), ForeignKey("techniques.id", ondelete="CASCADE"), nullable=False + ) + playbook_type = Column(String(32), nullable=False) # attack/detect/investigate/respond/hunt + title = Column(String(255), nullable=False) + content = Column(Text, nullable=False, default="") + version = Column(Integer, default=1, nullable=False) + tools = Column(JSONB, default=list) # list of tool name strings + prerequisites = Column(JSONB, default=list) # list of prerequisite strings + created_by = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + updated_by = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + is_active = Column(Boolean, default=True) + + # Relationships + technique = relationship("Technique", foreign_keys=[technique_id]) + creator = relationship("User", foreign_keys=[created_by]) + updater = relationship("User", foreign_keys=[updated_by]) + versions = relationship( + "PlaybookVersion", back_populates="playbook", + cascade="all, delete-orphan", + order_by="PlaybookVersion.version.desc()", + ) + + __table_args__ = ( + UniqueConstraint("technique_id", "playbook_type", name="uq_playbook_technique_type"), + Index("ix_playbooks_technique_id", "technique_id"), + Index("ix_playbooks_type", "playbook_type"), + ) + + +class PlaybookVersion(Base): + """Immutable snapshot of a playbook at a given version number.""" + + __tablename__ = "playbook_versions" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + playbook_id = Column( + UUID(as_uuid=True), ForeignKey("playbooks.id", ondelete="CASCADE"), nullable=False + ) + version = Column(Integer, nullable=False) + title = Column(String(255), nullable=False) + content = Column(Text, nullable=False, default="") + tools = Column(JSONB, default=list) + prerequisites = Column(JSONB, default=list) + changed_by = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + change_note = Column(String(500), nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + + playbook = relationship("Playbook", back_populates="versions") + changer = relationship("User", foreign_keys=[changed_by]) + + __table_args__ = ( + Index("ix_pb_versions_playbook_id", "playbook_id"), + Index("ix_pb_versions_version", "playbook_id", "version"), + ) + + +# ── Lessons Learned ──────────────────────────────────────────────────────────── + +class LessonLearned(Base): + """ + Immutable post-mortem record linked to a test, campaign, attack-path or + created manually. + + severity: critical | high | medium | low | info + entity_type: test | campaign | attack_path | manual + """ + __tablename__ = "lessons_learned" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + title = Column(String(255), nullable=False) + what_happened = Column(Text, nullable=False, default="") + root_cause = Column(Text, nullable=False, default="") + fix_applied = Column(Text, nullable=True) + severity = Column(String(16), nullable=False, default="medium") + entity_type = Column(String(32), nullable=False, default="manual") + entity_id = Column(UUID(as_uuid=True), nullable=True) + technique_ids = Column(JSONB, default=list) # list of UUID strings + tags = Column(JSONB, default=list) + created_by = Column( + UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True + ) + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + is_active = Column(Boolean, default=True) # soft-delete (admin only) + + creator = relationship("User", foreign_keys=[created_by]) + + __table_args__ = ( + Index("ix_ll_entity", "entity_type", "entity_id"), + Index("ix_ll_severity", "severity"), + Index("ix_ll_created_by", "created_by"), + ) diff --git a/backend/app/routers/knowledge.py b/backend/app/routers/knowledge.py new file mode 100644 index 0000000..560eed4 --- /dev/null +++ b/backend/app/routers/knowledge.py @@ -0,0 +1,209 @@ +"""Phase 11: Knowledge Management router — Playbooks + Lessons Learned.""" + +from typing import List, Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.schemas.knowledge_schema import ( + PlaybookCreate, PlaybookUpdate, PlaybookOut, PlaybookVersionOut, + LessonLearnedCreate, LessonLearnedUpdate, LessonLearnedOut, +) +from app.services import playbook_service as pb_svc +from app.services import lesson_learned_service as ll_svc + +router = APIRouter(prefix="/knowledge", tags=["knowledge"]) + + +# ══════════════════════════════════════════════════════════════════════════════ +# Playbooks +# ══════════════════════════════════════════════════════════════════════════════ + +@router.get("/playbooks", response_model=List[PlaybookOut]) +def list_playbooks( + technique_id: Optional[UUID] = None, + playbook_type: Optional[str] = None, + include_inactive: bool = False, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return pb_svc.list_playbooks( + db, + technique_id=technique_id, + playbook_type=playbook_type, + include_inactive=include_inactive, + ) + + +@router.post("/playbooks", response_model=PlaybookOut, status_code=201) +def create_playbook( + body: PlaybookCreate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return pb_svc.create_playbook(db, body.model_dump(), user.id) + + +@router.get("/playbooks/{playbook_id}", response_model=PlaybookOut) +def get_playbook( + playbook_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return pb_svc.get_playbook(db, playbook_id) + + +@router.patch("/playbooks/{playbook_id}", response_model=PlaybookOut) +def update_playbook( + playbook_id: UUID, + body: PlaybookUpdate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return pb_svc.update_playbook(db, playbook_id, body.model_dump(exclude_unset=True), user.id) + + +@router.delete("/playbooks/{playbook_id}", status_code=204) +def delete_playbook( + playbook_id: UUID, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + pb_svc.delete_playbook(db, playbook_id, user.id) + + +# ── Versions ────────────────────────────────────────────────────────────────── + +@router.get("/playbooks/{playbook_id}/versions", response_model=List[PlaybookVersionOut]) +def list_versions( + playbook_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return pb_svc.get_playbook_versions(db, playbook_id) + + +@router.post("/playbooks/{playbook_id}/restore/{version}", response_model=PlaybookOut) +def restore_version( + playbook_id: UUID, + version: int, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Roll the playbook back to a specific historical version.""" + return pb_svc.restore_version(db, playbook_id, version, user.id) + + +# ── By technique (convenience) ──────────────────────────────────────────────── + +@router.get( + "/techniques/{technique_id}/playbooks", + response_model=List[PlaybookOut], +) +def playbooks_for_technique( + technique_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """List all active playbooks for a specific technique.""" + return pb_svc.list_playbooks(db, technique_id=technique_id) + + +@router.get( + "/techniques/{technique_id}/playbooks/{playbook_type}", + response_model=PlaybookOut, +) +def get_playbook_by_technique_type( + technique_id: UUID, + playbook_type: str, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + pb = pb_svc.get_playbook_by_technique_type(db, technique_id, playbook_type) + if not pb: + from app.domain.errors import DomainError + raise DomainError( + f"No '{playbook_type}' playbook for technique {technique_id}", + status_code=404, + ) + return pb + + +# ══════════════════════════════════════════════════════════════════════════════ +# Lessons Learned +# ══════════════════════════════════════════════════════════════════════════════ + +@router.get("/lessons", response_model=List[LessonLearnedOut]) +def list_lessons( + entity_type: Optional[str] = None, + entity_id: Optional[UUID] = None, + severity: Optional[str] = None, + tag: Optional[str] = None, + technique_id: Optional[str] = None, + include_inactive: bool = False, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return ll_svc.list_lessons_learned( + db, + entity_type=entity_type, + entity_id=entity_id, + severity=severity, + tag=tag, + technique_id=technique_id, + include_inactive=include_inactive, + ) + + +@router.post("/lessons", response_model=LessonLearnedOut, status_code=201) +def create_lesson( + body: LessonLearnedCreate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return ll_svc.create_lesson_learned(db, body.model_dump(), user.id) + + +@router.get("/lessons/{lesson_id}", response_model=LessonLearnedOut) +def get_lesson( + lesson_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return ll_svc.get_lesson_learned(db, lesson_id) + + +@router.patch("/lessons/{lesson_id}", response_model=LessonLearnedOut) +def update_lesson( + lesson_id: UUID, + body: LessonLearnedUpdate, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + return ll_svc.update_lesson_learned( + db, lesson_id, body.model_dump(exclude_unset=True), user.id + ) + + +@router.delete("/lessons/{lesson_id}", status_code=204) +def delete_lesson( + lesson_id: UUID, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + """Soft-delete a lesson (admin / lead only).""" + ll_svc.delete_lesson_learned(db, lesson_id, user.id) + + +# ── Stats ───────────────────────────────────────────────────────────────────── + +@router.get("/stats") +def knowledge_stats( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Summary counts: total playbooks, lessons by severity, playbooks by type.""" + return ll_svc.get_knowledge_stats(db) diff --git a/backend/app/schemas/knowledge_schema.py b/backend/app/schemas/knowledge_schema.py new file mode 100644 index 0000000..7a9b96e --- /dev/null +++ b/backend/app/schemas/knowledge_schema.py @@ -0,0 +1,149 @@ +"""Phase 11: Knowledge Management schemas — Playbooks + Lessons Learned.""" + +from datetime import datetime +from typing import List, Optional +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, field_validator + +# ── Constants ───────────────────────────────────────────────────────────────── + +VALID_PLAYBOOK_TYPES = ["attack", "detect", "investigate", "respond", "hunt"] +VALID_SEVERITIES = ["critical", "high", "medium", "low", "info"] +VALID_ENTITY_TYPES = ["test", "campaign", "attack_path", "manual"] + + +# ══════════════════════════════════════════════════════════════════════════════ +# Playbook schemas +# ══════════════════════════════════════════════════════════════════════════════ + +class PlaybookCreate(BaseModel): + technique_id: UUID + playbook_type: str + title: str + content: str = "" + tools: List[str] = [] + prerequisites: List[str] = [] + change_note: Optional[str] = None + + @field_validator("playbook_type") + @classmethod + def validate_playbook_type(cls, v: str) -> str: + if v not in VALID_PLAYBOOK_TYPES: + raise ValueError( + f"Invalid playbook_type '{v}'. Must be one of: {VALID_PLAYBOOK_TYPES}" + ) + return v + + +class PlaybookUpdate(BaseModel): + title: Optional[str] = None + content: Optional[str] = None + tools: Optional[List[str]] = None + prerequisites: Optional[List[str]] = None + change_note: Optional[str] = None + + +class PlaybookVersionOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + playbook_id: UUID + version: int + title: str + content: str + tools: List[str] = [] + prerequisites: List[str] = [] + changed_by: Optional[UUID] + change_note: Optional[str] + created_at: datetime + + +class PlaybookOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + technique_id: UUID + playbook_type: str + title: str + content: str + version: int + tools: List[str] = [] + prerequisites: List[str] = [] + created_by: Optional[UUID] + updated_by: Optional[UUID] + created_at: datetime + updated_at: datetime + is_active: bool + + +# ══════════════════════════════════════════════════════════════════════════════ +# Lesson Learned schemas +# ══════════════════════════════════════════════════════════════════════════════ + +class LessonLearnedCreate(BaseModel): + title: str + what_happened: str + root_cause: str + fix_applied: Optional[str] = None + severity: str = "medium" + entity_type: str = "manual" + entity_id: Optional[UUID] = None + technique_ids: List[str] = [] + tags: List[str] = [] + + @field_validator("severity") + @classmethod + def validate_severity(cls, v: str) -> str: + if v not in VALID_SEVERITIES: + raise ValueError( + f"Invalid severity '{v}'. Must be one of: {VALID_SEVERITIES}" + ) + return v + + @field_validator("entity_type") + @classmethod + def validate_entity_type(cls, v: str) -> str: + if v not in VALID_ENTITY_TYPES: + raise ValueError( + f"Invalid entity_type '{v}'. Must be one of: {VALID_ENTITY_TYPES}" + ) + return v + + +class LessonLearnedUpdate(BaseModel): + title: Optional[str] = None + what_happened: Optional[str] = None + root_cause: Optional[str] = None + fix_applied: Optional[str] = None + severity: Optional[str] = None + technique_ids: Optional[List[str]] = None + tags: Optional[List[str]] = None + + @field_validator("severity") + @classmethod + def validate_severity(cls, v: Optional[str]) -> Optional[str]: + if v is not None and v not in VALID_SEVERITIES: + raise ValueError( + f"Invalid severity '{v}'. Must be one of: {VALID_SEVERITIES}" + ) + return v + + +class LessonLearnedOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + title: str + what_happened: str + root_cause: str + fix_applied: Optional[str] + severity: str + entity_type: str + entity_id: Optional[UUID] + technique_ids: List[str] = [] + tags: List[str] = [] + created_by: Optional[UUID] + created_at: datetime + updated_at: datetime + is_active: bool diff --git a/backend/app/services/lesson_learned_service.py b/backend/app/services/lesson_learned_service.py new file mode 100644 index 0000000..236bef5 --- /dev/null +++ b/backend/app/services/lesson_learned_service.py @@ -0,0 +1,133 @@ +"""Phase 11: Lesson Learned service.""" + +from datetime import datetime +from typing import List, Optional +from uuid import UUID + +from sqlalchemy.orm import Session + +from app.domain.errors import DomainError +from app.models.knowledge import LessonLearned + + +def _get_or_404(db: Session, ll_id: UUID) -> LessonLearned: + ll = db.query(LessonLearned).filter( + LessonLearned.id == ll_id, + LessonLearned.is_active == True, + ).first() + if not ll: + raise DomainError(f"Lesson Learned {ll_id} not found", status_code=404) + return ll + + +# ── Read ────────────────────────────────────────────────────────────────────── + +def get_lesson_learned(db: Session, ll_id: UUID) -> LessonLearned: + return _get_or_404(db, ll_id) + + +def list_lessons_learned( + db: Session, + entity_type: Optional[str] = None, + entity_id: Optional[UUID] = None, + severity: Optional[str] = None, + tag: Optional[str] = None, + technique_id: Optional[str] = None, + include_inactive: bool = False, +) -> List[LessonLearned]: + q = db.query(LessonLearned) + if not include_inactive: + q = q.filter(LessonLearned.is_active == True) + if entity_type: + q = q.filter(LessonLearned.entity_type == entity_type) + if entity_id: + q = q.filter(LessonLearned.entity_id == entity_id) + if severity: + q = q.filter(LessonLearned.severity == severity) + if tag: + # JSONB contains operator — filter lessons that have this tag + q = q.filter(LessonLearned.tags.contains([tag])) + if technique_id: + q = q.filter(LessonLearned.technique_ids.contains([technique_id])) + return q.order_by(LessonLearned.created_at.desc()).all() + + +# ── Create ──────────────────────────────────────────────────────────────────── + +def create_lesson_learned(db: Session, data: dict, user_id: UUID) -> LessonLearned: + ll = LessonLearned( + title = data["title"], + what_happened = data.get("what_happened", ""), + root_cause = data.get("root_cause", ""), + fix_applied = data.get("fix_applied"), + severity = data.get("severity", "medium"), + entity_type = data.get("entity_type", "manual"), + entity_id = data.get("entity_id"), + technique_ids = data.get("technique_ids") or [], + tags = data.get("tags") or [], + created_by = user_id, + ) + db.add(ll) + db.commit() + db.refresh(ll) + return ll + + +# ── Update ──────────────────────────────────────────────────────────────────── + +def update_lesson_learned(db: Session, ll_id: UUID, data: dict, user_id: UUID) -> LessonLearned: + ll = _get_or_404(db, ll_id) + + for field in ("title", "what_happened", "root_cause", "fix_applied", + "severity", "technique_ids", "tags"): + if field in data and data[field] is not None: + setattr(ll, field, data[field]) + + ll.updated_at = datetime.utcnow() + db.commit() + db.refresh(ll) + return ll + + +# ── Delete (soft) ───────────────────────────────────────────────────────────── + +def delete_lesson_learned(db: Session, ll_id: UUID, user_id: UUID) -> None: + """Soft-delete — admin-only enforcement is done at the router level.""" + ll = _get_or_404(db, ll_id) + ll.is_active = False + ll.updated_at = datetime.utcnow() + db.commit() + + +# ── Stats ───────────────────────────────────────────────────────────────────── + +def get_knowledge_stats(db: Session) -> dict: + """Summary counts for the knowledge base dashboard.""" + from app.models.knowledge import Playbook + + total_playbooks = db.query(Playbook).filter(Playbook.is_active == True).count() + total_lessons = db.query(LessonLearned).filter(LessonLearned.is_active == True).count() + + severity_counts: dict = {} + for sev in ("critical", "high", "medium", "low", "info"): + severity_counts[sev] = ( + db.query(LessonLearned) + .filter(LessonLearned.is_active == True, LessonLearned.severity == sev) + .count() + ) + + playbook_type_counts: dict = {} + from app.schemas.knowledge_schema import VALID_PLAYBOOK_TYPES + for ptype in VALID_PLAYBOOK_TYPES: + playbook_type_counts[ptype] = ( + db.query(Playbook) + .filter(Playbook.is_active == True, Playbook.playbook_type == ptype) + .count() + ) + + return { + "total_playbooks": total_playbooks, + "total_lessons": total_lessons, + "lessons_by_severity": severity_counts, + "playbooks_by_type": playbook_type_counts, + } diff --git a/backend/app/services/playbook_service.py b/backend/app/services/playbook_service.py new file mode 100644 index 0000000..3f72600 --- /dev/null +++ b/backend/app/services/playbook_service.py @@ -0,0 +1,199 @@ +"""Phase 11: Playbook service — CRUD + versioning.""" + +from datetime import datetime +from typing import List, Optional +from uuid import UUID + +from sqlalchemy.orm import Session + +from app.domain.errors import DomainError +from app.models.knowledge import Playbook, PlaybookVersion +from app.models.technique import Technique + + +def _get_or_404(db: Session, playbook_id: UUID) -> Playbook: + pb = db.query(Playbook).filter( + Playbook.id == playbook_id, + Playbook.is_active == True, + ).first() + if not pb: + raise DomainError(f"Playbook {playbook_id} not found", status_code=404) + return pb + + +def _snapshot(db: Session, pb: Playbook, changed_by: Optional[UUID], change_note: Optional[str]): + """Save a version snapshot of the current playbook state.""" + snap = PlaybookVersion( + playbook_id=pb.id, + version=pb.version, + title=pb.title, + content=pb.content, + tools=list(pb.tools or []), + prerequisites=list(pb.prerequisites or []), + changed_by=changed_by, + change_note=change_note, + created_at=datetime.utcnow(), + ) + db.add(snap) + + +# ── Read ────────────────────────────────────────────────────────────────────── + +def get_playbook(db: Session, playbook_id: UUID) -> Playbook: + return _get_or_404(db, playbook_id) + + +def get_playbook_by_technique_type( + db: Session, technique_id: UUID, playbook_type: str +) -> Optional[Playbook]: + return db.query(Playbook).filter( + Playbook.technique_id == technique_id, + Playbook.playbook_type == playbook_type, + Playbook.is_active == True, + ).first() + + +def list_playbooks( + db: Session, + technique_id: Optional[UUID] = None, + playbook_type: Optional[str] = None, + include_inactive: bool = False, +) -> List[Playbook]: + q = db.query(Playbook) + if not include_inactive: + q = q.filter(Playbook.is_active == True) + if technique_id: + q = q.filter(Playbook.technique_id == technique_id) + if playbook_type: + q = q.filter(Playbook.playbook_type == playbook_type) + return q.order_by(Playbook.technique_id, Playbook.playbook_type).all() + + +def get_playbook_versions(db: Session, playbook_id: UUID) -> List[PlaybookVersion]: + _get_or_404(db, playbook_id) # 404 guard + return ( + db.query(PlaybookVersion) + .filter(PlaybookVersion.playbook_id == playbook_id) + .order_by(PlaybookVersion.version.desc()) + .all() + ) + + +# ── Create / Update (upsert) ────────────────────────────────────────────────── + +def create_playbook(db: Session, data: dict, user_id: UUID) -> Playbook: + technique_id = data["technique_id"] + playbook_type = data["playbook_type"] + + # Validate technique exists + tech = db.query(Technique).filter(Technique.id == technique_id).first() + if not tech: + raise DomainError(f"Technique {technique_id} not found", status_code=404) + + # Check for existing (even soft-deleted) to reactivate + existing = db.query(Playbook).filter( + Playbook.technique_id == technique_id, + Playbook.playbook_type == playbook_type, + ).first() + + if existing: + if existing.is_active: + raise DomainError( + f"Playbook ({playbook_type}) for technique {technique_id} already exists. " + "Use PATCH to update it.", + status_code=409, + ) + # Reactivate soft-deleted playbook + existing.is_active = True + existing.title = data.get("title", existing.title) + existing.content = data.get("content", existing.content) + existing.tools = data.get("tools", existing.tools) or [] + existing.prerequisites = data.get("prerequisites", existing.prerequisites) or [] + existing.updated_by = user_id + existing.updated_at = datetime.utcnow() + existing.version += 1 + change_note = data.get("change_note") + _snapshot(db, existing, user_id, change_note or "Reactivated") + db.commit() + db.refresh(existing) + return existing + + pb = Playbook( + technique_id = technique_id, + playbook_type = playbook_type, + title = data.get("title", f"{playbook_type.capitalize()} playbook"), + content = data.get("content", ""), + version = 1, + tools = data.get("tools") or [], + prerequisites = data.get("prerequisites") or [], + created_by = user_id, + updated_by = user_id, + ) + db.add(pb) + db.flush() # get id + _snapshot(db, pb, user_id, data.get("change_note") or "Initial version") + db.commit() + db.refresh(pb) + return pb + + +def update_playbook(db: Session, playbook_id: UUID, data: dict, user_id: UUID) -> Playbook: + pb = _get_or_404(db, playbook_id) + + # Save snapshot before mutating + _snapshot(db, pb, user_id, data.get("change_note")) + + if "title" in data and data["title"] is not None: + pb.title = data["title"] + if "content" in data and data["content"] is not None: + pb.content = data["content"] + if "tools" in data and data["tools"] is not None: + pb.tools = data["tools"] + if "prerequisites" in data and data["prerequisites"] is not None: + pb.prerequisites = data["prerequisites"] + + pb.version += 1 + pb.updated_by = user_id + pb.updated_at = datetime.utcnow() + + db.commit() + db.refresh(pb) + return pb + + +def restore_version(db: Session, playbook_id: UUID, version_number: int, user_id: UUID) -> Playbook: + """Roll back a playbook to a specific historical version.""" + pb = _get_or_404(db, playbook_id) + + snap = db.query(PlaybookVersion).filter( + PlaybookVersion.playbook_id == playbook_id, + PlaybookVersion.version == version_number, + ).first() + if not snap: + raise DomainError( + f"Version {version_number} not found for playbook {playbook_id}", status_code=404 + ) + + # Snapshot current state before restoring + _snapshot(db, pb, user_id, f"Auto-snapshot before restore to v{version_number}") + + pb.title = snap.title + pb.content = snap.content + pb.tools = list(snap.tools or []) + pb.prerequisites = list(snap.prerequisites or []) + pb.version += 1 + pb.updated_by = user_id + pb.updated_at = datetime.utcnow() + + _snapshot(db, pb, user_id, f"Restored from v{version_number}") + db.commit() + db.refresh(pb) + return pb + + +def delete_playbook(db: Session, playbook_id: UUID, user_id: UUID) -> None: + pb = _get_or_404(db, playbook_id) + pb.is_active = False + pb.updated_by = user_id + pb.updated_at = datetime.utcnow() + db.commit() diff --git a/scripts/qa_phase11.py b/scripts/qa_phase11.py new file mode 100644 index 0000000..cb45ab6 --- /dev/null +++ b/scripts/qa_phase11.py @@ -0,0 +1,400 @@ +""" +QA script for Phase 11 — Knowledge Management (Playbooks + Lessons Learned). +Run with: python -X utf8 scripts/qa_phase11.py +""" + +import sys, os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests + +BASE = "http://localhost:8000/api/v1" +PASS = "\033[92m✓\033[0m" +FAIL = "\033[91m✗\033[0m" + +passed = 0 +failed = 0 + + +def check(label: str, condition: bool, detail: str = ""): + global passed, failed + if condition: + passed += 1 + print(f" {PASS} {label}") + else: + failed += 1 + print(f" {FAIL} {label}" + (f" — {detail}" for _ in [1] if detail).__next__() + if detail else f" {FAIL} {label}") + + +def get_token(username="admin", password="admin123"): + r = requests.post(f"{BASE}/auth/login", + json={"username": username, "password": password}) + if r.status_code == 200: + return r.json().get("access_token") or r.json().get("token") + # try form + r = requests.post(f"{BASE}/auth/token", + data={"username": username, "password": password}) + if r.status_code == 200: + return r.json().get("access_token") + raise RuntimeError(f"Login failed: {r.status_code} {r.text}") + + +def auth(token): + return {"Authorization": f"Bearer {token}"} + + +def get_first_technique(headers): + r = requests.get(f"{BASE}/techniques", headers=headers, params={"limit": 1}) + items = r.json() + if isinstance(items, dict): + items = items.get("items", []) + if not items: + raise RuntimeError("No techniques found — seed the DB first") + return items[0]["id"] + + +# ───────────────────────────────────────────────────────────────────────────── + +def main(): + print("\n====== Phase 11 QA — Knowledge Management ======\n") + + token = get_token() + h = auth(token) + tid = get_first_technique(h) + print(f" Using technique_id: {tid}\n") + + # ── Block 1: Playbook CRUD ──────────────────────────────────────────────── + print("── Block 1: Playbook CRUD ──") + + # Create detect playbook + r = requests.post(f"{BASE}/knowledge/playbooks", headers=h, json={ + "technique_id": tid, + "playbook_type": "detect", + "title": "Detection Playbook v1", + "content": "## Detection\nLook for suspicious PowerShell events.", + "tools": ["Splunk", "Sigma"], + "prerequisites": ["Log ingestion enabled"], + "change_note": "Initial version", + }) + check("POST /knowledge/playbooks → 201", r.status_code == 201, r.text[:120]) + pb1 = r.json() if r.status_code == 201 else {} + check("Playbook has id", bool(pb1.get("id"))) + check("Playbook version = 1", pb1.get("version") == 1) + check("Playbook tools list", pb1.get("tools") == ["Splunk", "Sigma"]) + + # Create attack playbook + r = requests.post(f"{BASE}/knowledge/playbooks", headers=h, json={ + "technique_id": tid, + "playbook_type": "attack", + "title": "Attack Playbook", + "content": "## Attack\nUse PowerShell to run encoded commands.", + "tools": ["Cobalt Strike"], + "prerequisites": ["Domain user access"], + }) + check("POST second playbook (attack type) → 201", r.status_code == 201, r.text[:120]) + pb2 = r.json() if r.status_code == 201 else {} + + # Duplicate → 409 + r = requests.post(f"{BASE}/knowledge/playbooks", headers=h, json={ + "technique_id": tid, + "playbook_type": "detect", + "title": "Duplicate", + "content": "dup", + }) + check("POST duplicate (same technique+type) → 409", r.status_code == 409, r.text[:120]) + + # Invalid playbook_type → 422 + r = requests.post(f"{BASE}/knowledge/playbooks", headers=h, json={ + "technique_id": tid, + "playbook_type": "badtype", + "title": "Bad", + "content": "x", + }) + check("POST invalid playbook_type → 422", r.status_code == 422, r.text[:120]) + + # GET list + r = requests.get(f"{BASE}/knowledge/playbooks", headers=h) + check("GET /knowledge/playbooks → 200", r.status_code == 200) + items = r.json() + check("List has ≥2 playbooks", len(items) >= 2) + + # GET filtered by technique + r = requests.get(f"{BASE}/knowledge/playbooks", headers=h, + params={"technique_id": tid}) + check("GET ?technique_id filter → 200", r.status_code == 200) + check("Filter returns only this technique's playbooks", + all(p["technique_id"] == tid for p in r.json())) + + # GET filtered by type + r = requests.get(f"{BASE}/knowledge/playbooks", headers=h, + params={"playbook_type": "detect"}) + check("GET ?playbook_type=detect filter → 200", r.status_code == 200) + check("Filter returns only detect playbooks", + all(p["playbook_type"] == "detect" for p in r.json())) + + # GET single + pb1_id = pb1.get("id") + r = requests.get(f"{BASE}/knowledge/playbooks/{pb1_id}", headers=h) + check("GET /knowledge/playbooks/{id} → 200", r.status_code == 200) + check("Correct playbook returned", r.json().get("id") == pb1_id) + + # GET 404 + r = requests.get(f"{BASE}/knowledge/playbooks/00000000-0000-0000-0000-000000000001", + headers=h) + check("GET non-existent playbook → 404", r.status_code == 404) + + print() + + # ── Block 2: Playbook Update + Versioning ───────────────────────────────── + print("── Block 2: Playbook Update + Versioning ──") + + r = requests.patch(f"{BASE}/knowledge/playbooks/{pb1_id}", headers=h, json={ + "content": "## Detection v2\nAlso check Sysmon event 4688.", + "tools": ["Splunk", "Sigma", "Sysmon"], + "change_note": "Added Sysmon detection rule", + }) + check("PATCH /knowledge/playbooks/{id} → 200", r.status_code == 200) + pb1_v2 = r.json() + check("Version incremented to 2", pb1_v2.get("version") == 2) + check("Content updated", "Sysmon" in pb1_v2.get("content", "")) + check("Tools updated (3 items)", len(pb1_v2.get("tools", [])) == 3) + + # PATCH again → version 3 + r = requests.patch(f"{BASE}/knowledge/playbooks/{pb1_id}", headers=h, json={ + "title": "Detection Playbook v3", + "change_note": "Renamed", + }) + check("PATCH again → version 3", r.json().get("version") == 3) + + # List versions + r = requests.get(f"{BASE}/knowledge/playbooks/{pb1_id}/versions", headers=h) + check("GET /playbooks/{id}/versions → 200", r.status_code == 200) + versions = r.json() + check("3 snapshots saved", len(versions) == 3, f"got {len(versions)}") + check("Versions in desc order", versions[0]["version"] >= versions[-1]["version"]) + + # Restore version 1 + r = requests.post(f"{BASE}/knowledge/playbooks/{pb1_id}/restore/1", headers=h) + check("POST /restore/1 → 200", r.status_code == 200) + restored = r.json() + check("Version incremented after restore", restored.get("version") == 4) + check("Content restored to v1 content", "Sysmon" not in restored.get("content", "")) + + # Restore non-existent version → 404 + r = requests.post(f"{BASE}/knowledge/playbooks/{pb1_id}/restore/999", headers=h) + check("POST /restore/999 (non-existent) → 404", r.status_code == 404) + + print() + + # ── Block 3: Technique convenience endpoints ────────────────────────────── + print("── Block 3: Technique convenience endpoints ──") + + r = requests.get(f"{BASE}/knowledge/techniques/{tid}/playbooks", headers=h) + check("GET /knowledge/techniques/{id}/playbooks → 200", r.status_code == 200) + check("Returns playbooks for this technique", len(r.json()) >= 1) + + r = requests.get(f"{BASE}/knowledge/techniques/{tid}/playbooks/detect", headers=h) + check("GET /knowledge/techniques/{id}/playbooks/detect → 200", r.status_code == 200) + check("Returns detect playbook", r.json().get("playbook_type") == "detect") + + r = requests.get(f"{BASE}/knowledge/techniques/{tid}/playbooks/respond", headers=h) + check("GET /knowledge/techniques/{id}/playbooks/respond → 404", r.status_code == 404) + + print() + + # ── Block 4: Lessons Learned CRUD ──────────────────────────────────────── + print("── Block 4: Lessons Learned CRUD ──") + + r = requests.post(f"{BASE}/knowledge/lessons", headers=h, json={ + "title": "T1059 — Detection gap in PowerShell logging", + "what_happened": "Red team ran encoded PS commands without triggering any alert.", + "root_cause": "ScriptBlock logging was disabled on 80% of endpoints.", + "fix_applied": "Enabled ScriptBlock logging via GPO. Validated in 48h.", + "severity": "high", + "entity_type": "manual", + "technique_ids": [tid], + "tags": ["powershell", "logging", "gpo"], + }) + check("POST /knowledge/lessons → 201", r.status_code == 201, r.text[:120]) + ll1 = r.json() if r.status_code == 201 else {} + check("Lesson has id", bool(ll1.get("id"))) + check("Severity correct", ll1.get("severity") == "high") + check("Tags saved", ll1.get("tags") == ["powershell", "logging", "gpo"]) + check("technique_ids saved", tid in (ll1.get("technique_ids") or [])) + + # Second lesson — critical + r = requests.post(f"{BASE}/knowledge/lessons", headers=h, json={ + "title": "Lateral movement undetected", + "what_happened": "PsExec used for lateral movement — zero detections.", + "root_cause": "SMB traffic not monitored in SIEM.", + "severity": "critical", + "entity_type": "campaign", + "tags": ["smb", "lateral-movement"], + }) + check("POST second lesson (critical) → 201", r.status_code == 201) + ll2 = r.json() if r.status_code == 201 else {} + + # Invalid severity → 422 + r = requests.post(f"{BASE}/knowledge/lessons", headers=h, json={ + "title": "Bad severity", + "what_happened": "x", + "root_cause": "y", + "severity": "extreme", + }) + check("POST invalid severity → 422", r.status_code == 422) + + # Invalid entity_type → 422 + r = requests.post(f"{BASE}/knowledge/lessons", headers=h, json={ + "title": "Bad entity_type", + "what_happened": "x", + "root_cause": "y", + "entity_type": "incident", + }) + check("POST invalid entity_type → 422", r.status_code == 422) + + # GET list + r = requests.get(f"{BASE}/knowledge/lessons", headers=h) + check("GET /knowledge/lessons → 200", r.status_code == 200) + check("List has ≥2 lessons", len(r.json()) >= 2) + + # GET single + ll1_id = ll1.get("id") + r = requests.get(f"{BASE}/knowledge/lessons/{ll1_id}", headers=h) + check("GET /knowledge/lessons/{id} → 200", r.status_code == 200) + check("Correct lesson returned", r.json().get("id") == ll1_id) + + print() + + # ── Block 5: Lessons filtering ──────────────────────────────────────────── + print("── Block 5: Lessons Learned filtering ──") + + r = requests.get(f"{BASE}/knowledge/lessons", headers=h, + params={"severity": "critical"}) + check("Filter by severity=critical → 200", r.status_code == 200) + results = r.json() + check("Only critical lessons returned", all(l["severity"] == "critical" for l in results)) + + r = requests.get(f"{BASE}/knowledge/lessons", headers=h, + params={"entity_type": "manual"}) + check("Filter by entity_type=manual → 200", r.status_code == 200) + check("Only manual lessons returned", + all(l["entity_type"] == "manual" for l in r.json())) + + r = requests.get(f"{BASE}/knowledge/lessons", headers=h, + params={"tag": "powershell"}) + check("Filter by tag=powershell → 200", r.status_code == 200) + check("Lesson with powershell tag returned", len(r.json()) >= 1) + + r = requests.get(f"{BASE}/knowledge/lessons", headers=h, + params={"technique_id": tid}) + check("Filter by technique_id → 200", r.status_code == 200) + check("Lesson linked to technique returned", len(r.json()) >= 1) + + print() + + # ── Block 6: Lesson Update ──────────────────────────────────────────────── + print("── Block 6: Lesson Learned update ──") + + r = requests.patch(f"{BASE}/knowledge/lessons/{ll1_id}", headers=h, json={ + "fix_applied": "Updated fix: also enabled Module logging.", + "tags": ["powershell", "logging", "gpo", "module-logging"], + }) + check("PATCH /knowledge/lessons/{id} → 200", r.status_code == 200) + updated = r.json() + check("Fix updated", "Module logging" in (updated.get("fix_applied") or "")) + check("Tags extended", len(updated.get("tags", [])) == 4) + + # Invalid severity via PATCH → 422 + r = requests.patch(f"{BASE}/knowledge/lessons/{ll1_id}", headers=h, json={ + "severity": "extreme", + }) + check("PATCH invalid severity → 422", r.status_code == 422) + + print() + + # ── Block 7: Knowledge Stats ────────────────────────────────────────────── + print("── Block 7: Knowledge Stats ──") + + r = requests.get(f"{BASE}/knowledge/stats", headers=h) + check("GET /knowledge/stats → 200", r.status_code == 200) + stats = r.json() + check("total_playbooks ≥ 2", stats.get("total_playbooks", 0) >= 2) + check("total_lessons ≥ 2", stats.get("total_lessons", 0) >= 2) + check("lessons_by_severity has 'high'", "high" in stats.get("lessons_by_severity", {})) + check("playbooks_by_type has 'detect'", "detect" in stats.get("playbooks_by_type", {})) + check("Stats detect count ≥ 1", stats.get("playbooks_by_type", {}).get("detect", 0) >= 1) + + print() + + # ── Block 8: Soft-Delete ────────────────────────────────────────────────── + print("── Block 8: Soft-delete ──") + + ll2_id = ll2.get("id") + r = requests.delete(f"{BASE}/knowledge/lessons/{ll2_id}", headers=h) + check("DELETE /knowledge/lessons/{id} → 204", r.status_code == 204) + + r = requests.get(f"{BASE}/knowledge/lessons/{ll2_id}", headers=h) + check("GET deleted lesson → 404", r.status_code == 404) + + r = requests.get(f"{BASE}/knowledge/lessons", headers=h, + params={"include_inactive": True}) + check("GET ?include_inactive=true includes deleted lesson", r.status_code == 200) + all_ids = [l["id"] for l in r.json()] + check("Deleted lesson visible with include_inactive", ll2_id in all_ids) + + # Playbook soft-delete + pb2_id = pb2.get("id") + r = requests.delete(f"{BASE}/knowledge/playbooks/{pb2_id}", headers=h) + check("DELETE playbook → 204", r.status_code == 204) + + r = requests.get(f"{BASE}/knowledge/playbooks/{pb2_id}", headers=h) + check("GET deleted playbook → 404", r.status_code == 404) + + print() + + # ── Block 9: Auth protection ────────────────────────────────────────────── + print("── Block 9: Auth protection (no token) ──") + + endpoints = [ + ("GET", f"{BASE}/knowledge/playbooks"), + ("POST", f"{BASE}/knowledge/playbooks"), + ("GET", f"{BASE}/knowledge/lessons"), + ("POST", f"{BASE}/knowledge/lessons"), + ("GET", f"{BASE}/knowledge/stats"), + ] + for method, url in endpoints: + r = requests.request(method, url, + json={"title": "x", "what_happened": "x", "root_cause": "x", + "technique_id": tid, "playbook_type": "hunt", + "content": "x"}) + check(f"{method} {url.split('/api/v1')[1]} without auth → 401", r.status_code == 401) + + print() + + # ── Block 10: Regression ───────────────────────────────────────────────── + print("── Block 10: Regression (Phase 10 still works) ──") + + r = requests.get(f"{BASE}/attack-paths", headers=h) + check("GET /attack-paths still works", r.status_code == 200) + + r = requests.get(f"{BASE}/ownership/analyst-dashboard", headers=h) + check("GET /ownership/analyst-dashboard still works", r.status_code == 200) + + r = requests.get(f"{BASE}/detection-lifecycle/assets", headers=h) + check("GET /detection-lifecycle/assets still works", r.status_code == 200) + + print() + + # ── Summary ─────────────────────────────────────────────────────────────── + total = passed + failed + print(f"====== Results: {passed}/{total} passed", end="") + if failed: + print(f" — \033[91m{failed} FAILED\033[0m ======\n") + sys.exit(1) + else: + print(" ✓ ALL PASSED ======\n") + + +if __name__ == "__main__": + main()