fix(analytics): restrict operators endpoint to admin [FASE-2.5]
Align with BI security spec and add flat JSON API tests for coverage, tests, and operators.
This commit is contained in:
@@ -8,7 +8,7 @@ from fastapi import APIRouter, Depends, Query
|
|||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from app.database import get_db
|
from app.database import get_db
|
||||||
from app.dependencies.auth import get_current_user, require_any_role
|
from app.dependencies.auth import get_current_user, require_role
|
||||||
from app.models.user import User
|
from app.models.user import User
|
||||||
from app.services import analytics_service
|
from app.services import analytics_service
|
||||||
|
|
||||||
@@ -49,7 +49,7 @@ def analytics_trends(
|
|||||||
@router.get("/operators")
|
@router.get("/operators")
|
||||||
def analytics_operators(
|
def analytics_operators(
|
||||||
db: Session = Depends(get_db),
|
db: Session = Depends(get_db),
|
||||||
user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
user: User = Depends(require_role("admin")),
|
||||||
):
|
):
|
||||||
"""Per-operator metrics — for workload management dashboards."""
|
"""Per-operator metrics — for workload management dashboards."""
|
||||||
return analytics_service.get_operators_analytics(db)
|
return analytics_service.get_operators_analytics(db)
|
||||||
|
|||||||
87
backend/tests/test_analytics_advanced_metrics.py
Normal file
87
backend/tests/test_analytics_advanced_metrics.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
"""Analytics and advanced metrics API tests (FASE-2.5, 2.6)."""
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from app.models.technique import Technique
|
||||||
|
from app.models.test import Test
|
||||||
|
from app.models.enums import TestState
|
||||||
|
|
||||||
|
|
||||||
|
def test_analytics_coverage_flat_json(client, auth_headers, db):
|
||||||
|
t = Technique(mitre_id="T1001", name="Test Tech", tactic="discovery")
|
||||||
|
db.add(t)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
r = client.get("/api/v1/analytics/coverage", headers=auth_headers)
|
||||||
|
assert r.status_code == 200
|
||||||
|
data = r.json()
|
||||||
|
assert isinstance(data, list)
|
||||||
|
assert any(row["mitre_id"] == "T1001" for row in data)
|
||||||
|
assert "tactic" in data[0]
|
||||||
|
assert "status" in data[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_analytics_tests_date_filter(client, auth_headers, db, admin_user):
|
||||||
|
technique = Technique(mitre_id="T1002", name="Filter Tech", tactic="impact")
|
||||||
|
db.add(technique)
|
||||||
|
db.flush()
|
||||||
|
db.add(
|
||||||
|
Test(
|
||||||
|
name="dated test",
|
||||||
|
technique_id=technique.id,
|
||||||
|
state=TestState.draft,
|
||||||
|
created_by=admin_user.id,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
r = client.get(
|
||||||
|
"/api/v1/analytics/tests",
|
||||||
|
params={"date_from": "2020-01-01"},
|
||||||
|
headers=auth_headers,
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert isinstance(r.json(), list)
|
||||||
|
|
||||||
|
|
||||||
|
def test_analytics_operators_requires_admin(client, auth_headers, red_tech_headers):
|
||||||
|
client.cookies.clear()
|
||||||
|
denied = client.get("/api/v1/analytics/operators", headers=red_tech_headers)
|
||||||
|
assert denied.status_code == 403
|
||||||
|
|
||||||
|
client.cookies.clear()
|
||||||
|
ok = client.get("/api/v1/analytics/operators", headers=auth_headers)
|
||||||
|
assert ok.status_code == 200
|
||||||
|
assert isinstance(ok.json(), list)
|
||||||
|
|
||||||
|
|
||||||
|
def test_coverage_by_tactic(client, auth_headers, db):
|
||||||
|
db.add(Technique(mitre_id="T2001", name="A", tactic="initial-access"))
|
||||||
|
db.add(Technique(mitre_id="T2002", name="B", tactic="initial-access"))
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
r = client.get("/api/v1/metrics/advanced/coverage-by-tactic", headers=auth_headers)
|
||||||
|
assert r.status_code == 200
|
||||||
|
rows = r.json()
|
||||||
|
assert isinstance(rows, list)
|
||||||
|
ia = next((x for x in rows if x["tactic"] == "initial-access"), None)
|
||||||
|
assert ia is not None
|
||||||
|
assert ia["total"] >= 2
|
||||||
|
assert "coverage_pct" in ia
|
||||||
|
|
||||||
|
|
||||||
|
def test_never_tested_lists_untested_technique(client, auth_headers, db):
|
||||||
|
db.add(Technique(mitre_id="T9999", name="Never Run", tactic="collection"))
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
r = client.get("/api/v1/metrics/advanced/never-tested", headers=auth_headers)
|
||||||
|
assert r.status_code == 200
|
||||||
|
keys = {row["mitre_id"] for row in r.json()}
|
||||||
|
assert "T9999" in keys
|
||||||
|
|
||||||
|
|
||||||
|
def test_avg_validation_time_empty(client, auth_headers):
|
||||||
|
r = client.get("/api/v1/metrics/advanced/avg-validation-time", headers=auth_headers)
|
||||||
|
assert r.status_code == 200
|
||||||
|
body = r.json()
|
||||||
|
assert body["total_validated"] == 0
|
||||||
Reference in New Issue
Block a user