diff --git a/backend/app/services/webhook_service.py b/backend/app/services/webhook_service.py index df38dde..bf2afeb 100644 --- a/backend/app/services/webhook_service.py +++ b/backend/app/services/webhook_service.py @@ -78,19 +78,94 @@ def dispatch_webhook(event_type: str, payload: dict) -> None: db.close() -def _send_webhook(db, wh: WebhookConfig, event_type: str, payload: dict) -> None: - """Send a single webhook POST and update its metadata.""" - body = { - "event": event_type, - "data": payload, - "timestamp": datetime.utcnow().isoformat() + "Z", +_EVENT_COLORS: dict[str, str] = { + "test.validated": "28a745", # green + "test.rejected": "dc3545", # red + "campaign.completed": "17a2b8", # teal + "campaign.started": "007bff", # blue + "mitre.synced": "6f42c1", # purple + "technique.status_changed":"fd7e14", # orange + "alert.fired": "dc3545", # red + "webhook.test": "6c757d", # gray +} + + +def _build_teams_card(event_type: str, payload: dict, timestamp: str) -> dict: + """Format payload as a Microsoft Teams Incoming Webhook MessageCard.""" + color = _EVENT_COLORS.get(event_type, "0078D4") + + # Build facts from payload key/value pairs (simple types only) + facts = [{"name": "Event", "value": event_type}, + {"name": "Time", "value": timestamp}] + for k, v in (payload or {}).items(): + if isinstance(v, (str, int, float, bool)) and v is not None: + facts.append({"name": str(k).replace("_", " ").title(), "value": str(v)}) + + return { + "@type": "MessageCard", + "@context": "http://schema.org/extensions", + "themeColor": color, + "summary": f"Aegis · {event_type}", + "sections": [{ + "activityTitle": f"**{event_type}**", + "activitySubtitle": "Aegis Platform Notification", + "activityImage": "https://raw.githubusercontent.com/microsoft/fluentui-system-icons/main/assets/Shield/SVG/ic_fluent_shield_24_filled.svg", + "facts": facts, + "markdown": True, + }], } - headers = {"Content-Type": "application/json"} - if wh.secret: - import json - raw = json.dumps(body, separators=(",", ":"), sort_keys=True).encode() - sig = hmac.new(wh.secret.encode(), raw, hashlib.sha256).hexdigest() - headers["X-Aegis-Signature"] = f"sha256={sig}" + + +def _build_slack_body(event_type: str, payload: dict, timestamp: str) -> dict: + """Format payload as a Slack Incoming Webhook message.""" + color = "#" + _EVENT_COLORS.get(event_type, "0078D4") + fields = [{"title": "Event", "value": event_type, "short": True}, + {"title": "Time", "value": timestamp, "short": True}] + for k, v in (payload or {}).items(): + if isinstance(v, (str, int, float, bool)) and v is not None: + fields.append({"title": str(k).replace("_", " ").title(), + "value": str(v), "short": True}) + return { + "attachments": [{ + "color": color, + "title": f"Aegis · {event_type}", + "fields": fields, + "footer": "Aegis Platform", + "ts": int(datetime.utcnow().timestamp()), + }] + } + + +def _send_webhook(db, wh: WebhookConfig, event_type: str, payload: dict) -> None: + """Send a single webhook POST and update its metadata. + + Auto-detects the target platform from the URL and formats accordingly: + - webhook.office.com / teams.microsoft.com → Teams MessageCard + - hooks.slack.com → Slack attachments + - everything else → generic Aegis JSON + """ + import json + timestamp = datetime.utcnow().isoformat() + "Z" + url_lower = wh.url.lower() + + if "webhook.office.com" in url_lower or "teams.microsoft.com" in url_lower: + body = _build_teams_card(event_type, payload, timestamp) + headers = {"Content-Type": "application/json"} + elif "hooks.slack.com" in url_lower: + body = _build_slack_body(event_type, payload, timestamp) + headers = {"Content-Type": "application/json"} + else: + body = { + "event": event_type, + "data": payload, + "timestamp": timestamp, + } + headers = {"Content-Type": "application/json"} + if wh.secret: + raw = json.dumps(body, separators=(",", ":"), sort_keys=True).encode() + sig = hmac.new(wh.secret.encode(), raw, hashlib.sha256).hexdigest() + headers["X-Aegis-Signature"] = f"sha256={sig}" + try: resp = requests.post(wh.url, json=body, headers=headers, timeout=10) resp.raise_for_status()