feat: Phase 5 - Metrics and dashboard API (T-020)

- Add GET /metrics/summary endpoint with global coverage counts and percentage
- Add GET /metrics/by-tactic endpoint with per-tactic coverage breakdown
- Handle multi-tactic techniques (comma-separated) counting in each tactic
- Add CoverageSummary and TacticCoverage Pydantic schemas
- Update README with metrics endpoints and project structure
This commit is contained in:
2026-02-06 15:33:37 +01:00
parent b11854fdab
commit abdb23be33
4 changed files with 156 additions and 1 deletions

View File

@@ -0,0 +1,119 @@
"""Coverage-metrics endpoints.
Provides aggregated views of MITRE ATT&CK technique coverage for
dashboards and reporting.
"""
from collections import defaultdict
from fastapi import APIRouter, Depends
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.enums import TechniqueStatus
from app.models.technique import Technique
from app.models.user import User
from app.schemas.metrics import CoverageSummary, TacticCoverage
router = APIRouter(prefix="/metrics", tags=["metrics"])
# ---------------------------------------------------------------------------
# GET /metrics/summary
# ---------------------------------------------------------------------------
@router.get("/summary", response_model=CoverageSummary)
def coverage_summary(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return a global coverage summary across all techniques."""
rows = (
db.query(
Technique.status_global,
func.count(Technique.id).label("cnt"),
)
.group_by(Technique.status_global)
.all()
)
counts: dict[str, int] = {s.value: 0 for s in TechniqueStatus}
for status, cnt in rows:
counts[status.value] = cnt
total = sum(counts.values())
validated = counts["validated"]
partial = counts["partial"]
coverage_pct = (
round((validated + partial) / total * 100, 2) if total > 0 else 0.0
)
return CoverageSummary(
total_techniques=total,
validated=validated,
partial=partial,
not_covered=counts["not_covered"],
in_progress=counts["in_progress"],
not_evaluated=counts["not_evaluated"],
coverage_percentage=coverage_pct,
)
# ---------------------------------------------------------------------------
# GET /metrics/by-tactic
# ---------------------------------------------------------------------------
@router.get("/by-tactic", response_model=list[TacticCoverage])
def coverage_by_tactic(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return coverage breakdown grouped by tactic.
Since a technique can belong to multiple tactics (stored as a
comma-separated string), the technique is counted once per tactic
it belongs to.
"""
techniques = db.query(
Technique.tactic, Technique.status_global
).all()
# Accumulate per-tactic counters. A technique with tactic
# "persistence, privilege-escalation" is counted in both.
tactic_data: dict[str, dict[str, int]] = defaultdict(
lambda: {s.value: 0 for s in TechniqueStatus}
)
for tactic_str, status in techniques:
if not tactic_str:
tactics = ["unknown"]
else:
tactics = [t.strip() for t in tactic_str.split(",")]
for tactic in tactics:
tactic_data[tactic][status.value] += 1
result = []
for tactic in sorted(tactic_data):
counts = tactic_data[tactic]
total = sum(counts.values())
result.append(
TacticCoverage(
tactic=tactic,
total=total,
validated=counts["validated"],
partial=counts["partial"],
not_covered=counts["not_covered"],
not_evaluated=counts["not_evaluated"],
in_progress=counts["in_progress"],
)
)
return result