8f98bdd273
- ruff.toml: select E/W/F/I/N rules, line-length=120, drop legacy ignores - Auto-fix: sort 82 import blocks (isort), remove 29 unused imports, strip 6 trailing-whitespace blank lines in docstrings - main.py: move setup_logging and settings imports to top (E402) - errors.py: noqa N818 on DDD exception names (96 call sites, safe) - intel_service.py: noqa N817 for universal ET alias - atomic/elastic/sigma import services: move _MAX_UNCOMPRESSED_SIZE and _MAX_ENTRIES to module level (N806) - compliance_import_service.py: move SAMPLE_CONTROLS / CIS_CONTROLS to module level; wrap long description strings (N806 + E501) - snapshot_service.py: move STATUS_ORDER dict to module level (N806) - sigma_import_service.py: remove dead dedup_key expression (F841) - threat_actor_import_service.py: remove dead stix_to_actor expression (F841) - data_source.py, seed_demo.py, campaign_scheduler_service.py, lolbas_import_service.py: wrap lines exceeding 120 chars (E501) - d3fend_import_service.py: per-file E501 ignore (data file with long strings) All 439 unit tests pass. ruff check app/ → All checks passed!
163 lines
5.3 KiB
Python
163 lines
5.3 KiB
Python
"""Snapshot endpoints — coverage snapshots CRUD and comparison.
|
|
|
|
Provides periodic and manual snapshots of the organisation's coverage
|
|
state, plus temporal comparison between any two snapshots.
|
|
"""
|
|
|
|
import logging
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.dependencies.auth import get_current_user, require_any_role, require_role
|
|
from app.domain.errors import BusinessRuleViolation
|
|
from app.domain.unit_of_work import UnitOfWork
|
|
from app.models.user import User
|
|
from app.services.audit_service import log_action
|
|
from app.services.snapshot_service import (
|
|
compare_snapshots,
|
|
create_snapshot,
|
|
delete_snapshot,
|
|
get_coverage_evolution,
|
|
get_snapshot_detail,
|
|
get_snapshot_or_raise,
|
|
serialize_snapshot_summary,
|
|
)
|
|
from app.services.snapshot_service import (
|
|
list_snapshots as list_snapshots_svc,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/snapshots", tags=["snapshots"])
|
|
|
|
|
|
# ── Pydantic schemas ─────────────────────────────────────────────────
|
|
|
|
class SnapshotCreate(BaseModel):
|
|
name: Optional[str] = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /snapshots — List snapshots (paginated)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("")
|
|
def list_snapshots(
|
|
offset: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List coverage snapshots ordered by creation date (newest first)."""
|
|
return list_snapshots_svc(db, offset=offset, limit=limit)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /snapshots — Create snapshot manually
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("", status_code=201)
|
|
def create_snapshot_endpoint(
|
|
payload: SnapshotCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead", "admin")),
|
|
):
|
|
"""Create a manual coverage snapshot with an optional name."""
|
|
snapshot = create_snapshot(db, name=payload.name, user_id=current_user.id)
|
|
|
|
with UnitOfWork(db) as uow:
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="create_snapshot",
|
|
entity_type="snapshot",
|
|
entity_id=snapshot.id,
|
|
details={"name": snapshot.name, "score": snapshot.organization_score},
|
|
)
|
|
uow.commit()
|
|
|
|
return serialize_snapshot_summary(snapshot)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /snapshots/evolution — Coverage trend over time
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/evolution")
|
|
def coverage_evolution(
|
|
months: int = Query(12, ge=1, le=36),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return coverage snapshots for trend charts (last *months* months)."""
|
|
return get_coverage_evolution(db, months=months)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /snapshots/compare — Compare two snapshots
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/compare")
|
|
def compare_snapshots_endpoint(
|
|
a: str = Query(..., description="Snapshot A ID"),
|
|
b: str = Query(..., description="Snapshot B ID"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Compare two snapshots showing improved, worsened, and unchanged techniques."""
|
|
try:
|
|
a_id = uuid.UUID(a)
|
|
b_id = uuid.UUID(b)
|
|
except ValueError:
|
|
raise BusinessRuleViolation("Invalid snapshot ID format")
|
|
|
|
return compare_snapshots(db, a_id, b_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /snapshots/{id} — Snapshot detail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/{snapshot_id}")
|
|
def get_snapshot(
|
|
snapshot_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Get detailed snapshot information including per-technique states."""
|
|
return get_snapshot_detail(db, snapshot_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /snapshots/{id} — Delete snapshot (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.delete("/{snapshot_id}")
|
|
def delete_snapshot_endpoint(
|
|
snapshot_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Delete a snapshot (admin only)."""
|
|
snapshot = get_snapshot_or_raise(db, snapshot_id)
|
|
|
|
with UnitOfWork(db) as uow:
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="delete_snapshot",
|
|
entity_type="snapshot",
|
|
entity_id=snapshot.id,
|
|
details={"name": snapshot.name},
|
|
)
|
|
delete_snapshot(db, snapshot_id)
|
|
uow.commit()
|
|
|
|
return {"detail": "Snapshot deleted"}
|