feat(rt-import): import Red Team engagement results as validated tests
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

Backend — POST /tests/import-rt (red_lead + admin):
  Accepts engagement JSON with name/date/description/operator and
  a list of techniques each with mitre_id, result, attack_success,
  platform, notes. Creates one Test per technique directly in
  'validated' state (red + blue validation = approved) bypassing
  the normal workflow. Recalculates technique.status_global for
  all affected techniques. Returns created/skipped summary.

Frontend — /tests/import-rt (new dedicated page):
  - Format reference panel (collapsible) with field descriptions
  - Download template JSON button (generates a filled example)
  - Paste JSON textarea + file upload (.json)
  - Live validation + preview table showing what will be imported
  - Import button with spinner
  - Success / warning / error result display
  Accessible to admin and red_lead only.
  Added to sidebar under Tests > Import RT Results.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-05-29 16:15:35 +02:00
parent b39a4fec14
commit 2f1ef7545d
5 changed files with 542 additions and 2 deletions

View File

@@ -20,16 +20,20 @@ GET /tests/{id}/timeline — audit-log history for this test
"""
import uuid
from typing import Optional
from datetime import datetime
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role, require_role
from app.domain.enums import DataClassification
from app.limiter import limiter
from app.models.enums import TestState
from app.models.enums import TestState, TestResult
from app.models.technique import Technique
from app.models.test import Test
from app.models.user import User
from app.schemas.test import (
TestCreate,
@@ -739,3 +743,140 @@ def sync_tempo(
})
return {"results": results}
# ---------------------------------------------------------------------------
# POST /tests/import-rt — bulk import from a real Red Team engagement
# ---------------------------------------------------------------------------
class RTTechniqueEntry(BaseModel):
mitre_id: str
result: str # "detected" | "not_detected" | "partially_detected"
attack_success: bool = True
platform: Optional[str] = None
notes: Optional[str] = None
class RTImportPayload(BaseModel):
name: str # engagement name, e.g. "Red Team Q1 2024"
date: Optional[str] = None # ISO date string
description: Optional[str] = None
operator: Optional[str] = None # team / company that ran the RT
techniques: list[RTTechniqueEntry]
@router.post("/import-rt", status_code=status.HTTP_201_CREATED)
def import_rt(
payload: RTImportPayload,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead")),
):
"""Import results from a real Red Team engagement.
Creates one Test record per technique in ``validated`` state (bypassing
the normal Red/Blue workflow) and immediately recalculates coverage metrics.
Requires ``red_lead`` or ``admin`` role.
"""
# Execution date from payload or now
exec_date_str = payload.date or datetime.utcnow().date().isoformat()
# Result string → TestResult enum
_result_map = {
"detected": TestResult.detected,
"not_detected": TestResult.not_detected,
"partially_detected": TestResult.partially_detected,
}
created: list[dict[str, Any]] = []
skipped: list[dict[str, str]] = []
affected_technique_ids: set = set()
with UnitOfWork(db) as uow:
for entry in payload.techniques:
# Find technique
technique = (
db.query(Technique)
.filter(Technique.mitre_id == entry.mitre_id.upper())
.first()
)
if technique is None:
skipped.append({"mitre_id": entry.mitre_id, "reason": "Technique not found"})
continue
detection_result = _result_map.get(entry.result)
if detection_result is None:
skipped.append({"mitre_id": entry.mitre_id, "reason": f"Unknown result value '{entry.result}'"})
continue
test_name = f"[RT] {payload.name}{technique.name}"
# Build red_summary from notes + engagement metadata
parts = []
if payload.operator:
parts.append(f"Operator: {payload.operator}")
parts.append(f"Engagement date: {exec_date_str}")
if entry.notes:
parts.append(f"\n{entry.notes}")
red_summary_text = "\n".join(parts)
# Create Test directly in validated state
test = Test(
technique_id=technique.id,
name=test_name,
description=payload.description,
platform=entry.platform,
procedure_text=entry.notes,
created_by=current_user.id,
state=TestState.validated,
# Red team fields
attack_success=entry.attack_success,
red_summary=red_summary_text,
red_validation_status="approved",
red_validated_by=current_user.id,
red_validated_at=datetime.utcnow(),
# Blue team fields
detection_result=detection_result,
blue_validation_status="approved",
blue_validated_by=current_user.id,
blue_validated_at=datetime.utcnow(),
# Timing
execution_date=exec_date_str,
created_at=datetime.utcnow(),
)
db.add(test)
db.flush()
affected_technique_ids.add(technique.id)
created.append({
"mitre_id": entry.mitre_id,
"test_name": test_name,
"result": entry.result,
"attack_success": entry.attack_success,
})
log_action(
db,
user_id=current_user.id,
action="rt_import_test",
entity_type="test",
entity_id=test.id,
details={"engagement": payload.name, "mitre_id": entry.mitre_id},
)
# Recalculate coverage for all affected techniques
for tech_id in affected_technique_ids:
tech = db.query(Technique).filter(Technique.id == tech_id).first()
if tech:
recalculate_technique_status(db, tech)
uow.commit()
return {
"created": len(created),
"skipped": len(skipped),
"items": created,
"warnings": skipped,
"engagement": payload.name,
}