"""Evidence service — permission validation, file validation, and query logic. Framework-agnostic; uses domain exceptions from app.domain.errors. The router is responsible for HTTP concerns, file I/O, MinIO upload, audit logging, and response formatting. """ # Enable future language features for compatibility from __future__ import annotations # Import os import os # Import uuid import uuid # Import Session from sqlalchemy.orm from sqlalchemy.orm import Session # Import from app.domain.errors from app.domain.errors import ( BusinessRuleViolation, EntityNotFoundError, PermissionViolation, ) # Import TeamSide, TestState from app.models.enums from app.models.enums import TeamSide, TestState # Import Evidence from app.models.evidence from app.models.evidence import Evidence # Import Test from app.models.test from app.models.test import Test # States where red evidence can be uploaded / deleted RED_EDITABLE_STATES = (TestState.draft, TestState.red_executing) # States where blue evidence can be uploaded / deleted BLUE_EDITABLE_STATES = (TestState.blue_evaluating,) # Maximum upload size in bytes (50 MB) MAX_UPLOAD_SIZE = 50 * 1024 * 1024 # Allowed file extensions (lowercase, with leading dot) ALLOWED_EXTENSIONS: frozenset[str] = frozenset({ # Literal argument value ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".svg", # Literal argument value ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".csv", ".txt", # Literal argument value ".md", ".rtf", ".odt", ".ods", # Literal argument value ".log", ".pcap", ".pcapng", ".evtx", ".json", ".xml", # Literal argument value ".yaml", ".yml", ".toml", # Literal argument value ".zip", ".tar", ".gz", ".7z", # Literal argument value ".har", ".eml", ".msg", }) # Define function validate_upload_permission def validate_upload_permission( # Entry: test test: Test, # Entry: team team: TeamSide, # Entry: user_role user_role: str, ) -> None: """Validate that the user can upload evidence for the given team in the current state. Raises: PermissionViolation: If user lacks role to upload for this team. BusinessRuleViolation: If test state does not allow uploading for this team. """ # Check: user_role == "admin" if user_role == "admin": # Return control to caller return # Check: team == TeamSide.red if team == TeamSide.red: # Check: user_role not in ("red_tech", "red_lead") if user_role not in ("red_tech", "red_lead"): # Raise PermissionViolation raise PermissionViolation( # Literal argument value "Only red_tech, red_lead or admin can upload red evidence" ) # Check: test.state not in RED_EDITABLE_STATES if test.state not in RED_EDITABLE_STATES: # Raise BusinessRuleViolation raise BusinessRuleViolation( f"Cannot upload red evidence in '{test.state.value}' state " # Literal argument value "(allowed in: draft, red_executing)" ) # Alternative: team == TeamSide.blue elif team == TeamSide.blue: # Check: user_role not in ("blue_tech", "blue_lead") if user_role not in ("blue_tech", "blue_lead"): # Raise PermissionViolation raise PermissionViolation( # Literal argument value "Only blue_tech, blue_lead or admin can upload blue evidence" ) # Check: test.state not in BLUE_EDITABLE_STATES if test.state not in BLUE_EDITABLE_STATES: # Raise BusinessRuleViolation raise BusinessRuleViolation( f"Cannot upload blue evidence in '{test.state.value}' state " # Literal argument value "(allowed in: blue_evaluating)" ) # Define function validate_delete_permission def validate_delete_permission( # Entry: test test: Test, # Entry: evidence evidence: Evidence, # Entry: user_role user_role: str, # Entry: user_id user_id: uuid.UUID, ) -> None: """Validate that the user can delete this evidence in the current state. Raises: PermissionViolation: If user cannot delete in this state or lacks permission. """ # Check: test.state in (TestState.in_review, TestState.validated, TestState.... if test.state in (TestState.in_review, TestState.validated, TestState.rejected): # Raise PermissionViolation raise PermissionViolation( f"Cannot delete evidence when test is in '{test.state.value}' state" ) # Check: user_role == "admin" if user_role == "admin": # Return control to caller return # Assign ev_team = evidence.team ev_team = evidence.team # Check: ev_team == TeamSide.red if ev_team == TeamSide.red: # Check: test.state not in RED_EDITABLE_STATES if test.state not in RED_EDITABLE_STATES: # Raise PermissionViolation raise PermissionViolation( # Literal argument value "Cannot delete red evidence outside draft/red_executing" ) # Check: user_role not in ("red_tech", "red_lead") and evidence.uploaded_by ... if user_role not in ("red_tech", "red_lead") and evidence.uploaded_by != user_id: # Raise PermissionViolation raise PermissionViolation( # Literal argument value "Not enough permissions to delete this evidence" ) # Alternative: ev_team == TeamSide.blue elif ev_team == TeamSide.blue: # Check: test.state not in BLUE_EDITABLE_STATES if test.state not in BLUE_EDITABLE_STATES: # Raise PermissionViolation raise PermissionViolation( # Literal argument value "Cannot delete blue evidence outside blue_evaluating" ) # Check: user_role not in ("blue_tech", "blue_lead") and evidence.uploaded_b... if user_role not in ("blue_tech", "blue_lead") and evidence.uploaded_by != user_id: # Raise PermissionViolation raise PermissionViolation( # Literal argument value "Not enough permissions to delete this evidence" ) # Define function validate_file def validate_file(file_name: str, content_size: int) -> None: """Validate file extension and size. Raises: BusinessRuleViolation: If extension is not allowed or file exceeds size limit. """ # _, ext = os.path.splitext(file_name) _, ext = os.path.splitext(file_name) # Assign ext_lower = ext.lower() if ext else "" ext_lower = ext.lower() if ext else "" # Check: ext_lower not in ALLOWED_EXTENSIONS if ext_lower not in ALLOWED_EXTENSIONS: # Raise BusinessRuleViolation raise BusinessRuleViolation( f"File type '{ext}' is not allowed. " f"Permitted types: {', '.join(sorted(ALLOWED_EXTENSIONS))}" ) # Check: content_size > MAX_UPLOAD_SIZE if content_size > MAX_UPLOAD_SIZE: # Raise BusinessRuleViolation raise BusinessRuleViolation( f"File exceeds maximum upload size of {MAX_UPLOAD_SIZE // (1024 * 1024)} MB" ) # Define function list_evidence_for_test def list_evidence_for_test( # Entry: db db: Session, # Entry: test_id test_id: uuid.UUID, *, # Entry: team team: TeamSide | str | None = None, ) -> list[Evidence]: """Return evidence for a test, optionally filtered by team.""" # Assign query = db.query(Evidence).filter(Evidence.test_id == test_id) query = db.query(Evidence).filter(Evidence.test_id == test_id) # Check: team is not None if team is not None: # Assign team_enum = TeamSide(team) if isinstance(team, str) else team team_enum = TeamSide(team) if isinstance(team, str) else team # Assign query = query.filter(Evidence.team == team_enum) query = query.filter(Evidence.team == team_enum) # Return query.order_by(Evidence.uploaded_at.desc()).all() return query.order_by(Evidence.uploaded_at.desc()).all() # Define function get_evidence_or_raise def get_evidence_or_raise(db: Session, evidence_id: uuid.UUID) -> Evidence: """Fetch evidence by ID. Raises EntityNotFoundError if not found.""" # Assign evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first() evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first() # Check: evidence is None if evidence is None: # Raise EntityNotFoundError raise EntityNotFoundError("Evidence", str(evidence_id)) # Return evidence return evidence # Define function get_test_or_raise def get_test_or_raise(db: Session, test_id: uuid.UUID) -> Test: """Fetch test by ID. Raises EntityNotFoundError if not found.""" # Assign test = db.query(Test).filter(Test.id == test_id).first() test = db.query(Test).filter(Test.id == test_id).first() # Check: test is None if test is None: # Raise EntityNotFoundError raise EntityNotFoundError("Test", str(test_id)) # Return test return test