fix(webhooks): auto-detect platform format for Teams/Slack/generic
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Root cause: Microsoft Teams Incoming Webhooks require MessageCard JSON
format. The service was sending generic Aegis JSON which Teams rejected
with a 400, incrementing failure_count on every dispatch.
Fix: _send_webhook() now auto-detects the target from the URL:
- webhook.office.com / teams.microsoft.com → Teams MessageCard
(colored card with event title + key/value facts table)
- hooks.slack.com → Slack attachments format
- everything else → current generic Aegis JSON
Also resets failure_count=0 in production so the webhook starts fresh.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -78,19 +78,94 @@ def dispatch_webhook(event_type: str, payload: dict) -> None:
|
||||
db.close()
|
||||
|
||||
|
||||
_EVENT_COLORS: dict[str, str] = {
|
||||
"test.validated": "28a745", # green
|
||||
"test.rejected": "dc3545", # red
|
||||
"campaign.completed": "17a2b8", # teal
|
||||
"campaign.started": "007bff", # blue
|
||||
"mitre.synced": "6f42c1", # purple
|
||||
"technique.status_changed":"fd7e14", # orange
|
||||
"alert.fired": "dc3545", # red
|
||||
"webhook.test": "6c757d", # gray
|
||||
}
|
||||
|
||||
|
||||
def _build_teams_card(event_type: str, payload: dict, timestamp: str) -> dict:
|
||||
"""Format payload as a Microsoft Teams Incoming Webhook MessageCard."""
|
||||
color = _EVENT_COLORS.get(event_type, "0078D4")
|
||||
|
||||
# Build facts from payload key/value pairs (simple types only)
|
||||
facts = [{"name": "Event", "value": event_type},
|
||||
{"name": "Time", "value": timestamp}]
|
||||
for k, v in (payload or {}).items():
|
||||
if isinstance(v, (str, int, float, bool)) and v is not None:
|
||||
facts.append({"name": str(k).replace("_", " ").title(), "value": str(v)})
|
||||
|
||||
return {
|
||||
"@type": "MessageCard",
|
||||
"@context": "http://schema.org/extensions",
|
||||
"themeColor": color,
|
||||
"summary": f"Aegis · {event_type}",
|
||||
"sections": [{
|
||||
"activityTitle": f"**{event_type}**",
|
||||
"activitySubtitle": "Aegis Platform Notification",
|
||||
"activityImage": "https://raw.githubusercontent.com/microsoft/fluentui-system-icons/main/assets/Shield/SVG/ic_fluent_shield_24_filled.svg",
|
||||
"facts": facts,
|
||||
"markdown": True,
|
||||
}],
|
||||
}
|
||||
|
||||
|
||||
def _build_slack_body(event_type: str, payload: dict, timestamp: str) -> dict:
|
||||
"""Format payload as a Slack Incoming Webhook message."""
|
||||
color = "#" + _EVENT_COLORS.get(event_type, "0078D4")
|
||||
fields = [{"title": "Event", "value": event_type, "short": True},
|
||||
{"title": "Time", "value": timestamp, "short": True}]
|
||||
for k, v in (payload or {}).items():
|
||||
if isinstance(v, (str, int, float, bool)) and v is not None:
|
||||
fields.append({"title": str(k).replace("_", " ").title(),
|
||||
"value": str(v), "short": True})
|
||||
return {
|
||||
"attachments": [{
|
||||
"color": color,
|
||||
"title": f"Aegis · {event_type}",
|
||||
"fields": fields,
|
||||
"footer": "Aegis Platform",
|
||||
"ts": int(datetime.utcnow().timestamp()),
|
||||
}]
|
||||
}
|
||||
|
||||
|
||||
def _send_webhook(db, wh: WebhookConfig, event_type: str, payload: dict) -> None:
|
||||
"""Send a single webhook POST and update its metadata."""
|
||||
"""Send a single webhook POST and update its metadata.
|
||||
|
||||
Auto-detects the target platform from the URL and formats accordingly:
|
||||
- webhook.office.com / teams.microsoft.com → Teams MessageCard
|
||||
- hooks.slack.com → Slack attachments
|
||||
- everything else → generic Aegis JSON
|
||||
"""
|
||||
import json
|
||||
timestamp = datetime.utcnow().isoformat() + "Z"
|
||||
url_lower = wh.url.lower()
|
||||
|
||||
if "webhook.office.com" in url_lower or "teams.microsoft.com" in url_lower:
|
||||
body = _build_teams_card(event_type, payload, timestamp)
|
||||
headers = {"Content-Type": "application/json"}
|
||||
elif "hooks.slack.com" in url_lower:
|
||||
body = _build_slack_body(event_type, payload, timestamp)
|
||||
headers = {"Content-Type": "application/json"}
|
||||
else:
|
||||
body = {
|
||||
"event": event_type,
|
||||
"data": payload,
|
||||
"timestamp": datetime.utcnow().isoformat() + "Z",
|
||||
"timestamp": timestamp,
|
||||
}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if wh.secret:
|
||||
import json
|
||||
raw = json.dumps(body, separators=(",", ":"), sort_keys=True).encode()
|
||||
sig = hmac.new(wh.secret.encode(), raw, hashlib.sha256).hexdigest()
|
||||
headers["X-Aegis-Signature"] = f"sha256={sig}"
|
||||
|
||||
try:
|
||||
resp = requests.post(wh.url, json=body, headers=headers, timeout=10)
|
||||
resp.raise_for_status()
|
||||
|
||||
Reference in New Issue
Block a user