T-109: Rewrite tests router with full Red/Blue workflow endpoints - list with filters, create from template, Red/Blue team updates with state guards, start-execution, submit-red, submit-blue, validate-red, validate-blue, reopen, and timeline. All using workflow service from Phase 11. T-110: Rewrite evidence router with Red/Blue separation - upload with team field, list with team filter, delete with state-based permissions. Red Team edits in draft/red_executing, Blue Team in blue_evaluating, admin bypasses all. T-111: Create test_templates router with full CRUD - paginated list with source/platform/severity/search filters, by-technique lookup, admin-only create/update, and soft delete. Registered in main.py. T-112: Add POST /system/import-atomic-tests endpoint to system router - admin-only trigger for Atomic Red Team import with error handling and statistics response. Includes validation tests for all four tasks (35 checks total).
243 lines
7.9 KiB
Python
243 lines
7.9 KiB
Python
"""CRUD router for TestTemplates — predefined test catalog.
|
|
|
|
Endpoints
|
|
---------
|
|
GET /test-templates — list with filters + pagination
|
|
GET /test-templates/{id} — detail
|
|
POST /test-templates — create custom (admin)
|
|
PATCH /test-templates/{id} — update (admin)
|
|
DELETE /test-templates/{id} — soft delete (admin)
|
|
GET /test-templates/by-technique/{mitre_id} — templates for a MITRE technique
|
|
|
|
Filters (GET /test-templates)
|
|
-----------------------------
|
|
- source: atomic_red_team | mitre | custom
|
|
- platform: windows | linux | macos
|
|
- severity: low | medium | high | critical
|
|
- mitre_technique_id: filter by specific technique
|
|
- search: full-text search across name and description
|
|
- offset / limit: pagination (default limit=50)
|
|
"""
|
|
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
|
from sqlalchemy import or_
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.dependencies.auth import get_current_user, require_role
|
|
from app.models.test_template import TestTemplate
|
|
from app.models.user import User
|
|
from app.schemas.test_template import (
|
|
TestTemplateCreate,
|
|
TestTemplateOut,
|
|
TestTemplateSummary,
|
|
)
|
|
from app.services.audit_service import log_action
|
|
|
|
router = APIRouter(prefix="/test-templates", tags=["test-templates"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates — list with filters + pagination
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("", response_model=list[TestTemplateSummary])
|
|
def list_templates(
|
|
source: Optional[str] = Query(None, description="Filter by source (atomic_red_team, mitre, custom)"),
|
|
platform: Optional[str] = Query(None, description="Filter by platform (windows, linux, macos)"),
|
|
severity: Optional[str] = Query(None, description="Filter by severity (low, medium, high, critical)"),
|
|
mitre_technique_id: Optional[str] = Query(None, description="Filter by MITRE technique ID"),
|
|
search: Optional[str] = Query(None, description="Search in name and description"),
|
|
offset: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return a paginated, filterable list of active test templates."""
|
|
query = db.query(TestTemplate).filter(TestTemplate.is_active == True) # noqa: E712
|
|
|
|
if source:
|
|
query = query.filter(TestTemplate.source == source)
|
|
if platform:
|
|
query = query.filter(TestTemplate.platform.ilike(f"%{platform}%"))
|
|
if severity:
|
|
query = query.filter(TestTemplate.severity == severity)
|
|
if mitre_technique_id:
|
|
query = query.filter(TestTemplate.mitre_technique_id == mitre_technique_id)
|
|
if search:
|
|
pattern = f"%{search}%"
|
|
query = query.filter(
|
|
or_(
|
|
TestTemplate.name.ilike(pattern),
|
|
TestTemplate.description.ilike(pattern),
|
|
)
|
|
)
|
|
|
|
templates = (
|
|
query
|
|
.order_by(TestTemplate.mitre_technique_id, TestTemplate.name)
|
|
.offset(offset)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
return templates
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates/by-technique/{mitre_id}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/by-technique/{mitre_id}", response_model=list[TestTemplateSummary])
|
|
def templates_by_technique(
|
|
mitre_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return all active templates mapped to a specific MITRE technique."""
|
|
templates = (
|
|
db.query(TestTemplate)
|
|
.filter(
|
|
TestTemplate.mitre_technique_id == mitre_id,
|
|
TestTemplate.is_active == True, # noqa: E712
|
|
)
|
|
.order_by(TestTemplate.name)
|
|
.all()
|
|
)
|
|
return templates
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates/{id} — detail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/{template_id}", response_model=TestTemplateOut)
|
|
def get_template(
|
|
template_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return full details for a single test template."""
|
|
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
|
|
if template is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test template not found",
|
|
)
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /test-templates — create (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post(
|
|
"",
|
|
response_model=TestTemplateOut,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_template(
|
|
payload: TestTemplateCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Create a custom test template. Admin only."""
|
|
template = TestTemplate(**payload.model_dump())
|
|
db.add(template)
|
|
db.commit()
|
|
db.refresh(template)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="create_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={
|
|
"name": template.name,
|
|
"source": template.source,
|
|
"mitre_technique_id": template.mitre_technique_id,
|
|
},
|
|
)
|
|
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /test-templates/{id} — update (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/{template_id}", response_model=TestTemplateOut)
|
|
def update_template(
|
|
template_id: uuid.UUID,
|
|
payload: TestTemplateCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Update fields of an existing test template. Admin only."""
|
|
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
|
|
if template is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test template not found",
|
|
)
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(template, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(template)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="update_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={"updated_fields": list(update_data.keys())},
|
|
)
|
|
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /test-templates/{id} — soft delete (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.delete("/{template_id}", status_code=status.HTTP_200_OK)
|
|
def delete_template(
|
|
template_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Soft-delete a test template by setting ``is_active=False``. Admin only."""
|
|
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
|
|
if template is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test template not found",
|
|
)
|
|
|
|
template.is_active = False
|
|
db.commit()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="delete_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={"name": template.name},
|
|
)
|
|
|
|
return {"detail": "Test template deactivated"}
|