Files
Aegis/backend/app/services/notification_service.py

241 lines
7.0 KiB
Python

"""Notification service — create, read, and manage in-app notifications.
Provides helpers for generating notifications automatically when test
state changes occur, plus CRUD for the notifications API.
Functions in this module stage changes via ``db.add()`` / ``db.flush()``
but do **not** commit. The caller is responsible for committing.
"""
import uuid
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.domain.errors import EntityNotFoundError
from app.models.notification import Notification
from app.models.user import User
# ---------------------------------------------------------------------------
# Core CRUD
# ---------------------------------------------------------------------------
def list_notifications(
db: Session,
user_id: uuid.UUID,
*,
offset: int = 0,
limit: int = 20,
) -> list[Notification]:
"""Return paginated notifications for a user, newest first."""
return (
db.query(Notification)
.filter(Notification.user_id == user_id)
.order_by(Notification.created_at.desc())
.offset(offset)
.limit(limit)
.all()
)
def get_notification_or_raise(
db: Session,
notification_id: uuid.UUID,
user_id: uuid.UUID,
) -> Notification:
"""Fetch a notification by ID and user, or raise EntityNotFoundError."""
notif = (
db.query(Notification)
.filter(
Notification.id == notification_id,
Notification.user_id == user_id,
)
.first()
)
if notif is None:
raise EntityNotFoundError("Notification", str(notification_id))
return notif
def notify_role(
db: Session,
*,
role: str,
type: str,
title: str,
message: str,
entity_type: str,
entity_id: uuid.UUID,
) -> None:
"""Send notifications to all active users with a given role."""
users = (
db.query(User)
.filter(User.role == role, User.is_active == True) # noqa: E712
.all()
)
for user in users:
create_notification(
db,
user_id=user.id,
type=type,
title=title,
message=message,
entity_type=entity_type,
entity_id=entity_id,
)
def create_notification(
db: Session,
user_id: uuid.UUID,
type: str,
title: str,
message: str | None = None,
entity_type: str | None = None,
entity_id: uuid.UUID | None = None,
) -> Notification:
"""Create a single notification for a user."""
notif = Notification(
user_id=user_id,
type=type,
title=title,
message=message,
entity_type=entity_type,
entity_id=entity_id,
)
db.add(notif)
db.flush()
return notif
def mark_as_read(
db: Session, notification_id: uuid.UUID, user_id: uuid.UUID
) -> Notification:
"""Mark a single notification as read. Returns the notification. Raises EntityNotFoundError if not found."""
notif = get_notification_or_raise(db, notification_id, user_id)
notif.read = True
return notif
def mark_all_as_read(db: Session, user_id: uuid.UUID) -> int:
"""Mark all unread notifications for a user as read. Returns count updated."""
count = (
db.query(Notification)
.filter(Notification.user_id == user_id, Notification.read == False) # noqa: E712
.update({"read": True})
)
return count
def get_unread_count(db: Session, user_id: uuid.UUID) -> int:
"""Return the number of unread notifications for a user."""
return (
db.query(func.count(Notification.id))
.filter(Notification.user_id == user_id, Notification.read == False) # noqa: E712
.scalar()
) or 0
def cleanup_old_notifications(db: Session, days: int = 90) -> int:
"""Delete read notifications older than *days*. Returns count deleted."""
cutoff = datetime.utcnow() - timedelta(days=days)
count = (
db.query(Notification)
.filter(
Notification.read == True, # noqa: E712
Notification.created_at < cutoff,
)
.delete()
)
return count
# ---------------------------------------------------------------------------
# Automatic notification dispatchers
# ---------------------------------------------------------------------------
def notify_test_state_change(db: Session, test, new_state: str) -> None:
"""Dispatch notifications based on a test's new state.
Called by the workflow service after each state transition.
Rules:
- red_executing -> notify creator (confirmation)
- blue_evaluating -> notify all blue_tech users
- in_review -> notify red_lead and blue_lead users
- rejected -> notify creator
- validated -> notify creator
"""
test_name = test.name
test_id = test.id
creator_id = test.created_by
if new_state == "red_executing" and creator_id:
create_notification(
db,
user_id=creator_id,
type="test_state_changed",
title="Test execution started",
message=f'Your test "{test_name}" has moved to execution phase.',
entity_type="test",
entity_id=test_id,
)
elif new_state == "blue_evaluating":
# Notify all blue_tech users
blue_users = db.query(User).filter(User.role == "blue_tech", User.is_active == True).all() # noqa: E712
for user in blue_users:
create_notification(
db,
user_id=user.id,
type="test_assigned",
title="New test ready for blue evaluation",
message=f'Test "{test_name}" needs blue team evaluation.',
entity_type="test",
entity_id=test_id,
)
elif new_state == "in_review":
# Notify red_lead and blue_lead users
managers = (
db.query(User)
.filter(User.role.in_(["red_lead", "blue_lead"]), User.is_active == True) # noqa: E712
.all()
)
for user in managers:
create_notification(
db,
user_id=user.id,
type="validation_needed",
title="Test ready for validation",
message=f'Test "{test_name}" is awaiting your review.',
entity_type="test",
entity_id=test_id,
)
elif new_state == "rejected" and creator_id:
create_notification(
db,
user_id=creator_id,
type="test_rejected",
title="Test rejected",
message=f'Your test "{test_name}" has been rejected. Please review and resubmit.',
entity_type="test",
entity_id=test_id,
)
elif new_state == "validated" and creator_id:
create_notification(
db,
user_id=creator_id,
type="test_validated",
title="Test validated",
message=f'Your test "{test_name}" has been validated successfully.',
entity_type="test",
entity_id=test_id,
)