Files
Aegis/backend/app/routers/evidence.py
Kitos a4a2adccee
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(phase-39): role-based access control overhaul + forced password change
- Add must_change_password field to User model with migration b023

- Add POST /auth/change-password endpoint with password policy validation

- Add require_password_changed dependency to block requests until password is changed

- Add ChangePasswordModal with live password policy checklist (forced on first login)

- Show password policy in CreateUserModal and EditUserModal

- Fix backend permissions: tests, campaigns, templates, reports, evidence, worklogs

- red_tech/blue_tech: execute only, cannot create tests/campaigns/templates

- red_lead/blue_lead: create/edit tests/campaigns/templates, generate reports, no system access

- viewer: read-only everywhere, can generate reports

- Fix frontend role checks across TestDetailPage, TestDetailHeader, TeamTabs, TestsPage, CampaignsPage, CampaignDetailPage, Sidebar
2026-02-18 10:37:02 +01:00

367 lines
12 KiB
Python

"""Evidence upload, download, listing and deletion router — v2 with Red/Blue separation.
Endpoints
---------
POST /tests/{test_id}/evidence — upload evidence (with team=red/blue)
GET /tests/{test_id}/evidence — list evidences (filterable by team)
GET /evidence/{id} — presigned download URL
DELETE /evidence/{id} — delete evidence (only in editable states)
Access Control
--------------
- Red Team (``red_tech``) can only upload ``team=red`` when test is in
``draft`` or ``red_executing``.
- Blue Team (``blue_tech``) can only upload ``team=blue`` when test is in
``blue_evaluating``.
- Admin can upload any team in any state.
- DELETE is restricted: red evidence in ``draft``/``red_executing``,
blue evidence in ``blue_evaluating``. No deletions in ``in_review``,
``validated``, or ``rejected``.
"""
import hashlib
import os
import uuid as _uuid
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.enums import TeamSide, TestState
from app.models.evidence import Evidence
from app.models.test import Test
from app.models.user import User
from app.schemas.evidence import EvidenceOut
from app.services.audit_service import log_action
from app.storage import get_presigned_url, upload_file
router = APIRouter(tags=["evidence"])
# States where red evidence can be uploaded / deleted
_RED_EDITABLE_STATES = (TestState.draft, TestState.red_executing)
# States where blue evidence can be uploaded / deleted
_BLUE_EDITABLE_STATES = (TestState.blue_evaluating,)
# ---------------------------------------------------------------------------
# Upload safety limits
# ---------------------------------------------------------------------------
# Maximum upload size in bytes (default 50 MB)
_MAX_UPLOAD_SIZE = 50 * 1024 * 1024
# Allowed file extensions (lowercase, with leading dot)
_ALLOWED_EXTENSIONS: set[str] = {
# Images / screenshots
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".svg",
# Documents
".pdf", ".doc", ".docx", ".xls", ".xlsx", ".csv", ".txt",
".md", ".rtf", ".odt", ".ods",
# Logs & captures
".log", ".pcap", ".pcapng", ".evtx", ".json", ".xml",
".yaml", ".yml", ".toml",
# Archives (for bundled evidence)
".zip", ".tar", ".gz", ".7z",
# Other common evidence types
".har", ".eml", ".msg",
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _evidence_to_out(evidence: Evidence) -> EvidenceOut:
"""Convert an ORM ``Evidence`` to the API schema, injecting a presigned URL."""
return EvidenceOut(
id=evidence.id,
test_id=evidence.test_id,
file_name=evidence.file_name,
sha256_hash=evidence.sha256_hash,
uploaded_by=evidence.uploaded_by,
uploaded_at=evidence.uploaded_at,
team=evidence.team,
notes=evidence.notes,
download_url=get_presigned_url(evidence.file_path),
)
def _validate_upload_permission(
test: Test,
team: TeamSide,
user: User,
) -> None:
"""Raise 403 if the user/team combination is not allowed in the current state."""
# Admins bypass all checks
if user.role == "admin":
return
if team == TeamSide.red:
if user.role not in ("red_tech", "red_lead"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only red_tech, red_lead or admin can upload red evidence",
)
if test.state not in _RED_EDITABLE_STATES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot upload red evidence in '{test.state.value}' state "
f"(allowed in: draft, red_executing)",
)
elif team == TeamSide.blue:
if user.role not in ("blue_tech", "blue_lead"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only blue_tech, blue_lead or admin can upload blue evidence",
)
if test.state not in _BLUE_EDITABLE_STATES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot upload blue evidence in '{test.state.value}' state "
f"(allowed in: blue_evaluating)",
)
def _validate_delete_permission(
test: Test,
evidence: Evidence,
user: User,
) -> None:
"""Raise 403 if the user cannot delete this evidence in the current state."""
# No deletions in review / validated / rejected
if test.state in (TestState.in_review, TestState.validated, TestState.rejected):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Cannot delete evidence when test is in '{test.state.value}' state",
)
# Admin can delete in editable states
if user.role == "admin":
return
ev_team = evidence.team
if ev_team == TeamSide.red:
if test.state not in _RED_EDITABLE_STATES:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete red evidence outside draft/red_executing",
)
if user.role not in ("red_tech", "red_lead") and evidence.uploaded_by != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this evidence",
)
elif ev_team == TeamSide.blue:
if test.state not in _BLUE_EDITABLE_STATES:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete blue evidence outside blue_evaluating",
)
if user.role not in ("blue_tech", "blue_lead") and evidence.uploaded_by != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this evidence",
)
# ---------------------------------------------------------------------------
# POST /tests/{test_id}/evidence — upload with team
# ---------------------------------------------------------------------------
@router.post(
"/tests/{test_id}/evidence",
response_model=EvidenceOut,
status_code=status.HTTP_201_CREATED,
)
async def upload_evidence(
test_id: _uuid.UUID,
file: UploadFile = File(...),
team: TeamSide = Form(TeamSide.red),
notes: Optional[str] = Form(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Upload a file as evidence for the given test.
The ``team`` field (sent as form data) determines whether this is
Red Team (attack) or Blue Team (detection) evidence.
"""
test = db.query(Test).filter(Test.id == test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
# Validate permissions
_validate_upload_permission(test, team, current_user)
# 1. Validate file extension
file_name = file.filename or "unnamed"
_, ext = os.path.splitext(file_name)
if ext.lower() not in _ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File type '{ext}' is not allowed. "
f"Permitted types: {', '.join(sorted(_ALLOWED_EXTENSIONS))}",
)
# 2. Read content with size limit
content = await file.read(_MAX_UPLOAD_SIZE + 1)
if len(content) > _MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File exceeds maximum upload size of "
f"{_MAX_UPLOAD_SIZE // (1024 * 1024)} MB",
)
# 3. Hash
sha256 = hashlib.sha256(content).hexdigest()
# 4. Object key (sanitise filename to prevent path traversal in storage)
safe_name = os.path.basename(file_name)
key = f"{test_id}/{_uuid.uuid4()}_{safe_name}"
# 5. Upload to MinIO
upload_file(content, key)
# 6. Persist metadata
evidence = Evidence(
test_id=test_id,
file_name=safe_name,
file_path=key,
sha256_hash=sha256,
uploaded_by=current_user.id,
team=team,
notes=notes,
)
db.add(evidence)
db.commit()
db.refresh(evidence)
# 7. Audit
log_action(
db,
user_id=current_user.id,
action="upload_evidence",
entity_type="evidence",
entity_id=evidence.id,
details={
"file_name": safe_name,
"sha256": sha256,
"test_id": str(test_id),
"team": team.value,
},
)
return _evidence_to_out(evidence)
# ---------------------------------------------------------------------------
# GET /tests/{test_id}/evidence — list (with optional team filter)
# ---------------------------------------------------------------------------
@router.get("/tests/{test_id}/evidence", response_model=list[EvidenceOut])
def list_evidence(
test_id: _uuid.UUID,
team: Optional[str] = Query(None, description="Filter by team: red or blue"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all evidences for a test, optionally filtered by team."""
test = db.query(Test).filter(Test.id == test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
query = db.query(Evidence).filter(Evidence.test_id == test_id)
if team:
query = query.filter(Evidence.team == team)
evidences = query.order_by(Evidence.uploaded_at.desc()).all()
return [_evidence_to_out(e) for e in evidences]
# ---------------------------------------------------------------------------
# GET /evidence/{id} — presigned download URL
# ---------------------------------------------------------------------------
@router.get("/evidence/{evidence_id}", response_model=EvidenceOut)
def get_evidence(
evidence_id: _uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return evidence metadata together with a presigned download URL."""
evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first()
if evidence is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Evidence not found",
)
return _evidence_to_out(evidence)
# ---------------------------------------------------------------------------
# DELETE /evidence/{id} — delete evidence (editable states only)
# ---------------------------------------------------------------------------
@router.delete("/evidence/{evidence_id}", status_code=status.HTTP_200_OK)
def delete_evidence(
evidence_id: _uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete an evidence record.
Only allowed in editable states:
- Red evidence: ``draft``, ``red_executing``
- Blue evidence: ``blue_evaluating``
- No deletions in ``in_review``, ``validated``, ``rejected``
"""
evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first()
if evidence is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Evidence not found",
)
test = db.query(Test).filter(Test.id == evidence.test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Parent test not found",
)
# Permission checks
_validate_delete_permission(test, evidence, current_user)
# Audit before deletion
log_action(
db,
user_id=current_user.id,
action="delete_evidence",
entity_type="evidence",
entity_id=evidence.id,
details={
"file_name": evidence.file_name,
"test_id": str(evidence.test_id),
"team": evidence.team.value if evidence.team else None,
},
)
db.delete(evidence)
db.commit()
return {"detail": "Evidence deleted"}