feat(phase-16): enhanced Tests view, Red/Blue dashboard metrics, and Template admin panel (T-122, T-123, T-124)

This commit is contained in:
2026-02-09 13:00:07 +01:00
parent fd7f855008
commit a95defcee4
12 changed files with 1769 additions and 159 deletions

View File

@@ -1,21 +1,30 @@
"""Coverage-metrics endpoints.
Provides aggregated views of MITRE ATT&CK technique coverage for
dashboards and reporting.
dashboards and reporting. V2 adds pipeline, team-activity, and
validation-rate endpoints for the Red/Blue workflow.
"""
from collections import defaultdict
from fastapi import APIRouter, Depends
from sqlalchemy import func
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.enums import TechniqueStatus
from app.models.enums import TechniqueStatus, TestState
from app.models.technique import Technique
from app.models.test import Test
from app.models.user import User
from app.schemas.metrics import CoverageSummary, TacticCoverage
from app.schemas.metrics import (
CoverageSummary,
RecentTestItem,
TacticCoverage,
TeamActivity,
TestPipelineCounts,
ValidationRate,
)
router = APIRouter(prefix="/metrics", tags=["metrics"])
@@ -117,3 +126,190 @@ def coverage_by_tactic(
)
return result
# ---------------------------------------------------------------------------
# GET /metrics/test-pipeline — counters per pipeline state
# ---------------------------------------------------------------------------
@router.get("/test-pipeline", response_model=TestPipelineCounts)
def test_pipeline(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return how many tests are in each pipeline state."""
rows = (
db.query(Test.state, func.count(Test.id).label("cnt"))
.group_by(Test.state)
.all()
)
state_counts: dict[str, int] = {s.value: 0 for s in TestState}
for state, cnt in rows:
state_counts[state.value] = cnt
total = sum(state_counts.values())
return TestPipelineCounts(
draft=state_counts["draft"],
red_executing=state_counts["red_executing"],
blue_evaluating=state_counts["blue_evaluating"],
in_review=state_counts["in_review"],
validated=state_counts["validated"],
rejected=state_counts["rejected"],
total=total,
)
# ---------------------------------------------------------------------------
# GET /metrics/team-activity — activity per team
# ---------------------------------------------------------------------------
@router.get("/team-activity", response_model=list[TeamActivity])
def team_activity(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return activity summary for Red and Blue teams."""
# Red Team: completed = tests past red_executing; pending = draft + red_executing
red_completed = (
db.query(func.count(Test.id))
.filter(Test.state.in_([
TestState.blue_evaluating,
TestState.in_review,
TestState.validated,
TestState.rejected,
]))
.scalar()
) or 0
red_pending = (
db.query(func.count(Test.id))
.filter(Test.state.in_([TestState.draft, TestState.red_executing]))
.scalar()
) or 0
# Blue Team: completed = tests past blue_evaluating; pending = blue_evaluating
blue_completed = (
db.query(func.count(Test.id))
.filter(Test.state.in_([
TestState.in_review,
TestState.validated,
TestState.rejected,
]))
.scalar()
) or 0
blue_pending = (
db.query(func.count(Test.id))
.filter(Test.state == TestState.blue_evaluating)
.scalar()
) or 0
return [
TeamActivity(
team="Red Team",
tests_completed=red_completed,
tests_pending=red_pending,
),
TeamActivity(
team="Blue Team",
tests_completed=blue_completed,
tests_pending=blue_pending,
),
]
# ---------------------------------------------------------------------------
# GET /metrics/validation-rate — approval / rejection rates
# ---------------------------------------------------------------------------
@router.get("/validation-rate", response_model=list[ValidationRate])
def validation_rate(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return approval and rejection rates for Red Lead and Blue Lead."""
# Red Lead validations
red_approved = (
db.query(func.count(Test.id))
.filter(Test.red_validation_status == "approved")
.scalar()
) or 0
red_rejected = (
db.query(func.count(Test.id))
.filter(Test.red_validation_status == "rejected")
.scalar()
) or 0
red_total = red_approved + red_rejected
red_rate = round(red_approved / red_total * 100, 1) if red_total > 0 else 0.0
# Blue Lead validations
blue_approved = (
db.query(func.count(Test.id))
.filter(Test.blue_validation_status == "approved")
.scalar()
) or 0
blue_rejected = (
db.query(func.count(Test.id))
.filter(Test.blue_validation_status == "rejected")
.scalar()
) or 0
blue_total = blue_approved + blue_rejected
blue_rate = round(blue_approved / blue_total * 100, 1) if blue_total > 0 else 0.0
return [
ValidationRate(
role="red_lead",
total_reviewed=red_total,
approved=red_approved,
rejected=red_rejected,
approval_rate=red_rate,
),
ValidationRate(
role="blue_lead",
total_reviewed=blue_total,
approved=blue_approved,
rejected=blue_rejected,
approval_rate=blue_rate,
),
]
# ---------------------------------------------------------------------------
# GET /metrics/recent-tests — latest 10 updated tests
# ---------------------------------------------------------------------------
@router.get("/recent-tests", response_model=list[RecentTestItem])
def recent_tests(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return the 10 most recently created tests."""
tests = (
db.query(Test)
.options(joinedload(Test.technique))
.order_by(Test.created_at.desc())
.limit(10)
.all()
)
return [
RecentTestItem(
id=str(t.id),
name=t.name,
state=t.state.value,
technique_mitre_id=t.technique.mitre_id if t.technique else None,
technique_name=t.technique.name if t.technique else None,
created_at=t.created_at,
)
for t in tests
]

View File

@@ -3,9 +3,11 @@
Endpoints
---------
GET /test-templates — list with filters + pagination
GET /test-templates/stats — catalog statistics (admin)
GET /test-templates/{id} — detail
POST /test-templates — create custom (admin)
PATCH /test-templates/{id} — update (admin)
PATCH /test-templates/{id}/toggle-active — toggle active/inactive (admin)
DELETE /test-templates/{id} — soft delete (admin)
GET /test-templates/by-technique/{mitre_id} — templates for a MITRE technique
@@ -16,6 +18,7 @@ Filters (GET /test-templates)
- severity: low | medium | high | critical
- mitre_technique_id: filter by specific technique
- search: full-text search across name and description
- is_active: true | false (default only active)
- offset / limit: pagination (default limit=50)
"""
@@ -23,7 +26,7 @@ import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import or_
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.database import get_db
@@ -57,7 +60,7 @@ def list_templates(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return a paginated, filterable list of active test templates."""
"""Return a paginated, filterable list of test templates."""
query = db.query(TestTemplate).filter(TestTemplate.is_active == True) # noqa: E712
if source:
@@ -87,6 +90,53 @@ def list_templates(
return templates
# ---------------------------------------------------------------------------
# GET /test-templates/stats — catalog statistics (admin)
# ---------------------------------------------------------------------------
@router.get("/stats")
def template_stats(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Return catalog statistics: totals by source, platform, active/inactive."""
total = db.query(func.count(TestTemplate.id)).scalar() or 0
active = (
db.query(func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.scalar()
) or 0
inactive = total - active
# By source
source_rows = (
db.query(TestTemplate.source, func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.group_by(TestTemplate.source)
.all()
)
by_source = {source: cnt for source, cnt in source_rows}
# By platform
platform_rows = (
db.query(TestTemplate.platform, func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.group_by(TestTemplate.platform)
.all()
)
by_platform = {(platform or "unspecified"): cnt for platform, cnt in platform_rows}
return {
"total": total,
"active": active,
"inactive": inactive,
"by_source": by_source,
"by_platform": by_platform,
}
# ---------------------------------------------------------------------------
# GET /test-templates/by-technique/{mitre_id}
# ---------------------------------------------------------------------------
@@ -208,6 +258,41 @@ def update_template(
return template
# ---------------------------------------------------------------------------
# PATCH /test-templates/{id}/toggle-active — toggle active/inactive (admin)
# ---------------------------------------------------------------------------
@router.patch("/{template_id}/toggle-active", response_model=TestTemplateOut)
def toggle_template_active(
template_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Toggle a template between active and inactive. Admin only."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test template not found",
)
template.is_active = not template.is_active
db.commit()
db.refresh(template)
log_action(
db,
user_id=current_user.id,
action="toggle_test_template",
entity_type="test_template",
entity_id=template.id,
details={"name": template.name, "is_active": template.is_active},
)
return template
# ---------------------------------------------------------------------------
# DELETE /test-templates/{id} — soft delete (admin only)
# ---------------------------------------------------------------------------

View File

@@ -88,18 +88,37 @@ def _get_test_with_technique(db: Session, test_id: uuid.UUID) -> Test:
def list_tests(
state: Optional[str] = Query(None, description="Filter by test state"),
technique_id: Optional[uuid.UUID] = Query(None, description="Filter by technique"),
platform: Optional[str] = Query(None, description="Filter by platform"),
created_by: Optional[uuid.UUID] = Query(None, description="Filter by creator"),
pending_validation_side: Optional[str] = Query(
None, description="Filter in_review tests pending validation on 'red' or 'blue' side"
),
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return a paginated list of tests, optionally filtered by state or technique."""
query = db.query(Test)
"""Return a paginated list of tests, optionally filtered by state, technique, platform or creator."""
query = db.query(Test).options(joinedload(Test.technique))
if state:
query = query.filter(Test.state == state)
if technique_id:
query = query.filter(Test.technique_id == technique_id)
if platform:
query = query.filter(Test.platform.ilike(f"%{platform}%"))
if created_by:
query = query.filter(Test.created_by == created_by)
if pending_validation_side == "red":
query = query.filter(
Test.state == TestState.in_review,
Test.red_validation_status.in_(["pending", None]),
)
elif pending_validation_side == "blue":
query = query.filter(
Test.state == TestState.in_review,
Test.blue_validation_status.in_(["pending", None]),
)
tests = query.order_by(Test.created_at.desc()).offset(offset).limit(limit).all()
return tests

View File

@@ -1,6 +1,8 @@
"""Pydantic schemas for coverage-metrics endpoints."""
from pydantic import BaseModel
from datetime import datetime
from pydantic import BaseModel, ConfigDict
class CoverageSummary(BaseModel):
@@ -25,3 +27,59 @@ class TacticCoverage(BaseModel):
not_covered: int
not_evaluated: int
in_progress: int
# ── V2 — Test Pipeline ────────────────────────────────────────────────
class TestPipelineCounts(BaseModel):
"""Counters per state in the test pipeline."""
draft: int = 0
red_executing: int = 0
blue_evaluating: int = 0
in_review: int = 0
validated: int = 0
rejected: int = 0
total: int = 0
# ── V2 — Team Activity ───────────────────────────────────────────────
class TeamActivity(BaseModel):
"""Activity summary for a team (Red or Blue)."""
team: str
tests_completed: int = 0
tests_pending: int = 0
avg_completion_hours: float | None = None
# ── V2 — Validation Rate ─────────────────────────────────────────────
class ValidationRate(BaseModel):
"""Approval / rejection rate for a manager role."""
role: str # "red_lead" or "blue_lead"
total_reviewed: int = 0
approved: int = 0
rejected: int = 0
approval_rate: float = 0.0 # percentage
# ── V2 — Recent Test ─────────────────────────────────────────────────
class RecentTestItem(BaseModel):
"""Lightweight test entry for the recent-tests widget."""
id: str
name: str
state: str
technique_mitre_id: str | None = None
technique_name: str | None = None
created_at: datetime | None = None
model_config = ConfigDict(from_attributes=True)

View File

@@ -126,4 +126,16 @@ class TestOut(BaseModel):
blue_validation_status: str | None = None
blue_validation_notes: str | None = None
# Technique info (populated when joined)
technique_mitre_id: str | None = None
technique_name: str | None = None
model_config = ConfigDict(from_attributes=True)
@classmethod
def model_validate(cls, obj, **kwargs):
"""Override to populate technique fields from the relationship."""
if hasattr(obj, "technique") and obj.technique is not None:
obj.__dict__["technique_mitre_id"] = obj.technique.mitre_id
obj.__dict__["technique_name"] = obj.technique.name
return super().model_validate(obj, **kwargs)