"""Coverage-metrics endpoints. Provides aggregated views of MITRE ATT&CK technique coverage for dashboards and reporting. """ from collections import defaultdict from fastapi import APIRouter, Depends from sqlalchemy import func from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user from app.models.enums import TechniqueStatus from app.models.technique import Technique from app.models.user import User from app.schemas.metrics import CoverageSummary, TacticCoverage router = APIRouter(prefix="/metrics", tags=["metrics"]) # --------------------------------------------------------------------------- # GET /metrics/summary # --------------------------------------------------------------------------- @router.get("/summary", response_model=CoverageSummary) def coverage_summary( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return a global coverage summary across all techniques.""" rows = ( db.query( Technique.status_global, func.count(Technique.id).label("cnt"), ) .group_by(Technique.status_global) .all() ) counts: dict[str, int] = {s.value: 0 for s in TechniqueStatus} for status, cnt in rows: counts[status.value] = cnt total = sum(counts.values()) validated = counts["validated"] partial = counts["partial"] coverage_pct = ( round((validated + partial) / total * 100, 2) if total > 0 else 0.0 ) return CoverageSummary( total_techniques=total, validated=validated, partial=partial, not_covered=counts["not_covered"], in_progress=counts["in_progress"], not_evaluated=counts["not_evaluated"], coverage_percentage=coverage_pct, ) # --------------------------------------------------------------------------- # GET /metrics/by-tactic # --------------------------------------------------------------------------- @router.get("/by-tactic", response_model=list[TacticCoverage]) def coverage_by_tactic( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return coverage breakdown grouped by tactic. Since a technique can belong to multiple tactics (stored as a comma-separated string), the technique is counted once per tactic it belongs to. """ techniques = db.query( Technique.tactic, Technique.status_global ).all() # Accumulate per-tactic counters. A technique with tactic # "persistence, privilege-escalation" is counted in both. tactic_data: dict[str, dict[str, int]] = defaultdict( lambda: {s.value: 0 for s in TechniqueStatus} ) for tactic_str, status in techniques: if not tactic_str: tactics = ["unknown"] else: tactics = [t.strip() for t in tactic_str.split(",")] for tactic in tactics: tactic_data[tactic][status.value] += 1 result = [] for tactic in sorted(tactic_data): counts = tactic_data[tactic] total = sum(counts.values()) result.append( TacticCoverage( tactic=tactic, total=total, validated=counts["validated"], partial=counts["partial"], not_covered=counts["not_covered"], not_evaluated=counts["not_evaluated"], in_progress=counts["in_progress"], ) ) return result