Files
Aegis/backend/app/routers/test_templates.py
Kitos a4a2adccee
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(phase-39): role-based access control overhaul + forced password change
- Add must_change_password field to User model with migration b023

- Add POST /auth/change-password endpoint with password policy validation

- Add require_password_changed dependency to block requests until password is changed

- Add ChangePasswordModal with live password policy checklist (forced on first login)

- Show password policy in CreateUserModal and EditUserModal

- Fix backend permissions: tests, campaigns, templates, reports, evidence, worklogs

- red_tech/blue_tech: execute only, cannot create tests/campaigns/templates

- red_lead/blue_lead: create/edit tests/campaigns/templates, generate reports, no system access

- viewer: read-only everywhere, can generate reports

- Fix frontend role checks across TestDetailPage, TestDetailHeader, TeamTabs, TestsPage, CampaignsPage, CampaignDetailPage, Sidebar
2026-02-18 10:37:02 +01:00

368 lines
12 KiB
Python

"""CRUD router for TestTemplates — predefined test catalog.
Endpoints
---------
GET /test-templates — list with filters + pagination
GET /test-templates/stats — catalog statistics (admin)
GET /test-templates/{id} — detail
POST /test-templates — create custom (admin)
PATCH /test-templates/{id} — update (admin)
PATCH /test-templates/{id}/toggle-active — toggle active/inactive (admin)
DELETE /test-templates/{id} — soft delete (admin)
GET /test-templates/by-technique/{mitre_id} — templates for a MITRE technique
Filters (GET /test-templates)
-----------------------------
- source: atomic_red_team | mitre | custom
- platform: windows | linux | macos
- severity: low | medium | high | critical
- mitre_technique_id: filter by specific technique
- search: full-text search across name and description
- is_active: true | false (default only active)
- offset / limit: pagination (default limit=50)
"""
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_role, require_any_role
from app.models.test_template import TestTemplate
from app.models.user import User
from app.schemas.test_template import (
TestTemplateCreate,
TestTemplateOut,
TestTemplateSummary,
)
from app.services.audit_service import log_action
router = APIRouter(prefix="/test-templates", tags=["test-templates"])
# ---------------------------------------------------------------------------
# GET /test-templates — list with filters + pagination
# ---------------------------------------------------------------------------
@router.get("", response_model=list[TestTemplateSummary])
def list_templates(
source: Optional[str] = Query(None, description="Filter by source (atomic_red_team, mitre, custom)"),
platform: Optional[str] = Query(None, description="Filter by platform (windows, linux, macos)"),
severity: Optional[str] = Query(None, description="Filter by severity (low, medium, high, critical)"),
mitre_technique_id: Optional[str] = Query(None, description="Filter by MITRE technique ID"),
search: Optional[str] = Query(None, description="Search in name and description"),
is_active: Optional[bool] = Query(None, description="Filter by active status (true/false). Omit to return all."),
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return a paginated, filterable list of test templates."""
query = db.query(TestTemplate)
if is_active is not None:
query = query.filter(TestTemplate.is_active == is_active) # noqa: E712
if source:
query = query.filter(TestTemplate.source == source)
if platform:
from app.utils import escape_like
query = query.filter(TestTemplate.platform.ilike(f"%{escape_like(platform)}%"))
if severity:
query = query.filter(TestTemplate.severity == severity)
if mitre_technique_id:
query = query.filter(TestTemplate.mitre_technique_id == mitre_technique_id)
if search:
from app.utils import escape_like
pattern = f"%{escape_like(search)}%"
query = query.filter(
or_(
TestTemplate.name.ilike(pattern),
TestTemplate.description.ilike(pattern),
)
)
templates = (
query
.order_by(TestTemplate.mitre_technique_id, TestTemplate.name)
.offset(offset)
.limit(limit)
.all()
)
return templates
# ---------------------------------------------------------------------------
# GET /test-templates/stats — catalog statistics (admin)
# ---------------------------------------------------------------------------
@router.get("/stats")
def template_stats(
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Return catalog statistics: totals by source, platform, active/inactive."""
total = db.query(func.count(TestTemplate.id)).scalar() or 0
active = (
db.query(func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.scalar()
) or 0
inactive = total - active
# By source
source_rows = (
db.query(TestTemplate.source, func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.group_by(TestTemplate.source)
.all()
)
by_source = {source: cnt for source, cnt in source_rows}
# By platform
platform_rows = (
db.query(TestTemplate.platform, func.count(TestTemplate.id))
.filter(TestTemplate.is_active == True) # noqa: E712
.group_by(TestTemplate.platform)
.all()
)
by_platform = {(platform or "unspecified"): cnt for platform, cnt in platform_rows}
return {
"total": total,
"active": active,
"inactive": inactive,
"by_source": by_source,
"by_platform": by_platform,
}
# ---------------------------------------------------------------------------
# PATCH /test-templates/bulk-activate — activate/deactivate all (admin)
# ---------------------------------------------------------------------------
@router.patch("/bulk-activate")
def bulk_activate_templates(
activate: bool = Query(True, description="True to activate all, False to deactivate all"),
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Set all templates to active or inactive."""
count = (
db.query(TestTemplate)
.filter(TestTemplate.is_active != activate)
.update({TestTemplate.is_active: activate})
)
db.commit()
log_action(
db,
user_id=current_user.id,
action="bulk_activate_templates" if activate else "bulk_deactivate_templates",
entity_type="test_template",
entity_id=None,
details={"affected": count, "is_active": activate},
)
return {
"detail": f"{'Activated' if activate else 'Deactivated'} {count} templates",
"affected": count,
"is_active": activate,
}
# ---------------------------------------------------------------------------
# GET /test-templates/by-technique/{mitre_id}
# ---------------------------------------------------------------------------
@router.get("/by-technique/{mitre_id}", response_model=list[TestTemplateSummary])
def templates_by_technique(
mitre_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return all active templates mapped to a specific MITRE technique."""
templates = (
db.query(TestTemplate)
.filter(
TestTemplate.mitre_technique_id == mitre_id,
TestTemplate.is_active == True, # noqa: E712
)
.order_by(TestTemplate.name)
.all()
)
return templates
# ---------------------------------------------------------------------------
# GET /test-templates/{id} — detail
# ---------------------------------------------------------------------------
@router.get("/{template_id}", response_model=TestTemplateOut)
def get_template(
template_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return full details for a single test template."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test template not found",
)
return template
# ---------------------------------------------------------------------------
# POST /test-templates — create (admin only)
# ---------------------------------------------------------------------------
@router.post(
"",
response_model=TestTemplateOut,
status_code=status.HTTP_201_CREATED,
)
def create_template(
payload: TestTemplateCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Create a custom test template."""
template = TestTemplate(**payload.model_dump())
db.add(template)
db.commit()
db.refresh(template)
log_action(
db,
user_id=current_user.id,
action="create_test_template",
entity_type="test_template",
entity_id=template.id,
details={
"name": template.name,
"source": template.source,
"mitre_technique_id": template.mitre_technique_id,
},
)
return template
# ---------------------------------------------------------------------------
# PATCH /test-templates/{id} — update (admin only)
# ---------------------------------------------------------------------------
@router.patch("/{template_id}", response_model=TestTemplateOut)
def update_template(
template_id: uuid.UUID,
payload: TestTemplateCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Update fields of an existing test template."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test template not found",
)
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(template, field, value)
db.commit()
db.refresh(template)
log_action(
db,
user_id=current_user.id,
action="update_test_template",
entity_type="test_template",
entity_id=template.id,
details={"updated_fields": list(update_data.keys())},
)
return template
# ---------------------------------------------------------------------------
# PATCH /test-templates/{id}/toggle-active — toggle active/inactive (admin)
# ---------------------------------------------------------------------------
@router.patch("/{template_id}/toggle-active", response_model=TestTemplateOut)
def toggle_template_active(
template_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Toggle a template between active and inactive."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test template not found",
)
template.is_active = not template.is_active
db.commit()
db.refresh(template)
log_action(
db,
user_id=current_user.id,
action="toggle_test_template",
entity_type="test_template",
entity_id=template.id,
details={"name": template.name, "is_active": template.is_active},
)
return template
# ---------------------------------------------------------------------------
# DELETE /test-templates/{id} — soft delete (admin only)
# ---------------------------------------------------------------------------
@router.delete("/{template_id}", status_code=status.HTTP_200_OK)
def delete_template(
template_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Soft-delete a test template by setting ``is_active=False``."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test template not found",
)
template.is_active = False
db.commit()
log_action(
db,
user_id=current_user.id,
action="delete_test_template",
entity_type="test_template",
entity_id=template.id,
details={"name": template.name},
)
return {"detail": "Test template deactivated"}