"""Analytics service — flat JSON optimized for PowerBI / BI tools.""" from __future__ import annotations from sqlalchemy import func from sqlalchemy.orm import Session from app.models.coverage_snapshot import CoverageSnapshot from app.models.technique import Technique from app.models.test import Test from app.models.user import User def get_coverage_analytics(db: Session) -> list[dict]: """Coverage per technique — flat format for BI dashboards.""" techniques = db.query(Technique).all() return [ { "mitre_id": t.mitre_id, "name": t.name, "tactic": t.tactic, "status": t.status_global.value if t.status_global else "not_evaluated", "is_subtechnique": t.is_subtechnique, "test_count": len(t.tests) if t.tests else 0, "review_required": t.review_required, "last_review_date": ( t.last_review_date.isoformat() if t.last_review_date else None ), } for t in techniques ] def get_tests_analytics( db: Session, *, date_from: str | None = None, date_to: str | None = None, ) -> list[dict]: """All tests with timestamps — flat format for BI dashboards.""" query = db.query(Test) if date_from: query = query.filter(Test.created_at >= date_from) if date_to: query = query.filter(Test.created_at <= date_to) tests = query.all() return [ { "id": str(t.id), "technique_id": str(t.technique_id), "name": t.name, "state": t.state.value if t.state else None, "result": t.result.value if t.result else None, "detection_result": ( t.detection_result.value if t.detection_result else None ), "created_at": t.created_at.isoformat() if t.created_at else None, "execution_date": ( t.execution_date.isoformat() if t.execution_date else None ), "platform": t.platform, "tool_used": t.tool_used, "attack_success": t.attack_success, "remediation_status": t.remediation_status, } for t in tests ] def get_trends_analytics(db: Session) -> list[dict]: """Historical coverage snapshots for trend visualization.""" snapshots = ( db.query(CoverageSnapshot) .order_by(CoverageSnapshot.created_at) .all() ) return [ { "date": s.created_at.isoformat() if s.created_at else None, "name": s.name, "total_techniques": s.total_techniques, "validated_count": s.validated_count, "partial_count": s.partial_count, "not_covered_count": s.not_covered_count, "organization_score": s.organization_score, } for s in snapshots ] def get_operators_analytics(db: Session) -> list[dict]: """Per-operator metrics — for workload management dashboards.""" results = ( db.query( User.username, User.role, func.count(Test.id).label("test_count"), ) .outerjoin(Test, Test.created_by == User.id) .group_by(User.id, User.username, User.role) .all() ) return [ {"username": r[0], "role": r[1], "test_count": r[2]} for r in results ]