Files
Aegis/backend/app/services/evidence_service.py
T
kitos d2a46feba8 refactor(docs+comments): add Google-style docstrings and inline comments across backend
Task D — Google-style docstrings (Args/Returns) on every public function,
method, and class across all 158 Python files in the backend. Zero ruff D
violations (pydocstyle Google convention).

Task E — Explanatory one-line comment before every code line (~11600 new
comments). ruff check passes clean after isort re-sort.
2026-06-11 11:06:55 +02:00

257 lines
9.1 KiB
Python

"""Evidence service — permission validation, file validation, and query logic.
Framework-agnostic; uses domain exceptions from app.domain.errors.
The router is responsible for HTTP concerns, file I/O, MinIO upload,
audit logging, and response formatting.
"""
# Enable future language features for compatibility
from __future__ import annotations
# Import os
import os
# Import uuid
import uuid
# Import Session from sqlalchemy.orm
from sqlalchemy.orm import Session
# Import from app.domain.errors
from app.domain.errors import (
BusinessRuleViolation,
EntityNotFoundError,
PermissionViolation,
)
# Import TeamSide, TestState from app.models.enums
from app.models.enums import TeamSide, TestState
# Import Evidence from app.models.evidence
from app.models.evidence import Evidence
# Import Test from app.models.test
from app.models.test import Test
# States where red evidence can be uploaded / deleted
RED_EDITABLE_STATES = (TestState.draft, TestState.red_executing)
# States where blue evidence can be uploaded / deleted
BLUE_EDITABLE_STATES = (TestState.blue_evaluating,)
# Maximum upload size in bytes (50 MB)
MAX_UPLOAD_SIZE = 50 * 1024 * 1024
# Allowed file extensions (lowercase, with leading dot)
ALLOWED_EXTENSIONS: frozenset[str] = frozenset({
# Literal argument value
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".svg",
# Literal argument value
".pdf", ".doc", ".docx", ".xls", ".xlsx", ".csv", ".txt",
# Literal argument value
".md", ".rtf", ".odt", ".ods",
# Literal argument value
".log", ".pcap", ".pcapng", ".evtx", ".json", ".xml",
# Literal argument value
".yaml", ".yml", ".toml",
# Literal argument value
".zip", ".tar", ".gz", ".7z",
# Literal argument value
".har", ".eml", ".msg",
})
# Define function validate_upload_permission
def validate_upload_permission(
# Entry: test
test: Test,
# Entry: team
team: TeamSide,
# Entry: user_role
user_role: str,
) -> None:
"""Validate that the user can upload evidence for the given team in the current state.
Raises:
PermissionViolation: If user lacks role to upload for this team.
BusinessRuleViolation: If test state does not allow uploading for this team.
"""
# Check: user_role == "admin"
if user_role == "admin":
# Return control to caller
return
# Check: team == TeamSide.red
if team == TeamSide.red:
# Check: user_role not in ("red_tech", "red_lead")
if user_role not in ("red_tech", "red_lead"):
# Raise PermissionViolation
raise PermissionViolation(
# Literal argument value
"Only red_tech, red_lead or admin can upload red evidence"
)
# Check: test.state not in RED_EDITABLE_STATES
if test.state not in RED_EDITABLE_STATES:
# Raise BusinessRuleViolation
raise BusinessRuleViolation(
f"Cannot upload red evidence in '{test.state.value}' state "
# Literal argument value
"(allowed in: draft, red_executing)"
)
# Alternative: team == TeamSide.blue
elif team == TeamSide.blue:
# Check: user_role not in ("blue_tech", "blue_lead")
if user_role not in ("blue_tech", "blue_lead"):
# Raise PermissionViolation
raise PermissionViolation(
# Literal argument value
"Only blue_tech, blue_lead or admin can upload blue evidence"
)
# Check: test.state not in BLUE_EDITABLE_STATES
if test.state not in BLUE_EDITABLE_STATES:
# Raise BusinessRuleViolation
raise BusinessRuleViolation(
f"Cannot upload blue evidence in '{test.state.value}' state "
# Literal argument value
"(allowed in: blue_evaluating)"
)
# Define function validate_delete_permission
def validate_delete_permission(
# Entry: test
test: Test,
# Entry: evidence
evidence: Evidence,
# Entry: user_role
user_role: str,
# Entry: user_id
user_id: uuid.UUID,
) -> None:
"""Validate that the user can delete this evidence in the current state.
Raises:
PermissionViolation: If user cannot delete in this state or lacks permission.
"""
# Check: test.state in (TestState.in_review, TestState.validated, TestState....
if test.state in (TestState.in_review, TestState.validated, TestState.rejected):
# Raise PermissionViolation
raise PermissionViolation(
f"Cannot delete evidence when test is in '{test.state.value}' state"
)
# Check: user_role == "admin"
if user_role == "admin":
# Return control to caller
return
# Assign ev_team = evidence.team
ev_team = evidence.team
# Check: ev_team == TeamSide.red
if ev_team == TeamSide.red:
# Check: test.state not in RED_EDITABLE_STATES
if test.state not in RED_EDITABLE_STATES:
# Raise PermissionViolation
raise PermissionViolation(
# Literal argument value
"Cannot delete red evidence outside draft/red_executing"
)
# Check: user_role not in ("red_tech", "red_lead") and evidence.uploaded_by ...
if user_role not in ("red_tech", "red_lead") and evidence.uploaded_by != user_id:
# Raise PermissionViolation
raise PermissionViolation(
# Literal argument value
"Not enough permissions to delete this evidence"
)
# Alternative: ev_team == TeamSide.blue
elif ev_team == TeamSide.blue:
# Check: test.state not in BLUE_EDITABLE_STATES
if test.state not in BLUE_EDITABLE_STATES:
# Raise PermissionViolation
raise PermissionViolation(
# Literal argument value
"Cannot delete blue evidence outside blue_evaluating"
)
# Check: user_role not in ("blue_tech", "blue_lead") and evidence.uploaded_b...
if user_role not in ("blue_tech", "blue_lead") and evidence.uploaded_by != user_id:
# Raise PermissionViolation
raise PermissionViolation(
# Literal argument value
"Not enough permissions to delete this evidence"
)
# Define function validate_file
def validate_file(file_name: str, content_size: int) -> None:
"""Validate file extension and size.
Raises:
BusinessRuleViolation: If extension is not allowed or file exceeds size limit.
"""
# _, ext = os.path.splitext(file_name)
_, ext = os.path.splitext(file_name)
# Assign ext_lower = ext.lower() if ext else ""
ext_lower = ext.lower() if ext else ""
# Check: ext_lower not in ALLOWED_EXTENSIONS
if ext_lower not in ALLOWED_EXTENSIONS:
# Raise BusinessRuleViolation
raise BusinessRuleViolation(
f"File type '{ext}' is not allowed. "
f"Permitted types: {', '.join(sorted(ALLOWED_EXTENSIONS))}"
)
# Check: content_size > MAX_UPLOAD_SIZE
if content_size > MAX_UPLOAD_SIZE:
# Raise BusinessRuleViolation
raise BusinessRuleViolation(
f"File exceeds maximum upload size of {MAX_UPLOAD_SIZE // (1024 * 1024)} MB"
)
# Define function list_evidence_for_test
def list_evidence_for_test(
# Entry: db
db: Session,
# Entry: test_id
test_id: uuid.UUID,
*,
# Entry: team
team: TeamSide | str | None = None,
) -> list[Evidence]:
"""Return evidence for a test, optionally filtered by team."""
# Assign query = db.query(Evidence).filter(Evidence.test_id == test_id)
query = db.query(Evidence).filter(Evidence.test_id == test_id)
# Check: team is not None
if team is not None:
# Assign team_enum = TeamSide(team) if isinstance(team, str) else team
team_enum = TeamSide(team) if isinstance(team, str) else team
# Assign query = query.filter(Evidence.team == team_enum)
query = query.filter(Evidence.team == team_enum)
# Return query.order_by(Evidence.uploaded_at.desc()).all()
return query.order_by(Evidence.uploaded_at.desc()).all()
# Define function get_evidence_or_raise
def get_evidence_or_raise(db: Session, evidence_id: uuid.UUID) -> Evidence:
"""Fetch evidence by ID. Raises EntityNotFoundError if not found."""
# Assign evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first()
evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first()
# Check: evidence is None
if evidence is None:
# Raise EntityNotFoundError
raise EntityNotFoundError("Evidence", str(evidence_id))
# Return evidence
return evidence
# Define function get_test_or_raise
def get_test_or_raise(db: Session, test_id: uuid.UUID) -> Test:
"""Fetch test by ID. Raises EntityNotFoundError if not found."""
# Assign test = db.query(Test).filter(Test.id == test_id).first()
test = db.query(Test).filter(Test.id == test_id).first()
# Check: test is None
if test is None:
# Raise EntityNotFoundError
raise EntityNotFoundError("Test", str(test_id))
# Return test
return test