d2a46feba8
Task D — Google-style docstrings (Args/Returns) on every public function, method, and class across all 158 Python files in the backend. Zero ruff D violations (pydocstyle Google convention). Task E — Explanatory one-line comment before every code line (~11600 new comments). ruff check passes clean after isort re-sort.
249 lines
8.0 KiB
Python
249 lines
8.0 KiB
Python
"""Snapshot endpoints — coverage snapshots CRUD and comparison.
|
|
|
|
Provides periodic and manual snapshots of the organisation's coverage
|
|
state, plus temporal comparison between any two snapshots.
|
|
"""
|
|
|
|
# Import logging
|
|
import logging
|
|
|
|
# Import uuid
|
|
import uuid
|
|
|
|
# Import Optional from typing
|
|
from typing import Optional
|
|
|
|
# Import APIRouter, Depends, Query from fastapi
|
|
from fastapi import APIRouter, Depends, Query
|
|
|
|
# Import BaseModel from pydantic
|
|
from pydantic import BaseModel
|
|
|
|
# Import Session from sqlalchemy.orm
|
|
from sqlalchemy.orm import Session
|
|
|
|
# Import get_db from app.database
|
|
from app.database import get_db
|
|
|
|
# Import get_current_user, require_any_role, require_role from app.dependencies.auth
|
|
from app.dependencies.auth import get_current_user, require_any_role, require_role
|
|
|
|
# Import BusinessRuleViolation from app.domain.errors
|
|
from app.domain.errors import BusinessRuleViolation
|
|
|
|
# Import UnitOfWork from app.domain.unit_of_work
|
|
from app.domain.unit_of_work import UnitOfWork
|
|
|
|
# Import User from app.models.user
|
|
from app.models.user import User
|
|
|
|
# Import log_action from app.services.audit_service
|
|
from app.services.audit_service import log_action
|
|
|
|
# Import from app.services.snapshot_service
|
|
from app.services.snapshot_service import (
|
|
compare_snapshots,
|
|
create_snapshot,
|
|
delete_snapshot,
|
|
get_coverage_evolution,
|
|
get_snapshot_detail,
|
|
get_snapshot_or_raise,
|
|
serialize_snapshot_summary,
|
|
)
|
|
|
|
# Import from app.services.snapshot_service
|
|
from app.services.snapshot_service import (
|
|
list_snapshots as list_snapshots_svc,
|
|
)
|
|
|
|
# Assign logger = logging.getLogger(__name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Assign router = APIRouter(prefix="/snapshots", tags=["snapshots"])
|
|
router = APIRouter(prefix="/snapshots", tags=["snapshots"])
|
|
|
|
|
|
# ── Pydantic schemas ─────────────────────────────────────────────────
|
|
|
|
class SnapshotCreate(BaseModel):
|
|
"""Payload for creating a new coverage snapshot."""
|
|
|
|
# Assign name = None
|
|
name: Optional[str] = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /snapshots — List snapshots (paginated)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("")
|
|
# Define function list_snapshots
|
|
def list_snapshots(
|
|
# Entry: offset
|
|
offset: int = Query(0, ge=0),
|
|
# Entry: limit
|
|
limit: int = Query(50, ge=1, le=200),
|
|
# Entry: db
|
|
db: Session = Depends(get_db),
|
|
# Entry: current_user
|
|
current_user: User = Depends(get_current_user),
|
|
) -> list:
|
|
"""List coverage snapshots ordered by creation date (newest first)."""
|
|
# Return list_snapshots_svc(db, offset=offset, limit=limit)
|
|
return list_snapshots_svc(db, offset=offset, limit=limit)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /snapshots — Create snapshot manually
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("", status_code=201)
|
|
# Define function create_snapshot_endpoint
|
|
def create_snapshot_endpoint(
|
|
# Entry: payload
|
|
payload: SnapshotCreate,
|
|
# Entry: db
|
|
db: Session = Depends(get_db),
|
|
# Entry: current_user
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead", "admin")),
|
|
) -> dict:
|
|
"""Create a manual coverage snapshot with an optional name."""
|
|
# Assign snapshot = create_snapshot(db, name=payload.name, user_id=current_user.id)
|
|
snapshot = create_snapshot(db, name=payload.name, user_id=current_user.id)
|
|
|
|
# Open context manager
|
|
with UnitOfWork(db) as uow:
|
|
# Call log_action()
|
|
log_action(
|
|
db,
|
|
# Keyword argument: user_id
|
|
user_id=current_user.id,
|
|
# Keyword argument: action
|
|
action="create_snapshot",
|
|
# Keyword argument: entity_type
|
|
entity_type="snapshot",
|
|
# Keyword argument: entity_id
|
|
entity_id=snapshot.id,
|
|
# Keyword argument: details
|
|
details={"name": snapshot.name, "score": snapshot.organization_score},
|
|
)
|
|
# Call uow.commit()
|
|
uow.commit()
|
|
|
|
# Return serialize_snapshot_summary(snapshot)
|
|
return serialize_snapshot_summary(snapshot)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /snapshots/evolution — Coverage trend over time
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/evolution")
|
|
# Define function coverage_evolution
|
|
def coverage_evolution(
|
|
# Entry: months
|
|
months: int = Query(12, ge=1, le=36),
|
|
# Entry: db
|
|
db: Session = Depends(get_db),
|
|
# Entry: current_user
|
|
current_user: User = Depends(get_current_user),
|
|
) -> list:
|
|
"""Return coverage snapshots for trend charts (last *months* months)."""
|
|
# Return get_coverage_evolution(db, months=months)
|
|
return get_coverage_evolution(db, months=months)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /snapshots/compare — Compare two snapshots
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/compare")
|
|
# Define function compare_snapshots_endpoint
|
|
def compare_snapshots_endpoint(
|
|
# Entry: a
|
|
a: str = Query(..., description="Snapshot A ID"),
|
|
# Entry: b
|
|
b: str = Query(..., description="Snapshot B ID"),
|
|
# Entry: db
|
|
db: Session = Depends(get_db),
|
|
# Entry: current_user
|
|
current_user: User = Depends(get_current_user),
|
|
) -> dict:
|
|
"""Compare two snapshots showing improved, worsened, and unchanged techniques."""
|
|
# Attempt the following; catch errors below
|
|
try:
|
|
# Assign a_id = uuid.UUID(a)
|
|
a_id = uuid.UUID(a)
|
|
# Assign b_id = uuid.UUID(b)
|
|
b_id = uuid.UUID(b)
|
|
# Handle ValueError
|
|
except ValueError:
|
|
# Raise BusinessRuleViolation
|
|
raise BusinessRuleViolation("Invalid snapshot ID format")
|
|
|
|
# Return compare_snapshots(db, a_id, b_id)
|
|
return compare_snapshots(db, a_id, b_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /snapshots/{id} — Snapshot detail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/{snapshot_id}")
|
|
# Define function get_snapshot
|
|
def get_snapshot(
|
|
# Entry: snapshot_id
|
|
snapshot_id: str,
|
|
# Entry: db
|
|
db: Session = Depends(get_db),
|
|
# Entry: current_user
|
|
current_user: User = Depends(get_current_user),
|
|
) -> dict:
|
|
"""Get detailed snapshot information including per-technique states."""
|
|
# Return get_snapshot_detail(db, snapshot_id)
|
|
return get_snapshot_detail(db, snapshot_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /snapshots/{id} — Delete snapshot (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.delete("/{snapshot_id}")
|
|
# Define function delete_snapshot_endpoint
|
|
def delete_snapshot_endpoint(
|
|
# Entry: snapshot_id
|
|
snapshot_id: str,
|
|
# Entry: db
|
|
db: Session = Depends(get_db),
|
|
# Entry: current_user
|
|
current_user: User = Depends(require_role("admin")),
|
|
) -> dict:
|
|
"""Delete a snapshot (admin only)."""
|
|
# Assign snapshot = get_snapshot_or_raise(db, snapshot_id)
|
|
snapshot = get_snapshot_or_raise(db, snapshot_id)
|
|
|
|
# Open context manager
|
|
with UnitOfWork(db) as uow:
|
|
# Call log_action()
|
|
log_action(
|
|
db,
|
|
# Keyword argument: user_id
|
|
user_id=current_user.id,
|
|
# Keyword argument: action
|
|
action="delete_snapshot",
|
|
# Keyword argument: entity_type
|
|
entity_type="snapshot",
|
|
# Keyword argument: entity_id
|
|
entity_id=snapshot.id,
|
|
# Keyword argument: details
|
|
details={"name": snapshot.name},
|
|
)
|
|
# Call delete_snapshot()
|
|
delete_snapshot(db, snapshot_id)
|
|
# Call uow.commit()
|
|
uow.commit()
|
|
|
|
# Return {"detail": "Snapshot deleted"}
|
|
return {"detail": "Snapshot deleted"}
|