Files
Aegis/backend/app/routers/operational_metrics.py
T
kitos bb8b9a6a72
Aegis CI / lint-and-test (push) Waiting to run
Snyk Security Scan / Python vulnerabilities (backend) (push) Waiting to run
Snyk Security Scan / npm vulnerabilities (frontend) (push) Waiting to run
Snyk Security Scan / Docker image vulnerabilities (backend) (push) Waiting to run
feat(dashboard): time range filter for operational metrics (30d/90d/6m/1y/all)
2026-06-19 10:41:22 +02:00

91 lines
3.1 KiB
Python

"""Operational metrics endpoints — MTTD, MTTR, Detection Efficacy, and more.
Provides operational KPIs for security teams with trend analysis
and team-level breakdowns.
"""
# Import APIRouter, Depends, Query from fastapi
from fastapi import APIRouter, Depends, Query
# Import Session from sqlalchemy.orm
from sqlalchemy.orm import Session
# Import get_db from app.database
from app.database import get_db
# Import get_current_user from app.dependencies.auth
from app.dependencies.auth import get_current_user
# Import User from app.models.user
from app.models.user import User
from datetime import datetime, date
# Import from app.services.operational_metrics_service
from app.services.operational_metrics_service import (
get_all_operational_metrics,
get_metrics_by_team,
get_operational_trend,
)
# Assign router = APIRouter(prefix="/metrics/operational", tags=["operational-metrics"])
router = APIRouter(prefix="/metrics/operational", tags=["operational-metrics"])
# ── GET /metrics/operational ──────────────────────────────────────────
@router.get("")
def operational_metrics(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
since: str | None = Query(None, description="ISO date YYYY-MM-DD — filter metrics to tests on or after this date"),
) -> dict:
"""Get all operational metrics (MTTD, MTTR, etc.). Uses cache when no time filter is set."""
if since:
try:
since_dt = datetime.combine(date.fromisoformat(since), datetime.min.time())
except ValueError:
since_dt = None
return get_all_operational_metrics(db, since_dt)
from app.services.score_cache import get_operational_metrics_cached
return get_operational_metrics_cached(db)
# ── GET /metrics/operational/trend ────────────────────────────────────
@router.get("/trend")
# Define function operational_trend
def operational_trend(
# Entry: period
period: str = Query("90d", pattern="^(30d|90d|1y)$"),
# Entry: db
db: Session = Depends(get_db),
# Entry: current_user
current_user: User = Depends(get_current_user),
) -> dict:
"""Get weekly trend data for operational metrics."""
# Return get_operational_trend(db, period)
return get_operational_trend(db, period)
# ── GET /metrics/operational/by-team ──────────────────────────────────
@router.get("/by-team")
def metrics_by_team(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
since: str | None = Query(None, description="ISO date YYYY-MM-DD — filter to tests on or after this date"),
) -> dict:
"""Get metrics broken down by Red Team vs Blue Team."""
since_dt = None
if since:
try:
since_dt = datetime.combine(date.fromisoformat(since), datetime.min.time())
except ValueError:
pass
return get_metrics_by_team(db, since_dt)