Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Evidence download:
- Replace presigned MinIO URLs with backend proxy endpoint
GET /api/v1/evidence/{id}/file streams the file through the backend
so MinIO never needs to be publicly accessible from browsers
- Add download_file() helper to storage.py (internal boto3 get_object)
- download_url in EvidenceOut now points to the proxy endpoint
Jira attachment:
- Fix add_attachment call: use add_attachment_object(issue_key, BytesIO)
instead of add_attachment(issue_key, filename=..., content=...) which
had wrong keyword args for the installed atlassian-python-api version
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
301 lines
10 KiB
Python
301 lines
10 KiB
Python
"""Evidence upload, download, listing and deletion router — v2 with Red/Blue separation.
|
|
|
|
Endpoints
|
|
---------
|
|
POST /tests/{test_id}/evidence — upload evidence (with team=red/blue)
|
|
GET /tests/{test_id}/evidence — list evidences (filterable by team)
|
|
GET /evidence/{id} — metadata + download_url
|
|
GET /evidence/{id}/file — proxy download (streams file through backend)
|
|
DELETE /evidence/{id} — delete evidence (only in editable states)
|
|
|
|
Access Control
|
|
--------------
|
|
- Red Team (``red_tech``) can only upload ``team=red`` when test is in
|
|
``draft`` or ``red_executing``.
|
|
- Blue Team (``blue_tech``) can only upload ``team=blue`` when test is in
|
|
``blue_evaluating``.
|
|
- Admin can upload any team in any state.
|
|
- DELETE is restricted: red evidence in ``draft``/``red_executing``,
|
|
blue evidence in ``blue_evaluating``. No deletions in ``in_review``,
|
|
``validated``, or ``rejected``.
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import uuid as _uuid
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, File, Form, Query, Request, UploadFile, status
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.domain.unit_of_work import UnitOfWork
|
|
from app.dependencies.auth import get_current_user
|
|
from app.models.enums import TeamSide
|
|
from app.models.evidence import Evidence
|
|
from app.models.user import User
|
|
from app.schemas.evidence import EvidenceOut
|
|
from app.services.audit_service import log_action
|
|
from app.services.evidence_service import (
|
|
get_evidence_or_raise,
|
|
get_test_or_raise,
|
|
list_evidence_for_test,
|
|
MAX_UPLOAD_SIZE,
|
|
validate_delete_permission,
|
|
validate_file,
|
|
validate_upload_permission,
|
|
)
|
|
from app.limiter import limiter
|
|
from app.storage import download_file, upload_file
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(tags=["evidence"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers (router-specific: infrastructure / HTTP concerns)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _evidence_to_out(evidence: Evidence) -> EvidenceOut:
|
|
"""Convert an ORM ``Evidence`` to the API schema.
|
|
|
|
``download_url`` points to the backend proxy endpoint so the browser
|
|
never needs direct access to MinIO.
|
|
"""
|
|
return EvidenceOut(
|
|
id=evidence.id,
|
|
test_id=evidence.test_id,
|
|
file_name=evidence.file_name,
|
|
sha256_hash=evidence.sha256_hash,
|
|
uploaded_by=evidence.uploaded_by,
|
|
uploaded_at=evidence.uploaded_at,
|
|
team=evidence.team,
|
|
notes=evidence.notes,
|
|
download_url=f"/api/v1/evidence/{evidence.id}/file",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/{test_id}/evidence — upload with team
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post(
|
|
"/tests/{test_id}/evidence",
|
|
response_model=EvidenceOut,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
@limiter.limit("10/minute")
|
|
async def upload_evidence(
|
|
request: Request,
|
|
test_id: _uuid.UUID,
|
|
file: UploadFile = File(...),
|
|
team: TeamSide = Form(TeamSide.red),
|
|
notes: Optional[str] = Form(None),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Upload a file as evidence for the given test.
|
|
|
|
The ``team`` field (sent as form data) determines whether this is
|
|
Red Team (attack) or Blue Team (detection) evidence.
|
|
"""
|
|
test = get_test_or_raise(db, test_id)
|
|
validate_upload_permission(test, team, current_user.role)
|
|
|
|
file_name = file.filename or "unnamed"
|
|
content = await file.read(MAX_UPLOAD_SIZE + 1)
|
|
validate_file(file_name, len(content))
|
|
|
|
# Hash
|
|
sha256 = hashlib.sha256(content).hexdigest()
|
|
|
|
# 4. Object key (sanitise filename to prevent path traversal in storage)
|
|
safe_name = os.path.basename(file_name)
|
|
key = f"{test_id}/{_uuid.uuid4()}_{safe_name}"
|
|
|
|
# 5. Upload to MinIO
|
|
upload_file(content, key)
|
|
|
|
# 6. Persist metadata and audit
|
|
with UnitOfWork(db) as uow:
|
|
evidence = Evidence(
|
|
test_id=test_id,
|
|
file_name=safe_name,
|
|
file_path=key,
|
|
sha256_hash=sha256,
|
|
uploaded_by=current_user.id,
|
|
uploaded_at=datetime.utcnow(), # set explicitly — DB column has no server default
|
|
team=team,
|
|
notes=notes,
|
|
)
|
|
db.add(evidence)
|
|
db.flush() # Get evidence.id for audit
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="upload_evidence",
|
|
entity_type="evidence",
|
|
entity_id=evidence.id,
|
|
details={
|
|
"file_name": safe_name,
|
|
"sha256": sha256,
|
|
"test_id": str(test_id),
|
|
"team": team.value,
|
|
},
|
|
)
|
|
uow.commit()
|
|
db.refresh(evidence)
|
|
|
|
# 7. Attach to Jira ticket if one exists (non-fatal)
|
|
_attach_evidence_to_jira(db, test_id, content, safe_name, current_user)
|
|
|
|
return _evidence_to_out(evidence)
|
|
|
|
|
|
def _attach_evidence_to_jira(
|
|
db,
|
|
test_id: _uuid.UUID,
|
|
content: bytes,
|
|
file_name: str,
|
|
actor,
|
|
) -> None:
|
|
"""Attach uploaded evidence to the linked Jira ticket (non-fatal)."""
|
|
try:
|
|
from app.services.jira_service import get_test_jira_key, get_user_jira_client, has_jira_configured
|
|
if not has_jira_configured(actor, db):
|
|
return
|
|
issue_key = get_test_jira_key(db, test_id)
|
|
if not issue_key:
|
|
return
|
|
import io
|
|
jira = get_user_jira_client(actor, db)
|
|
buf = io.BytesIO(content)
|
|
buf.name = file_name # requests uses .name as the multipart filename
|
|
jira.add_attachment_object(issue_key, buf)
|
|
import logging
|
|
logging.getLogger(__name__).info(
|
|
"Attached evidence '%s' to Jira ticket %s", file_name, issue_key
|
|
)
|
|
except Exception as exc:
|
|
import logging
|
|
logging.getLogger(__name__).warning(
|
|
"Failed to attach evidence '%s' to Jira: %s", file_name, exc, exc_info=True
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /tests/{test_id}/evidence — list (with optional team filter)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/tests/{test_id}/evidence", response_model=list[EvidenceOut])
|
|
def list_evidence(
|
|
test_id: _uuid.UUID,
|
|
team: Optional[str] = Query(None, description="Filter by team: red or blue"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List all evidences for a test, optionally filtered by team."""
|
|
get_test_or_raise(db, test_id)
|
|
evidences = list_evidence_for_test(db, test_id, team=team)
|
|
return [_evidence_to_out(e) for e in evidences]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /evidence/{id} — metadata + proxy download URL
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/evidence/{evidence_id}", response_model=EvidenceOut)
|
|
def get_evidence(
|
|
evidence_id: _uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return evidence metadata. ``download_url`` is a backend proxy URL."""
|
|
evidence = get_evidence_or_raise(db, evidence_id)
|
|
return _evidence_to_out(evidence)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /evidence/{id}/file — proxy download (streams file via backend)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/evidence/{evidence_id}/file")
|
|
def download_evidence_file(
|
|
evidence_id: _uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Stream the evidence file through the backend.
|
|
|
|
The browser calls this endpoint (authenticated via JWT cookie/header).
|
|
The backend fetches the file from MinIO internally and streams it back,
|
|
so MinIO never needs to be publicly accessible.
|
|
"""
|
|
import mimetypes
|
|
|
|
evidence = get_evidence_or_raise(db, evidence_id)
|
|
content = download_file(evidence.file_path)
|
|
|
|
mime_type, _ = mimetypes.guess_type(evidence.file_name)
|
|
if not mime_type:
|
|
mime_type = "application/octet-stream"
|
|
|
|
safe_name = evidence.file_name.replace('"', '\\"')
|
|
return StreamingResponse(
|
|
iter([content]),
|
|
media_type=mime_type,
|
|
headers={
|
|
"Content-Disposition": f'inline; filename="{safe_name}"',
|
|
"Content-Length": str(len(content)),
|
|
},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /evidence/{id} — delete evidence (editable states only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.delete("/evidence/{evidence_id}", status_code=status.HTTP_200_OK)
|
|
def delete_evidence(
|
|
evidence_id: _uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Delete an evidence record.
|
|
|
|
Only allowed in editable states:
|
|
- Red evidence: ``draft``, ``red_executing``
|
|
- Blue evidence: ``blue_evaluating``
|
|
- No deletions in ``in_review``, ``validated``, ``rejected``
|
|
"""
|
|
evidence = get_evidence_or_raise(db, evidence_id)
|
|
test = get_test_or_raise(db, evidence.test_id)
|
|
validate_delete_permission(test, evidence, current_user.role, current_user.id)
|
|
|
|
with UnitOfWork(db) as uow:
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="delete_evidence",
|
|
entity_type="evidence",
|
|
entity_id=evidence.id,
|
|
details={
|
|
"file_name": evidence.file_name,
|
|
"test_id": str(evidence.test_id),
|
|
"team": evidence.team.value if evidence.team else None,
|
|
},
|
|
)
|
|
db.delete(evidence)
|
|
uow.commit()
|
|
|
|
return {"detail": "Evidence deleted"}
|