"""CRUD router for security Tests — v2 with Red/Blue workflow. Endpoints --------- GET /tests — list with filters (state, technique_id) POST /tests — create (red_tech, admin) POST /tests/from-template — create from TestTemplate (red_tech, admin) GET /tests/{id} — detail with split red/blue evidences PATCH /tests/{id} — general update (draft/rejected only) PATCH /tests/{id}/red — Red Team updates (draft, red_executing) PATCH /tests/{id}/blue — Blue Team updates (blue_evaluating) POST /tests/{id}/start-execution — draft → red_executing POST /tests/{id}/submit-red — red_executing → blue_evaluating POST /tests/{id}/start-blue-work — blue tech picks up (sets Tempo timer) POST /tests/{id}/submit-blue — blue_evaluating → in_review POST /tests/{id}/validate-red — Red Lead validates POST /tests/{id}/validate-blue — Blue Lead validates POST /tests/{id}/reopen — rejected → draft GET /tests/{id}/timeline — audit-log history for this test """ import base64 import hashlib import os import uuid from datetime import datetime from typing import Any, Optional # Import APIRouter, Depends, HTTPException, Query, Reque... from fastapi from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from pydantic import BaseModel from sqlalchemy.orm import Session # Import get_db from app.database from app.database import get_db # Import get_current_user, require_any_role, require_role from app.dependencies.auth from app.dependencies.auth import get_current_user, require_any_role, require_role # Import UnitOfWork from app.domain.unit_of_work from app.domain.unit_of_work import UnitOfWork # Import limiter from app.limiter from app.limiter import limiter from app.models.enums import TestState, TestResult, TeamSide from app.models.evidence import Evidence from app.storage import upload_file from app.models.technique import Technique from app.models.test import Test from app.models.user import User # Import from app.schemas.test from app.schemas.test import ( TestBlueUpdate, TestBlueValidate, TestClassificationUpdate, TestCreate, TestOut, TestRedUpdate, TestRedValidate, TestRemediationUpdate, TestUpdate, ) # Import TestTemplateInstantiate from app.schemas.test_template from app.schemas.test_template import TestTemplateInstantiate # Import log_action from app.services.audit_service from app.services.audit_service import log_action # Import recalculate_technique_status from app.services.status_service from app.services.status_service import recalculate_technique_status from app.services.webhook_service import dispatch_webhook from app.services.test_crud_service import ( create_test as crud_create_test, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( create_test_from_template as crud_create_from_template, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( get_test_detail as crud_get_test_detail, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( get_test_or_raise as crud_get_test_or_raise, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( get_test_timeline as crud_get_test_timeline, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( get_test_with_technique as crud_get_test_with_technique, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( list_tests as crud_list_tests, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( update_test as crud_update_test, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( update_test_blue as crud_update_test_blue, ) # Import from app.services.test_crud_service from app.services.test_crud_service import ( update_test_red as crud_update_test_red, ) # Import from app.services.test_workflow_service from app.services.test_workflow_service import ( start_execution as wf_start_execution, submit_red_evidence as wf_submit_red, submit_blue_evidence as wf_submit_blue, start_blue_work as wf_start_blue_work, validate_as_red_lead as wf_validate_red, validate_as_blue_lead as wf_validate_blue, reopen_test as wf_reopen, handle_remediation_completed as wf_handle_remediation, get_retest_chain as wf_get_retest_chain, pause_timer as wf_pause_timer, resume_timer as wf_resume_timer, ) # Assign router = APIRouter(prefix="/tests", tags=["tests"]) router = APIRouter(prefix="/tests", tags=["tests"]) # --------------------------------------------------------------------------- # GET /tests — list with filters # --------------------------------------------------------------------------- @router.get("", response_model=list[TestOut]) # Define function list_tests def list_tests( # Entry: state state: Optional[str] = Query(None, description="Filter by test state"), # Entry: technique_id technique_id: Optional[uuid.UUID] = Query(None, description="Filter by technique"), # Entry: platform platform: Optional[str] = Query(None, description="Filter by platform"), # Entry: created_by created_by: Optional[uuid.UUID] = Query(None, description="Filter by creator"), # Entry: pending_validation_side pending_validation_side: Optional[str] = Query( None, description="Filter in_review tests pending validation on 'red' or 'blue' side" ), not_in_any_campaign: bool = Query( False, description="Only return tests not linked to any campaign" ), offset: int = Query(0, ge=0), # Entry: limit limit: int = Query(50, ge=1, le=200), # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(get_current_user), ) -> list: """Return a paginated list of tests, optionally filtered by state, technique, platform or creator. Args: state (Optional[str]): Filter by test state (e.g. ``draft``, ``validated``). technique_id (Optional[uuid.UUID]): Filter tests belonging to a specific technique. platform (Optional[str]): Filter by target platform (e.g. ``windows``, ``linux``). created_by (Optional[uuid.UUID]): Filter by the UUID of the creator. pending_validation_side (Optional[str]): Filter ``in_review`` tests pending validation on ``'red'`` or ``'blue'`` side. offset (int): Number of records to skip for pagination. limit (int): Maximum number of records to return. db (Session): SQLAlchemy database session. current_user (User): Authenticated user making the request. Returns: list: Serialised list of :class:`TestOut` objects matching the filters. """ # Return crud_list_tests( return crud_list_tests( db, # Keyword argument: state state=state, # Keyword argument: technique_id technique_id=technique_id, # Keyword argument: platform platform=platform, # Keyword argument: created_by created_by=created_by, # Keyword argument: pending_validation_side pending_validation_side=pending_validation_side, not_in_any_campaign=not_in_any_campaign, offset=offset, # Keyword argument: limit limit=limit, ) # --------------------------------------------------------------------------- # POST /tests — create (red_tech or admin) # --------------------------------------------------------------------------- @router.post( # Literal argument value "", # Keyword argument: response_model response_model=TestOut, # Keyword argument: status_code status_code=status.HTTP_201_CREATED, ) # Apply the @limiter.limit decorator @limiter.limit("30/minute") # Define function create_test def create_test( # Entry: request request: Request, # Entry: payload payload: TestCreate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_lead", "blue_lead")), ) -> TestOut: """Create a new test linked to an existing technique. ``created_by`` is set automatically and ``state`` defaults to *draft*. Args: request (Request): FastAPI request object (used by the rate limiter). payload (TestCreate): Fields for the new test, including ``technique_id``. db (Session): SQLAlchemy database session. current_user (User): Authenticated red_lead or blue_lead creating the test. Returns: TestOut: The newly created test with all fields populated. """ # Open context manager with UnitOfWork(db) as uow: # Assign test = crud_create_test( test = crud_create_test( db, # Keyword argument: technique_id technique_id=payload.technique_id, # Keyword argument: creator_id creator_id=current_user.id, **payload.model_dump(exclude={"technique_id"}), ) # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="create_test", # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test.id, # Keyword argument: details details={"name": test.name, "technique_id": str(test.technique_id)}, ) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Auto-create Jira ticket (non-fatal — any failure is logged, not raised) try: from app.services.jira_service import auto_create_test_issue auto_create_test_issue(db, test, current_user) db.commit() except Exception: pass # jira_service already logs warnings internally return test # --------------------------------------------------------------------------- # POST /tests/from-template — create from TestTemplate # --------------------------------------------------------------------------- @router.post( # Literal argument value "/from-template", # Keyword argument: response_model response_model=TestOut, # Keyword argument: status_code status_code=status.HTTP_201_CREATED, ) # Apply the @limiter.limit decorator @limiter.limit("30/minute") # Define function create_test_from_template def create_test_from_template( # Entry: request request: Request, # Entry: payload payload: TestTemplateInstantiate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_lead", "blue_lead")), ) -> TestOut: """Instantiate a real Test from an existing TestTemplate. The template's fields are copied into the new test as starting data. Args: request (Request): FastAPI request object (used by the rate limiter). payload (TestTemplateInstantiate): Contains ``template_id`` and target ``technique_id``. db (Session): SQLAlchemy database session. current_user (User): Authenticated red_lead or blue_lead creating the test. Returns: TestOut: The newly created test populated from the template. """ # Open context manager with UnitOfWork(db) as uow: # Assign test = crud_create_from_template( test = crud_create_from_template( db, # Keyword argument: template_id template_id=payload.template_id, # Keyword argument: technique_id_or_mitre technique_id_or_mitre=payload.technique_id, # Keyword argument: creator_id creator_id=current_user.id, name_override=payload.name, description_override=payload.description, platform_override=payload.platform, procedure_text_override=payload.procedure_text, tool_used_override=payload.tool_used, ) # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="create_test_from_template", # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test.id, # Keyword argument: details details={ # Literal argument value "name": test.name, # Literal argument value "template_id": str(payload.template_id), # Literal argument value "technique_id": str(test.technique_id), }, ) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Auto-create Jira ticket (non-fatal) try: from app.services.jira_service import auto_create_test_issue auto_create_test_issue(db, test, current_user) db.commit() except Exception: pass return test # --------------------------------------------------------------------------- # GET /tests/{id} — detail with evidences split by team # --------------------------------------------------------------------------- @router.get("/{test_id}", response_model=TestOut) # Define function get_test def get_test( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(get_current_user), ) -> TestOut: """Return full details for a single test, including its evidences. Args: test_id (uuid.UUID): Primary key of the test to retrieve. db (Session): SQLAlchemy database session. current_user (User): Authenticated user making the request. Returns: TestOut: Full test detail including split red/blue evidence lists. """ # Return crud_get_test_detail(db, test_id) return crud_get_test_detail(db, test_id) # --------------------------------------------------------------------------- # PATCH /tests/{id} — general update (draft / rejected) # --------------------------------------------------------------------------- @router.patch("/{test_id}", response_model=TestOut) # Define function update_test def update_test( # Entry: test_id test_id: uuid.UUID, # Entry: payload payload: TestUpdate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_lead", "blue_lead")), ) -> TestOut: """Update one or more fields of an existing test. Only leads or admins can update general test fields. The test must be in ``draft`` or ``rejected`` state. Args: test_id (uuid.UUID): Primary key of the test to update. payload (TestUpdate): Partial update payload; only set fields are applied. db (Session): SQLAlchemy database session. current_user (User): Authenticated red_lead or blue_lead performing the update. Returns: TestOut: The updated test with refreshed field values. """ # Assign update_data = payload.model_dump(exclude_unset=True) update_data = payload.model_dump(exclude_unset=True) # Open context manager with UnitOfWork(db) as uow: # Assign test = crud_update_test( test = crud_update_test( db, test_id, # Keyword argument: updater_id updater_id=current_user.id, # Keyword argument: updater_role updater_role=current_user.role, **update_data, ) # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="update_test", # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test.id, # Keyword argument: details details={"updated_fields": list(update_data.keys())}, ) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # PATCH /tests/{id}/classification — admin data classification # --------------------------------------------------------------------------- @router.patch("/{test_id}/classification", response_model=TestOut) # Define function update_test_classification def update_test_classification( # Entry: test_id test_id: uuid.UUID, # Entry: payload payload: TestClassificationUpdate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_role("admin")), ) -> TestOut: """Update the data classification label for a test (admin only). Args: test_id (uuid.UUID): Primary key of the test to classify. payload (TestClassificationUpdate): Contains the new ``data_classification`` value. db (Session): SQLAlchemy database session. current_user (User): Authenticated admin user. Returns: TestOut: The test with the updated ``data_classification`` field. """ # Open context manager with UnitOfWork(db) as uow: # Assign test = crud_get_test_or_raise(db, test_id) test = crud_get_test_or_raise(db, test_id) # Assign test.data_classification = payload.data_classification.value test.data_classification = payload.data_classification.value # Flush changes to DB without committing the transaction db.flush() # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="update_test_classification", # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test.id, # Keyword argument: details details={"data_classification": payload.data_classification.value}, ) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # PATCH /tests/{id}/red — Red Team update (draft, red_executing) # --------------------------------------------------------------------------- @router.patch("/{test_id}/red", response_model=TestOut) # Define function update_test_red def update_test_red( # Entry: test_id test_id: uuid.UUID, # Entry: payload payload: TestRedUpdate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_tech", "red_lead")), ) -> TestOut: """Red Team updates their fields (allowed in ``draft`` and ``red_executing``). Args: test_id (uuid.UUID): Primary key of the test to update. payload (TestRedUpdate): Red-team-specific fields to update. db (Session): SQLAlchemy database session. current_user (User): Authenticated red_tech or red_lead. Returns: TestOut: The updated test with refreshed red-team field values. """ # Assign update_data = payload.model_dump(exclude_unset=True) update_data = payload.model_dump(exclude_unset=True) # Open context manager with UnitOfWork(db) as uow: # Assign test = crud_update_test_red(db, test_id, **update_data) test = crud_update_test_red(db, test_id, **update_data) # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="update_test_red", # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test.id, # Keyword argument: details details={"updated_fields": list(update_data.keys())}, ) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # PATCH /tests/{id}/blue — Blue Team update (blue_evaluating only) # --------------------------------------------------------------------------- @router.patch("/{test_id}/blue", response_model=TestOut) # Define function update_test_blue def update_test_blue( # Entry: test_id test_id: uuid.UUID, # Entry: payload payload: TestBlueUpdate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("blue_tech", "blue_lead")), ) -> TestOut: """Blue Team updates their fields (allowed only in ``blue_evaluating``). Args: test_id (uuid.UUID): Primary key of the test to update. payload (TestBlueUpdate): Blue-team-specific fields to update. db (Session): SQLAlchemy database session. current_user (User): Authenticated blue_tech or blue_lead. Returns: TestOut: The updated test with refreshed blue-team field values. """ # Assign update_data = payload.model_dump(exclude_unset=True) update_data = payload.model_dump(exclude_unset=True) # Open context manager with UnitOfWork(db) as uow: # Assign test = crud_update_test_blue(db, test_id, **update_data) test = crud_update_test_blue(db, test_id, **update_data) # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="update_test_blue", # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test.id, # Keyword argument: details details={"updated_fields": list(update_data.keys())}, ) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # POST /tests/{id}/start-execution — draft → red_executing # --------------------------------------------------------------------------- @router.post("/{test_id}/start-execution", response_model=TestOut) # Define function start_execution def start_execution( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_tech", "red_lead")), ) -> TestOut: """Move a test from ``draft`` to ``red_executing``. Args: test_id (uuid.UUID): Primary key of the test to start. db (Session): SQLAlchemy database session. current_user (User): Authenticated red_tech or red_lead initiating execution. Returns: TestOut: The updated test in ``red_executing`` state. """ # Assign test = crud_get_test_or_raise(db, test_id) test = crud_get_test_or_raise(db, test_id) # Open context manager with UnitOfWork(db) as uow: # Assign test = wf_start_execution(db, test, current_user) test = wf_start_execution(db, test, current_user) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # POST /tests/{id}/submit-red — red_executing → blue_evaluating # --------------------------------------------------------------------------- @router.post("/{test_id}/submit-red", response_model=TestOut) # Define function submit_red def submit_red( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_tech", "red_lead")), ) -> TestOut: """Red Team finalises — move from ``red_executing`` to ``blue_evaluating``. Args: test_id (uuid.UUID): Primary key of the test to submit. db (Session): SQLAlchemy database session. current_user (User): Authenticated red_tech or red_lead submitting red evidence. Returns: TestOut: The updated test in ``blue_evaluating`` state. """ # Assign test = crud_get_test_or_raise(db, test_id) test = crud_get_test_or_raise(db, test_id) # Open context manager with UnitOfWork(db) as uow: # Assign test = wf_submit_red(db, test, current_user) test = wf_submit_red(db, test, current_user) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # POST /tests/{id}/submit-blue — blue_evaluating → in_review # --------------------------------------------------------------------------- @router.post("/{test_id}/submit-blue", response_model=TestOut) # Define function submit_blue def submit_blue( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("blue_tech", "blue_lead")), ) -> TestOut: """Blue Team finalises — move from ``blue_evaluating`` to ``in_review``. Args: test_id (uuid.UUID): Primary key of the test to submit. db (Session): SQLAlchemy database session. current_user (User): Authenticated blue_tech or blue_lead submitting blue evidence. Returns: TestOut: The updated test in ``in_review`` state. """ # Assign test = crud_get_test_or_raise(db, test_id) test = crud_get_test_or_raise(db, test_id) # Open context manager with UnitOfWork(db) as uow: # Assign test = wf_submit_blue(db, test, current_user) test = wf_submit_blue(db, test, current_user) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # POST /tests/{id}/start-blue-work — blue tech picks up test for evaluation # --------------------------------------------------------------------------- @router.post("/{test_id}/start-blue-work", response_model=TestOut) def start_blue_work( test_id: uuid.UUID, db: Session = Depends(get_db), current_user: User = Depends(require_any_role("blue_tech", "blue_lead")), ): """Blue tech picks up the test to start evaluating. Sets the Tempo timer start.""" test = crud_get_test_or_raise(db, test_id) with UnitOfWork(db) as uow: test = wf_start_blue_work(db, test, current_user) uow.commit() db.refresh(test) return test # --------------------------------------------------------------------------- # POST /tests/{id}/pause-timer — pause the active phase timer # --------------------------------------------------------------------------- @router.post("/{test_id}/pause-timer", response_model=TestOut) # Define function pause_timer def pause_timer( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_tech", "blue_tech", "red_lead", "blue_lead")), ) -> TestOut: """Pause the running timer for the current phase (red_executing or blue_evaluating). Args: test_id (uuid.UUID): Primary key of the test whose timer should be paused. db (Session): SQLAlchemy database session. current_user (User): Authenticated team member in the active phase. Returns: TestOut: The updated test with the phase timer paused. """ # Assign test = crud_get_test_or_raise(db, test_id) test = crud_get_test_or_raise(db, test_id) # Open context manager with UnitOfWork(db) as uow: # Assign test = wf_pause_timer(db, test, current_user) test = wf_pause_timer(db, test, current_user) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # POST /tests/{id}/resume-timer — resume a paused phase timer # --------------------------------------------------------------------------- @router.post("/{test_id}/resume-timer", response_model=TestOut) # Define function resume_timer def resume_timer( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_tech", "blue_tech", "red_lead", "blue_lead")), ) -> TestOut: """Resume the paused timer for the current phase. Args: test_id (uuid.UUID): Primary key of the test whose timer should be resumed. db (Session): SQLAlchemy database session. current_user (User): Authenticated team member in the active phase. Returns: TestOut: The updated test with the phase timer running again. """ # Assign test = crud_get_test_or_raise(db, test_id) test = crud_get_test_or_raise(db, test_id) # Open context manager with UnitOfWork(db) as uow: # Assign test = wf_resume_timer(db, test, current_user) test = wf_resume_timer(db, test, current_user) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # POST /tests/{id}/validate-red — Red Lead validates # --------------------------------------------------------------------------- @router.post("/{test_id}/validate-red", response_model=TestOut) # Define function validate_red def validate_red( # Entry: test_id test_id: uuid.UUID, # Entry: payload payload: TestRedValidate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_lead")), ) -> TestOut: """Red Lead approves or rejects the red side of a test. Args: test_id (uuid.UUID): Primary key of the test to validate. payload (TestRedValidate): Validation status and optional notes from the Red Lead. db (Session): SQLAlchemy database session. current_user (User): Authenticated red_lead performing the validation. Returns: TestOut: The updated test reflecting the red validation decision. """ # Assign test = crud_get_test_with_technique(db, test_id) test = crud_get_test_with_technique(db, test_id) # Open context manager with UnitOfWork(db) as uow: # Assign test = wf_validate_red( test = wf_validate_red( db, test, current_user, # Keyword argument: validation_status validation_status=payload.red_validation_status, # Keyword argument: notes notes=payload.red_validation_notes, ) # Check: test.state in (TestState.validated, TestState.rejected) if test.state in (TestState.validated, TestState.rejected): # Call recalculate_technique_status() recalculate_technique_status(db, test.technique) # Flag technique for review — coverage changed if test.technique: test.technique.review_required = True uow.commit() # Reload ORM object attributes from the database db.refresh(test) if test.state == TestState.validated: dispatch_webhook("test.validated", {"test_id": str(test.id), "technique_id": str(test.technique_id), "result": test.result.value if test.result else None}) elif test.state == TestState.rejected: dispatch_webhook("test.rejected", {"test_id": str(test.id), "technique_id": str(test.technique_id)}) return test # --------------------------------------------------------------------------- # POST /tests/{id}/validate-blue — Blue Lead validates # --------------------------------------------------------------------------- @router.post("/{test_id}/validate-blue", response_model=TestOut) # Define function validate_blue def validate_blue( # Entry: test_id test_id: uuid.UUID, # Entry: payload payload: TestBlueValidate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("blue_lead")), ) -> TestOut: """Blue Lead approves or rejects the blue side of a test. Args: test_id (uuid.UUID): Primary key of the test to validate. payload (TestBlueValidate): Validation status and optional notes from the Blue Lead. db (Session): SQLAlchemy database session. current_user (User): Authenticated blue_lead performing the validation. Returns: TestOut: The updated test reflecting the blue validation decision. """ # Assign test = crud_get_test_with_technique(db, test_id) test = crud_get_test_with_technique(db, test_id) # Open context manager with UnitOfWork(db) as uow: # Assign test = wf_validate_blue( test = wf_validate_blue( db, test, current_user, # Keyword argument: validation_status validation_status=payload.blue_validation_status, # Keyword argument: notes notes=payload.blue_validation_notes, ) # Check: test.state in (TestState.validated, TestState.rejected) if test.state in (TestState.validated, TestState.rejected): # Call recalculate_technique_status() recalculate_technique_status(db, test.technique) # Flag technique for review — coverage changed if test.technique: test.technique.review_required = True uow.commit() # Reload ORM object attributes from the database db.refresh(test) if test.state == TestState.validated: dispatch_webhook("test.validated", {"test_id": str(test.id), "technique_id": str(test.technique_id), "result": test.result.value if test.result else None}) elif test.state == TestState.rejected: dispatch_webhook("test.rejected", {"test_id": str(test.id), "technique_id": str(test.technique_id)}) return test # --------------------------------------------------------------------------- # POST /tests/{id}/reopen — rejected → draft # --------------------------------------------------------------------------- @router.post("/{test_id}/reopen", response_model=TestOut) # Define function reopen def reopen( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_lead", "blue_lead")), ) -> TestOut: """Reopen a rejected test, moving it back to ``draft``. Args: test_id (uuid.UUID): Primary key of the rejected test to reopen. db (Session): SQLAlchemy database session. current_user (User): Authenticated red_lead or blue_lead reopening the test. Returns: TestOut: The updated test in ``draft`` state. """ # Assign test = crud_get_test_or_raise(db, test_id) test = crud_get_test_or_raise(db, test_id) # Open context manager with UnitOfWork(db) as uow: # Assign test = wf_reopen(db, test, current_user) test = wf_reopen(db, test, current_user) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # PATCH /tests/{id}/remediation — update remediation fields # --------------------------------------------------------------------------- @router.patch("/{test_id}/remediation", response_model=TestOut) # Define function update_remediation def update_remediation( # Entry: test_id test_id: uuid.UUID, # Entry: payload payload: TestRemediationUpdate, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(require_any_role("red_lead", "blue_lead")), ) -> TestOut: """Update remediation fields on a test. When ``remediation_status`` transitions to ``'completed'``, an automatic re-test is created (subject to ``MAX_RETEST_COUNT``). Args: test_id (uuid.UUID): Primary key of the test to update. payload (TestRemediationUpdate): Remediation fields to update (status, notes, etc.). db (Session): SQLAlchemy database session. current_user (User): Authenticated red_lead or blue_lead updating remediation. Returns: TestOut: The updated test with refreshed remediation fields. """ # Assign test = crud_get_test_or_raise(db, test_id) test = crud_get_test_or_raise(db, test_id) # Assign old_remediation_status = test.remediation_status old_remediation_status = test.remediation_status # Assign update_data = payload.model_dump(exclude_unset=True) update_data = payload.model_dump(exclude_unset=True) # Iterate over update_data.items() for field, value in update_data.items(): # Call setattr() setattr(test, field, value) # Open context manager with UnitOfWork(db) as uow: # Call log_action() log_action( db, # Keyword argument: user_id user_id=current_user.id, # Keyword argument: action action="update_remediation", # Keyword argument: entity_type entity_type="test", # Keyword argument: entity_id entity_id=test.id, # Keyword argument: details details={"updated_fields": list(update_data.keys())}, ) # Assign new_status = update_data.get("remediation_status") new_status = update_data.get("remediation_status") # Check: new_status == "completed" and old_remediation_status != "completed" if new_status == "completed" and old_remediation_status != "completed": # Call wf_handle_remediation() wf_handle_remediation(db, test, current_user) # Call uow.commit() uow.commit() # Reload ORM object attributes from the database db.refresh(test) # Return test return test # --------------------------------------------------------------------------- # GET /tests/{id}/timeline — audit history for this test # --------------------------------------------------------------------------- @router.get("/{test_id}/timeline") # Define function get_test_timeline def get_test_timeline( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(get_current_user), ) -> list: """Return the chronological audit-log history for a test. Args: test_id (uuid.UUID): Primary key of the test whose timeline is requested. db (Session): SQLAlchemy database session. current_user (User): Authenticated user making the request. Returns: list: Chronological list of audit-log entries for the test. """ # Return crud_get_test_timeline(db, test_id) return crud_get_test_timeline(db, test_id) # --------------------------------------------------------------------------- # GET /tests/{id}/retest-chain — full retest chain # --------------------------------------------------------------------------- @router.get("/{test_id}/retest-chain") # Define function get_retest_chain def get_retest_chain( # Entry: test_id test_id: uuid.UUID, # Entry: db db: Session = Depends(get_db), # Entry: current_user current_user: User = Depends(get_current_user), ) -> list: """Return the full chain of retests (original + all retests) for a test. Args: test_id (uuid.UUID): Primary key of any test in the retest chain. db (Session): SQLAlchemy database session. current_user (User): Authenticated user making the request. Returns: list: Ordered list of dicts describing each test in the chain, including state, result, remediation status, and retest metadata. """ # Assign chain = wf_get_retest_chain(db, test_id) chain = wf_get_retest_chain(db, test_id) # Check: not chain if not chain: # Raise HTTPException raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test not found") # Return [ return [ { # Literal argument value "id": str(t.id), # Literal argument value "name": t.name, # Literal argument value "state": t.state.value if t.state else None, # Literal argument value "retest_of": str(t.retest_of) if t.retest_of else None, # Literal argument value "retest_count": t.retest_count, # Literal argument value "result": t.result.value if t.result else None, # Literal argument value "detection_result": t.detection_result.value if t.detection_result else None, # Literal argument value "remediation_status": t.remediation_status, # Literal argument value "created_at": t.created_at.isoformat() if t.created_at else None, } for t in chain ] # --------------------------------------------------------------------------- # POST /tests/{id}/sync-tempo — manual Tempo sync for red execution worklog # --------------------------------------------------------------------------- @router.post("/{test_id}/sync-tempo") def sync_tempo( test_id: uuid.UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Manually sync this test's red team execution worklog(s) to Tempo. Useful when the automatic sync failed at phase completion (e.g. Tempo was not yet configured). Only red_team_execution worklogs are eligible. Already-synced worklogs are skipped. Returns a summary of what happened. """ from datetime import datetime as _dt from app.models.worklog import Worklog from app.services.tempo_service import auto_log_test_worklog from app.services.test_crud_service import get_test_or_raise as _get test = _get(db, test_id) worklogs = ( db.query(Worklog) .filter( Worklog.entity_type == "test", Worklog.entity_id == test_id, Worklog.activity_type == "red_team_execution", ) .all() ) if not worklogs: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No red team execution worklog found for this test.", ) results = [] for wl in worklogs: if wl.tempo_synced: results.append({"worklog_id": str(wl.id), "status": "already_synced"}) continue try: result = auto_log_test_worklog( db=db, test=test, user=current_user, activity_type=wl.activity_type, duration_seconds=wl.duration_seconds, ) if result and isinstance(result, dict): wl.tempo_synced = _dt.utcnow() wl.tempo_worklog_id = str(result.get("tempoWorklogId", "")) db.commit() results.append({"worklog_id": str(wl.id), "status": "synced"}) else: results.append({ "worklog_id": str(wl.id), "status": "skipped", "detail": "Tempo not configured or conditions not met.", }) except Exception as exc: results.append({ "worklog_id": str(wl.id), "status": "error", "detail": str(exc), }) return {"results": results} # --------------------------------------------------------------------------- # POST /tests/{id}/request-discussion — disputed: confirm vote + notify other lead # --------------------------------------------------------------------------- @router.post("/{test_id}/request-discussion") def request_discussion( test_id: uuid.UUID, db: Session = Depends(get_db), current_user: User = Depends(require_any_role("red_lead", "blue_lead", "admin")), ): """Called when the approving lead confirms their vote in a disputed test. Sends a notification to the other lead (who rejected) asking them to discuss and resolve the conflict. The test remains in 'disputed' state. """ from app.models.user import User as UserModel from app.services.notification_service import create_notification test = crud_get_test_or_raise(db, test_id) if test.state.value != "disputed": from app.domain.errors import BusinessRuleViolation raise BusinessRuleViolation("Test is not in disputed state") role = current_user.role # Identify who the "other lead" is (the one who rejected) if (role in ("red_lead", "admin")) and test.red_validation_status == "approved": # Red approved, Blue rejected → notify Blue Lead who rejected rejector_id = test.blue_validated_by rejector_label = "Blue Lead" requester_label = "Red Lead" elif (role in ("blue_lead", "admin")) and test.blue_validation_status == "approved": # Blue approved, Red rejected → notify Red Lead who rejected rejector_id = test.red_validated_by rejector_label = "Red Lead" requester_label = "Blue Lead" else: from app.domain.errors import BusinessRuleViolation raise BusinessRuleViolation( "The conflict state is inconsistent — no approving lead found" ) # Look up the rejecting lead's full info for the response rejector = ( db.query(UserModel).filter(UserModel.id == rejector_id).first() if rejector_id else None ) rejector_name = rejector.username if rejector else rejector_label rejector_email = getattr(rejector, "email", None) if rejector else None # Notify the rejecting lead if rejector_id: try: create_notification( db, user_id=rejector_id, type="validation_conflict", title="Discussion requested on disputed test", message=( f"{requester_label} ({current_user.username}) is confirming their approval " f"of test '{test.name}' and wants to discuss your rejection with you. " f"Please reach out to resolve the disagreement." ), entity_type="test", entity_id=str(test.id), ) except Exception as e: import logging logging.getLogger(__name__).warning( "Failed to send discussion notification: %s", e ) log_action( db, user_id=current_user.id, action="request_dispute_discussion", entity_type="test", entity_id=test.id, details={"test_name": test.name, "rejector": rejector_name}, ) db.commit() return { "status": "notification_sent", "message": f"Discussion request sent to {rejector_name}", "rejector_username": rejector_name, "rejector_email": rejector_email, "rejector_role": rejector_label, } # --------------------------------------------------------------------------- # POST /tests/import-rt — bulk import from a real Red Team engagement # --------------------------------------------------------------------------- _ALLOWED_IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"} _MAX_EVIDENCE_BYTES = 10 * 1024 * 1024 # 10 MB decoded per image class RTEvidenceEntry(BaseModel): filename: str # e.g. "screenshot_edr.png" data: str # base64-encoded image content caption: Optional[str] = None # optional description shown as evidence notes class RTTechniqueEntry(BaseModel): mitre_id: str result: str # "detected" | "not_detected" | "partially_detected" attack_success: bool = True platform: Optional[str] = None notes: Optional[str] = None evidence: list[RTEvidenceEntry] # REQUIRED — at least one image per technique class RTImportPayload(BaseModel): name: str # engagement name, e.g. "Red Team Q1 2024" date: Optional[str] = None # ISO date string description: Optional[str] = None operator: Optional[str] = None # team / company that ran the RT techniques: list[RTTechniqueEntry] @router.post("/import-rt", status_code=status.HTTP_201_CREATED) def import_rt( payload: RTImportPayload, db: Session = Depends(get_db), current_user: User = Depends(require_any_role("red_lead")), ): """Import results from a real Red Team engagement. Creates one Test record per technique in ``validated`` state (bypassing the normal Red/Blue workflow) and immediately recalculates coverage metrics. Requires ``red_lead`` or ``admin`` role. """ # Pre-validate: every technique must include at least one evidence image for entry in payload.techniques: if not entry.evidence: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=( f"Technique {entry.mitre_id} is missing evidence. " "At least one screenshot or image is required per technique." ), ) # Execution date from payload or now exec_date_str = payload.date or datetime.utcnow().date().isoformat() # Result string → TestResult enum _result_map = { "detected": TestResult.detected, "not_detected": TestResult.not_detected, "partially_detected": TestResult.partially_detected, } created: list[dict[str, Any]] = [] skipped: list[dict[str, str]] = [] affected_technique_ids: set = set() with UnitOfWork(db) as uow: for entry in payload.techniques: # Find technique technique = ( db.query(Technique) .filter(Technique.mitre_id == entry.mitre_id.upper()) .first() ) if technique is None: skipped.append({"mitre_id": entry.mitre_id, "reason": "Technique not found"}) continue detection_result = _result_map.get(entry.result) if detection_result is None: skipped.append({"mitre_id": entry.mitre_id, "reason": f"Unknown result value '{entry.result}'"}) continue test_name = f"[RT] {payload.name} — {technique.name}" # Build red_summary from notes + engagement metadata parts = [] if payload.operator: parts.append(f"Operator: {payload.operator}") parts.append(f"Engagement date: {exec_date_str}") if entry.notes: parts.append(f"\n{entry.notes}") red_summary_text = "\n".join(parts) # RT pre-validates the Red side (they ran it), but Blue Lead # must still validate the detection result before it counts. # State = in_review so it appears in the Blue Lead's validation queue. test = Test( technique_id=technique.id, name=test_name, description=payload.description, platform=entry.platform, procedure_text=entry.notes, created_by=current_user.id, state=TestState.in_review, # Red team — approved by the RT operator attack_success=entry.attack_success, red_summary=red_summary_text, red_validation_status="approved", red_validated_by=current_user.id, red_validated_at=datetime.utcnow(), # Blue team — pre-fill the detection result but leave # validation_status pending so Blue Lead must confirm detection_result=detection_result, blue_validation_status=None, # Timing execution_date=exec_date_str, created_at=datetime.utcnow(), ) db.add(test) db.flush() # ── Store evidence images ────────────────────────────── evidence_count = 0 for ev in entry.evidence: safe_name = os.path.basename(ev.filename) or "evidence.png" ext = os.path.splitext(safe_name)[1].lower() if ext not in _ALLOWED_IMAGE_EXTS: # Skip non-image files silently (log warning) continue try: img_bytes = base64.b64decode(ev.data) except Exception: continue # malformed base64 — skip if len(img_bytes) > _MAX_EVIDENCE_BYTES: continue # over size limit — skip sha256 = hashlib.sha256(img_bytes).hexdigest() key = f"{test.id}/{uuid.uuid4()}_{safe_name}" try: upload_file(img_bytes, key) except Exception: continue # storage error — skip but don't abort evidence_obj = Evidence( test_id=test.id, file_name=safe_name, file_path=key, sha256_hash=sha256, uploaded_by=current_user.id, uploaded_at=datetime.utcnow(), team=TeamSide.red, notes=ev.caption, ) db.add(evidence_obj) evidence_count += 1 affected_technique_ids.add(technique.id) created.append({ "mitre_id": entry.mitre_id, "test_name": test_name, "result": entry.result, "attack_success": entry.attack_success, "evidence_attached": evidence_count, }) log_action( db, user_id=current_user.id, action="rt_import_test", entity_type="test", entity_id=test.id, details={"engagement": payload.name, "mitre_id": entry.mitre_id}, ) # Recalculate coverage for all affected techniques for tech_id in affected_technique_ids: tech = db.query(Technique).filter(Technique.id == tech_id).first() if tech: recalculate_technique_status(db, tech) uow.commit() return { "created": len(created), "skipped": len(skipped), "items": created, "warnings": skipped, "engagement": payload.name, }