feat(security): extend rate limits on sync, tests, evidence and reports [FASE-3.4]

This commit is contained in:
2026-05-18 14:16:53 +02:00
parent 5b29c2fc56
commit 3e854b7b79
7 changed files with 94 additions and 9 deletions

6
backend/app/limiter.py Normal file
View File

@@ -0,0 +1,6 @@
"""Shared SlowAPI rate limiter for all routers."""
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)

View File

@@ -24,7 +24,7 @@ import os
import uuid as _uuid
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, Query, UploadFile, status
from fastapi import APIRouter, Depends, File, Form, Query, Request, UploadFile, status
from sqlalchemy.orm import Session
from app.database import get_db
@@ -44,6 +44,7 @@ from app.services.evidence_service import (
validate_file,
validate_upload_permission,
)
from app.limiter import limiter
from app.storage import get_presigned_url, upload_file
router = APIRouter(tags=["evidence"])
@@ -78,7 +79,9 @@ def _evidence_to_out(evidence: Evidence) -> EvidenceOut:
response_model=EvidenceOut,
status_code=status.HTTP_201_CREATED,
)
@limiter.limit("10/minute")
async def upload_evidence(
request: Request,
test_id: _uuid.UUID,
file: UploadFile = File(...),
team: TeamSide = Form(TeamSide.red),

View File

@@ -2,13 +2,14 @@
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.models.user import User
from app.limiter import limiter
from app.services import report_generation_service
router = APIRouter(prefix="/reports/generate", tags=["professional-reports"])
@@ -21,7 +22,9 @@ _MEDIA_TYPES = {
@router.get("/purple-campaign/{campaign_id}")
@limiter.limit("5/minute")
def generate_purple_report(
request: Request,
campaign_id: UUID,
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
@@ -39,7 +42,9 @@ def generate_purple_report(
@router.get("/coverage-summary")
@limiter.limit("5/minute")
def generate_coverage_report(
request: Request,
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead", "viewer")),
@@ -56,7 +61,9 @@ def generate_coverage_report(
@router.get("/executive-summary")
@limiter.limit("5/minute")
def generate_executive_report(
request: Request,
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead", "viewer")),
@@ -73,7 +80,9 @@ def generate_executive_report(
@router.get("/quarterly-summary")
@limiter.limit("5/minute")
def generate_quarterly_report(
request: Request,
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead", "viewer")),
@@ -90,7 +99,9 @@ def generate_quarterly_report(
@router.get("/technique/{technique_id}")
@limiter.limit("5/minute")
def generate_technique_report(
request: Request,
technique_id: UUID,
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),

View File

@@ -7,7 +7,7 @@ scheduler health introspection.
import logging
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from app.database import get_db
@@ -17,6 +17,7 @@ from app.services.mitre_sync_service import sync_mitre
from app.services.intel_service import scan_intel
from app.services.atomic_import_service import import_atomic_red_team
from app.jobs.mitre_sync_job import scheduler
from app.limiter import limiter
logger = logging.getLogger(__name__)
@@ -24,7 +25,9 @@ router = APIRouter(prefix="/system", tags=["system"])
@router.post("/sync-mitre")
@limiter.limit("2/hour")
def trigger_mitre_sync(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
@@ -63,7 +66,9 @@ def trigger_intel_scan(
@router.post("/import-atomic-tests")
@limiter.limit("2/hour")
def trigger_atomic_import(
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):

View File

@@ -21,11 +21,13 @@ GET /tests/{id}/timeline — audit-log history for this test
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.dependencies.auth import get_current_user, require_any_role, require_role
from app.domain.enums import DataClassification
from app.limiter import limiter
from app.models.enums import TestState
from app.models.user import User
from app.schemas.test import (
@@ -37,6 +39,7 @@ from app.schemas.test import (
TestRedValidate,
TestBlueValidate,
TestRemediationUpdate,
TestClassificationUpdate,
)
from app.schemas.test_template import TestTemplateInstantiate
from app.domain.unit_of_work import UnitOfWork
@@ -112,7 +115,9 @@ def list_tests(
response_model=TestOut,
status_code=status.HTTP_201_CREATED,
)
@limiter.limit("30/minute")
def create_test(
request: Request,
payload: TestCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
@@ -152,7 +157,9 @@ def create_test(
response_model=TestOut,
status_code=status.HTTP_201_CREATED,
)
@limiter.limit("30/minute")
def create_test_from_template(
request: Request,
payload: TestTemplateInstantiate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
@@ -241,6 +248,36 @@ def update_test(
return test
# ---------------------------------------------------------------------------
# PATCH /tests/{id}/classification — admin data classification
# ---------------------------------------------------------------------------
@router.patch("/{test_id}/classification", response_model=TestOut)
def update_test_classification(
test_id: uuid.UUID,
payload: TestClassificationUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Update the data classification label for a test (admin only)."""
with UnitOfWork(db) as uow:
test = crud_get_test_or_raise(db, test_id)
test.data_classification = payload.data_classification.value
db.flush()
log_action(
db,
user_id=current_user.id,
action="update_test_classification",
entity_type="test",
entity_id=test.id,
details={"data_classification": payload.data_classification.value},
)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# PATCH /tests/{id}/red — Red Team update (draft, red_executing)
# ---------------------------------------------------------------------------