feat(rt-import): require base64 evidence images per technique
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

Each technique in the RT import JSON now requires at least one evidence
image (PNG/JPG/GIF/WebP/BMP, max 10 MB decoded) embedded as base64.

Backend:
- RTEvidenceEntry model: filename, data (base64), caption (optional)
- RTTechniqueEntry.evidence is now required
- Pre-validation raises 422 if any technique is missing evidence
- After test creation, images are decoded and stored in MinIO as
  Evidence records (team=red) linked to the test

Frontend:
- RTEvidenceEntry type added to api/tests.ts
- parseJson() validates evidence presence and structure per technique
- Preview table shows base64 thumbnails (up to 3 + overflow count)
- Format reference updated: evidence fields moved to Required section
- Import result shows total evidence images attached

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
kitos
2026-06-05 12:57:22 +02:00
parent 20cdb70f57
commit 14a56a6057
3 changed files with 157 additions and 3 deletions

View File

@@ -19,6 +19,9 @@ POST /tests/{id}/reopen — rejected → draft
GET /tests/{id}/timeline — audit-log history for this test
"""
import base64
import hashlib
import os
import uuid
from datetime import datetime
from typing import Any, Optional
@@ -31,7 +34,9 @@ from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role, require_role
from app.domain.enums import DataClassification
from app.limiter import limiter
from app.models.enums import TestState, TestResult
from app.models.enums import TestState, TestResult, TeamSide
from app.models.evidence import Evidence
from app.storage import upload_file
from app.models.technique import Technique
from app.models.test import Test
from app.models.user import User
@@ -844,12 +849,23 @@ def request_discussion(
# ---------------------------------------------------------------------------
_ALLOWED_IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"}
_MAX_EVIDENCE_BYTES = 10 * 1024 * 1024 # 10 MB decoded per image
class RTEvidenceEntry(BaseModel):
filename: str # e.g. "screenshot_edr.png"
data: str # base64-encoded image content
caption: Optional[str] = None # optional description shown as evidence notes
class RTTechniqueEntry(BaseModel):
mitre_id: str
result: str # "detected" | "not_detected" | "partially_detected"
attack_success: bool = True
platform: Optional[str] = None
notes: Optional[str] = None
evidence: list[RTEvidenceEntry] # REQUIRED — at least one image per technique
class RTImportPayload(BaseModel):
@@ -872,6 +888,17 @@ def import_rt(
the normal Red/Blue workflow) and immediately recalculates coverage metrics.
Requires ``red_lead`` or ``admin`` role.
"""
# Pre-validate: every technique must include at least one evidence image
for entry in payload.techniques:
if not entry.evidence:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"Technique {entry.mitre_id} is missing evidence. "
"At least one screenshot or image is required per technique."
),
)
# Execution date from payload or now
exec_date_str = payload.date or datetime.utcnow().date().isoformat()
@@ -942,12 +969,46 @@ def import_rt(
db.add(test)
db.flush()
# ── Store evidence images ──────────────────────────────
evidence_count = 0
for ev in entry.evidence:
safe_name = os.path.basename(ev.filename) or "evidence.png"
ext = os.path.splitext(safe_name)[1].lower()
if ext not in _ALLOWED_IMAGE_EXTS:
# Skip non-image files silently (log warning)
continue
try:
img_bytes = base64.b64decode(ev.data)
except Exception:
continue # malformed base64 — skip
if len(img_bytes) > _MAX_EVIDENCE_BYTES:
continue # over size limit — skip
sha256 = hashlib.sha256(img_bytes).hexdigest()
key = f"{test.id}/{uuid.uuid4()}_{safe_name}"
try:
upload_file(img_bytes, key)
except Exception:
continue # storage error — skip but don't abort
evidence_obj = Evidence(
test_id=test.id,
file_name=safe_name,
file_path=key,
sha256_hash=sha256,
uploaded_by=current_user.id,
uploaded_at=datetime.utcnow(),
team=TeamSide.red,
notes=ev.caption,
)
db.add(evidence_obj)
evidence_count += 1
affected_technique_ids.add(technique.id)
created.append({
"mitre_id": entry.mitre_id,
"test_name": test_name,
"result": entry.result,
"attack_success": entry.attack_success,
"evidence_attached": evidence_count,
})
log_action(

View File

@@ -284,12 +284,19 @@ export interface TempoSyncResult {
// ── RT Import ──────────────────────────────────────────────────────
export interface RTEvidenceEntry {
filename: string; // e.g. "screenshot_edr.png"
data: string; // base64-encoded image (PNG / JPG / GIF / WebP / BMP)
caption?: string; // optional description
}
export interface RTTechniqueEntry {
mitre_id: string;
result: "detected" | "not_detected" | "partially_detected";
attack_success: boolean;
platform?: string;
notes?: string;
evidence: RTEvidenceEntry[]; // required — at least 1 image
}
export interface RTImportPayload {
@@ -303,7 +310,7 @@ export interface RTImportPayload {
export interface RTImportResult {
created: number;
skipped: number;
items: { mitre_id: string; test_name: string; result: string; attack_success: boolean }[];
items: { mitre_id: string; test_name: string; result: string; attack_success: boolean; evidence_attached: number }[];
warnings: { mitre_id: string; reason: string }[];
engagement: string;
}

View File

@@ -11,10 +11,15 @@ import {
ChevronDown,
ChevronUp,
} from "lucide-react";
import { importRT, type RTImportPayload, type RTTechniqueEntry } from "../api/tests";
import { importRT, type RTImportPayload, type RTTechniqueEntry, type RTEvidenceEntry } from "../api/tests";
/* ── Template JSON ─────────────────────────────────────────────────── */
// Tiny 1×1 pixel red PNG — used only as placeholder in the downloadable template.
// Operators must replace this with real base64-encoded screenshots.
const PLACEHOLDER_B64 =
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAIAAACQd1PeAAAADklEQVQI12P4z8BQDwADhQGAWjR9awAAAABJRU5ErkJggg==";
const TEMPLATE_JSON: RTImportPayload = {
name: "Red Team Q1 2024",
date: new Date().toISOString().slice(0, 10),
@@ -27,6 +32,13 @@ const TEMPLATE_JSON: RTImportPayload = {
attack_success: true,
platform: "windows",
notes: "PowerShell execution caught by Defender for Endpoint within 2 min",
evidence: [
{
filename: "edr_alert_powershell.png",
data: PLACEHOLDER_B64,
caption: "EDR alert showing PowerShell execution blocked",
},
],
},
{
mitre_id: "T1078",
@@ -34,6 +46,18 @@ const TEMPLATE_JSON: RTImportPayload = {
attack_success: true,
platform: "windows",
notes: "Credential reuse via stolen credentials — undetected for 48h",
evidence: [
{
filename: "logon_event_4624.png",
data: PLACEHOLDER_B64,
caption: "Windows event log showing lateral movement logon",
},
{
filename: "timeline_activity.png",
data: PLACEHOLDER_B64,
caption: "48h activity timeline without any alert",
},
],
},
{
mitre_id: "T1486",
@@ -41,6 +65,13 @@ const TEMPLATE_JSON: RTImportPayload = {
attack_success: false,
platform: "windows",
notes: "Ransomware blocked by AppLocker before execution",
evidence: [
{
filename: "applocker_block.png",
data: PLACEHOLDER_B64,
caption: "AppLocker event log blocking ransomware binary",
},
],
},
{
mitre_id: "T1190",
@@ -48,6 +79,13 @@ const TEMPLATE_JSON: RTImportPayload = {
attack_success: true,
platform: "linux",
notes: "Exploit worked but only a partial alert fired — no full incident created",
evidence: [
{
filename: "partial_alert.png",
data: PLACEHOLDER_B64,
caption: "SIEM alert showing partial detection — no incident created",
},
],
},
],
};
@@ -79,6 +117,18 @@ function parseJson(raw: string): { payload: RTImportPayload | null; error: strin
if (!["detected", "not_detected", "partially_detected"].includes(t.result)) {
return { payload: null, error: `Invalid result '${t.result}' for ${t.mitre_id}. Must be: detected | not_detected | partially_detected` };
}
// Evidence is mandatory
if (!Array.isArray(t.evidence) || t.evidence.length === 0) {
return { payload: null, error: `Technique ${t.mitre_id} is missing evidence. At least one image is required per technique (evidence[].filename + evidence[].data in base64).` };
}
for (const ev of t.evidence as RTEvidenceEntry[]) {
if (!ev.filename || typeof ev.filename !== "string") {
return { payload: null, error: `Evidence in ${t.mitre_id} is missing filename.` };
}
if (!ev.data || typeof ev.data !== "string") {
return { payload: null, error: `Evidence '${ev.filename}' in ${t.mitre_id} is missing base64 data.` };
}
}
}
return { payload: parsed as RTImportPayload, error: null };
} catch (e) {
@@ -177,6 +227,9 @@ export default function ImportRTPage() {
["techniques[].mitre_id", "string", "e.g. T1059.001"],
["techniques[].result", "enum", "detected | not_detected | partially_detected"],
["techniques[].attack_success", "boolean", "Was the attack successful?"],
["techniques[].evidence", "array", "Min 1 image required per technique"],
["techniques[].evidence[].filename", "string", "e.g. screenshot_edr.png"],
["techniques[].evidence[].data", "string", "Base64-encoded image (PNG/JPG/GIF/WebP/BMP, max 10 MB)"],
].map(([field, type, desc]) => (
<tr key={field}>
<td className="py-1.5 pr-3 font-mono text-cyan-400">{field}</td>
@@ -198,6 +251,7 @@ export default function ImportRTPage() {
["operator", "string", "Team or company"],
["techniques[].platform", "string", "windows | linux | macos"],
["techniques[].notes", "string", "What happened, how it was used"],
["techniques[].evidence[].caption", "string", "Description of what the image shows"],
].map(([field, type, desc]) => (
<tr key={field}>
<td className="py-1.5 pr-3 font-mono text-gray-400">{field}</td>
@@ -289,6 +343,7 @@ export default function ImportRTPage() {
<th className="px-4 py-2.5 text-left">Attack</th>
<th className="px-4 py-2.5 text-left">Platform</th>
<th className="px-4 py-2.5 text-left">Notes</th>
<th className="px-4 py-2.5 text-left">Evidence</th>
</tr>
</thead>
<tbody className="divide-y divide-gray-800/50">
@@ -308,6 +363,34 @@ export default function ImportRTPage() {
</td>
<td className="px-4 py-2.5 text-gray-400 capitalize">{t.platform ?? "—"}</td>
<td className="px-4 py-2.5 text-gray-500 max-w-xs truncate">{t.notes ?? "—"}</td>
<td className="px-4 py-2.5">
{t.evidence && t.evidence.length > 0 ? (
<div className="flex items-center gap-1.5">
<div className="flex gap-1">
{t.evidence.slice(0, 3).map((ev: RTEvidenceEntry, ei: number) => {
const ext = ev.filename.split(".").pop()?.toLowerCase() ?? "png";
const mime = ext === "jpg" || ext === "jpeg" ? "image/jpeg" : `image/${ext}`;
return (
<img
key={ei}
src={`data:${mime};base64,${ev.data}`}
title={ev.caption ?? ev.filename}
className="h-7 w-7 rounded object-cover border border-gray-700"
/>
);
})}
{t.evidence.length > 3 && (
<span className="flex h-7 w-7 items-center justify-center rounded border border-gray-700 bg-gray-800 text-xs text-gray-400">
+{t.evidence.length - 3}
</span>
)}
</div>
<span className="text-xs text-gray-500">{t.evidence.length} img</span>
</div>
) : (
<span className="text-xs text-red-400"> Missing</span>
)}
</td>
</tr>
))}
</tbody>
@@ -328,6 +411,9 @@ export default function ImportRTPage() {
<p className="text-xs text-gray-400">
Engagement: <span className="text-white">{result.engagement}</span>
{" · "}Created: <span className="text-green-400">{result.created}</span>
{" · "}Evidence: <span className="text-cyan-400">
{result.items.reduce((sum, it) => sum + (it.evidence_attached ?? 0), 0)} images attached
</span>
{result.skipped > 0 && <>{" · "}<span className="text-yellow-400">{result.skipped} skipped</span></>}
</p>