"""Coverage-metrics endpoints. Provides aggregated views of MITRE ATT&CK technique coverage for dashboards and reporting. V2 adds pipeline, team-activity, and validation-rate endpoints for the Red/Blue workflow. """ from collections import defaultdict from fastapi import APIRouter, Depends from sqlalchemy import func from sqlalchemy.orm import Session, joinedload from app.database import get_db from app.dependencies.auth import get_current_user from app.models.enums import TechniqueStatus, TestState from app.models.technique import Technique from app.models.test import Test from app.models.user import User from app.schemas.metrics import ( CoverageSummary, RecentTestItem, TacticCoverage, TeamActivity, TestPipelineCounts, ValidationRate, ) router = APIRouter(prefix="/metrics", tags=["metrics"]) # --------------------------------------------------------------------------- # GET /metrics/summary # --------------------------------------------------------------------------- @router.get("/summary", response_model=CoverageSummary) def coverage_summary( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return a global coverage summary across all techniques.""" rows = ( db.query( Technique.status_global, func.count(Technique.id).label("cnt"), ) .group_by(Technique.status_global) .all() ) counts: dict[str, int] = {s.value: 0 for s in TechniqueStatus} for status, cnt in rows: counts[status.value] = cnt total = sum(counts.values()) validated = counts["validated"] partial = counts["partial"] coverage_pct = ( round((validated + partial) / total * 100, 2) if total > 0 else 0.0 ) return CoverageSummary( total_techniques=total, validated=validated, partial=partial, not_covered=counts["not_covered"], in_progress=counts["in_progress"], not_evaluated=counts["not_evaluated"], coverage_percentage=coverage_pct, ) # --------------------------------------------------------------------------- # GET /metrics/by-tactic # --------------------------------------------------------------------------- @router.get("/by-tactic", response_model=list[TacticCoverage]) def coverage_by_tactic( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return coverage breakdown grouped by tactic. Since a technique can belong to multiple tactics (stored as a comma-separated string), the technique is counted once per tactic it belongs to. """ techniques = db.query( Technique.tactic, Technique.status_global ).all() # Accumulate per-tactic counters. A technique with tactic # "persistence, privilege-escalation" is counted in both. tactic_data: dict[str, dict[str, int]] = defaultdict( lambda: {s.value: 0 for s in TechniqueStatus} ) for tactic_str, status in techniques: if not tactic_str: tactics = ["unknown"] else: tactics = [t.strip() for t in tactic_str.split(",")] for tactic in tactics: tactic_data[tactic][status.value] += 1 result = [] for tactic in sorted(tactic_data): counts = tactic_data[tactic] total = sum(counts.values()) result.append( TacticCoverage( tactic=tactic, total=total, validated=counts["validated"], partial=counts["partial"], not_covered=counts["not_covered"], not_evaluated=counts["not_evaluated"], in_progress=counts["in_progress"], ) ) return result # --------------------------------------------------------------------------- # GET /metrics/test-pipeline — counters per pipeline state # --------------------------------------------------------------------------- @router.get("/test-pipeline", response_model=TestPipelineCounts) def test_pipeline( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return how many tests are in each pipeline state.""" rows = ( db.query(Test.state, func.count(Test.id).label("cnt")) .group_by(Test.state) .all() ) state_counts: dict[str, int] = {s.value: 0 for s in TestState} for state, cnt in rows: state_counts[state.value] = cnt total = sum(state_counts.values()) return TestPipelineCounts( draft=state_counts["draft"], red_executing=state_counts["red_executing"], blue_evaluating=state_counts["blue_evaluating"], in_review=state_counts["in_review"], validated=state_counts["validated"], rejected=state_counts["rejected"], total=total, ) # --------------------------------------------------------------------------- # GET /metrics/team-activity — activity per team # --------------------------------------------------------------------------- @router.get("/team-activity", response_model=list[TeamActivity]) def team_activity( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return activity summary for Red and Blue teams.""" # Red Team: completed = tests past red_executing; pending = draft + red_executing red_completed = ( db.query(func.count(Test.id)) .filter(Test.state.in_([ TestState.blue_evaluating, TestState.in_review, TestState.validated, TestState.rejected, ])) .scalar() ) or 0 red_pending = ( db.query(func.count(Test.id)) .filter(Test.state.in_([TestState.draft, TestState.red_executing])) .scalar() ) or 0 # Blue Team: completed = tests past blue_evaluating; pending = blue_evaluating blue_completed = ( db.query(func.count(Test.id)) .filter(Test.state.in_([ TestState.in_review, TestState.validated, TestState.rejected, ])) .scalar() ) or 0 blue_pending = ( db.query(func.count(Test.id)) .filter(Test.state == TestState.blue_evaluating) .scalar() ) or 0 return [ TeamActivity( team="Red Team", tests_completed=red_completed, tests_pending=red_pending, ), TeamActivity( team="Blue Team", tests_completed=blue_completed, tests_pending=blue_pending, ), ] # --------------------------------------------------------------------------- # GET /metrics/validation-rate — approval / rejection rates # --------------------------------------------------------------------------- @router.get("/validation-rate", response_model=list[ValidationRate]) def validation_rate( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return approval and rejection rates for Red Lead and Blue Lead.""" # Red Lead validations red_approved = ( db.query(func.count(Test.id)) .filter(Test.red_validation_status == "approved") .scalar() ) or 0 red_rejected = ( db.query(func.count(Test.id)) .filter(Test.red_validation_status == "rejected") .scalar() ) or 0 red_total = red_approved + red_rejected red_rate = round(red_approved / red_total * 100, 1) if red_total > 0 else 0.0 # Blue Lead validations blue_approved = ( db.query(func.count(Test.id)) .filter(Test.blue_validation_status == "approved") .scalar() ) or 0 blue_rejected = ( db.query(func.count(Test.id)) .filter(Test.blue_validation_status == "rejected") .scalar() ) or 0 blue_total = blue_approved + blue_rejected blue_rate = round(blue_approved / blue_total * 100, 1) if blue_total > 0 else 0.0 return [ ValidationRate( role="red_lead", total_reviewed=red_total, approved=red_approved, rejected=red_rejected, approval_rate=red_rate, ), ValidationRate( role="blue_lead", total_reviewed=blue_total, approved=blue_approved, rejected=blue_rejected, approval_rate=blue_rate, ), ] # --------------------------------------------------------------------------- # GET /metrics/recent-tests — latest 10 updated tests # --------------------------------------------------------------------------- @router.get("/recent-tests", response_model=list[RecentTestItem]) def recent_tests( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return the 10 most recently created tests.""" tests = ( db.query(Test) .options(joinedload(Test.technique)) .order_by(Test.created_at.desc()) .limit(10) .all() ) return [ RecentTestItem( id=str(t.id), name=t.name, state=t.state.value, technique_mitre_id=t.technique.mitre_id if t.technique else None, technique_name=t.technique.name if t.technique else None, created_at=t.created_at, ) for t in tests ]