"""Notification service — create, read, and manage in-app notifications. Provides helpers for generating notifications automatically when test state changes occur, plus CRUD for the notifications API. Functions in this module stage changes via ``db.add()`` / ``db.flush()`` but do **not** commit. The caller is responsible for committing. """ # Import uuid import uuid # Import datetime, timedelta from datetime from datetime import datetime, timedelta # Import func from sqlalchemy from sqlalchemy import func # Import Session from sqlalchemy.orm from sqlalchemy.orm import Session # Import EntityNotFoundError from app.domain.errors from app.domain.errors import EntityNotFoundError # Import Notification from app.models.notification from app.models.notification import Notification # Import Test from app.models.test from app.models.test import Test # Import User from app.models.user from app.models.user import User # --------------------------------------------------------------------------- # Core CRUD # --------------------------------------------------------------------------- def list_notifications( # Entry: db db: Session, # Entry: user_id user_id: uuid.UUID, *, # Entry: offset offset: int = 0, # Entry: limit limit: int = 20, ) -> list[Notification]: """Return paginated notifications for a user, newest first.""" # Return ( return ( db.query(Notification) # Chain .filter() call .filter(Notification.user_id == user_id) # Chain .order_by() call .order_by(Notification.created_at.desc()) # Chain .offset() call .offset(offset) # Chain .limit() call .limit(limit) # Chain .all() call .all() ) # Define function get_notification_or_raise def get_notification_or_raise( # Entry: db db: Session, # Entry: notification_id notification_id: uuid.UUID, # Entry: user_id user_id: uuid.UUID, ) -> Notification: """Fetch a notification by ID and user, or raise EntityNotFoundError.""" # Assign notif = ( notif = ( db.query(Notification) # Chain .filter() call .filter( Notification.id == notification_id, Notification.user_id == user_id, ) # Chain .first() call .first() ) # Check: notif is None if notif is None: # Raise EntityNotFoundError raise EntityNotFoundError("Notification", str(notification_id)) # Return notif return notif # Define function notify_role def notify_role( # Entry: db db: Session, *, # Entry: role role: str, # Entry: type type: str, # Entry: title title: str, # Entry: message message: str, # Entry: entity_type entity_type: str, # Entry: entity_id entity_id: uuid.UUID, ) -> None: """Send notifications to all active users with a given role.""" # Assign users = ( users = ( db.query(User) # Chain .filter() call .filter(User.role == role, User.is_active == True) # noqa: E712 # Chain .all() call .all() ) # Iterate over users for user in users: # Call create_notification() create_notification( db, # Keyword argument: user_id user_id=user.id, # Keyword argument: type type=type, # Keyword argument: title title=title, # Keyword argument: message message=message, # Keyword argument: entity_type entity_type=entity_type, # Keyword argument: entity_id entity_id=entity_id, ) # Define function create_notification def create_notification( # Entry: db db: Session, # Entry: user_id user_id: uuid.UUID, # Entry: type type: str, # Entry: title title: str, # Entry: message message: str | None = None, # Entry: entity_type entity_type: str | None = None, # Entry: entity_id entity_id: uuid.UUID | None = None, ) -> Notification: """Create a single notification for a user.""" # Assign notif = Notification( notif = Notification( # Keyword argument: user_id user_id=user_id, # Keyword argument: type type=type, # Keyword argument: title title=title, # Keyword argument: message message=message, # Keyword argument: entity_type entity_type=entity_type, # Keyword argument: entity_id entity_id=entity_id, ) # Stage new record(s) for database insertion db.add(notif) # Flush changes to DB without committing the transaction db.flush() # Return notif return notif # Define function mark_as_read def mark_as_read( # Entry: db db: Session, notification_id: uuid.UUID, user_id: uuid.UUID ) -> Notification: """Mark a single notification as read. Returns the notification. Raises EntityNotFoundError if not found.""" # Assign notif = get_notification_or_raise(db, notification_id, user_id) notif = get_notification_or_raise(db, notification_id, user_id) # Assign notif.read = True notif.read = True # Return notif return notif # Define function mark_all_as_read def mark_all_as_read(db: Session, user_id: uuid.UUID) -> int: """Mark all unread notifications for a user as read. Returns count updated.""" # Assign count = ( count = ( db.query(Notification) # Chain .filter() call .filter(Notification.user_id == user_id, Notification.read == False) # noqa: E712 # Chain .update() call .update({"read": True}) ) # Return count return count # Define function get_unread_count def get_unread_count(db: Session, user_id: uuid.UUID) -> int: """Return the number of unread notifications for a user.""" # Return ( return ( db.query(func.count(Notification.id)) # Chain .filter() call .filter(Notification.user_id == user_id, Notification.read == False) # noqa: E712 # Chain .scalar() call .scalar() ) or 0 # Define function cleanup_old_notifications def cleanup_old_notifications(db: Session, days: int = 90) -> int: """Delete read notifications older than *days*. Returns count deleted.""" # Assign cutoff = datetime.utcnow() - timedelta(days=days) cutoff = datetime.utcnow() - timedelta(days=days) # Assign count = ( count = ( db.query(Notification) # Chain .filter() call .filter( Notification.read == True, # noqa: E712 Notification.created_at < cutoff, ) # Chain .delete() call .delete() ) # Return count return count # --------------------------------------------------------------------------- # Automatic notification dispatchers # --------------------------------------------------------------------------- def notify_role_with_email( db: Session, *, role: str, type: str, title: str, message: str, entity_type: str, entity_id: uuid.UUID, email_fn=None, # callable(user_email) -> bool, optional ) -> None: """Send in-app notifications + optional email to all active users with a role.""" users = ( db.query(User) .filter(User.role == role, User.is_active == True) # noqa: E712 .all() ) for user in users: create_notification( db, user_id=user.id, type=type, title=title, message=message, entity_type=entity_type, entity_id=entity_id, ) if email_fn and user.email: try: email_fn(user.email) except Exception: pass # email failures never crash notification flow def notify_test_state_change(db: Session, test, new_state: str) -> None: """Dispatch notifications based on a test's new state. Called by the workflow service after each state transition. Rules: - red_executing -> notify creator (confirmation) - blue_evaluating -> notify all blue_tech users - in_review -> notify red_lead and blue_lead users - rejected -> notify creator - validated -> notify creator """ # Assign test_name = test.name test_name = test.name # Assign test_id = test.id test_id = test.id # Assign creator_id = test.created_by creator_id = test.created_by # Check: new_state == "red_executing" and creator_id if new_state == "red_executing" and creator_id: # Call create_notification() create_notification( db, # Keyword argument: user_id user_id=creator_id, # Keyword argument: type type="test_state_changed", # Keyword argument: title title="Test execution started", # Keyword argument: message message=f'Your test "{test_name}" has moved to execution phase.', # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test_id, ) # Alternative: new_state == "blue_evaluating" elif new_state == "blue_evaluating": # Notify all blue_tech users blue_users = db.query(User).filter(User.role == "blue_tech", User.is_active == True).all() # noqa: E712 # Iterate over blue_users for user in blue_users: # Call create_notification() create_notification( db, # Keyword argument: user_id user_id=user.id, # Keyword argument: type type="test_assigned", # Keyword argument: title title="New test ready for blue evaluation", # Keyword argument: message message=f'Test "{test_name}" needs blue team evaluation.', # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test_id, ) # Alternative: new_state == "in_review" elif new_state == "in_review": # Notify red_lead and blue_lead users managers = ( db.query(User) # Chain .filter() call .filter(User.role.in_(["red_lead", "blue_lead"]), User.is_active == True) # noqa: E712 # Chain .all() call .all() ) # Iterate over managers for user in managers: # Call create_notification() create_notification( db, # Keyword argument: user_id user_id=user.id, # Keyword argument: type type="validation_needed", # Keyword argument: title title="Test ready for validation", # Keyword argument: message message=f'Test "{test_name}" is awaiting your review.', # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test_id, ) # Alternative: new_state == "rejected" and creator_id elif new_state == "rejected" and creator_id: # Call create_notification() create_notification( db, # Keyword argument: user_id user_id=creator_id, # Keyword argument: type type="test_rejected", # Keyword argument: title title="Test rejected", # Keyword argument: message message=f'Your test "{test_name}" has been rejected. Please review and resubmit.', # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test_id, ) # Alternative: new_state == "validated" and creator_id elif new_state == "validated" and creator_id: # Call create_notification() create_notification( db, # Keyword argument: user_id user_id=creator_id, # Keyword argument: type type="test_validated", # Keyword argument: title title="Test validated", # Keyword argument: message message=f'Your test "{test_name}" has been validated successfully.', # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test_id, )