Files
Aegis/backend/app/routers/snapshots.py
T
kitos 9ff0f04ba3 refactor(types): add comprehensive type annotations across backend Python codebase
Enable ANN rules in ruff.toml (flake8-annotations) and resolve all 221 violations:

ANN201/ANN202 — return types on 168 public/private functions:
- All 28 FastAPI routers: endpoints annotated with dict/list/specific schema/
  StreamingResponse/FileResponse/JSONResponse as appropriate
- main.py: lifespan→AsyncGenerator[None,None], exception handlers→JSONResponse
- database.py: get_db→Generator[Session,None,None], proxy methods→correct types
- middleware/request_context.py: dispatch→Response with Callable call_next type

ANN001/ANN002/ANN003 — 32 missing argument types:
- seed_demo.py: all db parameters typed as Session
- domain/unit_of_work.py: __aexit__ exc_type/exc_val/exc_tb typed with TracebackType
- services: audit_service user_id→UUID|None, heatmap_service query/model/builder,
  notification_service test→Test, tempo_service test→Test/user→User,
  test_workflow_service test_id→UUID, campaign_crud **fields→object,
  test_crud **fields→object (4 sites)

ANN401 — 16 Any usages resolved:
- Domain entities (campaign/technique/threat_actor/test_entity): replaced Any with
  actual ORM types via TYPE_CHECKING guards to avoid circular imports
- detection_rule_service: test_id/detection_rule_id/evaluator_id→UUID
- score_cache: kept Any with # noqa: ANN401 (genuinely generic cache)
- jira_service/tempo_service: kept Any with # noqa: ANN401 (lazy optional deps)
- d3fend_import_service: _to_str(v: Any) kept with # noqa: ANN401

ANN204/ANN205/ANN206 — special/static/class methods:
- database.py proxy __call__/__getattr__: *args: object/**kwargs: object
- schemas/test.py model_validate: obj→object, **kwargs→object
- sa_technique_repository._int_type→type

All 439 unit tests pass. ruff check app/ → All checks passed!
2026-06-11 11:06:54 +02:00

163 lines
5.4 KiB
Python

"""Snapshot endpoints — coverage snapshots CRUD and comparison.
Provides periodic and manual snapshots of the organisation's coverage
state, plus temporal comparison between any two snapshots.
"""
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role, require_role
from app.domain.errors import BusinessRuleViolation
from app.domain.unit_of_work import UnitOfWork
from app.models.user import User
from app.services.audit_service import log_action
from app.services.snapshot_service import (
compare_snapshots,
create_snapshot,
delete_snapshot,
get_coverage_evolution,
get_snapshot_detail,
get_snapshot_or_raise,
serialize_snapshot_summary,
)
from app.services.snapshot_service import (
list_snapshots as list_snapshots_svc,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/snapshots", tags=["snapshots"])
# ── Pydantic schemas ─────────────────────────────────────────────────
class SnapshotCreate(BaseModel):
name: Optional[str] = None
# ---------------------------------------------------------------------------
# GET /snapshots — List snapshots (paginated)
# ---------------------------------------------------------------------------
@router.get("")
def list_snapshots(
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> list:
"""List coverage snapshots ordered by creation date (newest first)."""
return list_snapshots_svc(db, offset=offset, limit=limit)
# ---------------------------------------------------------------------------
# POST /snapshots — Create snapshot manually
# ---------------------------------------------------------------------------
@router.post("", status_code=201)
def create_snapshot_endpoint(
payload: SnapshotCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead", "admin")),
) -> dict:
"""Create a manual coverage snapshot with an optional name."""
snapshot = create_snapshot(db, name=payload.name, user_id=current_user.id)
with UnitOfWork(db) as uow:
log_action(
db,
user_id=current_user.id,
action="create_snapshot",
entity_type="snapshot",
entity_id=snapshot.id,
details={"name": snapshot.name, "score": snapshot.organization_score},
)
uow.commit()
return serialize_snapshot_summary(snapshot)
# ---------------------------------------------------------------------------
# GET /snapshots/evolution — Coverage trend over time
# ---------------------------------------------------------------------------
@router.get("/evolution")
def coverage_evolution(
months: int = Query(12, ge=1, le=36),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> list:
"""Return coverage snapshots for trend charts (last *months* months)."""
return get_coverage_evolution(db, months=months)
# ---------------------------------------------------------------------------
# GET /snapshots/compare — Compare two snapshots
# ---------------------------------------------------------------------------
@router.get("/compare")
def compare_snapshots_endpoint(
a: str = Query(..., description="Snapshot A ID"),
b: str = Query(..., description="Snapshot B ID"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> dict:
"""Compare two snapshots showing improved, worsened, and unchanged techniques."""
try:
a_id = uuid.UUID(a)
b_id = uuid.UUID(b)
except ValueError:
raise BusinessRuleViolation("Invalid snapshot ID format")
return compare_snapshots(db, a_id, b_id)
# ---------------------------------------------------------------------------
# GET /snapshots/{id} — Snapshot detail
# ---------------------------------------------------------------------------
@router.get("/{snapshot_id}")
def get_snapshot(
snapshot_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> dict:
"""Get detailed snapshot information including per-technique states."""
return get_snapshot_detail(db, snapshot_id)
# ---------------------------------------------------------------------------
# DELETE /snapshots/{id} — Delete snapshot (admin only)
# ---------------------------------------------------------------------------
@router.delete("/{snapshot_id}")
def delete_snapshot_endpoint(
snapshot_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
) -> dict:
"""Delete a snapshot (admin only)."""
snapshot = get_snapshot_or_raise(db, snapshot_id)
with UnitOfWork(db) as uow:
log_action(
db,
user_id=current_user.id,
action="delete_snapshot",
entity_type="snapshot",
entity_id=snapshot.id,
details={"name": snapshot.name},
)
delete_snapshot(db, snapshot_id)
uow.commit()
return {"detail": "Snapshot deleted"}