refactor(evidence): extract permission validation and queries to evidence_service, use domain exceptions

This commit is contained in:
2026-02-19 19:02:36 +01:00
parent 20738d11b3
commit 50b70704ae
4 changed files with 239 additions and 222 deletions

View File

@@ -24,52 +24,32 @@ import os
import uuid as _uuid
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status
from fastapi import APIRouter, Depends, File, Form, Query, UploadFile, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.enums import TeamSide, TestState
from app.models.enums import TeamSide
from app.models.evidence import Evidence
from app.models.test import Test
from app.models.user import User
from app.schemas.evidence import EvidenceOut
from app.services.audit_service import log_action
from app.services.evidence_service import (
get_evidence_or_raise,
get_test_or_raise,
list_evidence_for_test,
MAX_UPLOAD_SIZE,
validate_delete_permission,
validate_file,
validate_upload_permission,
)
from app.storage import get_presigned_url, upload_file
router = APIRouter(tags=["evidence"])
# States where red evidence can be uploaded / deleted
_RED_EDITABLE_STATES = (TestState.draft, TestState.red_executing)
# States where blue evidence can be uploaded / deleted
_BLUE_EDITABLE_STATES = (TestState.blue_evaluating,)
# ---------------------------------------------------------------------------
# Upload safety limits
# ---------------------------------------------------------------------------
# Maximum upload size in bytes (default 50 MB)
_MAX_UPLOAD_SIZE = 50 * 1024 * 1024
# Allowed file extensions (lowercase, with leading dot)
_ALLOWED_EXTENSIONS: set[str] = {
# Images / screenshots
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".svg",
# Documents
".pdf", ".doc", ".docx", ".xls", ".xlsx", ".csv", ".txt",
".md", ".rtf", ".odt", ".ods",
# Logs & captures
".log", ".pcap", ".pcapng", ".evtx", ".json", ".xml",
".yaml", ".yml", ".toml",
# Archives (for bundled evidence)
".zip", ".tar", ".gz", ".7z",
# Other common evidence types
".har", ".eml", ".msg",
}
# ---------------------------------------------------------------------------
# Helpers
# Helpers (router-specific: infrastructure / HTTP concerns)
# ---------------------------------------------------------------------------
def _evidence_to_out(evidence: Evidence) -> EvidenceOut:
@@ -87,85 +67,6 @@ def _evidence_to_out(evidence: Evidence) -> EvidenceOut:
)
def _validate_upload_permission(
test: Test,
team: TeamSide,
user: User,
) -> None:
"""Raise 403 if the user/team combination is not allowed in the current state."""
# Admins bypass all checks
if user.role == "admin":
return
if team == TeamSide.red:
if user.role not in ("red_tech", "red_lead"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only red_tech, red_lead or admin can upload red evidence",
)
if test.state not in _RED_EDITABLE_STATES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot upload red evidence in '{test.state.value}' state "
f"(allowed in: draft, red_executing)",
)
elif team == TeamSide.blue:
if user.role not in ("blue_tech", "blue_lead"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only blue_tech, blue_lead or admin can upload blue evidence",
)
if test.state not in _BLUE_EDITABLE_STATES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot upload blue evidence in '{test.state.value}' state "
f"(allowed in: blue_evaluating)",
)
def _validate_delete_permission(
test: Test,
evidence: Evidence,
user: User,
) -> None:
"""Raise 403 if the user cannot delete this evidence in the current state."""
# No deletions in review / validated / rejected
if test.state in (TestState.in_review, TestState.validated, TestState.rejected):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Cannot delete evidence when test is in '{test.state.value}' state",
)
# Admin can delete in editable states
if user.role == "admin":
return
ev_team = evidence.team
if ev_team == TeamSide.red:
if test.state not in _RED_EDITABLE_STATES:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete red evidence outside draft/red_executing",
)
if user.role not in ("red_tech", "red_lead") and evidence.uploaded_by != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this evidence",
)
elif ev_team == TeamSide.blue:
if test.state not in _BLUE_EDITABLE_STATES:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete blue evidence outside blue_evaluating",
)
if user.role not in ("blue_tech", "blue_lead") and evidence.uploaded_by != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this evidence",
)
# ---------------------------------------------------------------------------
# POST /tests/{test_id}/evidence — upload with team
# ---------------------------------------------------------------------------
@@ -189,36 +90,14 @@ async def upload_evidence(
The ``team`` field (sent as form data) determines whether this is
Red Team (attack) or Blue Team (detection) evidence.
"""
test = db.query(Test).filter(Test.id == test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
test = get_test_or_raise(db, test_id)
validate_upload_permission(test, team, current_user.role)
# Validate permissions
_validate_upload_permission(test, team, current_user)
# 1. Validate file extension
file_name = file.filename or "unnamed"
_, ext = os.path.splitext(file_name)
if ext.lower() not in _ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File type '{ext}' is not allowed. "
f"Permitted types: {', '.join(sorted(_ALLOWED_EXTENSIONS))}",
)
content = await file.read(MAX_UPLOAD_SIZE + 1)
validate_file(file_name, len(content))
# 2. Read content with size limit
content = await file.read(_MAX_UPLOAD_SIZE + 1)
if len(content) > _MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File exceeds maximum upload size of "
f"{_MAX_UPLOAD_SIZE // (1024 * 1024)} MB",
)
# 3. Hash
# Hash
sha256 = hashlib.sha256(content).hexdigest()
# 4. Object key (sanitise filename to prevent path traversal in storage)
@@ -273,19 +152,8 @@ def list_evidence(
current_user: User = Depends(get_current_user),
):
"""List all evidences for a test, optionally filtered by team."""
test = db.query(Test).filter(Test.id == test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
query = db.query(Evidence).filter(Evidence.test_id == test_id)
if team:
query = query.filter(Evidence.team == team)
evidences = query.order_by(Evidence.uploaded_at.desc()).all()
get_test_or_raise(db, test_id)
evidences = list_evidence_for_test(db, test_id, team=team)
return [_evidence_to_out(e) for e in evidences]
@@ -301,13 +169,7 @@ def get_evidence(
current_user: User = Depends(get_current_user),
):
"""Return evidence metadata together with a presigned download URL."""
evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first()
if evidence is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Evidence not found",
)
evidence = get_evidence_or_raise(db, evidence_id)
return _evidence_to_out(evidence)
@@ -329,22 +191,9 @@ def delete_evidence(
- Blue evidence: ``blue_evaluating``
- No deletions in ``in_review``, ``validated``, ``rejected``
"""
evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first()
if evidence is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Evidence not found",
)
test = db.query(Test).filter(Test.id == evidence.test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Parent test not found",
)
# Permission checks
_validate_delete_permission(test, evidence, current_user)
evidence = get_evidence_or_raise(db, evidence_id)
test = get_test_or_raise(db, evidence.test_id)
validate_delete_permission(test, evidence, current_user.role, current_user.id)
# Audit before deletion
log_action(