282 lines
9.9 KiB
Python
282 lines
9.9 KiB
Python
"""CRUD router for TestTemplates — predefined test catalog.
|
|
|
|
Endpoints
|
|
---------
|
|
GET /test-templates — list with filters + pagination
|
|
GET /test-templates/stats — catalog statistics (admin)
|
|
GET /test-templates/{id} — detail
|
|
POST /test-templates — create custom (admin)
|
|
PATCH /test-templates/{id} — update (admin)
|
|
PATCH /test-templates/{id}/toggle-active — toggle active/inactive (admin)
|
|
DELETE /test-templates/{id} — soft delete (admin)
|
|
GET /test-templates/by-technique/{mitre_id} — templates for a MITRE technique
|
|
|
|
Filters (GET /test-templates)
|
|
-----------------------------
|
|
- source: atomic_red_team | mitre | custom
|
|
- platform: windows | linux | macos
|
|
- severity: low | medium | high | critical
|
|
- mitre_technique_id: filter by specific technique
|
|
- search: full-text search across name and description
|
|
- is_active: true | false (default only active)
|
|
- offset / limit: pagination (default limit=50)
|
|
"""
|
|
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, Query, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.dependencies.auth import get_current_user, require_any_role
|
|
from app.domain.unit_of_work import UnitOfWork
|
|
from app.models.user import User
|
|
from app.schemas.test_template import (
|
|
TestTemplateCreate,
|
|
TestTemplateOut,
|
|
TestTemplateSummary,
|
|
)
|
|
from app.services.audit_service import log_action
|
|
from app.services.test_template_service import (
|
|
bulk_activate,
|
|
create_template as create_template_svc,
|
|
get_template_or_raise,
|
|
get_template_stats,
|
|
get_templates_by_technique as templates_by_technique,
|
|
list_templates,
|
|
soft_delete_template,
|
|
toggle_template_active as toggle_template_active_svc,
|
|
update_template as update_template_svc,
|
|
)
|
|
|
|
router = APIRouter(prefix="/test-templates", tags=["test-templates"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates — list with filters + pagination
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("", response_model=list[TestTemplateSummary])
|
|
def _list_templates_handler(
|
|
source: Optional[str] = Query(None, description="Filter by source (atomic_red_team, mitre, custom)"),
|
|
platform: Optional[str] = Query(None, description="Filter by platform (windows, linux, macos)"),
|
|
severity: Optional[str] = Query(None, description="Filter by severity (low, medium, high, critical)"),
|
|
mitre_technique_id: Optional[str] = Query(None, description="Filter by MITRE technique ID"),
|
|
search: Optional[str] = Query(None, description="Search in name and description"),
|
|
is_active: Optional[bool] = Query(None, description="Filter by active status (true/false). Omit to return all."),
|
|
offset: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return a paginated, filterable list of test templates."""
|
|
return list_templates(
|
|
db,
|
|
source=source,
|
|
platform=platform,
|
|
severity=severity,
|
|
mitre_technique_id=mitre_technique_id,
|
|
search=search,
|
|
is_active=is_active,
|
|
offset=offset,
|
|
limit=limit,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates/stats — catalog statistics (admin)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/stats")
|
|
def template_stats(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
|
):
|
|
"""Return catalog statistics: active, by_source, by_platform."""
|
|
return get_template_stats(db)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /test-templates/bulk-activate — activate/deactivate all (admin)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/bulk-activate")
|
|
def bulk_activate_templates(
|
|
activate: bool = Query(True, description="True to activate all, False to deactivate all"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
|
):
|
|
"""Set all templates to active or inactive."""
|
|
count = bulk_activate(db, activate=activate)
|
|
with UnitOfWork(db) as uow:
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="bulk_activate_templates" if activate else "bulk_deactivate_templates",
|
|
entity_type="test_template",
|
|
entity_id=None,
|
|
details={"affected": count, "is_active": activate},
|
|
)
|
|
uow.commit()
|
|
|
|
return {
|
|
"detail": f"{'Activated' if activate else 'Deactivated'} {count} templates",
|
|
"affected": count,
|
|
"is_active": activate,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates/by-technique/{mitre_id}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/by-technique/{mitre_id}", response_model=list[TestTemplateSummary])
|
|
def _templates_by_technique_handler(
|
|
mitre_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return all active templates mapped to a specific MITRE technique."""
|
|
return templates_by_technique(db, mitre_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates/{id} — detail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/{template_id}", response_model=TestTemplateOut)
|
|
def get_template(
|
|
template_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return full details for a single test template."""
|
|
return get_template_or_raise(db, template_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /test-templates — create (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post(
|
|
"",
|
|
response_model=TestTemplateOut,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_template(
|
|
payload: TestTemplateCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
|
):
|
|
"""Create a custom test template."""
|
|
template = create_template_svc(db, **payload.model_dump())
|
|
with UnitOfWork(db) as uow:
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="create_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={
|
|
"name": template.name,
|
|
"source": template.source,
|
|
"mitre_technique_id": template.mitre_technique_id,
|
|
},
|
|
)
|
|
uow.commit()
|
|
db.refresh(template)
|
|
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /test-templates/{id} — update (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/{template_id}", response_model=TestTemplateOut)
|
|
def update_template(
|
|
template_id: uuid.UUID,
|
|
payload: TestTemplateCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
|
):
|
|
"""Update fields of an existing test template."""
|
|
template = update_template_svc(db, template_id, **payload.model_dump(exclude_unset=True))
|
|
with UnitOfWork(db) as uow:
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="update_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={"updated_fields": list(payload.model_dump(exclude_unset=True).keys())},
|
|
)
|
|
uow.commit()
|
|
db.refresh(template)
|
|
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /test-templates/{id}/toggle-active — toggle active/inactive (admin)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/{template_id}/toggle-active", response_model=TestTemplateOut)
|
|
def toggle_template_active(
|
|
template_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
|
):
|
|
"""Toggle a template between active and inactive (is_active = not is_active)."""
|
|
template = toggle_template_active_svc(db, template_id)
|
|
with UnitOfWork(db) as uow:
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="toggle_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={"name": template.name, "is_active": template.is_active},
|
|
)
|
|
uow.commit()
|
|
db.refresh(template)
|
|
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /test-templates/{id} — soft delete (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.delete("/{template_id}", status_code=status.HTTP_200_OK)
|
|
def delete_template(
|
|
template_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
|
|
):
|
|
"""Soft-delete a test template by setting ``is_active=False``."""
|
|
template = get_template_or_raise(db, template_id)
|
|
soft_delete_template(db, template_id)
|
|
with UnitOfWork(db) as uow:
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="delete_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={"name": template.name},
|
|
)
|
|
uow.commit()
|
|
|
|
return {"detail": "Test template deactivated"}
|