T-109: Rewrite tests router with full Red/Blue workflow endpoints - list with filters, create from template, Red/Blue team updates with state guards, start-execution, submit-red, submit-blue, validate-red, validate-blue, reopen, and timeline. All using workflow service from Phase 11. T-110: Rewrite evidence router with Red/Blue separation - upload with team field, list with team filter, delete with state-based permissions. Red Team edits in draft/red_executing, Blue Team in blue_evaluating, admin bypasses all. T-111: Create test_templates router with full CRUD - paginated list with source/platform/severity/search filters, by-technique lookup, admin-only create/update, and soft delete. Registered in main.py. T-112: Add POST /system/import-atomic-tests endpoint to system router - admin-only trigger for Atomic Red Team import with error handling and statistics response. Includes validation tests for all four tasks (35 checks total).
539 lines
17 KiB
Python
539 lines
17 KiB
Python
"""CRUD router for security Tests — v2 with Red/Blue workflow.
|
|
|
|
Endpoints
|
|
---------
|
|
GET /tests — list with filters (state, technique_id)
|
|
POST /tests — create (red_tech, admin)
|
|
POST /tests/from-template — create from TestTemplate (red_tech, admin)
|
|
GET /tests/{id} — detail with split red/blue evidences
|
|
PATCH /tests/{id} — general update (draft/rejected only)
|
|
PATCH /tests/{id}/red — Red Team updates (draft, red_executing)
|
|
PATCH /tests/{id}/blue — Blue Team updates (blue_evaluating)
|
|
POST /tests/{id}/start-execution — draft → red_executing
|
|
POST /tests/{id}/submit-red — red_executing → blue_evaluating
|
|
POST /tests/{id}/submit-blue — blue_evaluating → in_review
|
|
POST /tests/{id}/validate-red — Red Lead validates
|
|
POST /tests/{id}/validate-blue — Blue Lead validates
|
|
POST /tests/{id}/reopen — rejected → draft
|
|
GET /tests/{id}/timeline — audit-log history for this test
|
|
"""
|
|
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from app.database import get_db
|
|
from app.dependencies.auth import get_current_user, require_any_role
|
|
from app.models.audit import AuditLog
|
|
from app.models.enums import TestState, TeamSide
|
|
from app.models.technique import Technique
|
|
from app.models.test import Test
|
|
from app.models.test_template import TestTemplate
|
|
from app.models.user import User
|
|
from app.schemas.test import (
|
|
TestCreate,
|
|
TestOut,
|
|
TestUpdate,
|
|
TestRedUpdate,
|
|
TestBlueUpdate,
|
|
TestRedValidate,
|
|
TestBlueValidate,
|
|
)
|
|
from app.schemas.test_template import TestTemplateInstantiate
|
|
from app.services.audit_service import log_action
|
|
from app.services.status_service import recalculate_technique_status
|
|
from app.services.test_workflow_service import (
|
|
start_execution as wf_start_execution,
|
|
submit_red_evidence as wf_submit_red,
|
|
submit_blue_evidence as wf_submit_blue,
|
|
validate_as_red_lead as wf_validate_red,
|
|
validate_as_blue_lead as wf_validate_blue,
|
|
reopen_test as wf_reopen,
|
|
)
|
|
|
|
router = APIRouter(prefix="/tests", tags=["tests"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_test_or_404(db: Session, test_id: uuid.UUID) -> Test:
|
|
test = db.query(Test).filter(Test.id == test_id).first()
|
|
if test is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test not found")
|
|
return test
|
|
|
|
|
|
def _get_test_with_technique(db: Session, test_id: uuid.UUID) -> Test:
|
|
test = (
|
|
db.query(Test)
|
|
.options(joinedload(Test.technique))
|
|
.filter(Test.id == test_id)
|
|
.first()
|
|
)
|
|
if test is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test not found")
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /tests — list with filters
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("", response_model=list[TestOut])
|
|
def list_tests(
|
|
state: Optional[str] = Query(None, description="Filter by test state"),
|
|
technique_id: Optional[uuid.UUID] = Query(None, description="Filter by technique"),
|
|
offset: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return a paginated list of tests, optionally filtered by state or technique."""
|
|
query = db.query(Test)
|
|
|
|
if state:
|
|
query = query.filter(Test.state == state)
|
|
if technique_id:
|
|
query = query.filter(Test.technique_id == technique_id)
|
|
|
|
tests = query.order_by(Test.created_at.desc()).offset(offset).limit(limit).all()
|
|
return tests
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests — create (red_tech or admin)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post(
|
|
"",
|
|
response_model=TestOut,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_test(
|
|
payload: TestCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_tech")),
|
|
):
|
|
"""Create a new test linked to an existing technique.
|
|
|
|
``created_by`` is set automatically and ``state`` defaults to *draft*.
|
|
"""
|
|
technique = db.query(Technique).filter(Technique.id == payload.technique_id).first()
|
|
if technique is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Technique with id '{payload.technique_id}' not found",
|
|
)
|
|
|
|
test = Test(
|
|
**payload.model_dump(),
|
|
created_by=current_user.id,
|
|
state=TestState.draft,
|
|
)
|
|
db.add(test)
|
|
db.commit()
|
|
db.refresh(test)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="create_test",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={"name": test.name, "technique_id": str(test.technique_id)},
|
|
)
|
|
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/from-template — create from TestTemplate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post(
|
|
"/from-template",
|
|
response_model=TestOut,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_test_from_template(
|
|
payload: TestTemplateInstantiate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_tech")),
|
|
):
|
|
"""Instantiate a real Test from an existing TestTemplate.
|
|
|
|
The template's fields are copied into the new test as starting data.
|
|
"""
|
|
template = db.query(TestTemplate).filter(TestTemplate.id == payload.template_id).first()
|
|
if template is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"TestTemplate with id '{payload.template_id}' not found",
|
|
)
|
|
|
|
technique = db.query(Technique).filter(Technique.id == payload.technique_id).first()
|
|
if technique is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Technique with id '{payload.technique_id}' not found",
|
|
)
|
|
|
|
test = Test(
|
|
technique_id=payload.technique_id,
|
|
name=template.name,
|
|
description=template.description,
|
|
platform=template.platform,
|
|
procedure_text=template.attack_procedure,
|
|
tool_used=template.tool_suggested,
|
|
created_by=current_user.id,
|
|
state=TestState.draft,
|
|
)
|
|
db.add(test)
|
|
db.commit()
|
|
db.refresh(test)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="create_test_from_template",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={
|
|
"name": test.name,
|
|
"template_id": str(template.id),
|
|
"technique_id": str(test.technique_id),
|
|
},
|
|
)
|
|
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /tests/{id} — detail with evidences split by team
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/{test_id}", response_model=TestOut)
|
|
def get_test(
|
|
test_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return full details for a single test, including its evidences."""
|
|
test = (
|
|
db.query(Test)
|
|
.options(joinedload(Test.evidences))
|
|
.filter(Test.id == test_id)
|
|
.first()
|
|
)
|
|
|
|
if test is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test not found",
|
|
)
|
|
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /tests/{id} — general update (draft / rejected)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/{test_id}", response_model=TestOut)
|
|
def update_test(
|
|
test_id: uuid.UUID,
|
|
payload: TestUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Update one or more fields of an existing test.
|
|
|
|
Only the original creator or an admin can update.
|
|
The test must be in ``draft`` or ``rejected`` state.
|
|
"""
|
|
test = _get_test_or_404(db, test_id)
|
|
|
|
if current_user.role != "admin" and test.created_by != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions",
|
|
)
|
|
|
|
if test.state not in (TestState.draft, TestState.rejected):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Cannot update a test in '{test.state.value}' state (must be draft or rejected)",
|
|
)
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(test, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(test)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="update_test",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={"updated_fields": list(update_data.keys())},
|
|
)
|
|
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /tests/{id}/red — Red Team update (draft, red_executing)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/{test_id}/red", response_model=TestOut)
|
|
def update_test_red(
|
|
test_id: uuid.UUID,
|
|
payload: TestRedUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_tech")),
|
|
):
|
|
"""Red Team updates their fields (allowed in ``draft`` and ``red_executing``)."""
|
|
test = _get_test_or_404(db, test_id)
|
|
|
|
if test.state not in (TestState.draft, TestState.red_executing):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Cannot update red fields in '{test.state.value}' state (must be draft or red_executing)",
|
|
)
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(test, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(test)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="update_test_red",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={"updated_fields": list(update_data.keys())},
|
|
)
|
|
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /tests/{id}/blue — Blue Team update (blue_evaluating only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/{test_id}/blue", response_model=TestOut)
|
|
def update_test_blue(
|
|
test_id: uuid.UUID,
|
|
payload: TestBlueUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("blue_tech")),
|
|
):
|
|
"""Blue Team updates their fields (allowed only in ``blue_evaluating``)."""
|
|
test = _get_test_or_404(db, test_id)
|
|
|
|
if test.state != TestState.blue_evaluating:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Cannot update blue fields in '{test.state.value}' state (must be blue_evaluating)",
|
|
)
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(test, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(test)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="update_test_blue",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={"updated_fields": list(update_data.keys())},
|
|
)
|
|
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/{id}/start-execution — draft → red_executing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/{test_id}/start-execution", response_model=TestOut)
|
|
def start_execution(
|
|
test_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_tech")),
|
|
):
|
|
"""Move a test from ``draft`` to ``red_executing``."""
|
|
test = _get_test_or_404(db, test_id)
|
|
test = wf_start_execution(db, test, current_user)
|
|
db.refresh(test)
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/{id}/submit-red — red_executing → blue_evaluating
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/{test_id}/submit-red", response_model=TestOut)
|
|
def submit_red(
|
|
test_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_tech")),
|
|
):
|
|
"""Red Team finalises — move from ``red_executing`` to ``blue_evaluating``."""
|
|
test = _get_test_or_404(db, test_id)
|
|
test = wf_submit_red(db, test, current_user)
|
|
db.refresh(test)
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/{id}/submit-blue — blue_evaluating → in_review
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/{test_id}/submit-blue", response_model=TestOut)
|
|
def submit_blue(
|
|
test_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("blue_tech")),
|
|
):
|
|
"""Blue Team finalises — move from ``blue_evaluating`` to ``in_review``."""
|
|
test = _get_test_or_404(db, test_id)
|
|
test = wf_submit_blue(db, test, current_user)
|
|
db.refresh(test)
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/{id}/validate-red — Red Lead validates
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/{test_id}/validate-red", response_model=TestOut)
|
|
def validate_red(
|
|
test_id: uuid.UUID,
|
|
payload: TestRedValidate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead")),
|
|
):
|
|
"""Red Lead approves or rejects the red side of a test."""
|
|
test = _get_test_with_technique(db, test_id)
|
|
test = wf_validate_red(
|
|
db, test, current_user,
|
|
validation_status=payload.red_validation_status,
|
|
notes=payload.red_validation_notes,
|
|
)
|
|
|
|
# Recalculate technique status if test reached a terminal state
|
|
if test.state in (TestState.validated, TestState.rejected):
|
|
recalculate_technique_status(db, test.technique)
|
|
|
|
db.refresh(test)
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/{id}/validate-blue — Blue Lead validates
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/{test_id}/validate-blue", response_model=TestOut)
|
|
def validate_blue(
|
|
test_id: uuid.UUID,
|
|
payload: TestBlueValidate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("blue_lead")),
|
|
):
|
|
"""Blue Lead approves or rejects the blue side of a test."""
|
|
test = _get_test_with_technique(db, test_id)
|
|
test = wf_validate_blue(
|
|
db, test, current_user,
|
|
validation_status=payload.blue_validation_status,
|
|
notes=payload.blue_validation_notes,
|
|
)
|
|
|
|
# Recalculate technique status if test reached a terminal state
|
|
if test.state in (TestState.validated, TestState.rejected):
|
|
recalculate_technique_status(db, test.technique)
|
|
|
|
db.refresh(test)
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /tests/{id}/reopen — rejected → draft
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post("/{test_id}/reopen", response_model=TestOut)
|
|
def reopen(
|
|
test_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
|
):
|
|
"""Reopen a rejected test, moving it back to ``draft``."""
|
|
test = _get_test_or_404(db, test_id)
|
|
test = wf_reopen(db, test, current_user)
|
|
db.refresh(test)
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /tests/{id}/timeline — audit history for this test
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/{test_id}/timeline")
|
|
def get_test_timeline(
|
|
test_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return the chronological audit-log history for a test."""
|
|
# Verify the test exists
|
|
_get_test_or_404(db, test_id)
|
|
|
|
logs = (
|
|
db.query(AuditLog)
|
|
.filter(
|
|
AuditLog.entity_type == "test",
|
|
AuditLog.entity_id == str(test_id),
|
|
)
|
|
.order_by(AuditLog.timestamp.asc())
|
|
.all()
|
|
)
|
|
|
|
return [
|
|
{
|
|
"id": str(log.id),
|
|
"action": log.action,
|
|
"user_id": str(log.user_id) if log.user_id else None,
|
|
"timestamp": log.timestamp.isoformat() if log.timestamp else None,
|
|
"details": log.details,
|
|
}
|
|
for log in logs
|
|
]
|