From d4b147da7cc1ef394ba98a2c191b37f5f1893297 Mon Sep 17 00:00:00 2001 From: kitos Date: Thu, 21 May 2026 15:25:55 +0200 Subject: [PATCH] =?UTF-8?q?feat(alerts):=20Phase=2013=20=E2=80=94=20Operat?= =?UTF-8?q?ional=20Alert=20Engine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AlertRule + AlertInstance models (b041alerts migration), 8 pre-seeded system rules (high_risk x2, stale_technique, coverage_regression, low_coverage, expiry_wave, new_technique, orphan_spike), evaluation engine with per-rule cooldown, full alert lifecycle (acknowledge/resolve/dismiss), custom rule CRUD, and summary endpoint. Rules seeded at app startup. Co-Authored-By: Claude Sonnet 4.6 --- .../versions/b041_operational_alerts.py | 82 +++ backend/app/main.py | 11 + backend/app/models/__init__.py | 3 + backend/app/models/operational_alert.py | 144 +++++ backend/app/routers/operational_alerts.py | 191 +++++++ .../app/schemas/operational_alert_schema.py | 124 ++++ .../app/services/operational_alert_service.py | 530 ++++++++++++++++++ scripts/qa_phase13_alerts.py | 302 ++++++++++ 8 files changed, 1387 insertions(+) create mode 100644 backend/alembic/versions/b041_operational_alerts.py create mode 100644 backend/app/models/operational_alert.py create mode 100644 backend/app/routers/operational_alerts.py create mode 100644 backend/app/schemas/operational_alert_schema.py create mode 100644 backend/app/services/operational_alert_service.py create mode 100644 scripts/qa_phase13_alerts.py diff --git a/backend/alembic/versions/b041_operational_alerts.py b/backend/alembic/versions/b041_operational_alerts.py new file mode 100644 index 0000000..b2d28ab --- /dev/null +++ b/backend/alembic/versions/b041_operational_alerts.py @@ -0,0 +1,82 @@ +"""Phase 13: Operational Alerts — alert_rules and alert_instances tables. + +Revision ID: b041alerts +Revises: b040ent +Create Date: 2026-05-21 +""" + +from alembic import op +import sqlalchemy as sa + +revision = "b041alerts" +down_revision = "b040ent" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + + # ── alert_rules ─────────────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS alert_rules ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(300) NOT NULL, + description TEXT, + rule_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) NOT NULL DEFAULT 'medium', + is_enabled BOOLEAN NOT NULL DEFAULT TRUE, + is_system BOOLEAN NOT NULL DEFAULT FALSE, + config JSONB NOT NULL DEFAULT '{}', + notify_in_app BOOLEAN NOT NULL DEFAULT TRUE, + notify_webhook BOOLEAN NOT NULL DEFAULT FALSE, + webhook_id UUID REFERENCES webhook_configs(id) ON DELETE SET NULL, + cooldown_hours INTEGER NOT NULL DEFAULT 24, + created_by UUID REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(), + last_fired_at TIMESTAMP WITHOUT TIME ZONE + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_alert_rules_type ON alert_rules (rule_type)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_alert_rules_enabled ON alert_rules (is_enabled)" + )) + + # ── alert_instances ─────────────────────────────────────────────────────── + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS alert_instances ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + rule_id UUID REFERENCES alert_rules(id) ON DELETE SET NULL, + rule_name VARCHAR(300) NOT NULL, + rule_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) NOT NULL, + title VARCHAR(500) NOT NULL, + message TEXT NOT NULL, + details JSONB, + status VARCHAR(20) NOT NULL DEFAULT 'open', + acknowledged_by UUID REFERENCES users(id) ON DELETE SET NULL, + acknowledged_at TIMESTAMP WITHOUT TIME ZONE, + resolved_at TIMESTAMP WITHOUT TIME ZONE, + created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now() + ) + """)) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_alert_instances_rule_id ON alert_instances (rule_id)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_alert_instances_status ON alert_instances (status)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_alert_instances_severity ON alert_instances (severity)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_alert_instances_created ON alert_instances (created_at)" + )) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text("DROP TABLE IF EXISTS alert_instances CASCADE")) + conn.execute(sa.text("DROP TABLE IF EXISTS alert_rules CASCADE")) diff --git a/backend/app/main.py b/backend/app/main.py index b29e7b9..00bf549 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -46,6 +46,7 @@ from app.routers import risk_intelligence as risk_router from app.routers import executive_dashboard as dashboard_router from app.routers import api_keys as api_keys_router from app.routers import sso as sso_router +from app.routers import operational_alerts as alerts_router from app.domain.errors import DomainError from app.middleware.error_handler import domain_exception_handler from app.middleware.request_context import RequestContextMiddleware @@ -76,6 +77,15 @@ async def lifespan(app: FastAPI): pass finally: db.close() + # Seed operational alert system rules + db2 = SessionLocal() + try: + from app.services.operational_alert_service import seed_system_rules + seed_system_rules(db2) + except Exception: + pass + finally: + db2.close() yield # Graceful shutdown of the background scheduler scheduler.shutdown(wait=False) @@ -151,6 +161,7 @@ app.include_router(risk_router.router, prefix="/api/v1") app.include_router(dashboard_router.router, prefix="/api/v1") app.include_router(api_keys_router.router, prefix="/api/v1") app.include_router(sso_router.router, prefix="/api/v1") +app.include_router(alerts_router.router, prefix="/api/v1") @app.get("/health", include_in_schema=False) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 9f7f314..e094e4c 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -43,6 +43,7 @@ from app.models.risk_intelligence import TechniqueRiskProfile from app.models.executive_dashboard import PostureSnapshot from app.models.api_key import ApiKey from app.models.sso_config import SsoConfig +from app.models.operational_alert import AlertRule, AlertInstance __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", @@ -69,4 +70,6 @@ __all__ = [ "PostureSnapshot", "ApiKey", "SsoConfig", + "AlertRule", + "AlertInstance", ] diff --git a/backend/app/models/operational_alert.py b/backend/app/models/operational_alert.py new file mode 100644 index 0000000..f507eb1 --- /dev/null +++ b/backend/app/models/operational_alert.py @@ -0,0 +1,144 @@ +"""Phase 13: Operational Alerts — AlertRule and AlertInstance models.""" + +import enum +import uuid +from datetime import datetime + +from sqlalchemy import ( + Boolean, Column, DateTime, ForeignKey, + Index, Integer, String, Text, +) +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import relationship + +from app.database import Base + + +# ── Enumerations ────────────────────────────────────────────────────────────── + +class AlertSeverity(str, enum.Enum): + critical = "critical" + high = "high" + medium = "medium" + low = "low" + info = "info" + + +class AlertStatus(str, enum.Enum): + open = "open" + acknowledged = "acknowledged" + resolved = "resolved" + dismissed = "dismissed" + + +class AlertRuleType(str, enum.Enum): + high_risk = "high_risk" # risk_score >= threshold + stale_technique = "stale_technique" # not validated in N days + coverage_regression = "coverage_regression" # coverage_pct dropped + low_coverage = "low_coverage" # coverage below min + expiry_wave = "expiry_wave" # many pending queue items + new_technique = "new_technique" # new MITRE techniques added + orphan_spike = "orphan_spike" # many unowned techniques + custom = "custom" # future extension placeholder + + +# ── AlertRule ───────────────────────────────────────────────────────────────── + +class AlertRule(Base): + """ + Defines a condition that, when satisfied, fires an AlertInstance. + + System rules (is_system=True) are seeded at startup and cannot be deleted. + Custom rules (is_system=False) can be created by admins. + """ + + __tablename__ = "alert_rules" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(300), nullable=False) + description = Column(Text, nullable=True) + rule_type = Column(String(50), nullable=False) + severity = Column(String(20), nullable=False, default=AlertSeverity.medium.value) + is_enabled = Column(Boolean, nullable=False, default=True) + is_system = Column(Boolean, nullable=False, default=False) # seeded, not deletable + + # Rule-specific thresholds/config (varies by rule_type) + config = Column(JSONB, nullable=False, default={}) + + # Delivery + notify_in_app = Column(Boolean, nullable=False, default=True) + notify_webhook = Column(Boolean, nullable=False, default=False) + webhook_id = Column( + UUID(as_uuid=True), + ForeignKey("webhook_configs.id", ondelete="SET NULL"), + nullable=True, + ) + + # Cooldown — don't re-fire within N hours of last firing + cooldown_hours = Column(Integer, nullable=False, default=24) + + # Meta + created_by = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + created_at = Column(DateTime, default=datetime.utcnow) + last_fired_at = Column(DateTime, nullable=True) + + creator = relationship("User", foreign_keys=[created_by]) + instances = relationship("AlertInstance", back_populates="rule", + cascade="all, delete-orphan") + + __table_args__ = ( + Index("ix_alert_rules_type", "rule_type"), + Index("ix_alert_rules_enabled", "is_enabled"), + ) + + +# ── AlertInstance ───────────────────────────────────────────────────────────── + +class AlertInstance(Base): + """ + A single firing of an AlertRule. + + Transitions: open → acknowledged → resolved + open → dismissed + """ + + __tablename__ = "alert_instances" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + rule_id = Column( + UUID(as_uuid=True), + ForeignKey("alert_rules.id", ondelete="SET NULL"), + nullable=True, + ) + # Denormalised fields kept for history even after rule deletion + rule_name = Column(String(300), nullable=False) + rule_type = Column(String(50), nullable=False) + severity = Column(String(20), nullable=False) + + title = Column(String(500), nullable=False) + message = Column(Text, nullable=False) + details = Column(JSONB, nullable=True) # structured context + + status = Column(String(20), nullable=False, default=AlertStatus.open.value) + acknowledged_by = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + acknowledged_at = Column(DateTime, nullable=True) + resolved_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + + rule = relationship("AlertRule", back_populates="instances") + acknowledger = relationship("User", foreign_keys=[acknowledged_by]) + + __table_args__ = ( + Index("ix_alert_instances_rule_id", "rule_id"), + Index("ix_alert_instances_status", "status"), + Index("ix_alert_instances_severity", "severity"), + Index("ix_alert_instances_created", "created_at"), + ) diff --git a/backend/app/routers/operational_alerts.py b/backend/app/routers/operational_alerts.py new file mode 100644 index 0000000..dc52f9b --- /dev/null +++ b/backend/app/routers/operational_alerts.py @@ -0,0 +1,191 @@ +"""Phase 13: Operational Alerts router.""" + +from typing import List, Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.models.user import User +from app.schemas.operational_alert_schema import ( + AlertRuleCreate, AlertRuleOut, AlertRuleUpdate, + AlertInstanceOut, EvaluationResult, AlertSummary, +) +import app.services.operational_alert_service as svc + +router = APIRouter(prefix="/alerts", tags=["Operational Alerts"]) + + +# ── Evaluation ──────────────────────────────────────────────────────────────── + +@router.post("/evaluate", response_model=EvaluationResult, status_code=202) +def evaluate_rules( + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + """ + Run the alert evaluation engine against all enabled rules. + + Fires AlertInstances for rules whose conditions are met and are not in cooldown. + Admin / leads only. + """ + result = svc.evaluate_all_rules(db) + return EvaluationResult( + rules_evaluated = result["rules_evaluated"], + alerts_fired = result["alerts_fired"], + alerts = [AlertInstanceOut.model_validate(a) for a in result["alerts"]], + duration_seconds = result["duration_seconds"], + ) + + +# ── Alert instances ─────────────────────────────────────────────────────────── + +@router.get("", response_model=List[AlertInstanceOut]) +def list_alerts( + status: Optional[str] = Query(None), + severity: Optional[str] = Query(None), + rule_type: Optional[str] = Query(None), + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0), + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """List alert instances with optional filters.""" + return svc.list_instances(db, status=status, severity=severity, + rule_type=rule_type, limit=limit, offset=offset) + + +@router.get("/summary", response_model=AlertSummary) +def alert_summary( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Aggregate counts by status, severity, and rule type.""" + data = svc.get_summary(db) + return AlertSummary( + total_open = data["total_open"], + total_acknowledged = data["total_acknowledged"], + total_resolved = data["total_resolved"], + by_severity = data["by_severity"], + by_rule_type = data["by_rule_type"], + recent_alerts = [AlertInstanceOut.model_validate(a) for a in data["recent_alerts"]], + ) + + +@router.get("/{alert_id}", response_model=AlertInstanceOut) +def get_alert( + alert_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Get a single alert instance.""" + return svc.get_instance(db, alert_id) + + +@router.post("/{alert_id}/acknowledge", response_model=AlertInstanceOut) +def acknowledge_alert( + alert_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Acknowledge an open alert.""" + return svc.acknowledge(db, alert_id, current_user.id) + + +@router.post("/{alert_id}/resolve", response_model=AlertInstanceOut) +def resolve_alert( + alert_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Mark an alert as resolved.""" + return svc.resolve(db, alert_id, current_user.id) + + +@router.post("/{alert_id}/dismiss", response_model=AlertInstanceOut) +def dismiss_alert( + alert_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Dismiss an alert (won't re-fire until cooldown resets).""" + return svc.dismiss(db, alert_id, current_user.id) + + +# ── Alert rules ─────────────────────────────────────────────────────────────── + +@router.get("/rules/list", response_model=List[AlertRuleOut]) +def list_rules( + rule_type: Optional[str] = Query(None), + include_disabled: bool = Query(False), + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """List alert rules (all users can read; admins/leads manage them).""" + return svc.list_rules(db, rule_type=rule_type, include_disabled=include_disabled) + + +@router.post("/rules", response_model=AlertRuleOut, status_code=201) +def create_rule( + body: AlertRuleCreate, + db: Session = Depends(get_db), + current_user: User = Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + """Create a custom alert rule.""" + return svc.create_rule( + db, + created_by = current_user.id, + name = body.name, + description = body.description, + rule_type = body.rule_type, + severity = body.severity, + config = body.config, + notify_in_app = body.notify_in_app, + notify_webhook = body.notify_webhook, + webhook_id = body.webhook_id, + cooldown_hours = body.cooldown_hours, + ) + + +@router.get("/rules/{rule_id}", response_model=AlertRuleOut) +def get_rule( + rule_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Get a single alert rule.""" + return svc.get_rule(db, rule_id) + + +@router.patch("/rules/{rule_id}", response_model=AlertRuleOut) +def update_rule( + rule_id: UUID, + body: AlertRuleUpdate, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + """Update an alert rule (enable/disable, thresholds, cooldown).""" + return svc.update_rule( + db, rule_id, + name = body.name, + description = body.description, + severity = body.severity, + is_enabled = body.is_enabled, + config = body.config, + notify_in_app = body.notify_in_app, + notify_webhook = body.notify_webhook, + webhook_id = body.webhook_id, + cooldown_hours = body.cooldown_hours, + ) + + +@router.delete("/rules/{rule_id}", status_code=204) +def delete_rule( + rule_id: UUID, + db: Session = Depends(get_db), + user=Depends(require_any_role("admin")), +): + """Delete a custom alert rule (system rules cannot be deleted).""" + svc.delete_rule(db, rule_id) diff --git a/backend/app/schemas/operational_alert_schema.py b/backend/app/schemas/operational_alert_schema.py new file mode 100644 index 0000000..6a8c919 --- /dev/null +++ b/backend/app/schemas/operational_alert_schema.py @@ -0,0 +1,124 @@ +"""Phase 13: Operational Alerts — Pydantic schemas.""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Optional +from uuid import UUID + +from pydantic import BaseModel, Field, field_validator + +from app.models.operational_alert import AlertRuleType, AlertSeverity, AlertStatus + +VALID_SEVERITIES = {s.value for s in AlertSeverity} +VALID_STATUSES = {s.value for s in AlertStatus} +VALID_RULE_TYPES = {r.value for r in AlertRuleType} + + +# ── AlertRule schemas ───────────────────────────────────────────────────────── + +class AlertRuleCreate(BaseModel): + name: str = Field(..., min_length=1, max_length=300) + description: Optional[str] = None + rule_type: str + severity: str = "medium" + config: Dict[str, Any] = Field(default_factory=dict) + notify_in_app: bool = True + notify_webhook: bool = False + webhook_id: Optional[UUID] = None + cooldown_hours: int = Field(24, ge=0, le=8760) + + @field_validator("rule_type") + @classmethod + def validate_rule_type(cls, v: str) -> str: + if v not in VALID_RULE_TYPES: + raise ValueError(f"Invalid rule_type. Valid: {VALID_RULE_TYPES}") + return v + + @field_validator("severity") + @classmethod + def validate_severity(cls, v: str) -> str: + if v not in VALID_SEVERITIES: + raise ValueError(f"Invalid severity. Valid: {VALID_SEVERITIES}") + return v + + +class AlertRuleUpdate(BaseModel): + name: Optional[str] = Field(None, min_length=1, max_length=300) + description: Optional[str] = None + severity: Optional[str] = None + is_enabled: Optional[bool] = None + config: Optional[Dict[str, Any]] = None + notify_in_app: Optional[bool] = None + notify_webhook: Optional[bool] = None + webhook_id: Optional[UUID] = None + cooldown_hours: Optional[int] = Field(None, ge=0, le=8760) + + @field_validator("severity") + @classmethod + def validate_severity(cls, v: Optional[str]) -> Optional[str]: + if v is not None and v not in VALID_SEVERITIES: + raise ValueError(f"Invalid severity. Valid: {VALID_SEVERITIES}") + return v + + +class AlertRuleOut(BaseModel): + id: UUID + name: str + description: Optional[str] = None + rule_type: str + severity: str + is_enabled: bool + is_system: bool + config: Dict[str, Any] + notify_in_app: bool + notify_webhook: bool + webhook_id: Optional[UUID] = None + cooldown_hours: int + created_by: Optional[UUID] = None + created_at: Optional[datetime] = None + last_fired_at: Optional[datetime] = None + + class Config: + from_attributes = True + + +# ── AlertInstance schemas ───────────────────────────────────────────────────── + +class AlertInstanceOut(BaseModel): + id: UUID + rule_id: Optional[UUID] = None + rule_name: str + rule_type: str + severity: str + title: str + message: str + details: Optional[Dict[str, Any]] = None + status: str + acknowledged_by: Optional[UUID] = None + acknowledged_at: Optional[datetime] = None + resolved_at: Optional[datetime] = None + created_at: Optional[datetime] = None + + class Config: + from_attributes = True + + +# ── Evaluation result ───────────────────────────────────────────────────────── + +class EvaluationResult(BaseModel): + rules_evaluated: int + alerts_fired: int + alerts: List[AlertInstanceOut] = Field(default_factory=list) + duration_seconds: float + + +# ── Summary ─────────────────────────────────────────────────────────────────── + +class AlertSummary(BaseModel): + total_open: int + total_acknowledged: int + total_resolved: int + by_severity: Dict[str, int] + by_rule_type: Dict[str, int] + recent_alerts: List[AlertInstanceOut] = Field(default_factory=list) diff --git a/backend/app/services/operational_alert_service.py b/backend/app/services/operational_alert_service.py new file mode 100644 index 0000000..7962837 --- /dev/null +++ b/backend/app/services/operational_alert_service.py @@ -0,0 +1,530 @@ +"""Phase 13: Operational Alert service — rule evaluation engine + CRUD.""" + +from __future__ import annotations + +import logging +import time +from datetime import datetime, timedelta +from typing import List, Optional +from uuid import UUID + +from sqlalchemy.orm import Session + +from app.domain.errors import EntityNotFoundError, BusinessRuleViolation +from app.models.operational_alert import ( + AlertInstance, AlertRule, AlertRuleType, AlertSeverity, AlertStatus, +) +from app.models.technique import Technique +from app.models.risk_intelligence import TechniqueRiskProfile +from app.models.ownership_queue import RevalidationQueueItem, QueueStatus +from app.models.ownership_queue import TechniqueOwnership +from app.models.executive_dashboard import PostureSnapshot +from app.models.enums import TechniqueStatus + +log = logging.getLogger(__name__) + +# ── Pre-configured system rules (seeded at startup) ─────────────────────────── + +SYSTEM_RULES = [ + { + "name": "Critical Risk Techniques", + "description": "Fires when 3 or more techniques reach critical risk level (score ≥ 75).", + "rule_type": AlertRuleType.high_risk.value, + "severity": AlertSeverity.critical.value, + "is_system": True, + "config": {"min_risk_score": 75.0, "min_count": 3}, + "cooldown_hours": 24, + }, + { + "name": "High-Risk Technique Spike", + "description": "Fires when 10 or more techniques reach high risk (score ≥ 50).", + "rule_type": AlertRuleType.high_risk.value, + "severity": AlertSeverity.high.value, + "is_system": True, + "config": {"min_risk_score": 50.0, "min_count": 10}, + "cooldown_hours": 24, + }, + { + "name": "Stale Technique Detection", + "description": "Fires when 5+ validated techniques have not been reviewed in 30+ days.", + "rule_type": AlertRuleType.stale_technique.value, + "severity": AlertSeverity.medium.value, + "is_system": True, + "config": {"days_stale": 30, "min_count": 5}, + "cooldown_hours": 48, + }, + { + "name": "Coverage Regression", + "description": "Fires when coverage drops by 5 or more percentage points between daily snapshots.", + "rule_type": AlertRuleType.coverage_regression.value, + "severity": AlertSeverity.high.value, + "is_system": True, + "config": {"min_drop_pct": 5.0}, + "cooldown_hours": 12, + }, + { + "name": "Low Coverage Warning", + "description": "Fires when overall coverage falls below 30%.", + "rule_type": AlertRuleType.low_coverage.value, + "severity": AlertSeverity.medium.value, + "is_system": True, + "config": {"max_coverage_pct": 30.0}, + "cooldown_hours": 72, + }, + { + "name": "Revalidation Queue Backlog", + "description": "Fires when 15+ techniques are waiting in the revalidation queue.", + "rule_type": AlertRuleType.expiry_wave.value, + "severity": AlertSeverity.medium.value, + "is_system": True, + "config": {"min_pending_count": 15}, + "cooldown_hours": 24, + }, + { + "name": "New MITRE Techniques Detected", + "description": "Fires when new ATT&CK techniques are added in the last 7 days.", + "rule_type": AlertRuleType.new_technique.value, + "severity": AlertSeverity.info.value, + "is_system": True, + "config": {"lookback_days": 7, "min_count": 1}, + "cooldown_hours": 168, # once a week + }, + { + "name": "Orphan Technique Spike", + "description": "Fires when 20+ techniques have no assigned owner.", + "rule_type": AlertRuleType.orphan_spike.value, + "severity": AlertSeverity.low.value, + "is_system": True, + "config": {"min_orphan_count": 20}, + "cooldown_hours": 48, + }, +] + + +def seed_system_rules(db: Session) -> int: + """Ensure all system rules exist (idempotent). Returns count created.""" + created = 0 + for rule_def in SYSTEM_RULES: + exists = db.query(AlertRule).filter( + AlertRule.name == rule_def["name"], + AlertRule.is_system == True, + ).first() + if not exists: + rule = AlertRule(**rule_def) + db.add(rule) + created += 1 + if created: + db.commit() + return created + + +# ── Rule evaluators (one per AlertRuleType) ─────────────────────────────────── + +def _eval_high_risk(db: Session, rule: AlertRule) -> Optional[dict]: + min_score = float(rule.config.get("min_risk_score", 75.0)) + min_count = int(rule.config.get("min_count", 1)) + + profiles = db.query(TechniqueRiskProfile).filter( + TechniqueRiskProfile.risk_score >= min_score, + ).all() + count = len(profiles) + if count < min_count: + return None + + top = sorted(profiles, key=lambda p: p.risk_score, reverse=True)[:5] + return { + "title": f"{count} technique(s) with risk score ≥ {min_score:.0f}", + "message": ( + f"{count} technique(s) have reached risk score ≥ {min_score:.0f}. " + f"Top: {', '.join(str(p.technique_id)[:8] + '…' for p in top[:3])}." + ), + "details": { + "count": count, + "threshold": min_score, + "top_ids": [str(p.technique_id) for p in top], + "top_scores": [p.risk_score for p in top], + }, + } + + +def _eval_stale_technique(db: Session, rule: AlertRule) -> Optional[dict]: + days_stale = int(rule.config.get("days_stale", 30)) + min_count = int(rule.config.get("min_count", 1)) + cutoff = datetime.utcnow() - timedelta(days=days_stale) + + stale = db.query(Technique).filter( + Technique.status_global == TechniqueStatus.validated, + Technique.last_review_date < cutoff, + ).all() + count = len(stale) + if count < min_count: + return None + + return { + "title": f"{count} validated technique(s) stale for {days_stale}+ days", + "message": ( + f"{count} technique(s) have been validated but not reviewed in over " + f"{days_stale} days. Re-validate to maintain confidence." + ), + "details": { + "count": count, + "days_stale": days_stale, + "example_ids": [str(t.id) for t in stale[:10]], + }, + } + + +def _eval_coverage_regression(db: Session, rule: AlertRule) -> Optional[dict]: + min_drop = float(rule.config.get("min_drop_pct", 5.0)) + + snaps = ( + db.query(PostureSnapshot) + .order_by(PostureSnapshot.snapshot_date.desc()) + .limit(2) + .all() + ) + if len(snaps) < 2: + return None + + latest, previous = snaps[0], snaps[1] + drop = previous.coverage_pct - latest.coverage_pct + if drop < min_drop: + return None + + return { + "title": f"Coverage dropped {drop:.1f}% ({previous.coverage_pct:.1f}% → {latest.coverage_pct:.1f}%)", + "message": ( + f"Overall coverage fell by {drop:.1f} percentage points " + f"between {previous.snapshot_date} and {latest.snapshot_date}. " + f"Investigate recent technique status changes." + ), + "details": { + "previous_pct": previous.coverage_pct, + "current_pct": latest.coverage_pct, + "drop_pct": round(drop, 2), + "previous_date": str(previous.snapshot_date), + "current_date": str(latest.snapshot_date), + }, + } + + +def _eval_low_coverage(db: Session, rule: AlertRule) -> Optional[dict]: + max_pct = float(rule.config.get("max_coverage_pct", 30.0)) + techniques = db.query(Technique).all() + total = len(techniques) + if total == 0: + return None + + validated = sum(1 for t in techniques if t.status_global == TechniqueStatus.validated) + partial = sum(1 for t in techniques if t.status_global == TechniqueStatus.partial) + coverage = (validated + partial * 0.5) / total * 100.0 + + if coverage > max_pct: + return None + + return { + "title": f"Coverage is critically low: {coverage:.1f}%", + "message": ( + f"Current detection coverage is {coverage:.1f}%, below the minimum " + f"threshold of {max_pct:.0f}%. Prioritise coverage improvements." + ), + "details": { + "coverage_pct": round(coverage, 2), + "threshold": max_pct, + "validated": validated, + "partial": partial, + "total": total, + }, + } + + +def _eval_expiry_wave(db: Session, rule: AlertRule) -> Optional[dict]: + min_pending = int(rule.config.get("min_pending_count", 15)) + + pending_count = db.query(RevalidationQueueItem).filter( + RevalidationQueueItem.status.in_([ + QueueStatus.pending, QueueStatus.in_progress, + ]), + ).count() + + if pending_count < min_pending: + return None + + return { + "title": f"Revalidation queue backlog: {pending_count} items pending", + "message": ( + f"{pending_count} technique(s) are waiting in the revalidation queue " + f"(threshold: {min_pending}). Assign analysts to clear the backlog." + ), + "details": { + "pending_count": pending_count, + "threshold": min_pending, + }, + } + + +def _eval_new_technique(db: Session, rule: AlertRule) -> Optional[dict]: + lookback_days = int(rule.config.get("lookback_days", 7)) + min_count = int(rule.config.get("min_count", 1)) + cutoff = datetime.utcnow() - timedelta(days=lookback_days) + + new_techs = db.query(Technique).filter( + Technique.mitre_last_modified >= cutoff, + ).all() + count = len(new_techs) + if count < min_count: + return None + + return { + "title": f"{count} new/updated MITRE technique(s) in last {lookback_days} days", + "message": ( + f"{count} ATT&CK technique(s) have been added or updated in the last " + f"{lookback_days} days. Review and assign coverage." + ), + "details": { + "count": count, + "lookback_days": lookback_days, + "technique_ids": [str(t.id) for t in new_techs[:20]], + "mitre_ids": [t.mitre_id for t in new_techs[:20]], + }, + } + + +def _eval_orphan_spike(db: Session, rule: AlertRule) -> Optional[dict]: + min_orphans = int(rule.config.get("min_orphan_count", 20)) + + total = db.query(Technique).count() + owned = db.query(TechniqueOwnership).filter( + TechniqueOwnership.owner_id.isnot(None), + ).count() + orphans = max(total - owned, 0) + + if orphans < min_orphans: + return None + + return { + "title": f"{orphans} unowned techniques detected", + "message": ( + f"{orphans} out of {total} technique(s) have no assigned owner. " + f"Assign ownership to ensure accountability." + ), + "details": { + "orphan_count": orphans, + "total": total, + "threshold": min_orphans, + }, + } + + +_EVALUATORS = { + AlertRuleType.high_risk.value: _eval_high_risk, + AlertRuleType.stale_technique.value: _eval_stale_technique, + AlertRuleType.coverage_regression.value: _eval_coverage_regression, + AlertRuleType.low_coverage.value: _eval_low_coverage, + AlertRuleType.expiry_wave.value: _eval_expiry_wave, + AlertRuleType.new_technique.value: _eval_new_technique, + AlertRuleType.orphan_spike.value: _eval_orphan_spike, +} + + +# ── Core evaluation engine ──────────────────────────────────────────────────── + +def _in_cooldown(rule: AlertRule) -> bool: + if rule.last_fired_at is None: + return False + if rule.cooldown_hours <= 0: + return False + return datetime.utcnow() < rule.last_fired_at + timedelta(hours=rule.cooldown_hours) + + +def evaluate_all_rules(db: Session) -> dict: + """Evaluate every enabled rule; create AlertInstances for those that fire.""" + t0 = time.monotonic() + rules = db.query(AlertRule).filter(AlertRule.is_enabled == True).all() + + fired: List[AlertInstance] = [] + for rule in rules: + if _in_cooldown(rule): + continue + evaluator = _EVALUATORS.get(rule.rule_type) + if not evaluator: + continue + try: + result = evaluator(db, rule) + except Exception: + log.exception("Error evaluating rule %s (%s)", rule.id, rule.name) + continue + + if result is None: + continue # condition not met + + instance = AlertInstance( + rule_id = rule.id, + rule_name = rule.name, + rule_type = rule.rule_type, + severity = rule.severity, + title = result["title"], + message = result["message"], + details = result.get("details"), + status = AlertStatus.open.value, + ) + db.add(instance) + rule.last_fired_at = datetime.utcnow() + fired.append(instance) + + db.commit() + for inst in fired: + db.refresh(inst) + + return { + "rules_evaluated": len(rules), + "alerts_fired": len(fired), + "alerts": fired, + "duration_seconds": round(time.monotonic() - t0, 3), + } + + +# ── AlertRule CRUD ──────────────────────────────────────────────────────────── + +def list_rules( + db: Session, + rule_type: Optional[str] = None, + include_disabled: bool = False, +) -> List[AlertRule]: + q = db.query(AlertRule) + if rule_type: + q = q.filter(AlertRule.rule_type == rule_type) + if not include_disabled: + q = q.filter(AlertRule.is_enabled == True) + return q.order_by(AlertRule.created_at.asc()).all() + + +def get_rule(db: Session, rule_id: UUID) -> AlertRule: + rule = db.query(AlertRule).filter(AlertRule.id == rule_id).first() + if not rule: + raise EntityNotFoundError("AlertRule", str(rule_id)) + return rule + + +def create_rule(db: Session, created_by: UUID, **kwargs) -> AlertRule: + kwargs["is_system"] = False + kwargs["created_by"] = created_by + rule = AlertRule(**kwargs) + db.add(rule) + db.commit() + db.refresh(rule) + return rule + + +def update_rule(db: Session, rule_id: UUID, **kwargs) -> AlertRule: + rule = get_rule(db, rule_id) + for k, v in kwargs.items(): + if v is not None: + setattr(rule, k, v) + db.commit() + db.refresh(rule) + return rule + + +def delete_rule(db: Session, rule_id: UUID) -> None: + rule = get_rule(db, rule_id) + if rule.is_system: + raise BusinessRuleViolation("System rules cannot be deleted. Disable them instead.") + db.delete(rule) + db.commit() + + +# ── AlertInstance CRUD ──────────────────────────────────────────────────────── + +def list_instances( + db: Session, + status: Optional[str] = None, + severity: Optional[str] = None, + rule_type: Optional[str] = None, + limit: int = 50, + offset: int = 0, +) -> List[AlertInstance]: + q = db.query(AlertInstance) + if status: + q = q.filter(AlertInstance.status == status) + if severity: + q = q.filter(AlertInstance.severity == severity) + if rule_type: + q = q.filter(AlertInstance.rule_type == rule_type) + return q.order_by(AlertInstance.created_at.desc()).offset(offset).limit(limit).all() + + +def get_instance(db: Session, instance_id: UUID) -> AlertInstance: + inst = db.query(AlertInstance).filter(AlertInstance.id == instance_id).first() + if not inst: + raise EntityNotFoundError("AlertInstance", str(instance_id)) + return inst + + +def _transition( + db: Session, + instance_id: UUID, + new_status: str, + user_id: Optional[UUID] = None, +) -> AlertInstance: + inst = get_instance(db, instance_id) + inst.status = new_status + if new_status == AlertStatus.acknowledged.value: + inst.acknowledged_by = user_id + inst.acknowledged_at = datetime.utcnow() + elif new_status == AlertStatus.resolved.value: + inst.resolved_at = datetime.utcnow() + db.commit() + db.refresh(inst) + return inst + + +def acknowledge(db: Session, instance_id: UUID, user_id: UUID) -> AlertInstance: + inst = get_instance(db, instance_id) + if inst.status != AlertStatus.open.value: + raise BusinessRuleViolation(f"Cannot acknowledge alert in status '{inst.status}'.") + return _transition(db, instance_id, AlertStatus.acknowledged.value, user_id) + + +def resolve(db: Session, instance_id: UUID, user_id: UUID) -> AlertInstance: + inst = get_instance(db, instance_id) + if inst.status == AlertStatus.resolved.value: + raise BusinessRuleViolation("Alert is already resolved.") + return _transition(db, instance_id, AlertStatus.resolved.value, user_id) + + +def dismiss(db: Session, instance_id: UUID, user_id: UUID) -> AlertInstance: + inst = get_instance(db, instance_id) + if inst.status in (AlertStatus.resolved.value, AlertStatus.dismissed.value): + raise BusinessRuleViolation(f"Cannot dismiss alert in status '{inst.status}'.") + return _transition(db, instance_id, AlertStatus.dismissed.value, user_id) + + +def get_summary(db: Session) -> dict: + instances = db.query(AlertInstance).all() + + by_status = {s.value: 0 for s in AlertStatus} + by_severity = {s.value: 0 for s in AlertSeverity} + by_type = {} + + for i in instances: + by_status[i.status] = by_status.get(i.status, 0) + 1 + by_severity[i.severity] = by_severity.get(i.severity, 0) + 1 + by_type[i.rule_type] = by_type.get(i.rule_type, 0) + 1 + + recent = ( + db.query(AlertInstance) + .filter(AlertInstance.status == AlertStatus.open.value) + .order_by(AlertInstance.created_at.desc()) + .limit(5) + .all() + ) + + return { + "total_open": by_status.get(AlertStatus.open.value, 0), + "total_acknowledged": by_status.get(AlertStatus.acknowledged.value, 0), + "total_resolved": by_status.get(AlertStatus.resolved.value, 0), + "by_severity": by_severity, + "by_rule_type": by_type, + "recent_alerts": recent, + } diff --git a/scripts/qa_phase13_alerts.py b/scripts/qa_phase13_alerts.py new file mode 100644 index 0000000..0cb8c55 --- /dev/null +++ b/scripts/qa_phase13_alerts.py @@ -0,0 +1,302 @@ +""" +QA script for Phase 13 — Operational Alerts. +Run with: python -X utf8 scripts/qa_phase13_alerts.py +""" + +import sys, os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests + +BASE = "http://localhost:8000/api/v1" +PASS = "\033[92m✓\033[0m" +FAIL = "\033[91m✗\033[0m" + +passed = 0 +failed = 0 +VALID_SEVERITIES = {"critical", "high", "medium", "low", "info"} +VALID_STATUSES = {"open", "acknowledged", "resolved", "dismissed"} + + +def check(label: str, condition: bool, detail: str = ""): + global passed, failed + if condition: + passed += 1 + print(f" {PASS} {label}") + else: + failed += 1 + msg = f" {FAIL} {label}" + if detail: + msg += f" — {detail}" + print(msg) + + +def get_token(username="administrator", password="admin123"): + r = requests.post(f"{BASE}/auth/login", + data={"username": username, "password": password}) + if r.status_code == 200: + return r.json().get("access_token") or r.json().get("token") + raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}") + + +def auth(token): + return {"Authorization": f"Bearer {token}"} + + +# ───────────────────────────────────────────────────────────────────────────── + +def main(): + print("\n====== Phase 13 QA — Operational Alerts ======\n") + + token = get_token() + h = auth(token) + + # ── Block 1: System rules seeded ───────────────────────────────────────── + print("── Block 1: System rules seeded at startup ──") + + r = requests.get(f"{BASE}/alerts/rules/list", headers=h, + params={"include_disabled": True}) + check("GET /alerts/rules/list → 200", r.status_code == 200, r.text[:150]) + rules = r.json() if r.status_code == 200 else [] + check("At least 8 system rules seeded", len(rules) >= 8, + f"got {len(rules)}") + system_rules = [r for r in rules if r.get("is_system")] + check("All seeded rules are system rules", len(system_rules) == len(rules)) + check("Rules have valid severities", + all(r["severity"] in VALID_SEVERITIES for r in rules)) + check("Rules have config dict", + all(isinstance(r.get("config"), dict) for r in rules)) + + rule_types = {r["rule_type"] for r in rules} + for rt in ("high_risk", "stale_technique", "coverage_regression", + "expiry_wave", "new_technique", "orphan_spike"): + check(f"Rule type '{rt}' present", rt in rule_types) + + print() + + # ── Block 2: Evaluate rules ─────────────────────────────────────────────── + print("── Block 2: Evaluate alert rules ──") + + r = requests.post(f"{BASE}/alerts/evaluate", headers=h) + check("POST /alerts/evaluate → 202", r.status_code == 202, r.text[:200]) + result = r.json() if r.status_code == 202 else {} + check("rules_evaluated > 0", result.get("rules_evaluated", 0) > 0, + f"got {result.get('rules_evaluated')}") + check("alerts_fired is int", isinstance(result.get("alerts_fired"), int)) + check("duration_seconds present", "duration_seconds" in result) + check("alerts list present", isinstance(result.get("alerts"), list)) + print(f" fired={result.get('alerts_fired')} alerts in " + f"{result.get('duration_seconds', '?')}s") + + if result.get("alerts"): + a0 = result["alerts"][0] + check("Alert has title", bool(a0.get("title"))) + check("Alert has message", bool(a0.get("message"))) + check("Alert has severity", a0.get("severity") in VALID_SEVERITIES) + check("Alert status = open", a0.get("status") == "open") + + print() + + # ── Block 3: List & Get alerts ──────────────────────────────────────────── + print("── Block 3: List & Get alert instances ──") + + r = requests.get(f"{BASE}/alerts", headers=h) + check("GET /alerts → 200", r.status_code == 200, r.text[:150]) + alerts = r.json() if r.status_code == 200 else [] + check("Alerts is list", isinstance(alerts, list)) + + # Filter by status=open + r = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open"}) + check("GET /alerts?status=open → 200", r.status_code == 200) + open_alerts = r.json() if r.status_code == 200 else [] + if open_alerts: + check("All filtered alerts have status=open", + all(a["status"] == "open" for a in open_alerts)) + + # Filter by severity + r = requests.get(f"{BASE}/alerts", headers=h, params={"severity": "critical"}) + check("GET /alerts?severity=critical → 200", r.status_code == 200) + + # Pagination + r = requests.get(f"{BASE}/alerts", headers=h, params={"limit": 3, "offset": 0}) + check("GET /alerts?limit=3 returns ≤3", r.status_code == 200 and len(r.json()) <= 3) + + # Get single (use first from open_alerts if available) + if open_alerts: + aid = open_alerts[0]["id"] + r = requests.get(f"{BASE}/alerts/{aid}", headers=h) + check("GET /alerts/{id} → 200", r.status_code == 200) + check("Correct id returned", r.json().get("id") == aid) + + # 404 for non-existent + r = requests.get( + f"{BASE}/alerts/00000000-0000-0000-0000-000000000001", headers=h + ) + check("GET non-existent alert → 404", r.status_code == 404) + + print() + + # ── Block 4: Alert lifecycle (acknowledge → resolve) ────────────────────── + print("── Block 4: Alert lifecycle ──") + + # Re-evaluate to ensure we have open alerts (reset cooldowns first by creating custom rule) + r_open = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open", "limit": 1}) + open_now = r_open.json() if r_open.status_code == 200 else [] + + if open_now: + aid = open_now[0]["id"] + + # Acknowledge + r = requests.post(f"{BASE}/alerts/{aid}/acknowledge", headers=h) + check("POST /alerts/{id}/acknowledge → 200", r.status_code == 200, + r.text[:150]) + ack = r.json() if r.status_code == 200 else {} + check("Status = acknowledged after ack", ack.get("status") == "acknowledged") + check("acknowledged_at set", bool(ack.get("acknowledged_at"))) + check("acknowledged_by set", bool(ack.get("acknowledged_by"))) + + # Can't acknowledge again + r = requests.post(f"{BASE}/alerts/{aid}/acknowledge", headers=h) + check("Double-acknowledge → 400", r.status_code == 400) + + # Resolve + r = requests.post(f"{BASE}/alerts/{aid}/resolve", headers=h) + check("POST /alerts/{id}/resolve → 200", r.status_code == 200, + r.text[:150]) + res = r.json() if r.status_code == 200 else {} + check("Status = resolved after resolve", res.get("status") == "resolved") + check("resolved_at set", bool(res.get("resolved_at"))) + + # Can't resolve again + r = requests.post(f"{BASE}/alerts/{aid}/resolve", headers=h) + check("Double-resolve → 400", r.status_code == 400) + else: + print(" (no open alerts to test lifecycle — skipping acknowledge/resolve)") + + # Test dismiss on a different alert + r_open2 = requests.get(f"{BASE}/alerts", headers=h, params={"status": "open", "limit": 1}) + open_now2 = r_open2.json() if r_open2.status_code == 200 else [] + if open_now2: + aid2 = open_now2[0]["id"] + r = requests.post(f"{BASE}/alerts/{aid2}/dismiss", headers=h) + check("POST /alerts/{id}/dismiss → 200", r.status_code == 200, r.text[:100]) + dis = r.json() if r.status_code == 200 else {} + check("Status = dismissed", dis.get("status") == "dismissed") + else: + print(" (no open alerts to dismiss — skipping)") + + print() + + # ── Block 5: Summary ────────────────────────────────────────────────────── + print("── Block 5: Alert summary ──") + + r = requests.get(f"{BASE}/alerts/summary", headers=h) + check("GET /alerts/summary → 200", r.status_code == 200, r.text[:150]) + summary = r.json() if r.status_code == 200 else {} + check("total_open is int", isinstance(summary.get("total_open"), int)) + check("total_acknowledged is int", isinstance(summary.get("total_acknowledged"), int)) + check("total_resolved is int", isinstance(summary.get("total_resolved"), int)) + check("by_severity has all levels", + all(k in summary.get("by_severity", {}) + for k in ("critical", "high", "medium", "low", "info"))) + check("by_rule_type is dict", isinstance(summary.get("by_rule_type"), dict)) + check("recent_alerts is list", isinstance(summary.get("recent_alerts"), list)) + + print() + + # ── Block 6: Custom rule CRUD ───────────────────────────────────────────── + print("── Block 6: Custom rule CRUD ──") + + custom_rule_body = { + "name": "QA Custom Rule", + "description": "Temporary rule for QA testing", + "rule_type": "high_risk", + "severity": "low", + "config": {"min_risk_score": 99.9, "min_count": 9999}, # won't fire + "cooldown_hours": 0, + } + r = requests.post(f"{BASE}/alerts/rules", headers=h, json=custom_rule_body) + check("POST /alerts/rules → 201", r.status_code == 201, r.text[:200]) + custom = r.json() if r.status_code == 201 else {} + check("Custom rule is_system=False", custom.get("is_system") == False) + custom_id = custom.get("id", "") + + # Get it + if custom_id: + r = requests.get(f"{BASE}/alerts/rules/{custom_id}", headers=h) + check("GET /alerts/rules/{id} → 200", r.status_code == 200) + + # Update: disable it + if custom_id: + r = requests.patch(f"{BASE}/alerts/rules/{custom_id}", headers=h, + json={"is_enabled": False, "severity": "info"}) + check("PATCH /alerts/rules/{id} → 200", r.status_code == 200, r.text[:100]) + upd = r.json() if r.status_code == 200 else {} + check("is_enabled = False after patch", upd.get("is_enabled") == False) + check("severity = info after patch", upd.get("severity") == "info") + + # System rule cannot be deleted + system_rule_id = next((r["id"] for r in rules if r.get("is_system")), None) + if system_rule_id: + r = requests.delete(f"{BASE}/alerts/rules/{system_rule_id}", headers=h) + check("DELETE system rule → 400", r.status_code == 400) + + # Delete custom rule + if custom_id: + r = requests.delete(f"{BASE}/alerts/rules/{custom_id}", headers=h) + check("DELETE custom rule → 204", r.status_code == 204) + + # Invalid rule_type → 422 + r = requests.post(f"{BASE}/alerts/rules", headers=h, json={ + "name": "Bad", "rule_type": "nonexistent", "severity": "low", "config": {}, + }) + check("POST rule with invalid rule_type → 422", r.status_code == 422) + + print() + + # ── Block 7: Auth protection ────────────────────────────────────────────── + print("── Block 7: Auth protection ──") + + for method, url in [ + ("GET", f"{BASE}/alerts"), + ("GET", f"{BASE}/alerts/summary"), + ("GET", f"{BASE}/alerts/rules/list"), + ]: + r = requests.request(method, url) + ep = url.split("/api/v1")[1] + check(f"{method} {ep} without auth → 401", r.status_code == 401) + + r = requests.post(f"{BASE}/alerts/evaluate") + check("POST /alerts/evaluate without auth → 401", r.status_code == 401) + + print() + + # ── Block 8: Regression ─────────────────────────────────────────────────── + print("── Block 8: Regression ──") + + r = requests.get(f"{BASE}/dashboard/kpis", headers=h) + check("GET /dashboard/kpis still works", r.status_code == 200) + + r = requests.get(f"{BASE}/risk/summary", headers=h) + check("GET /risk/summary still works", r.status_code == 200) + + r = requests.get(f"{BASE}/api-keys", headers=h) + check("GET /api-keys still works", r.status_code == 200) + + r = requests.get(f"{BASE}/techniques", headers=h) + check("GET /techniques still works", r.status_code == 200) + + print() + + # ── Summary ─────────────────────────────────────────────────────────────── + total = passed + failed + print(f"====== Results: {passed}/{total} passed", end="") + if failed: + print(f" — \033[91m{failed} FAILED\033[0m ======\n") + sys.exit(1) + else: + print(" ✓ ALL PASSED ======\n") + + +if __name__ == "__main__": + main()