Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
- SystemConfig model + migration b033 for runtime key-value config - GET/PATCH /system/email-config + POST /system/email-test (admin only) - email_service reads SMTP config from DB (overrides .env) - Webhooks now accessible to red_lead/blue_lead + admin - GET /users/me already existed; /users/me/preferences already working - SettingsPage with 4 role-aware tabs: * Profile & Jira: jira_account_id, user info * Notifications: role-specific email/in-app toggles (12 prefs) * Webhooks: full CRUD + test ping (leads + admin) * Email/SMTP: enable toggle, server config, test email (admin only) - Added /settings route (all authenticated users) - Settings link added to Sidebar Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
169 lines
6.7 KiB
Python
169 lines
6.7 KiB
Python
"""Email notification service using SMTP.
|
|
|
|
Sending is silently skipped when SMTP_ENABLED=False (default) and no
|
|
DB config overrides it. All errors are caught and logged — email
|
|
failures never crash the caller.
|
|
|
|
Config priority:
|
|
1. system_configs table (key ``smtp.*``) — managed via the Settings UI
|
|
2. .env / environment variables (app.config.settings)
|
|
"""
|
|
import logging
|
|
import smtplib
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from typing import Optional
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers — read effective SMTP config (DB first, env fallback)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _get_smtp_config(db=None) -> dict:
|
|
"""Return a dict with resolved SMTP settings.
|
|
|
|
When *db* is provided the function looks up ``system_configs`` rows
|
|
whose key starts with ``smtp.`` and overrides the .env values.
|
|
"""
|
|
cfg = {
|
|
"enabled": settings.SMTP_ENABLED,
|
|
"host": settings.SMTP_HOST,
|
|
"port": settings.SMTP_PORT,
|
|
"username": settings.SMTP_USERNAME,
|
|
"password": settings.SMTP_PASSWORD,
|
|
"from_email": settings.SMTP_FROM_EMAIL,
|
|
"use_tls": settings.SMTP_USE_TLS,
|
|
}
|
|
|
|
if db is not None:
|
|
try:
|
|
from app.models.system_config import SystemConfig # avoid circular
|
|
|
|
rows = db.query(SystemConfig).filter(
|
|
SystemConfig.key.like("smtp.%")
|
|
).all()
|
|
for row in rows:
|
|
k = row.key # e.g. "smtp.host"
|
|
v = row.value
|
|
if v is None:
|
|
continue
|
|
short = k[len("smtp."):] # "host"
|
|
if short == "enabled":
|
|
cfg["enabled"] = v.lower() in ("true", "1", "yes")
|
|
elif short == "host":
|
|
cfg["host"] = v
|
|
elif short == "port":
|
|
try:
|
|
cfg["port"] = int(v)
|
|
except ValueError:
|
|
pass
|
|
elif short == "username":
|
|
cfg["username"] = v
|
|
elif short == "password":
|
|
cfg["password"] = v
|
|
elif short == "from_email":
|
|
cfg["from_email"] = v
|
|
elif short == "use_tls":
|
|
cfg["use_tls"] = v.lower() in ("true", "1", "yes")
|
|
except Exception:
|
|
logger.exception("Failed to read SMTP config from DB — falling back to env")
|
|
|
|
return cfg
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core send
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def send_email(to: str, subject: str, html_body: str, db=None) -> bool:
|
|
"""Send an HTML email. Returns True on success, False on skip/error.
|
|
|
|
Pass *db* to allow runtime config override from system_configs table.
|
|
"""
|
|
cfg = _get_smtp_config(db)
|
|
|
|
if not cfg["enabled"]:
|
|
logger.debug("SMTP disabled — skipping email to %s: %s", to, subject)
|
|
return False
|
|
if not to:
|
|
return False
|
|
try:
|
|
msg = MIMEMultipart("alternative")
|
|
msg["Subject"] = f"[Aegis] {subject}"
|
|
msg["From"] = cfg["from_email"]
|
|
msg["To"] = to
|
|
msg.attach(MIMEText(html_body, "html"))
|
|
with smtplib.SMTP(cfg["host"], cfg["port"], timeout=10) as server:
|
|
if cfg["use_tls"]:
|
|
server.starttls()
|
|
if cfg["username"]:
|
|
server.login(cfg["username"], cfg["password"])
|
|
server.send_message(msg)
|
|
logger.info("Email sent to %s: %s", to, subject)
|
|
return True
|
|
except Exception:
|
|
logger.exception("Failed to send email to %s: %s", to, subject)
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Typed senders
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def send_test_validated_email(to: str, test_name: str, technique_id: str, test_id: str, db=None) -> bool:
|
|
"""Notify that a test was validated."""
|
|
url = f"{settings.PLATFORM_URL}/tests/{test_id}"
|
|
html = f"""
|
|
<html><body style="font-family:sans-serif;color:#1a1a2e">
|
|
<h2 style="color:#22d3ee">✅ Test Validated</h2>
|
|
<p>Test <strong>{test_name}</strong> for technique <code>{technique_id}</code> has been validated.</p>
|
|
<p><a href="{url}" style="background:#22d3ee;color:#000;padding:8px 16px;border-radius:4px;text-decoration:none">View Test</a></p>
|
|
<p style="color:#666;font-size:12px">Aegis ATT&CK Coverage Platform</p>
|
|
</body></html>"""
|
|
return send_email(to, f"Test Validated: {test_name}", html, db=db)
|
|
|
|
|
|
def send_campaign_completed_email(to: str, campaign_name: str, campaign_id: str, db=None) -> bool:
|
|
"""Notify that a campaign was completed."""
|
|
url = f"{settings.PLATFORM_URL}/campaigns/{campaign_id}"
|
|
html = f"""
|
|
<html><body style="font-family:sans-serif;color:#1a1a2e">
|
|
<h2 style="color:#22d3ee">🎯 Campaign Completed</h2>
|
|
<p>Campaign <strong>{campaign_name}</strong> has been completed.</p>
|
|
<p><a href="{url}" style="background:#22d3ee;color:#000;padding:8px 16px;border-radius:4px;text-decoration:none">View Campaign</a></p>
|
|
<p style="color:#666;font-size:12px">Aegis ATT&CK Coverage Platform</p>
|
|
</body></html>"""
|
|
return send_email(to, f"Campaign Completed: {campaign_name}", html, db=db)
|
|
|
|
|
|
def send_new_mitre_techniques_email(to: str, created: int, updated: int, db=None) -> bool:
|
|
"""Notify of new MITRE techniques after sync."""
|
|
if created == 0:
|
|
return False
|
|
html = f"""
|
|
<html><body style="font-family:sans-serif;color:#1a1a2e">
|
|
<h2 style="color:#22d3ee">🔄 MITRE ATT&CK Updated</h2>
|
|
<p><strong>{created}</strong> new techniques added, <strong>{updated}</strong> updated.</p>
|
|
<p><a href="{settings.PLATFORM_URL}/techniques" style="background:#22d3ee;color:#000;padding:8px 16px;border-radius:4px;text-decoration:none">View Techniques</a></p>
|
|
<p style="color:#666;font-size:12px">Aegis ATT&CK Coverage Platform</p>
|
|
</body></html>"""
|
|
return send_email(to, f"MITRE ATT&CK Updated: {created} new techniques", html, db=db)
|
|
|
|
|
|
def send_test_email(to: str, db=None) -> bool:
|
|
"""Send a test/ping email to verify SMTP config."""
|
|
html = """
|
|
<html><body style="font-family:sans-serif;color:#1a1a2e">
|
|
<h2 style="color:#22d3ee">✅ Email Configuration Test</h2>
|
|
<p>This is a test email from Aegis. If you received this, your SMTP configuration is working correctly.</p>
|
|
<p style="color:#666;font-size:12px">Aegis ATT&CK Coverage Platform</p>
|
|
</body></html>"""
|
|
return send_email(to, "Email Configuration Test", html, db=db)
|