From 97349a1d1301e038ce57273c8d8336a0d17e51a5 Mon Sep 17 00:00:00 2001 From: kitos Date: Thu, 21 May 2026 15:57:41 +0200 Subject: [PATCH] =?UTF-8?q?feat(alerts):=20close=20Phase=2013=20gaps=20?= =?UTF-8?q?=E2=80=94=20hourly=20job=20+=20webhook=20+=20in-app=20notificat?= =?UTF-8?q?ions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add dispatch_webhook_targeted() to webhook_service for rule-specific delivery - evaluate_all_rules() now dispatches in-app notifications (admins/leads) and webhooks after each alert fires (targeted + global alert.fired broadcast) - APScheduler: _run_alert_evaluation() job registered hourly alongside existing jobs Co-Authored-By: Claude Sonnet 4.6 --- backend/app/jobs/mitre_sync_job.py | 30 ++++++- .../app/services/operational_alert_service.py | 88 +++++++++++++++++-- backend/app/services/webhook_service.py | 35 ++++++-- 3 files changed, 141 insertions(+), 12 deletions(-) diff --git a/backend/app/jobs/mitre_sync_job.py b/backend/app/jobs/mitre_sync_job.py index 5d8f314..0dbb844 100644 --- a/backend/app/jobs/mitre_sync_job.py +++ b/backend/app/jobs/mitre_sync_job.py @@ -223,6 +223,25 @@ def _run_queue_generation() -> None: db.close() +def _run_alert_evaluation() -> None: + """Evaluate all enabled operational alert rules (hourly).""" + logger.info("Scheduled alert evaluation job starting...") + db = SessionLocal() + try: + from app.services.operational_alert_service import evaluate_all_rules + result = evaluate_all_rules(db) + logger.info( + "Alert evaluation finished — %d rules, %d alerts fired in %.3fs", + result["rules_evaluated"], + result["alerts_fired"], + result["duration_seconds"], + ) + except Exception: + logger.exception("Alert evaluation job failed") + finally: + db.close() + + # --------------------------------------------------------------------------- # Scheduler bootstrap # --------------------------------------------------------------------------- @@ -338,11 +357,20 @@ def start_scheduler() -> None: name="Revalidation queue generation (daily 02:30)", replace_existing=True, ) + scheduler.add_job( + _run_alert_evaluation, + trigger="interval", + hours=1, + id="alert_evaluation", + name="Operational alert evaluation (hourly)", + replace_existing=True, + ) scheduler.start() logger.info( "Background scheduler started — mitre_sync (24h), intel_scan (7d), " "notification_cleanup (24h), weekly_snapshot (Sundays 00:00), " "recurring_campaigns (daily), jira_sync (1h), " "osint_enrichment (weekly), stale_detection (daily), " - "retention_policies (daily), data_sources_sync (6h)" + "retention_policies (daily), data_sources_sync (6h), " + "alert_evaluation (1h)" ) diff --git a/backend/app/services/operational_alert_service.py b/backend/app/services/operational_alert_service.py index 7962837..dc50a4b 100644 --- a/backend/app/services/operational_alert_service.py +++ b/backend/app/services/operational_alert_service.py @@ -23,6 +23,53 @@ from app.models.enums import TechniqueStatus log = logging.getLogger(__name__) + +# ── Notification & webhook dispatch helpers ─────────────────────────────────── + +def _dispatch_inapp_notifications(db: Session, rule: AlertRule, instance: AlertInstance) -> None: + """Create in-app Notification rows for all admins and leads.""" + from app.services.notification_service import create_notification + + admin_roles = {"admin", "red_lead", "blue_lead"} + users = db.query(User).filter( + User.role.in_(admin_roles), + User.is_active == True, # noqa: E712 + ).all() + for user in users: + create_notification( + db, + user_id = user.id, + type = "alert_fired", + title = instance.title, + message = instance.message, + entity_type = "alert_instance", + entity_id = instance.id, + ) + + +def _dispatch_webhooks(rule: AlertRule, instance: AlertInstance) -> None: + """Fire webhook(s) for a triggered alert (all exceptions caught).""" + from app.services.webhook_service import dispatch_webhook, dispatch_webhook_targeted + + payload = { + "alert_id": str(instance.id), + "rule_id": str(rule.id) if rule.id else None, + "rule_name": instance.rule_name, + "rule_type": instance.rule_type, + "severity": instance.severity, + "title": instance.title, + "message": instance.message, + "details": instance.details, + } + + # 1. Targeted webhook configured on the rule + if rule.notify_webhook and rule.webhook_id: + dispatch_webhook_targeted(str(rule.webhook_id), "alert.fired", payload) + + # 2. Broadcast to all global "alert.fired" subscribers + dispatch_webhook("alert.fired", payload) + + # ── Pre-configured system rules (seeded at startup) ─────────────────────────── SYSTEM_RULES = [ @@ -338,11 +385,19 @@ def _in_cooldown(rule: AlertRule) -> bool: def evaluate_all_rules(db: Session) -> dict: - """Evaluate every enabled rule; create AlertInstances for those that fire.""" + """Evaluate every enabled rule; create AlertInstances for those that fire. + + After persisting each alert, dispatches: + - In-app notifications to all admins/leads (if rule.notify_in_app) + - Webhooks to the rule's targeted webhook + global "alert.fired" subscribers + (if rule.notify_webhook) + """ t0 = time.monotonic() rules = db.query(AlertRule).filter(AlertRule.is_enabled == True).all() - fired: List[AlertInstance] = [] + # (rule, instance) pairs so we can dispatch after commit + fired_pairs: List[tuple] = [] + for rule in rules: if _in_cooldown(rule): continue @@ -370,12 +425,35 @@ def evaluate_all_rules(db: Session) -> dict: ) db.add(instance) rule.last_fired_at = datetime.utcnow() - fired.append(instance) + fired_pairs.append((rule, instance)) + # ── Persist alerts ──────────────────────────────────────────────────────── db.commit() - for inst in fired: - db.refresh(inst) + for _rule, inst in fired_pairs: + db.refresh(inst) # populate id + created_at from DB + # ── In-app notifications (need instance.id, so must be after refresh) ──── + for rule, inst in fired_pairs: + if rule.notify_in_app: + try: + _dispatch_inapp_notifications(db, rule, inst) + except Exception: + log.exception("In-app notification failed for alert %s", inst.id) + if fired_pairs: + try: + db.commit() # commit notifications + except Exception: + log.exception("Failed to commit in-app notifications") + db.rollback() + + # ── Webhooks (fire-and-forget, own sessions) ────────────────────────────── + for rule, inst in fired_pairs: + try: + _dispatch_webhooks(rule, inst) + except Exception: + log.exception("Webhook dispatch failed for alert %s", inst.id) + + fired = [inst for _, inst in fired_pairs] return { "rules_evaluated": len(rules), "alerts_fired": len(fired), diff --git a/backend/app/services/webhook_service.py b/backend/app/services/webhook_service.py index 8e199b6..df38dde 100644 --- a/backend/app/services/webhook_service.py +++ b/backend/app/services/webhook_service.py @@ -1,13 +1,14 @@ """Webhook dispatch service — outbound HTTP notifications. Supported event types: -- test.validated — fired when a test reaches "validated" state -- test.rejected — fired when a test reaches "rejected" state -- campaign.completed — fired when a campaign is completed -- campaign.started — fired when a campaign is activated -- mitre.synced — fired after MITRE ATT&CK sync completes +- test.validated — fired when a test reaches "validated" state +- test.rejected — fired when a test reaches "rejected" state +- campaign.completed — fired when a campaign is completed +- campaign.started — fired when a campaign is activated +- mitre.synced — fired after MITRE ATT&CK sync completes - technique.status_changed — fired when a technique's status changes -- webhook.test — manual test ping from the admin UI +- webhook.test — manual test ping from the admin UI +- alert.fired — fired when an operational alert is triggered """ import hashlib @@ -29,6 +30,28 @@ logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- +def dispatch_webhook_targeted(webhook_id: str, event_type: str, payload: dict) -> None: + """Send to a single specific WebhookConfig by ID (used by alert rules). + + Opens its own DB session. All exceptions are caught. + """ + db = SessionLocal() + try: + wh = db.query(WebhookConfig).filter( + WebhookConfig.id == webhook_id, + WebhookConfig.is_active == True, # noqa: E712 + ).first() + if wh: + _send_webhook(db, wh, event_type, payload) + except Exception: + logger.exception( + "dispatch_webhook_targeted: error for webhook_id=%s event=%s", + webhook_id, event_type, + ) + finally: + db.close() + + def dispatch_webhook(event_type: str, payload: dict) -> None: """Send an outbound webhook to all active subscribers for *event_type*.