"""Analytics endpoints — flat JSON optimized for PowerBI / BI tools. Returns complete datasets without pagination so BI tools can ingest directly from URL. All endpoints require authentication. """ from fastapi import APIRouter, Depends, Query from sqlalchemy import func from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user, require_any_role from app.models.coverage_snapshot import CoverageSnapshot from app.models.technique import Technique from app.models.test import Test from app.models.user import User router = APIRouter(prefix="/analytics", tags=["analytics"]) @router.get("/coverage") def analytics_coverage( db: Session = Depends(get_db), user: User = Depends(get_current_user), ): """Coverage per technique — flat format for BI dashboards.""" techniques = db.query(Technique).all() return [ { "mitre_id": t.mitre_id, "name": t.name, "tactic": t.tactic, "status": t.status_global.value if t.status_global else "not_evaluated", "is_subtechnique": t.is_subtechnique, "test_count": len(t.tests) if t.tests else 0, "review_required": t.review_required, "last_review_date": ( t.last_review_date.isoformat() if t.last_review_date else None ), } for t in techniques ] @router.get("/tests") def analytics_tests( date_from: str = Query(None, description="ISO date filter (>=)"), date_to: str = Query(None, description="ISO date filter (<=)"), db: Session = Depends(get_db), user: User = Depends(get_current_user), ): """All tests with timestamps — flat format for BI dashboards.""" query = db.query(Test) if date_from: query = query.filter(Test.created_at >= date_from) if date_to: query = query.filter(Test.created_at <= date_to) tests = query.all() return [ { "id": str(t.id), "technique_id": str(t.technique_id), "name": t.name, "state": t.state.value if t.state else None, "result": t.result.value if t.result else None, "detection_result": ( t.detection_result.value if t.detection_result else None ), "created_at": t.created_at.isoformat() if t.created_at else None, "execution_date": ( t.execution_date.isoformat() if t.execution_date else None ), "platform": t.platform, "tool_used": t.tool_used, "attack_success": t.attack_success, "remediation_status": t.remediation_status, } for t in tests ] @router.get("/trends") def analytics_trends( db: Session = Depends(get_db), user: User = Depends(get_current_user), ): """Historical coverage snapshots for trend visualization.""" snapshots = ( db.query(CoverageSnapshot) .order_by(CoverageSnapshot.created_at) .all() ) return [ { "date": s.created_at.isoformat() if s.created_at else None, "name": s.name, "total_techniques": s.total_techniques, "validated_count": s.validated_count, "partial_count": s.partial_count, "not_covered_count": s.not_covered_count, "organization_score": s.organization_score, } for s in snapshots ] @router.get("/operators") def analytics_operators( db: Session = Depends(get_db), user: User = Depends(require_any_role("red_lead", "blue_lead")), ): """Per-operator metrics — for workload management dashboards.""" results = ( db.query( User.username, User.role, func.count(Test.id).label("test_count"), ) .outerjoin(Test, Test.created_by == User.id) .group_by(User.id, User.username, User.role) .all() ) return [ {"username": r[0], "role": r[1], "test_count": r[2]} for r in results ]