Critical (1-3): - Replace hardcoded admin credentials with secure auto-generation (seed.py) - Enforce SECRET_KEY configuration, fail in production if missing (config.py) - Add Zip Slip and Zip Bomb protection to all ZIP import services High/Medium (4-9): - Add 50MB file size limit and extension whitelist to evidence uploads - Configure CORS origins via environment variable instead of hardcoded - Migrate JWT storage from localStorage to HttpOnly cookies (frontend+backend) - Add rate limiting (5/min) on login endpoint via slowapi - Replace generic dict payloads with Pydantic schemas (mass assignment) Medium (10-17): - Check is_active on login to prevent disabled users from authenticating - Sanitize exception messages in API responses (system, data_sources) - Escape LIKE wildcards in all ilike search filters across 8 routers - Run Docker container as non-root user (appuser) - Make MINIO_SECURE configurable via environment variable - Add password complexity policy (12+ chars, upper/lower/digit/special) - Implement JWT token revocation via in-memory blacklist + reduce TTL to 15min - Replace xml.etree with defusedxml to prevent Billion Laughs attacks Low (18-20): - Add security headers to Nginx (CSP, X-Frame-Options, HSTS-ready, etc.) - Disable Swagger UI/ReDoc/OpenAPI in production - Restrict /health endpoint to internal networks via Nginx ACL Also: rewrite install.sh as interactive wizard for guided deployment, fix test-from-template validation error (technique_id UUID vs MITRE ID)
368 lines
12 KiB
Python
368 lines
12 KiB
Python
"""CRUD router for TestTemplates — predefined test catalog.
|
|
|
|
Endpoints
|
|
---------
|
|
GET /test-templates — list with filters + pagination
|
|
GET /test-templates/stats — catalog statistics (admin)
|
|
GET /test-templates/{id} — detail
|
|
POST /test-templates — create custom (admin)
|
|
PATCH /test-templates/{id} — update (admin)
|
|
PATCH /test-templates/{id}/toggle-active — toggle active/inactive (admin)
|
|
DELETE /test-templates/{id} — soft delete (admin)
|
|
GET /test-templates/by-technique/{mitre_id} — templates for a MITRE technique
|
|
|
|
Filters (GET /test-templates)
|
|
-----------------------------
|
|
- source: atomic_red_team | mitre | custom
|
|
- platform: windows | linux | macos
|
|
- severity: low | medium | high | critical
|
|
- mitre_technique_id: filter by specific technique
|
|
- search: full-text search across name and description
|
|
- is_active: true | false (default only active)
|
|
- offset / limit: pagination (default limit=50)
|
|
"""
|
|
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
|
from sqlalchemy import func, or_
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.dependencies.auth import get_current_user, require_role
|
|
from app.models.test_template import TestTemplate
|
|
from app.models.user import User
|
|
from app.schemas.test_template import (
|
|
TestTemplateCreate,
|
|
TestTemplateOut,
|
|
TestTemplateSummary,
|
|
)
|
|
from app.services.audit_service import log_action
|
|
|
|
router = APIRouter(prefix="/test-templates", tags=["test-templates"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates — list with filters + pagination
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("", response_model=list[TestTemplateSummary])
|
|
def list_templates(
|
|
source: Optional[str] = Query(None, description="Filter by source (atomic_red_team, mitre, custom)"),
|
|
platform: Optional[str] = Query(None, description="Filter by platform (windows, linux, macos)"),
|
|
severity: Optional[str] = Query(None, description="Filter by severity (low, medium, high, critical)"),
|
|
mitre_technique_id: Optional[str] = Query(None, description="Filter by MITRE technique ID"),
|
|
search: Optional[str] = Query(None, description="Search in name and description"),
|
|
is_active: Optional[bool] = Query(None, description="Filter by active status (true/false). Omit to return all."),
|
|
offset: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return a paginated, filterable list of test templates."""
|
|
query = db.query(TestTemplate)
|
|
if is_active is not None:
|
|
query = query.filter(TestTemplate.is_active == is_active) # noqa: E712
|
|
|
|
if source:
|
|
query = query.filter(TestTemplate.source == source)
|
|
if platform:
|
|
from app.utils import escape_like
|
|
query = query.filter(TestTemplate.platform.ilike(f"%{escape_like(platform)}%"))
|
|
if severity:
|
|
query = query.filter(TestTemplate.severity == severity)
|
|
if mitre_technique_id:
|
|
query = query.filter(TestTemplate.mitre_technique_id == mitre_technique_id)
|
|
if search:
|
|
from app.utils import escape_like
|
|
pattern = f"%{escape_like(search)}%"
|
|
query = query.filter(
|
|
or_(
|
|
TestTemplate.name.ilike(pattern),
|
|
TestTemplate.description.ilike(pattern),
|
|
)
|
|
)
|
|
|
|
templates = (
|
|
query
|
|
.order_by(TestTemplate.mitre_technique_id, TestTemplate.name)
|
|
.offset(offset)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
return templates
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates/stats — catalog statistics (admin)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/stats")
|
|
def template_stats(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Return catalog statistics: totals by source, platform, active/inactive."""
|
|
|
|
total = db.query(func.count(TestTemplate.id)).scalar() or 0
|
|
active = (
|
|
db.query(func.count(TestTemplate.id))
|
|
.filter(TestTemplate.is_active == True) # noqa: E712
|
|
.scalar()
|
|
) or 0
|
|
inactive = total - active
|
|
|
|
# By source
|
|
source_rows = (
|
|
db.query(TestTemplate.source, func.count(TestTemplate.id))
|
|
.filter(TestTemplate.is_active == True) # noqa: E712
|
|
.group_by(TestTemplate.source)
|
|
.all()
|
|
)
|
|
by_source = {source: cnt for source, cnt in source_rows}
|
|
|
|
# By platform
|
|
platform_rows = (
|
|
db.query(TestTemplate.platform, func.count(TestTemplate.id))
|
|
.filter(TestTemplate.is_active == True) # noqa: E712
|
|
.group_by(TestTemplate.platform)
|
|
.all()
|
|
)
|
|
by_platform = {(platform or "unspecified"): cnt for platform, cnt in platform_rows}
|
|
|
|
return {
|
|
"total": total,
|
|
"active": active,
|
|
"inactive": inactive,
|
|
"by_source": by_source,
|
|
"by_platform": by_platform,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /test-templates/bulk-activate — activate/deactivate all (admin)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/bulk-activate")
|
|
def bulk_activate_templates(
|
|
activate: bool = Query(True, description="True to activate all, False to deactivate all"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Set all templates to active or inactive. Admin only."""
|
|
count = (
|
|
db.query(TestTemplate)
|
|
.filter(TestTemplate.is_active != activate)
|
|
.update({TestTemplate.is_active: activate})
|
|
)
|
|
db.commit()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="bulk_activate_templates" if activate else "bulk_deactivate_templates",
|
|
entity_type="test_template",
|
|
entity_id=None,
|
|
details={"affected": count, "is_active": activate},
|
|
)
|
|
|
|
return {
|
|
"detail": f"{'Activated' if activate else 'Deactivated'} {count} templates",
|
|
"affected": count,
|
|
"is_active": activate,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates/by-technique/{mitre_id}
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/by-technique/{mitre_id}", response_model=list[TestTemplateSummary])
|
|
def templates_by_technique(
|
|
mitre_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return all active templates mapped to a specific MITRE technique."""
|
|
templates = (
|
|
db.query(TestTemplate)
|
|
.filter(
|
|
TestTemplate.mitre_technique_id == mitre_id,
|
|
TestTemplate.is_active == True, # noqa: E712
|
|
)
|
|
.order_by(TestTemplate.name)
|
|
.all()
|
|
)
|
|
return templates
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /test-templates/{id} — detail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.get("/{template_id}", response_model=TestTemplateOut)
|
|
def get_template(
|
|
template_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return full details for a single test template."""
|
|
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
|
|
if template is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test template not found",
|
|
)
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /test-templates — create (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.post(
|
|
"",
|
|
response_model=TestTemplateOut,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_template(
|
|
payload: TestTemplateCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Create a custom test template. Admin only."""
|
|
template = TestTemplate(**payload.model_dump())
|
|
db.add(template)
|
|
db.commit()
|
|
db.refresh(template)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="create_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={
|
|
"name": template.name,
|
|
"source": template.source,
|
|
"mitre_technique_id": template.mitre_technique_id,
|
|
},
|
|
)
|
|
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /test-templates/{id} — update (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/{template_id}", response_model=TestTemplateOut)
|
|
def update_template(
|
|
template_id: uuid.UUID,
|
|
payload: TestTemplateCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Update fields of an existing test template. Admin only."""
|
|
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
|
|
if template is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test template not found",
|
|
)
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(template, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(template)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="update_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={"updated_fields": list(update_data.keys())},
|
|
)
|
|
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /test-templates/{id}/toggle-active — toggle active/inactive (admin)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.patch("/{template_id}/toggle-active", response_model=TestTemplateOut)
|
|
def toggle_template_active(
|
|
template_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Toggle a template between active and inactive. Admin only."""
|
|
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
|
|
if template is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test template not found",
|
|
)
|
|
|
|
template.is_active = not template.is_active
|
|
db.commit()
|
|
db.refresh(template)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="toggle_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={"name": template.name, "is_active": template.is_active},
|
|
)
|
|
|
|
return template
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /test-templates/{id} — soft delete (admin only)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.delete("/{template_id}", status_code=status.HTTP_200_OK)
|
|
def delete_template(
|
|
template_id: uuid.UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Soft-delete a test template by setting ``is_active=False``. Admin only."""
|
|
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
|
|
if template is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Test template not found",
|
|
)
|
|
|
|
template.is_active = False
|
|
db.commit()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="delete_test_template",
|
|
entity_type="test_template",
|
|
entity_id=template.id,
|
|
details={"name": template.name},
|
|
)
|
|
|
|
return {"detail": "Test template deactivated"}
|