Files
Aegis/backend/app/routers/evidence.py
kitos 7111debd8f
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix(evidence): proxy download + fix Jira attachment signature
Evidence download:
- Replace presigned MinIO URLs with backend proxy endpoint
  GET /api/v1/evidence/{id}/file streams the file through the backend
  so MinIO never needs to be publicly accessible from browsers
- Add download_file() helper to storage.py (internal boto3 get_object)
- download_url in EvidenceOut now points to the proxy endpoint

Jira attachment:
- Fix add_attachment call: use add_attachment_object(issue_key, BytesIO)
  instead of add_attachment(issue_key, filename=..., content=...) which
  had wrong keyword args for the installed atlassian-python-api version

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 11:26:01 +02:00

301 lines
10 KiB
Python

"""Evidence upload, download, listing and deletion router — v2 with Red/Blue separation.
Endpoints
---------
POST /tests/{test_id}/evidence — upload evidence (with team=red/blue)
GET /tests/{test_id}/evidence — list evidences (filterable by team)
GET /evidence/{id} — metadata + download_url
GET /evidence/{id}/file — proxy download (streams file through backend)
DELETE /evidence/{id} — delete evidence (only in editable states)
Access Control
--------------
- Red Team (``red_tech``) can only upload ``team=red`` when test is in
``draft`` or ``red_executing``.
- Blue Team (``blue_tech``) can only upload ``team=blue`` when test is in
``blue_evaluating``.
- Admin can upload any team in any state.
- DELETE is restricted: red evidence in ``draft``/``red_executing``,
blue evidence in ``blue_evaluating``. No deletions in ``in_review``,
``validated``, or ``rejected``.
"""
import hashlib
import logging
import os
import uuid as _uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, Query, Request, UploadFile, status
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.database import get_db
from app.domain.unit_of_work import UnitOfWork
from app.dependencies.auth import get_current_user
from app.models.enums import TeamSide
from app.models.evidence import Evidence
from app.models.user import User
from app.schemas.evidence import EvidenceOut
from app.services.audit_service import log_action
from app.services.evidence_service import (
get_evidence_or_raise,
get_test_or_raise,
list_evidence_for_test,
MAX_UPLOAD_SIZE,
validate_delete_permission,
validate_file,
validate_upload_permission,
)
from app.limiter import limiter
from app.storage import download_file, upload_file
logger = logging.getLogger(__name__)
router = APIRouter(tags=["evidence"])
# ---------------------------------------------------------------------------
# Helpers (router-specific: infrastructure / HTTP concerns)
# ---------------------------------------------------------------------------
def _evidence_to_out(evidence: Evidence) -> EvidenceOut:
"""Convert an ORM ``Evidence`` to the API schema.
``download_url`` points to the backend proxy endpoint so the browser
never needs direct access to MinIO.
"""
return EvidenceOut(
id=evidence.id,
test_id=evidence.test_id,
file_name=evidence.file_name,
sha256_hash=evidence.sha256_hash,
uploaded_by=evidence.uploaded_by,
uploaded_at=evidence.uploaded_at,
team=evidence.team,
notes=evidence.notes,
download_url=f"/api/v1/evidence/{evidence.id}/file",
)
# ---------------------------------------------------------------------------
# POST /tests/{test_id}/evidence — upload with team
# ---------------------------------------------------------------------------
@router.post(
"/tests/{test_id}/evidence",
response_model=EvidenceOut,
status_code=status.HTTP_201_CREATED,
)
@limiter.limit("10/minute")
async def upload_evidence(
request: Request,
test_id: _uuid.UUID,
file: UploadFile = File(...),
team: TeamSide = Form(TeamSide.red),
notes: Optional[str] = Form(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Upload a file as evidence for the given test.
The ``team`` field (sent as form data) determines whether this is
Red Team (attack) or Blue Team (detection) evidence.
"""
test = get_test_or_raise(db, test_id)
validate_upload_permission(test, team, current_user.role)
file_name = file.filename or "unnamed"
content = await file.read(MAX_UPLOAD_SIZE + 1)
validate_file(file_name, len(content))
# Hash
sha256 = hashlib.sha256(content).hexdigest()
# 4. Object key (sanitise filename to prevent path traversal in storage)
safe_name = os.path.basename(file_name)
key = f"{test_id}/{_uuid.uuid4()}_{safe_name}"
# 5. Upload to MinIO
upload_file(content, key)
# 6. Persist metadata and audit
with UnitOfWork(db) as uow:
evidence = Evidence(
test_id=test_id,
file_name=safe_name,
file_path=key,
sha256_hash=sha256,
uploaded_by=current_user.id,
uploaded_at=datetime.utcnow(), # set explicitly — DB column has no server default
team=team,
notes=notes,
)
db.add(evidence)
db.flush() # Get evidence.id for audit
log_action(
db,
user_id=current_user.id,
action="upload_evidence",
entity_type="evidence",
entity_id=evidence.id,
details={
"file_name": safe_name,
"sha256": sha256,
"test_id": str(test_id),
"team": team.value,
},
)
uow.commit()
db.refresh(evidence)
# 7. Attach to Jira ticket if one exists (non-fatal)
_attach_evidence_to_jira(db, test_id, content, safe_name, current_user)
return _evidence_to_out(evidence)
def _attach_evidence_to_jira(
db,
test_id: _uuid.UUID,
content: bytes,
file_name: str,
actor,
) -> None:
"""Attach uploaded evidence to the linked Jira ticket (non-fatal)."""
try:
from app.services.jira_service import get_test_jira_key, get_user_jira_client, has_jira_configured
if not has_jira_configured(actor, db):
return
issue_key = get_test_jira_key(db, test_id)
if not issue_key:
return
import io
jira = get_user_jira_client(actor, db)
buf = io.BytesIO(content)
buf.name = file_name # requests uses .name as the multipart filename
jira.add_attachment_object(issue_key, buf)
import logging
logging.getLogger(__name__).info(
"Attached evidence '%s' to Jira ticket %s", file_name, issue_key
)
except Exception as exc:
import logging
logging.getLogger(__name__).warning(
"Failed to attach evidence '%s' to Jira: %s", file_name, exc, exc_info=True
)
# ---------------------------------------------------------------------------
# GET /tests/{test_id}/evidence — list (with optional team filter)
# ---------------------------------------------------------------------------
@router.get("/tests/{test_id}/evidence", response_model=list[EvidenceOut])
def list_evidence(
test_id: _uuid.UUID,
team: Optional[str] = Query(None, description="Filter by team: red or blue"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all evidences for a test, optionally filtered by team."""
get_test_or_raise(db, test_id)
evidences = list_evidence_for_test(db, test_id, team=team)
return [_evidence_to_out(e) for e in evidences]
# ---------------------------------------------------------------------------
# GET /evidence/{id} — metadata + proxy download URL
# ---------------------------------------------------------------------------
@router.get("/evidence/{evidence_id}", response_model=EvidenceOut)
def get_evidence(
evidence_id: _uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return evidence metadata. ``download_url`` is a backend proxy URL."""
evidence = get_evidence_or_raise(db, evidence_id)
return _evidence_to_out(evidence)
# ---------------------------------------------------------------------------
# GET /evidence/{id}/file — proxy download (streams file via backend)
# ---------------------------------------------------------------------------
@router.get("/evidence/{evidence_id}/file")
def download_evidence_file(
evidence_id: _uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Stream the evidence file through the backend.
The browser calls this endpoint (authenticated via JWT cookie/header).
The backend fetches the file from MinIO internally and streams it back,
so MinIO never needs to be publicly accessible.
"""
import mimetypes
evidence = get_evidence_or_raise(db, evidence_id)
content = download_file(evidence.file_path)
mime_type, _ = mimetypes.guess_type(evidence.file_name)
if not mime_type:
mime_type = "application/octet-stream"
safe_name = evidence.file_name.replace('"', '\\"')
return StreamingResponse(
iter([content]),
media_type=mime_type,
headers={
"Content-Disposition": f'inline; filename="{safe_name}"',
"Content-Length": str(len(content)),
},
)
# ---------------------------------------------------------------------------
# DELETE /evidence/{id} — delete evidence (editable states only)
# ---------------------------------------------------------------------------
@router.delete("/evidence/{evidence_id}", status_code=status.HTTP_200_OK)
def delete_evidence(
evidence_id: _uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete an evidence record.
Only allowed in editable states:
- Red evidence: ``draft``, ``red_executing``
- Blue evidence: ``blue_evaluating``
- No deletions in ``in_review``, ``validated``, ``rejected``
"""
evidence = get_evidence_or_raise(db, evidence_id)
test = get_test_or_raise(db, evidence.test_id)
validate_delete_permission(test, evidence, current_user.role, current_user.id)
with UnitOfWork(db) as uow:
log_action(
db,
user_id=current_user.id,
action="delete_evidence",
entity_type="evidence",
entity_id=evidence.id,
details={
"file_name": evidence.file_name,
"test_id": str(evidence.test_id),
"team": evidence.team.value if evidence.team else None,
},
)
db.delete(evidence)
uow.commit()
return {"detail": "Evidence deleted"}