Files
Aegis/backend/app/routers/operational_alerts.py
kitos 6f4901b611
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
security: fix 6 vulnerabilities identified in SDLC audit
- fix(auth): enforce API key scopes in require_role/require_any_role;
  attach _api_key_scopes to user on API key auth; add require_scope()
  dependency — scopes were stored but never enforced (CWE-285)

- fix(sso): read SECURE_COOKIES env var for SSO cookie instead of
  hardcoded secure=False — SAML sessions now respect HTTPS config (CWE-614)

- fix(webhooks): SSRF prevention — validate webhook URLs against private
  and reserved CIDRs at creation/update time (CWE-918)

- fix(knowledge): restrict playbook/lesson create, update and restore
  to admin/red_lead/blue_lead roles — was open to any authenticated user (CWE-284)

- fix(alerts): restrict alert acknowledge/resolve/dismiss to admin/lead
  roles — any user could silence security alerts (CWE-284)

- security: delete get_admin_creds.py, check_auth.py, deploy.py scripts
  containing hardcoded root SSH credentials and production DB access;
  add scripts/.gitignore to prevent reintroduction (CWE-798)
2026-05-22 09:46:29 +02:00

192 lines
6.8 KiB
Python

"""Phase 13: Operational Alerts router."""
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.models.user import User
from app.schemas.operational_alert_schema import (
AlertRuleCreate, AlertRuleOut, AlertRuleUpdate,
AlertInstanceOut, EvaluationResult, AlertSummary,
)
import app.services.operational_alert_service as svc
router = APIRouter(prefix="/alerts", tags=["Operational Alerts"])
# ── Evaluation ────────────────────────────────────────────────────────────────
@router.post("/evaluate", response_model=EvaluationResult, status_code=202)
def evaluate_rules(
db: Session = Depends(get_db),
user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""
Run the alert evaluation engine against all enabled rules.
Fires AlertInstances for rules whose conditions are met and are not in cooldown.
Admin / leads only.
"""
result = svc.evaluate_all_rules(db)
return EvaluationResult(
rules_evaluated = result["rules_evaluated"],
alerts_fired = result["alerts_fired"],
alerts = [AlertInstanceOut.model_validate(a) for a in result["alerts"]],
duration_seconds = result["duration_seconds"],
)
# ── Alert instances ───────────────────────────────────────────────────────────
@router.get("", response_model=List[AlertInstanceOut])
def list_alerts(
status: Optional[str] = Query(None),
severity: Optional[str] = Query(None),
rule_type: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""List alert instances with optional filters."""
return svc.list_instances(db, status=status, severity=severity,
rule_type=rule_type, limit=limit, offset=offset)
@router.get("/summary", response_model=AlertSummary)
def alert_summary(
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Aggregate counts by status, severity, and rule type."""
data = svc.get_summary(db)
return AlertSummary(
total_open = data["total_open"],
total_acknowledged = data["total_acknowledged"],
total_resolved = data["total_resolved"],
by_severity = data["by_severity"],
by_rule_type = data["by_rule_type"],
recent_alerts = [AlertInstanceOut.model_validate(a) for a in data["recent_alerts"]],
)
@router.get("/{alert_id}", response_model=AlertInstanceOut)
def get_alert(
alert_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Get a single alert instance."""
return svc.get_instance(db, alert_id)
@router.post("/{alert_id}/acknowledge", response_model=AlertInstanceOut)
def acknowledge_alert(
alert_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""Acknowledge an open alert (admin / lead roles only)."""
return svc.acknowledge(db, alert_id, current_user.id)
@router.post("/{alert_id}/resolve", response_model=AlertInstanceOut)
def resolve_alert(
alert_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""Mark an alert as resolved (admin / lead roles only)."""
return svc.resolve(db, alert_id, current_user.id)
@router.post("/{alert_id}/dismiss", response_model=AlertInstanceOut)
def dismiss_alert(
alert_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""Dismiss an alert (admin / lead roles only — won't re-fire until cooldown resets)."""
return svc.dismiss(db, alert_id, current_user.id)
# ── Alert rules ───────────────────────────────────────────────────────────────
@router.get("/rules/list", response_model=List[AlertRuleOut])
def list_rules(
rule_type: Optional[str] = Query(None),
include_disabled: bool = Query(False),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""List alert rules (all users can read; admins/leads manage them)."""
return svc.list_rules(db, rule_type=rule_type, include_disabled=include_disabled)
@router.post("/rules", response_model=AlertRuleOut, status_code=201)
def create_rule(
body: AlertRuleCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""Create a custom alert rule."""
return svc.create_rule(
db,
created_by = current_user.id,
name = body.name,
description = body.description,
rule_type = body.rule_type,
severity = body.severity,
config = body.config,
notify_in_app = body.notify_in_app,
notify_webhook = body.notify_webhook,
webhook_id = body.webhook_id,
cooldown_hours = body.cooldown_hours,
)
@router.get("/rules/{rule_id}", response_model=AlertRuleOut)
def get_rule(
rule_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Get a single alert rule."""
return svc.get_rule(db, rule_id)
@router.patch("/rules/{rule_id}", response_model=AlertRuleOut)
def update_rule(
rule_id: UUID,
body: AlertRuleUpdate,
db: Session = Depends(get_db),
user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""Update an alert rule (enable/disable, thresholds, cooldown)."""
return svc.update_rule(
db, rule_id,
name = body.name,
description = body.description,
severity = body.severity,
is_enabled = body.is_enabled,
config = body.config,
notify_in_app = body.notify_in_app,
notify_webhook = body.notify_webhook,
webhook_id = body.webhook_id,
cooldown_hours = body.cooldown_hours,
)
@router.delete("/rules/{rule_id}", status_code=204)
def delete_rule(
rule_id: UUID,
db: Session = Depends(get_db),
user=Depends(require_any_role("admin")),
):
"""Delete a custom alert rule (system rules cannot be deleted)."""
svc.delete_rule(db, rule_id)