- Add Pydantic schemas for Technique, Test and Evidence - Add CRUD endpoints for Techniques (list with filters, detail, create, update, review) - Add CRUD endpoints for Tests (create, detail, update, validate, reject) - Add evidence upload with SHA-256 integrity and presigned download URLs - Add MinIO/S3 storage client with bucket auto-creation on startup - Add status_service to recalculate technique coverage from test results - Add require_any_role RBAC dependency for multi-role authorization - Update README with API endpoints reference and project structure
133 lines
3.9 KiB
Python
133 lines
3.9 KiB
Python
"""Evidence upload and download router."""
|
|
|
|
import hashlib
|
|
import uuid as _uuid
|
|
|
|
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.dependencies.auth import get_current_user
|
|
from app.models.evidence import Evidence
|
|
from app.models.test import Test
|
|
from app.models.user import User
|
|
from app.schemas.evidence import EvidenceOut
|
|
from app.services.audit_service import log_action
|
|
from app.storage import get_presigned_url, upload_file
|
|
|
|
router = APIRouter(tags=["evidence"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/{test_id}/evidence — upload
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post(
|
|
"/tests/{test_id}/evidence",
|
|
response_model=EvidenceOut,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
async def upload_evidence(
|
|
test_id: _uuid.UUID,
|
|
file: UploadFile = File(...),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Upload a file as evidence for the given test.
|
|
|
|
Steps:
|
|
1. Read file content and compute SHA-256.
|
|
2. Build an object key ``{test_id}/{uuid}_{filename}``.
|
|
3. Upload to MinIO.
|
|
4. Persist an :class:`Evidence` row in the database.
|
|
5. Write an audit-log entry.
|
|
"""
|
|
# Verify the parent test exists
|
|
test = db.query(Test).filter(Test.id == test_id).first()
|
|
if test is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test not found",
|
|
)
|
|
|
|
# 1. Read content + hash
|
|
content = await file.read()
|
|
sha256 = hashlib.sha256(content).hexdigest()
|
|
|
|
# 2. Object key
|
|
file_name = file.filename or "unnamed"
|
|
key = f"{test_id}/{_uuid.uuid4()}_{file_name}"
|
|
|
|
# 3. Upload to MinIO
|
|
upload_file(content, key)
|
|
|
|
# 4. Persist metadata
|
|
evidence = Evidence(
|
|
test_id=test_id,
|
|
file_name=file_name,
|
|
file_path=key,
|
|
sha256_hash=sha256,
|
|
uploaded_by=current_user.id,
|
|
)
|
|
db.add(evidence)
|
|
db.commit()
|
|
db.refresh(evidence)
|
|
|
|
# 5. Audit
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="upload_evidence",
|
|
entity_type="evidence",
|
|
entity_id=evidence.id,
|
|
details={
|
|
"file_name": file_name,
|
|
"sha256": sha256,
|
|
"test_id": str(test_id),
|
|
},
|
|
)
|
|
|
|
# Build response with download URL
|
|
return _evidence_to_out(evidence)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /evidence/{id} — presigned download URL
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/evidence/{evidence_id}", response_model=EvidenceOut)
|
|
def get_evidence(
|
|
evidence_id: _uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return evidence metadata together with a presigned download URL."""
|
|
evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first()
|
|
if evidence is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Evidence not found",
|
|
)
|
|
|
|
return _evidence_to_out(evidence)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _evidence_to_out(evidence: Evidence) -> EvidenceOut:
|
|
"""Convert an ORM ``Evidence`` to the API schema, injecting a presigned URL."""
|
|
return EvidenceOut(
|
|
id=evidence.id,
|
|
test_id=evidence.test_id,
|
|
file_name=evidence.file_name,
|
|
sha256_hash=evidence.sha256_hash,
|
|
uploaded_by=evidence.uploaded_by,
|
|
uploaded_at=evidence.uploaded_at,
|
|
download_url=get_presigned_url(evidence.file_path),
|
|
)
|