Files
Aegis/backend/app/routers/operational_alerts.py
kitos d4b147da7c
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(alerts): Phase 13 — Operational Alert Engine
AlertRule + AlertInstance models (b041alerts migration), 8 pre-seeded system
rules (high_risk x2, stale_technique, coverage_regression, low_coverage,
expiry_wave, new_technique, orphan_spike), evaluation engine with per-rule
cooldown, full alert lifecycle (acknowledge/resolve/dismiss), custom rule CRUD,
and summary endpoint. Rules seeded at app startup.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-21 15:25:55 +02:00

192 lines
6.6 KiB
Python

"""Phase 13: Operational Alerts router."""
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.models.user import User
from app.schemas.operational_alert_schema import (
AlertRuleCreate, AlertRuleOut, AlertRuleUpdate,
AlertInstanceOut, EvaluationResult, AlertSummary,
)
import app.services.operational_alert_service as svc
router = APIRouter(prefix="/alerts", tags=["Operational Alerts"])
# ── Evaluation ────────────────────────────────────────────────────────────────
@router.post("/evaluate", response_model=EvaluationResult, status_code=202)
def evaluate_rules(
db: Session = Depends(get_db),
user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""
Run the alert evaluation engine against all enabled rules.
Fires AlertInstances for rules whose conditions are met and are not in cooldown.
Admin / leads only.
"""
result = svc.evaluate_all_rules(db)
return EvaluationResult(
rules_evaluated = result["rules_evaluated"],
alerts_fired = result["alerts_fired"],
alerts = [AlertInstanceOut.model_validate(a) for a in result["alerts"]],
duration_seconds = result["duration_seconds"],
)
# ── Alert instances ───────────────────────────────────────────────────────────
@router.get("", response_model=List[AlertInstanceOut])
def list_alerts(
status: Optional[str] = Query(None),
severity: Optional[str] = Query(None),
rule_type: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""List alert instances with optional filters."""
return svc.list_instances(db, status=status, severity=severity,
rule_type=rule_type, limit=limit, offset=offset)
@router.get("/summary", response_model=AlertSummary)
def alert_summary(
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Aggregate counts by status, severity, and rule type."""
data = svc.get_summary(db)
return AlertSummary(
total_open = data["total_open"],
total_acknowledged = data["total_acknowledged"],
total_resolved = data["total_resolved"],
by_severity = data["by_severity"],
by_rule_type = data["by_rule_type"],
recent_alerts = [AlertInstanceOut.model_validate(a) for a in data["recent_alerts"]],
)
@router.get("/{alert_id}", response_model=AlertInstanceOut)
def get_alert(
alert_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Get a single alert instance."""
return svc.get_instance(db, alert_id)
@router.post("/{alert_id}/acknowledge", response_model=AlertInstanceOut)
def acknowledge_alert(
alert_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Acknowledge an open alert."""
return svc.acknowledge(db, alert_id, current_user.id)
@router.post("/{alert_id}/resolve", response_model=AlertInstanceOut)
def resolve_alert(
alert_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Mark an alert as resolved."""
return svc.resolve(db, alert_id, current_user.id)
@router.post("/{alert_id}/dismiss", response_model=AlertInstanceOut)
def dismiss_alert(
alert_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Dismiss an alert (won't re-fire until cooldown resets)."""
return svc.dismiss(db, alert_id, current_user.id)
# ── Alert rules ───────────────────────────────────────────────────────────────
@router.get("/rules/list", response_model=List[AlertRuleOut])
def list_rules(
rule_type: Optional[str] = Query(None),
include_disabled: bool = Query(False),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""List alert rules (all users can read; admins/leads manage them)."""
return svc.list_rules(db, rule_type=rule_type, include_disabled=include_disabled)
@router.post("/rules", response_model=AlertRuleOut, status_code=201)
def create_rule(
body: AlertRuleCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""Create a custom alert rule."""
return svc.create_rule(
db,
created_by = current_user.id,
name = body.name,
description = body.description,
rule_type = body.rule_type,
severity = body.severity,
config = body.config,
notify_in_app = body.notify_in_app,
notify_webhook = body.notify_webhook,
webhook_id = body.webhook_id,
cooldown_hours = body.cooldown_hours,
)
@router.get("/rules/{rule_id}", response_model=AlertRuleOut)
def get_rule(
rule_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Get a single alert rule."""
return svc.get_rule(db, rule_id)
@router.patch("/rules/{rule_id}", response_model=AlertRuleOut)
def update_rule(
rule_id: UUID,
body: AlertRuleUpdate,
db: Session = Depends(get_db),
user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""Update an alert rule (enable/disable, thresholds, cooldown)."""
return svc.update_rule(
db, rule_id,
name = body.name,
description = body.description,
severity = body.severity,
is_enabled = body.is_enabled,
config = body.config,
notify_in_app = body.notify_in_app,
notify_webhook = body.notify_webhook,
webhook_id = body.webhook_id,
cooldown_hours = body.cooldown_hours,
)
@router.delete("/rules/{rule_id}", status_code=204)
def delete_rule(
rule_id: UUID,
db: Session = Depends(get_db),
user=Depends(require_any_role("admin")),
):
"""Delete a custom alert rule (system rules cannot be deleted)."""
svc.delete_rule(db, rule_id)