Files
Aegis/backend/app/services/lesson_learned_service.py
kitos 4fba4152d9
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix(knowledge): use EntityNotFoundError/DuplicateEntityError instead of DomainError(status_code=)
2026-05-20 15:21:36 +02:00

134 lines
5.1 KiB
Python

"""Phase 11: Lesson Learned service."""
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.domain.errors import EntityNotFoundError
from app.models.knowledge import LessonLearned
def _get_or_404(db: Session, ll_id: UUID) -> LessonLearned:
ll = db.query(LessonLearned).filter(
LessonLearned.id == ll_id,
LessonLearned.is_active == True,
).first()
if not ll:
raise EntityNotFoundError("LessonLearned", str(ll_id))
return ll
# ── Read ──────────────────────────────────────────────────────────────────────
def get_lesson_learned(db: Session, ll_id: UUID) -> LessonLearned:
return _get_or_404(db, ll_id)
def list_lessons_learned(
db: Session,
entity_type: Optional[str] = None,
entity_id: Optional[UUID] = None,
severity: Optional[str] = None,
tag: Optional[str] = None,
technique_id: Optional[str] = None,
include_inactive: bool = False,
) -> List[LessonLearned]:
q = db.query(LessonLearned)
if not include_inactive:
q = q.filter(LessonLearned.is_active == True)
if entity_type:
q = q.filter(LessonLearned.entity_type == entity_type)
if entity_id:
q = q.filter(LessonLearned.entity_id == entity_id)
if severity:
q = q.filter(LessonLearned.severity == severity)
if tag:
# JSONB contains operator — filter lessons that have this tag
q = q.filter(LessonLearned.tags.contains([tag]))
if technique_id:
q = q.filter(LessonLearned.technique_ids.contains([technique_id]))
return q.order_by(LessonLearned.created_at.desc()).all()
# ── Create ────────────────────────────────────────────────────────────────────
def create_lesson_learned(db: Session, data: dict, user_id: UUID) -> LessonLearned:
ll = LessonLearned(
title = data["title"],
what_happened = data.get("what_happened", ""),
root_cause = data.get("root_cause", ""),
fix_applied = data.get("fix_applied"),
severity = data.get("severity", "medium"),
entity_type = data.get("entity_type", "manual"),
entity_id = data.get("entity_id"),
technique_ids = data.get("technique_ids") or [],
tags = data.get("tags") or [],
created_by = user_id,
)
db.add(ll)
db.commit()
db.refresh(ll)
return ll
# ── Update ────────────────────────────────────────────────────────────────────
def update_lesson_learned(db: Session, ll_id: UUID, data: dict, user_id: UUID) -> LessonLearned:
ll = _get_or_404(db, ll_id)
for field in ("title", "what_happened", "root_cause", "fix_applied",
"severity", "technique_ids", "tags"):
if field in data and data[field] is not None:
setattr(ll, field, data[field])
ll.updated_at = datetime.utcnow()
db.commit()
db.refresh(ll)
return ll
# ── Delete (soft) ─────────────────────────────────────────────────────────────
def delete_lesson_learned(db: Session, ll_id: UUID, user_id: UUID) -> None:
"""Soft-delete — admin-only enforcement is done at the router level."""
ll = _get_or_404(db, ll_id)
ll.is_active = False
ll.updated_at = datetime.utcnow()
db.commit()
# ── Stats ─────────────────────────────────────────────────────────────────────
def get_knowledge_stats(db: Session) -> dict:
"""Summary counts for the knowledge base dashboard."""
from app.models.knowledge import Playbook
total_playbooks = db.query(Playbook).filter(Playbook.is_active == True).count()
total_lessons = db.query(LessonLearned).filter(LessonLearned.is_active == True).count()
severity_counts: dict = {}
for sev in ("critical", "high", "medium", "low", "info"):
severity_counts[sev] = (
db.query(LessonLearned)
.filter(LessonLearned.is_active == True, LessonLearned.severity == sev)
.count()
)
playbook_type_counts: dict = {}
from app.schemas.knowledge_schema import VALID_PLAYBOOK_TYPES
for ptype in VALID_PLAYBOOK_TYPES:
playbook_type_counts[ptype] = (
db.query(Playbook)
.filter(Playbook.is_active == True, Playbook.playbook_type == ptype)
.count()
)
return {
"total_playbooks": total_playbooks,
"total_lessons": total_lessons,
"lessons_by_severity": severity_counts,
"playbooks_by_type": playbook_type_counts,
}