diff --git a/backend/app/routers/evidence.py b/backend/app/routers/evidence.py index 58b37fb..2c7797e 100644 --- a/backend/app/routers/evidence.py +++ b/backend/app/routers/evidence.py @@ -24,52 +24,32 @@ import os import uuid as _uuid from typing import Optional -from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status +from fastapi import APIRouter, Depends, File, Form, Query, UploadFile, status from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user -from app.models.enums import TeamSide, TestState +from app.models.enums import TeamSide from app.models.evidence import Evidence -from app.models.test import Test from app.models.user import User from app.schemas.evidence import EvidenceOut from app.services.audit_service import log_action +from app.services.evidence_service import ( + get_evidence_or_raise, + get_test_or_raise, + list_evidence_for_test, + MAX_UPLOAD_SIZE, + validate_delete_permission, + validate_file, + validate_upload_permission, +) from app.storage import get_presigned_url, upload_file router = APIRouter(tags=["evidence"]) -# States where red evidence can be uploaded / deleted -_RED_EDITABLE_STATES = (TestState.draft, TestState.red_executing) -# States where blue evidence can be uploaded / deleted -_BLUE_EDITABLE_STATES = (TestState.blue_evaluating,) # --------------------------------------------------------------------------- -# Upload safety limits -# --------------------------------------------------------------------------- - -# Maximum upload size in bytes (default 50 MB) -_MAX_UPLOAD_SIZE = 50 * 1024 * 1024 - -# Allowed file extensions (lowercase, with leading dot) -_ALLOWED_EXTENSIONS: set[str] = { - # Images / screenshots - ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".svg", - # Documents - ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".csv", ".txt", - ".md", ".rtf", ".odt", ".ods", - # Logs & captures - ".log", ".pcap", ".pcapng", ".evtx", ".json", ".xml", - ".yaml", ".yml", ".toml", - # Archives (for bundled evidence) - ".zip", ".tar", ".gz", ".7z", - # Other common evidence types - ".har", ".eml", ".msg", -} - - -# --------------------------------------------------------------------------- -# Helpers +# Helpers (router-specific: infrastructure / HTTP concerns) # --------------------------------------------------------------------------- def _evidence_to_out(evidence: Evidence) -> EvidenceOut: @@ -87,85 +67,6 @@ def _evidence_to_out(evidence: Evidence) -> EvidenceOut: ) -def _validate_upload_permission( - test: Test, - team: TeamSide, - user: User, -) -> None: - """Raise 403 if the user/team combination is not allowed in the current state.""" - # Admins bypass all checks - if user.role == "admin": - return - - if team == TeamSide.red: - if user.role not in ("red_tech", "red_lead"): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Only red_tech, red_lead or admin can upload red evidence", - ) - if test.state not in _RED_EDITABLE_STATES: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"Cannot upload red evidence in '{test.state.value}' state " - f"(allowed in: draft, red_executing)", - ) - elif team == TeamSide.blue: - if user.role not in ("blue_tech", "blue_lead"): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Only blue_tech, blue_lead or admin can upload blue evidence", - ) - if test.state not in _BLUE_EDITABLE_STATES: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"Cannot upload blue evidence in '{test.state.value}' state " - f"(allowed in: blue_evaluating)", - ) - - -def _validate_delete_permission( - test: Test, - evidence: Evidence, - user: User, -) -> None: - """Raise 403 if the user cannot delete this evidence in the current state.""" - # No deletions in review / validated / rejected - if test.state in (TestState.in_review, TestState.validated, TestState.rejected): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail=f"Cannot delete evidence when test is in '{test.state.value}' state", - ) - - # Admin can delete in editable states - if user.role == "admin": - return - - ev_team = evidence.team - - if ev_team == TeamSide.red: - if test.state not in _RED_EDITABLE_STATES: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Cannot delete red evidence outside draft/red_executing", - ) - if user.role not in ("red_tech", "red_lead") and evidence.uploaded_by != user.id: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Not enough permissions to delete this evidence", - ) - elif ev_team == TeamSide.blue: - if test.state not in _BLUE_EDITABLE_STATES: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Cannot delete blue evidence outside blue_evaluating", - ) - if user.role not in ("blue_tech", "blue_lead") and evidence.uploaded_by != user.id: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Not enough permissions to delete this evidence", - ) - - # --------------------------------------------------------------------------- # POST /tests/{test_id}/evidence — upload with team # --------------------------------------------------------------------------- @@ -189,36 +90,14 @@ async def upload_evidence( The ``team`` field (sent as form data) determines whether this is Red Team (attack) or Blue Team (detection) evidence. """ - test = db.query(Test).filter(Test.id == test_id).first() - if test is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Test not found", - ) + test = get_test_or_raise(db, test_id) + validate_upload_permission(test, team, current_user.role) - # Validate permissions - _validate_upload_permission(test, team, current_user) - - # 1. Validate file extension file_name = file.filename or "unnamed" - _, ext = os.path.splitext(file_name) - if ext.lower() not in _ALLOWED_EXTENSIONS: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"File type '{ext}' is not allowed. " - f"Permitted types: {', '.join(sorted(_ALLOWED_EXTENSIONS))}", - ) + content = await file.read(MAX_UPLOAD_SIZE + 1) + validate_file(file_name, len(content)) - # 2. Read content with size limit - content = await file.read(_MAX_UPLOAD_SIZE + 1) - if len(content) > _MAX_UPLOAD_SIZE: - raise HTTPException( - status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, - detail=f"File exceeds maximum upload size of " - f"{_MAX_UPLOAD_SIZE // (1024 * 1024)} MB", - ) - - # 3. Hash + # Hash sha256 = hashlib.sha256(content).hexdigest() # 4. Object key (sanitise filename to prevent path traversal in storage) @@ -273,19 +152,8 @@ def list_evidence( current_user: User = Depends(get_current_user), ): """List all evidences for a test, optionally filtered by team.""" - test = db.query(Test).filter(Test.id == test_id).first() - if test is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Test not found", - ) - - query = db.query(Evidence).filter(Evidence.test_id == test_id) - - if team: - query = query.filter(Evidence.team == team) - - evidences = query.order_by(Evidence.uploaded_at.desc()).all() + get_test_or_raise(db, test_id) + evidences = list_evidence_for_test(db, test_id, team=team) return [_evidence_to_out(e) for e in evidences] @@ -301,13 +169,7 @@ def get_evidence( current_user: User = Depends(get_current_user), ): """Return evidence metadata together with a presigned download URL.""" - evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first() - if evidence is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Evidence not found", - ) - + evidence = get_evidence_or_raise(db, evidence_id) return _evidence_to_out(evidence) @@ -329,22 +191,9 @@ def delete_evidence( - Blue evidence: ``blue_evaluating`` - No deletions in ``in_review``, ``validated``, ``rejected`` """ - evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first() - if evidence is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Evidence not found", - ) - - test = db.query(Test).filter(Test.id == evidence.test_id).first() - if test is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Parent test not found", - ) - - # Permission checks - _validate_delete_permission(test, evidence, current_user) + evidence = get_evidence_or_raise(db, evidence_id) + test = get_test_or_raise(db, evidence.test_id) + validate_delete_permission(test, evidence, current_user.role, current_user.id) # Audit before deletion log_action( diff --git a/backend/app/services/evidence_service.py b/backend/app/services/evidence_service.py new file mode 100644 index 0000000..afb5841 --- /dev/null +++ b/backend/app/services/evidence_service.py @@ -0,0 +1,167 @@ +"""Evidence service — permission validation, file validation, and query logic. + +Framework-agnostic; uses domain exceptions from app.domain.errors. +The router is responsible for HTTP concerns, file I/O, MinIO upload, +audit logging, and response formatting. +""" + +from __future__ import annotations + +import os +import uuid + +from sqlalchemy.orm import Session + +from app.domain.errors import ( + BusinessRuleViolation, + EntityNotFoundError, + PermissionViolation, +) +from app.models.enums import TeamSide, TestState +from app.models.evidence import Evidence +from app.models.test import Test + +# States where red evidence can be uploaded / deleted +RED_EDITABLE_STATES = (TestState.draft, TestState.red_executing) +# States where blue evidence can be uploaded / deleted +BLUE_EDITABLE_STATES = (TestState.blue_evaluating,) + +# Maximum upload size in bytes (50 MB) +MAX_UPLOAD_SIZE = 50 * 1024 * 1024 + +# Allowed file extensions (lowercase, with leading dot) +ALLOWED_EXTENSIONS: frozenset[str] = frozenset({ + ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".svg", + ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".csv", ".txt", + ".md", ".rtf", ".odt", ".ods", + ".log", ".pcap", ".pcapng", ".evtx", ".json", ".xml", + ".yaml", ".yml", ".toml", + ".zip", ".tar", ".gz", ".7z", + ".har", ".eml", ".msg", +}) + + +def validate_upload_permission( + test: Test, + team: TeamSide, + user_role: str, +) -> None: + """Validate that the user can upload evidence for the given team in the current state. + + Raises: + PermissionViolation: If user lacks role to upload for this team. + BusinessRuleViolation: If test state does not allow uploading for this team. + """ + if user_role == "admin": + return + + if team == TeamSide.red: + if user_role not in ("red_tech", "red_lead"): + raise PermissionViolation( + "Only red_tech, red_lead or admin can upload red evidence" + ) + if test.state not in RED_EDITABLE_STATES: + raise BusinessRuleViolation( + f"Cannot upload red evidence in '{test.state.value}' state " + "(allowed in: draft, red_executing)" + ) + elif team == TeamSide.blue: + if user_role not in ("blue_tech", "blue_lead"): + raise PermissionViolation( + "Only blue_tech, blue_lead or admin can upload blue evidence" + ) + if test.state not in BLUE_EDITABLE_STATES: + raise BusinessRuleViolation( + f"Cannot upload blue evidence in '{test.state.value}' state " + "(allowed in: blue_evaluating)" + ) + + +def validate_delete_permission( + test: Test, + evidence: Evidence, + user_role: str, + user_id: uuid.UUID, +) -> None: + """Validate that the user can delete this evidence in the current state. + + Raises: + PermissionViolation: If user cannot delete in this state or lacks permission. + """ + if test.state in (TestState.in_review, TestState.validated, TestState.rejected): + raise PermissionViolation( + f"Cannot delete evidence when test is in '{test.state.value}' state" + ) + + if user_role == "admin": + return + + ev_team = evidence.team + + if ev_team == TeamSide.red: + if test.state not in RED_EDITABLE_STATES: + raise PermissionViolation( + "Cannot delete red evidence outside draft/red_executing" + ) + if user_role not in ("red_tech", "red_lead") and evidence.uploaded_by != user_id: + raise PermissionViolation( + "Not enough permissions to delete this evidence" + ) + elif ev_team == TeamSide.blue: + if test.state not in BLUE_EDITABLE_STATES: + raise PermissionViolation( + "Cannot delete blue evidence outside blue_evaluating" + ) + if user_role not in ("blue_tech", "blue_lead") and evidence.uploaded_by != user_id: + raise PermissionViolation( + "Not enough permissions to delete this evidence" + ) + + +def validate_file(file_name: str, content_size: int) -> None: + """Validate file extension and size. + + Raises: + BusinessRuleViolation: If extension is not allowed or file exceeds size limit. + """ + _, ext = os.path.splitext(file_name) + ext_lower = ext.lower() if ext else "" + if ext_lower not in ALLOWED_EXTENSIONS: + raise BusinessRuleViolation( + f"File type '{ext}' is not allowed. " + f"Permitted types: {', '.join(sorted(ALLOWED_EXTENSIONS))}" + ) + if content_size > MAX_UPLOAD_SIZE: + raise BusinessRuleViolation( + f"File exceeds maximum upload size of {MAX_UPLOAD_SIZE // (1024 * 1024)} MB" + ) + + +def list_evidence_for_test( + db: Session, + test_id: uuid.UUID, + *, + team: TeamSide | str | None = None, +) -> list[Evidence]: + """Return evidence for a test, optionally filtered by team.""" + query = db.query(Evidence).filter(Evidence.test_id == test_id) + if team is not None: + team_enum = TeamSide(team) if isinstance(team, str) else team + query = query.filter(Evidence.team == team_enum) + return query.order_by(Evidence.uploaded_at.desc()).all() + + +def get_evidence_or_raise(db: Session, evidence_id: uuid.UUID) -> Evidence: + """Fetch evidence by ID. Raises EntityNotFoundError if not found.""" + evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first() + if evidence is None: + raise EntityNotFoundError("Evidence", str(evidence_id)) + return evidence + + +def get_test_or_raise(db: Session, test_id: uuid.UUID) -> Test: + """Fetch test by ID. Raises EntityNotFoundError if not found.""" + test = db.query(Test).filter(Test.id == test_id).first() + if test is None: + raise EntityNotFoundError("Test", str(test_id)) + return test diff --git a/backend/tests/test_t110_evidence_router.py b/backend/tests/test_t110_evidence_router.py index 6a18867..fee04cd 100644 --- a/backend/tests/test_t110_evidence_router.py +++ b/backend/tests/test_t110_evidence_router.py @@ -89,12 +89,12 @@ for mod_name in [ # Imports # --------------------------------------------------------------------------- -from fastapi import HTTPException +from app.domain.errors import PermissionViolation from app.models.enums import TeamSide, TestState -from app.routers.evidence import ( - router, - _validate_upload_permission, - _validate_delete_permission, +from app.routers.evidence import router +from app.services.evidence_service import ( + validate_delete_permission, + validate_upload_permission, ) @@ -132,7 +132,7 @@ def test_red_tech_upload_red_in_red_executing(): test = _make_test(TestState.red_executing) user = _make_user("red_tech") # Should not raise - _validate_upload_permission(test, TeamSide.red, user) + validate_upload_permission(test, TeamSide.red, user.role) print(" [PASS] red_tech can upload team=red in red_executing") @@ -144,7 +144,7 @@ def test_red_tech_upload_red_in_red_executing(): def test_red_tech_upload_red_in_draft(): test = _make_test(TestState.draft) user = _make_user("red_tech") - _validate_upload_permission(test, TeamSide.red, user) + validate_upload_permission(test, TeamSide.red, user.role) print(" [PASS] red_tech can upload team=red in draft") @@ -157,10 +157,10 @@ def test_red_tech_cannot_upload_blue(): test = _make_test(TestState.red_executing) user = _make_user("red_tech") try: - _validate_upload_permission(test, TeamSide.blue, user) - assert False, "Should have raised HTTPException" - except HTTPException as exc: - assert exc.status_code == 403 + validate_upload_permission(test, TeamSide.blue, user.role) + assert False, "Should have raised PermissionViolation" + except PermissionViolation: + pass print(" [PASS] red_tech CANNOT upload team=blue (403)") @@ -172,7 +172,7 @@ def test_red_tech_cannot_upload_blue(): def test_blue_tech_upload_blue_in_blue_evaluating(): test = _make_test(TestState.blue_evaluating) user = _make_user("blue_tech") - _validate_upload_permission(test, TeamSide.blue, user) + validate_upload_permission(test, TeamSide.blue, user.role) print(" [PASS] blue_tech can upload team=blue in blue_evaluating") @@ -185,10 +185,10 @@ def test_blue_tech_cannot_upload_red(): test = _make_test(TestState.blue_evaluating) user = _make_user("blue_tech") try: - _validate_upload_permission(test, TeamSide.red, user) - assert False, "Should have raised HTTPException" - except HTTPException as exc: - assert exc.status_code == 403 + validate_upload_permission(test, TeamSide.red, user.role) + assert False, "Should have raised PermissionViolation" + except PermissionViolation: + pass print(" [PASS] blue_tech CANNOT upload team=red (403)") @@ -223,10 +223,10 @@ def test_delete_in_review_fails(): user = _make_user("red_tech") evidence = _make_evidence(TeamSide.red, uploaded_by=user.id) try: - _validate_delete_permission(test, evidence, user) - assert False, "Should have raised HTTPException" - except HTTPException as exc: - assert exc.status_code == 403 + validate_delete_permission(test, evidence, user.role, user.id) + assert False, "Should have raised PermissionViolation" + except PermissionViolation: + pass print(" [PASS] DELETE in in_review -> 403") @@ -240,7 +240,7 @@ def test_delete_red_evidence_in_red_executing(): user = _make_user("red_tech") evidence = _make_evidence(TeamSide.red, uploaded_by=user.id) # Should not raise - _validate_delete_permission(test, evidence, user) + validate_delete_permission(test, evidence, user.role, user.id) print(" [PASS] DELETE red evidence in red_executing -> allowed") @@ -254,11 +254,11 @@ def test_admin_bypass(): # Red in blue_evaluating (normally blocked) test1 = _make_test(TestState.blue_evaluating) - _validate_upload_permission(test1, TeamSide.red, admin) + validate_upload_permission(test1, TeamSide.red, admin.role) # Blue in draft (normally blocked) test2 = _make_test(TestState.draft) - _validate_upload_permission(test2, TeamSide.blue, admin) + validate_upload_permission(test2, TeamSide.blue, admin.role) print(" [PASS] Admin can upload any team in any state") diff --git a/backend/tests/test_workflow.py b/backend/tests/test_workflow.py index 0b61848..0a2cc46 100644 --- a/backend/tests/test_workflow.py +++ b/backend/tests/test_workflow.py @@ -419,56 +419,57 @@ def test_dual_validation_red_approves_blue_rejects(mock_log): def test_evidence_team_separation(): """Verify evidence router logic separates red and blue evidence correctly.""" - from app.routers.evidence import _validate_upload_permission, _RED_EDITABLE_STATES, _BLUE_EDITABLE_STATES + from app.domain.errors import BusinessRuleViolation, PermissionViolation + from app.models.enums import TeamSide + from app.services.evidence_service import validate_upload_permission # Red tech can upload red evidence in draft test = _make_test(TestState.draft) red_user = _make_user("red_tech") red_user.role = "red_tech" - from app.models.enums import TeamSide - _validate_upload_permission(test, TeamSide.red, red_user) # should not raise + validate_upload_permission(test, TeamSide.red, red_user.role) # should not raise # Red tech can upload red evidence in red_executing test.state = TestState.red_executing - _validate_upload_permission(test, TeamSide.red, red_user) # should not raise + validate_upload_permission(test, TeamSide.red, red_user.role) # should not raise - # Red tech CANNOT upload red evidence in blue_evaluating + # Red tech CANNOT upload red evidence in blue_evaluating (state violation -> 400) test.state = TestState.blue_evaluating try: - _validate_upload_permission(test, TeamSide.red, red_user) - assert False, "Should have raised HTTPException" - except HTTPException as exc: - assert exc.status_code == 400 + validate_upload_permission(test, TeamSide.red, red_user.role) + assert False, "Should have raised BusinessRuleViolation" + except BusinessRuleViolation: + pass - # Red tech CANNOT upload blue evidence + # Red tech CANNOT upload blue evidence (role violation -> 403) test.state = TestState.blue_evaluating try: - _validate_upload_permission(test, TeamSide.blue, red_user) - assert False, "Should have raised HTTPException" - except HTTPException as exc: - assert exc.status_code == 403 + validate_upload_permission(test, TeamSide.blue, red_user.role) + assert False, "Should have raised PermissionViolation" + except PermissionViolation: + pass # Blue tech can upload blue evidence in blue_evaluating test.state = TestState.blue_evaluating blue_user = _make_user("blue_tech") blue_user.role = "blue_tech" - _validate_upload_permission(test, TeamSide.blue, blue_user) # should not raise + validate_upload_permission(test, TeamSide.blue, blue_user.role) # should not raise - # Blue tech CANNOT upload blue evidence in draft + # Blue tech CANNOT upload blue evidence in draft (state violation -> 400) test.state = TestState.draft try: - _validate_upload_permission(test, TeamSide.blue, blue_user) - assert False, "Should have raised HTTPException" - except HTTPException as exc: - assert exc.status_code == 400 + validate_upload_permission(test, TeamSide.blue, blue_user.role) + assert False, "Should have raised BusinessRuleViolation" + except BusinessRuleViolation: + pass - # Blue tech CANNOT upload red evidence + # Blue tech CANNOT upload red evidence (role violation -> 403) test.state = TestState.draft try: - _validate_upload_permission(test, TeamSide.red, blue_user) - assert False, "Should have raised HTTPException" - except HTTPException as exc: - assert exc.status_code == 403 + validate_upload_permission(test, TeamSide.red, blue_user.role) + assert False, "Should have raised PermissionViolation" + except PermissionViolation: + pass # ===========================================================================