Files
Aegis/backend/app/routers/tests.py
kitos 1b513b050e
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix: 4 improvements — campaign test deletion, review queue triggers, technique link, Jira read-only
1. Campaign test deletion: removing a test from a campaign now also
   deletes the underlying Test record and recalculates technique status.

2. Review Queue triggers: review_required=True is now also set when
   - Sigma/Elastic detection rules are imported for a technique
   - A test is validated (coverage status changes)

3. Test detail — Technique link: 'Technique' entry added at the top of
   the Details sidebar showing MITRE ID + name as a clickable link to
   /techniques/{mitre_id}.

4. Jira panel — read-only on test page: added readOnly + label props to
   JiraLinkPanel. TestDetailPage now passes readOnly=true and the test
   name as label, hiding Link Issue / Sync / Unlink controls (automatic
   Jira creation only — no manual management).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 11:18:55 +02:00

742 lines
26 KiB
Python

"""CRUD router for security Tests — v2 with Red/Blue workflow.
Endpoints
---------
GET /tests — list with filters (state, technique_id)
POST /tests — create (red_tech, admin)
POST /tests/from-template — create from TestTemplate (red_tech, admin)
GET /tests/{id} — detail with split red/blue evidences
PATCH /tests/{id} — general update (draft/rejected only)
PATCH /tests/{id}/red — Red Team updates (draft, red_executing)
PATCH /tests/{id}/blue — Blue Team updates (blue_evaluating)
POST /tests/{id}/start-execution — draft → red_executing
POST /tests/{id}/submit-red — red_executing → blue_evaluating
POST /tests/{id}/start-blue-work — blue tech picks up (sets Tempo timer)
POST /tests/{id}/submit-blue — blue_evaluating → in_review
POST /tests/{id}/validate-red — Red Lead validates
POST /tests/{id}/validate-blue — Blue Lead validates
POST /tests/{id}/reopen — rejected → draft
GET /tests/{id}/timeline — audit-log history for this test
"""
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role, require_role
from app.domain.enums import DataClassification
from app.limiter import limiter
from app.models.enums import TestState
from app.models.user import User
from app.schemas.test import (
TestCreate,
TestOut,
TestUpdate,
TestRedUpdate,
TestBlueUpdate,
TestRedValidate,
TestBlueValidate,
TestRemediationUpdate,
TestClassificationUpdate,
)
from app.schemas.test_template import TestTemplateInstantiate
from app.domain.unit_of_work import UnitOfWork
from app.services.audit_service import log_action
from app.services.status_service import recalculate_technique_status
from app.services.webhook_service import dispatch_webhook
from app.services.test_crud_service import (
create_test as crud_create_test,
create_test_from_template as crud_create_from_template,
get_test_detail as crud_get_test_detail,
get_test_or_raise as crud_get_test_or_raise,
get_test_timeline as crud_get_test_timeline,
get_test_with_technique as crud_get_test_with_technique,
list_tests as crud_list_tests,
update_test as crud_update_test,
update_test_blue as crud_update_test_blue,
update_test_red as crud_update_test_red,
)
from app.services.test_workflow_service import (
start_execution as wf_start_execution,
submit_red_evidence as wf_submit_red,
submit_blue_evidence as wf_submit_blue,
start_blue_work as wf_start_blue_work,
validate_as_red_lead as wf_validate_red,
validate_as_blue_lead as wf_validate_blue,
reopen_test as wf_reopen,
handle_remediation_completed as wf_handle_remediation,
get_retest_chain as wf_get_retest_chain,
pause_timer as wf_pause_timer,
resume_timer as wf_resume_timer,
)
router = APIRouter(prefix="/tests", tags=["tests"])
# ---------------------------------------------------------------------------
# GET /tests — list with filters
# ---------------------------------------------------------------------------
@router.get("", response_model=list[TestOut])
def list_tests(
state: Optional[str] = Query(None, description="Filter by test state"),
technique_id: Optional[uuid.UUID] = Query(None, description="Filter by technique"),
platform: Optional[str] = Query(None, description="Filter by platform"),
created_by: Optional[uuid.UUID] = Query(None, description="Filter by creator"),
pending_validation_side: Optional[str] = Query(
None, description="Filter in_review tests pending validation on 'red' or 'blue' side"
),
not_in_any_campaign: bool = Query(
False, description="Only return tests not linked to any campaign"
),
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return a paginated list of tests, optionally filtered by state, technique, platform or creator."""
return crud_list_tests(
db,
state=state,
technique_id=technique_id,
platform=platform,
created_by=created_by,
pending_validation_side=pending_validation_side,
not_in_any_campaign=not_in_any_campaign,
offset=offset,
limit=limit,
)
# ---------------------------------------------------------------------------
# POST /tests — create (red_tech or admin)
# ---------------------------------------------------------------------------
@router.post(
"",
response_model=TestOut,
status_code=status.HTTP_201_CREATED,
)
@limiter.limit("30/minute")
def create_test(
request: Request,
payload: TestCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Create a new test linked to an existing technique.
``created_by`` is set automatically and ``state`` defaults to *draft*.
"""
with UnitOfWork(db) as uow:
test = crud_create_test(
db,
technique_id=payload.technique_id,
creator_id=current_user.id,
**payload.model_dump(exclude={"technique_id"}),
)
log_action(
db,
user_id=current_user.id,
action="create_test",
entity_type="test",
entity_id=test.id,
details={"name": test.name, "technique_id": str(test.technique_id)},
)
uow.commit()
db.refresh(test)
# Auto-create Jira ticket (non-fatal — any failure is logged, not raised)
try:
from app.services.jira_service import auto_create_test_issue
auto_create_test_issue(db, test, current_user)
db.commit()
except Exception:
pass # jira_service already logs warnings internally
return test
# ---------------------------------------------------------------------------
# POST /tests/from-template — create from TestTemplate
# ---------------------------------------------------------------------------
@router.post(
"/from-template",
response_model=TestOut,
status_code=status.HTTP_201_CREATED,
)
@limiter.limit("30/minute")
def create_test_from_template(
request: Request,
payload: TestTemplateInstantiate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Instantiate a real Test from an existing TestTemplate.
The template's fields are copied into the new test as starting data.
"""
with UnitOfWork(db) as uow:
test = crud_create_from_template(
db,
template_id=payload.template_id,
technique_id_or_mitre=payload.technique_id,
creator_id=current_user.id,
name_override=payload.name,
description_override=payload.description,
platform_override=payload.platform,
procedure_text_override=payload.procedure_text,
tool_used_override=payload.tool_used,
)
log_action(
db,
user_id=current_user.id,
action="create_test_from_template",
entity_type="test",
entity_id=test.id,
details={
"name": test.name,
"template_id": str(payload.template_id),
"technique_id": str(test.technique_id),
},
)
uow.commit()
db.refresh(test)
# Auto-create Jira ticket (non-fatal)
try:
from app.services.jira_service import auto_create_test_issue
auto_create_test_issue(db, test, current_user)
db.commit()
except Exception:
pass
return test
# ---------------------------------------------------------------------------
# GET /tests/{id} — detail with evidences split by team
# ---------------------------------------------------------------------------
@router.get("/{test_id}", response_model=TestOut)
def get_test(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return full details for a single test, including its evidences."""
return crud_get_test_detail(db, test_id)
# ---------------------------------------------------------------------------
# PATCH /tests/{id} — general update (draft / rejected)
# ---------------------------------------------------------------------------
@router.patch("/{test_id}", response_model=TestOut)
def update_test(
test_id: uuid.UUID,
payload: TestUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Update one or more fields of an existing test.
Only leads or admins can update general test fields.
The test must be in ``draft`` or ``rejected`` state.
"""
update_data = payload.model_dump(exclude_unset=True)
with UnitOfWork(db) as uow:
test = crud_update_test(
db,
test_id,
updater_id=current_user.id,
updater_role=current_user.role,
**update_data,
)
log_action(
db,
user_id=current_user.id,
action="update_test",
entity_type="test",
entity_id=test.id,
details={"updated_fields": list(update_data.keys())},
)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# PATCH /tests/{id}/classification — admin data classification
# ---------------------------------------------------------------------------
@router.patch("/{test_id}/classification", response_model=TestOut)
def update_test_classification(
test_id: uuid.UUID,
payload: TestClassificationUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Update the data classification label for a test (admin only)."""
with UnitOfWork(db) as uow:
test = crud_get_test_or_raise(db, test_id)
test.data_classification = payload.data_classification.value
db.flush()
log_action(
db,
user_id=current_user.id,
action="update_test_classification",
entity_type="test",
entity_id=test.id,
details={"data_classification": payload.data_classification.value},
)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# PATCH /tests/{id}/red — Red Team update (draft, red_executing)
# ---------------------------------------------------------------------------
@router.patch("/{test_id}/red", response_model=TestOut)
def update_test_red(
test_id: uuid.UUID,
payload: TestRedUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "red_lead")),
):
"""Red Team updates their fields (allowed in ``draft`` and ``red_executing``)."""
update_data = payload.model_dump(exclude_unset=True)
with UnitOfWork(db) as uow:
test = crud_update_test_red(db, test_id, **update_data)
log_action(
db,
user_id=current_user.id,
action="update_test_red",
entity_type="test",
entity_id=test.id,
details={"updated_fields": list(update_data.keys())},
)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# PATCH /tests/{id}/blue — Blue Team update (blue_evaluating only)
# ---------------------------------------------------------------------------
@router.patch("/{test_id}/blue", response_model=TestOut)
def update_test_blue(
test_id: uuid.UUID,
payload: TestBlueUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("blue_tech", "blue_lead")),
):
"""Blue Team updates their fields (allowed only in ``blue_evaluating``)."""
update_data = payload.model_dump(exclude_unset=True)
with UnitOfWork(db) as uow:
test = crud_update_test_blue(db, test_id, **update_data)
log_action(
db,
user_id=current_user.id,
action="update_test_blue",
entity_type="test",
entity_id=test.id,
details={"updated_fields": list(update_data.keys())},
)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/start-execution — draft → red_executing
# ---------------------------------------------------------------------------
@router.post("/{test_id}/start-execution", response_model=TestOut)
def start_execution(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "red_lead")),
):
"""Move a test from ``draft`` to ``red_executing``."""
test = crud_get_test_or_raise(db, test_id)
with UnitOfWork(db) as uow:
test = wf_start_execution(db, test, current_user)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/submit-red — red_executing → blue_evaluating
# ---------------------------------------------------------------------------
@router.post("/{test_id}/submit-red", response_model=TestOut)
def submit_red(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "red_lead")),
):
"""Red Team finalises — move from ``red_executing`` to ``blue_evaluating``."""
test = crud_get_test_or_raise(db, test_id)
with UnitOfWork(db) as uow:
test = wf_submit_red(db, test, current_user)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/submit-blue — blue_evaluating → in_review
# ---------------------------------------------------------------------------
@router.post("/{test_id}/submit-blue", response_model=TestOut)
def submit_blue(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("blue_tech", "blue_lead")),
):
"""Blue Team finalises — move from ``blue_evaluating`` to ``in_review``."""
test = crud_get_test_or_raise(db, test_id)
with UnitOfWork(db) as uow:
test = wf_submit_blue(db, test, current_user)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/start-blue-work — blue tech picks up test for evaluation
# ---------------------------------------------------------------------------
@router.post("/{test_id}/start-blue-work", response_model=TestOut)
def start_blue_work(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("blue_tech", "blue_lead")),
):
"""Blue tech picks up the test to start evaluating. Sets the Tempo timer start."""
test = crud_get_test_or_raise(db, test_id)
with UnitOfWork(db) as uow:
test = wf_start_blue_work(db, test, current_user)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/pause-timer — pause the active phase timer
# ---------------------------------------------------------------------------
@router.post("/{test_id}/pause-timer", response_model=TestOut)
def pause_timer(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "blue_tech", "red_lead", "blue_lead")),
):
"""Pause the running timer for the current phase (red_executing or blue_evaluating)."""
test = crud_get_test_or_raise(db, test_id)
with UnitOfWork(db) as uow:
test = wf_pause_timer(db, test, current_user)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/resume-timer — resume a paused phase timer
# ---------------------------------------------------------------------------
@router.post("/{test_id}/resume-timer", response_model=TestOut)
def resume_timer(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "blue_tech", "red_lead", "blue_lead")),
):
"""Resume the paused timer for the current phase."""
test = crud_get_test_or_raise(db, test_id)
with UnitOfWork(db) as uow:
test = wf_resume_timer(db, test, current_user)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/validate-red — Red Lead validates
# ---------------------------------------------------------------------------
@router.post("/{test_id}/validate-red", response_model=TestOut)
def validate_red(
test_id: uuid.UUID,
payload: TestRedValidate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead")),
):
"""Red Lead approves or rejects the red side of a test."""
test = crud_get_test_with_technique(db, test_id)
with UnitOfWork(db) as uow:
test = wf_validate_red(
db, test, current_user,
validation_status=payload.red_validation_status,
notes=payload.red_validation_notes,
)
if test.state in (TestState.validated, TestState.rejected):
recalculate_technique_status(db, test.technique)
# Flag technique for review — coverage changed
if test.technique:
test.technique.review_required = True
uow.commit()
db.refresh(test)
if test.state == TestState.validated:
dispatch_webhook("test.validated", {"test_id": str(test.id), "technique_id": str(test.technique_id), "result": test.result.value if test.result else None})
elif test.state == TestState.rejected:
dispatch_webhook("test.rejected", {"test_id": str(test.id), "technique_id": str(test.technique_id)})
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/validate-blue — Blue Lead validates
# ---------------------------------------------------------------------------
@router.post("/{test_id}/validate-blue", response_model=TestOut)
def validate_blue(
test_id: uuid.UUID,
payload: TestBlueValidate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("blue_lead")),
):
"""Blue Lead approves or rejects the blue side of a test."""
test = crud_get_test_with_technique(db, test_id)
with UnitOfWork(db) as uow:
test = wf_validate_blue(
db, test, current_user,
validation_status=payload.blue_validation_status,
notes=payload.blue_validation_notes,
)
if test.state in (TestState.validated, TestState.rejected):
recalculate_technique_status(db, test.technique)
# Flag technique for review — coverage changed
if test.technique:
test.technique.review_required = True
uow.commit()
db.refresh(test)
if test.state == TestState.validated:
dispatch_webhook("test.validated", {"test_id": str(test.id), "technique_id": str(test.technique_id), "result": test.result.value if test.result else None})
elif test.state == TestState.rejected:
dispatch_webhook("test.rejected", {"test_id": str(test.id), "technique_id": str(test.technique_id)})
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/reopen — rejected → draft
# ---------------------------------------------------------------------------
@router.post("/{test_id}/reopen", response_model=TestOut)
def reopen(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Reopen a rejected test, moving it back to ``draft``."""
test = crud_get_test_or_raise(db, test_id)
with UnitOfWork(db) as uow:
test = wf_reopen(db, test, current_user)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# PATCH /tests/{id}/remediation — update remediation fields
# ---------------------------------------------------------------------------
@router.patch("/{test_id}/remediation", response_model=TestOut)
def update_remediation(
test_id: uuid.UUID,
payload: TestRemediationUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Update remediation fields on a test.
When ``remediation_status`` transitions to ``'completed'``, an automatic
re-test is created (subject to ``MAX_RETEST_COUNT``).
"""
test = crud_get_test_or_raise(db, test_id)
old_remediation_status = test.remediation_status
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(test, field, value)
with UnitOfWork(db) as uow:
log_action(
db,
user_id=current_user.id,
action="update_remediation",
entity_type="test",
entity_id=test.id,
details={"updated_fields": list(update_data.keys())},
)
new_status = update_data.get("remediation_status")
if new_status == "completed" and old_remediation_status != "completed":
wf_handle_remediation(db, test, current_user)
uow.commit()
db.refresh(test)
return test
# ---------------------------------------------------------------------------
# GET /tests/{id}/timeline — audit history for this test
# ---------------------------------------------------------------------------
@router.get("/{test_id}/timeline")
def get_test_timeline(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return the chronological audit-log history for a test."""
return crud_get_test_timeline(db, test_id)
# ---------------------------------------------------------------------------
# GET /tests/{id}/retest-chain — full retest chain
# ---------------------------------------------------------------------------
@router.get("/{test_id}/retest-chain")
def get_retest_chain(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return the full chain of retests (original + all retests) for a test."""
chain = wf_get_retest_chain(db, test_id)
if not chain:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test not found")
return [
{
"id": str(t.id),
"name": t.name,
"state": t.state.value if t.state else None,
"retest_of": str(t.retest_of) if t.retest_of else None,
"retest_count": t.retest_count,
"result": t.result.value if t.result else None,
"detection_result": t.detection_result.value if t.detection_result else None,
"remediation_status": t.remediation_status,
"created_at": t.created_at.isoformat() if t.created_at else None,
}
for t in chain
]
# ---------------------------------------------------------------------------
# POST /tests/{id}/sync-tempo — manual Tempo sync for red execution worklog
# ---------------------------------------------------------------------------
@router.post("/{test_id}/sync-tempo")
def sync_tempo(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Manually sync this test's red team execution worklog(s) to Tempo.
Useful when the automatic sync failed at phase completion (e.g. Tempo
was not yet configured). Only red_team_execution worklogs are eligible.
Already-synced worklogs are skipped. Returns a summary of what happened.
"""
from datetime import datetime as _dt
from app.models.worklog import Worklog
from app.services.tempo_service import auto_log_test_worklog
from app.services.test_crud_service import get_test_or_raise as _get
test = _get(db, test_id)
worklogs = (
db.query(Worklog)
.filter(
Worklog.entity_type == "test",
Worklog.entity_id == test_id,
Worklog.activity_type == "red_team_execution",
)
.all()
)
if not worklogs:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No red team execution worklog found for this test.",
)
results = []
for wl in worklogs:
if wl.tempo_synced:
results.append({"worklog_id": str(wl.id), "status": "already_synced"})
continue
try:
result = auto_log_test_worklog(
db=db,
test=test,
user=current_user,
activity_type=wl.activity_type,
duration_seconds=wl.duration_seconds,
)
if result and isinstance(result, dict):
wl.tempo_synced = _dt.utcnow()
wl.tempo_worklog_id = str(result.get("tempoWorklogId", ""))
db.commit()
results.append({"worklog_id": str(wl.id), "status": "synced"})
else:
results.append({
"worklog_id": str(wl.id),
"status": "skipped",
"detail": "Tempo not configured or conditions not met.",
})
except Exception as exc:
results.append({
"worklog_id": str(wl.id),
"status": "error",
"detail": str(exc),
})
return {"results": results}