Files
Aegis/backend/app/services/analytics_service.py

108 lines
3.3 KiB
Python

"""Analytics service — flat JSON optimized for PowerBI / BI tools."""
from __future__ import annotations
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.models.coverage_snapshot import CoverageSnapshot
from app.models.technique import Technique
from app.models.test import Test
from app.models.user import User
def get_coverage_analytics(db: Session) -> list[dict]:
"""Coverage per technique — flat format for BI dashboards."""
techniques = db.query(Technique).all()
return [
{
"mitre_id": t.mitre_id,
"name": t.name,
"tactic": t.tactic,
"status": t.status_global.value if t.status_global else "not_evaluated",
"is_subtechnique": t.is_subtechnique,
"test_count": len(t.tests) if t.tests else 0,
"review_required": t.review_required,
"last_review_date": (
t.last_review_date.isoformat() if t.last_review_date else None
),
}
for t in techniques
]
def get_tests_analytics(
db: Session,
*,
date_from: str | None = None,
date_to: str | None = None,
) -> list[dict]:
"""All tests with timestamps — flat format for BI dashboards."""
query = db.query(Test)
if date_from:
query = query.filter(Test.created_at >= date_from)
if date_to:
query = query.filter(Test.created_at <= date_to)
tests = query.all()
return [
{
"id": str(t.id),
"technique_id": str(t.technique_id),
"name": t.name,
"state": t.state.value if t.state else None,
"result": t.result.value if t.result else None,
"detection_result": (
t.detection_result.value if t.detection_result else None
),
"created_at": t.created_at.isoformat() if t.created_at else None,
"execution_date": (
t.execution_date.isoformat() if t.execution_date else None
),
"platform": t.platform,
"tool_used": t.tool_used,
"attack_success": t.attack_success,
"remediation_status": t.remediation_status,
}
for t in tests
]
def get_trends_analytics(db: Session) -> list[dict]:
"""Historical coverage snapshots for trend visualization."""
snapshots = (
db.query(CoverageSnapshot)
.order_by(CoverageSnapshot.created_at)
.all()
)
return [
{
"date": s.created_at.isoformat() if s.created_at else None,
"name": s.name,
"total_techniques": s.total_techniques,
"validated_count": s.validated_count,
"partial_count": s.partial_count,
"not_covered_count": s.not_covered_count,
"organization_score": s.organization_score,
}
for s in snapshots
]
def get_operators_analytics(db: Session) -> list[dict]:
"""Per-operator metrics — for workload management dashboards."""
results = (
db.query(
User.username,
User.role,
func.count(Test.id).label("test_count"),
)
.outerjoin(Test, Test.created_by == User.id)
.group_by(User.id, User.username, User.role)
.all()
)
return [
{"username": r[0], "role": r[1], "test_count": r[2]}
for r in results
]