fix(evaluations): fix duplicate substeps and improve eval test format by scenario grouping

This commit is contained in:
kitos
2026-06-08 13:20:42 +02:00
parent e2861a08bc
commit 0c9f3051b4
+111 -57
View File
@@ -272,7 +272,7 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]:
substeps: list[dict[str, Any]] = [] substeps: list[dict[str, Any]] = []
scenarios = target.get("Detections_By_Step", {}) scenarios = target.get("Detections_By_Step", {})
for _scenario_name, scenario_data in scenarios.items(): for scenario_name, scenario_data in scenarios.items():
for step in scenario_data.get("Steps", []): for step in scenario_data.get("Steps", []):
step_num = step.get("Step_Num", "") step_num = step.get("Step_Num", "")
step_name = step.get("Step_Name", "") step_name = step.get("Step_Name", "")
@@ -331,6 +331,7 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]:
"detection_type": best_type, "detection_type": best_type,
"note": best_note, "note": best_note,
# Enrichment fields from the API # Enrichment fields from the API
"scenario_name": scenario_name,
"step_num": step_num, "step_num": step_num,
"step_name": step_name, "step_name": step_name,
"step_description": step_description, "step_description": step_description,
@@ -346,17 +347,34 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]:
def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]: def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]:
"""Aggregate substep results per technique. """Aggregate substep results per technique.
Keeps the best detection score and accumulates ALL occurrences so that - Deduplicates substeps by (substep_ref, criteria) — prevents duplicates
the importer can build a rich attack-path narrative in procedure_text. that arise when adversaries with multiple scenarios (e.g. Wizard Spider +
Sandworm) repeat the same substep across a "combined" replay scenario.
- Groups unique occurrences by scenario_name so the narrative can show
"Wizard Spider scenario" vs "Sandworm scenario" separately.
- Tracks best detection score across all unique substeps.
""" """
by_technique: dict[str, dict] = {} by_technique: dict[str, dict] = {}
for sub in substeps: for sub in substeps:
tid = sub["technique_id"] tid = sub["technique_id"]
if tid not in by_technique: if tid not in by_technique:
by_technique[tid] = {**sub, "occurrences": []} by_technique[tid] = {
**sub,
"occurrences": [], # flat list of unique occurrences
"_seen_keys": set(), # (substep_ref, criteria) dedup set
}
# Always record this occurrence for the narrative agg = by_technique[tid]
by_technique[tid]["occurrences"].append({
# Deduplication key: same substep_ref + same criteria text = duplicate
dedup_key = (sub["substep_ref"], sub["criteria"])
if dedup_key in agg["_seen_keys"]:
continue
agg["_seen_keys"].add(dedup_key)
agg["occurrences"].append({
"scenario_name": sub["scenario_name"],
"substep_ref": sub["substep_ref"], "substep_ref": sub["substep_ref"],
"step_num": sub["step_num"], "step_num": sub["step_num"],
"step_name": sub["step_name"], "step_name": sub["step_name"],
@@ -368,19 +386,37 @@ def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]:
"note": sub["note"], "note": sub["note"],
}) })
# Promote to best detection if this substep scored higher # Promote best detection score
if sub["best_score"] > by_technique[tid]["best_score"]: if sub["best_score"] > agg["best_score"]:
by_technique[tid]["best_score"] = sub["best_score"] agg["best_score"] = sub["best_score"]
by_technique[tid]["detection_type"] = sub["detection_type"] agg["detection_type"] = sub["detection_type"]
by_technique[tid]["note"] = sub["note"] agg["note"] = sub["note"]
by_technique[tid]["tactic_id"] = sub["tactic_id"] agg["tactic_id"] = sub["tactic_id"]
by_technique[tid]["tactic_name"] = sub["tactic_name"] agg["tactic_name"] = sub["tactic_name"]
# Clean up internal dedup sets before returning
for agg in by_technique.values():
agg.pop("_seen_keys", None)
return by_technique return by_technique
def _group_occurrences_by_scenario(occurrences: list[dict]) -> dict[str, list[dict]]:
"""Group a technique's occurrences by scenario, preserving insertion order."""
grouped: dict[str, list[dict]] = {}
for occ in occurrences:
sc = occ.get("scenario_name", "Scenario_1")
grouped.setdefault(sc, []).append(occ)
return grouped
def _build_procedure_text(agg: dict, adversary_display: str, eval_round: int) -> str: def _build_procedure_text(agg: dict, adversary_display: str, eval_round: int) -> str:
"""Build a rich attack-path narrative for the Test.procedure_text field.""" """Build a rich attack-path narrative for the Test.procedure_text field.
Groups substeps by scenario so adversaries with multiple threat groups
(e.g. Wizard Spider + Sandworm with 3 scenarios) are clearly separated.
Includes Step.Description narrative for context.
"""
occurrences = agg.get("occurrences", []) occurrences = agg.get("occurrences", [])
if not occurrences: if not occurrences:
return ( return (
@@ -388,42 +424,56 @@ def _build_procedure_text(agg: dict, adversary_display: str, eval_round: int) ->
f"See evaluation report at https://evals.mitre.org for full details." f"See evaluation report at https://evals.mitre.org for full details."
) )
lines: list[str] = [] lines: list[str] = [f"ATT&CK Evaluation R{eval_round}{adversary_display}", ""]
lines.append(f"ATT&CK Evaluation R{eval_round}{adversary_display}\n")
# Include step description(s) — deduplicated, one per step grouped = _group_occurrences_by_scenario(occurrences)
seen_steps: set = set() scenario_count = len(grouped)
for occ in occurrences:
step_key = str(occ.get("step_num", "")) for sc_name, sc_occs in grouped.items():
# Scenario header — only shown when there are multiple scenarios
if scenario_count > 1:
idx = sc_name.replace("Scenario_", "Scenario ")
lines.append(f"=== {idx} ===")
# Within each scenario, group by step to emit description once per step
seen_step_descs: set = set()
for occ in sc_occs:
step_num = occ.get("step_num", "")
step_name = occ.get("step_name", "") step_name = occ.get("step_name", "")
step_desc = occ.get("step_description", "") step_desc = occ.get("step_description", "")
if step_key and step_key not in seen_steps and step_desc: # Use (step_num or step_name) as dedup key for descriptions
seen_steps.add(step_key) step_key = str(step_num) if step_num else step_name
truncated = step_desc[:500] + ("..." if len(step_desc) > 500 else "")
lines.append(f"Step {step_key}{step_name}:") if step_key and step_key not in seen_step_descs:
lines.append(truncated) seen_step_descs.add(step_key)
lines.append("") header = f"Step {step_num}{step_name}:" if step_num else f"{step_name}:"
lines.append(header)
if step_desc:
truncated = step_desc[:450] + ("" if len(step_desc) > 450 else "")
lines.append(truncated)
# List all attack criteria for this technique
lines.append("Attack steps observed:")
for occ in occurrences:
ref = occ.get("substep_ref", "") ref = occ.get("substep_ref", "")
criteria = occ.get("criteria", "") criteria = occ.get("criteria", "")
step_name = occ.get("step_name", "") det = occ.get("detection_type", "")
if criteria: if criteria:
prefix = f"[{ref}]" if ref else "" tag = f" [{ref}]" if ref else " "
lines.append(f" {prefix} {criteria}") det_tag = f" [{det}]" if det and det.lower() not in ("none", "") else ""
if step_name: lines.append(f"{tag}{det_tag} {criteria}")
lines.append(f" ↳ Step: {step_name}")
return "\n".join(lines) lines.append("")
return "\n".join(lines).rstrip()
def _build_description(agg: dict, adversary_display: str, eval_round: int) -> str: def _build_description(agg: dict, adversary_display: str, eval_round: int) -> str:
"""Build the full Test.description with detection details and attack path.""" """Build Test.description with source metadata, detection guidance and warning.
The 'criteria' field from the MITRE API describes what each substep does AND
what should be detected, so it doubles as blue-team detection guidance.
"""
occurrences = agg.get("occurrences", []) occurrences = agg.get("occurrences", [])
# Collect all unique data sources across every occurrence of this technique # Collect all unique data sources across every unique occurrence
all_data_sources: list[str] = sorted({ all_data_sources: list[str] = sorted({
src src
for occ in occurrences for occ in occurrences
@@ -442,23 +492,25 @@ def _build_description(agg: dict, adversary_display: str, eval_round: int) -> st
f"{ds}" for ds in all_data_sources f"{ds}" for ds in all_data_sources
) )
# Attack path / substep criteria section # Detection guidance — what criteria were observed (blue team can use these as IOCs)
path_lines: list[str] = [] det_lines: list[str] = []
for occ in occurrences: grouped = _group_occurrences_by_scenario(occurrences)
for sc_name, sc_occs in grouped.items():
scenario_label = f"[{sc_name}] " if len(grouped) > 1 else ""
for occ in sc_occs:
ref = occ.get("substep_ref", "") ref = occ.get("substep_ref", "")
criteria = occ.get("criteria", "")
step_name = occ.get("step_name", "") step_name = occ.get("step_name", "")
criteria = occ.get("criteria", "")
det_type = occ.get("detection_type", "") det_type = occ.get("detection_type", "")
if criteria: if criteria:
label = f"[{ref}]" if ref else "" label = f"[{ref}]" if ref else ""
step_label = f" ({step_name})" if step_name else "" step_label = f" ({step_name})" if step_name else ""
det_label = f"{det_type}" if det_type and det_type.lower() != "none" else "" det_label = f"{det_type}" if det_type and det_type.lower() not in ("none", "") else ""
path_lines.append(f" {label}{step_label}{det_label}:") det_lines.append(f" {scenario_label}{label}{step_label}{det_label}: {criteria}")
path_lines.append(f" {criteria}")
path_section = "" det_section = ""
if path_lines: if det_lines:
path_section = "\n\nAttack path — substep criteria:\n" + "\n".join(path_lines) det_section = "\n\nDetection criteria (what to look for):\n" + "\n".join(det_lines)
warning = ( warning = (
f"\n\n⚠️ IMPORTANT: These results reflect CrowdStrike Falcon performance in a " f"\n\n⚠️ IMPORTANT: These results reflect CrowdStrike Falcon performance in a "
@@ -467,11 +519,9 @@ def _build_description(agg: dict, adversary_display: str, eval_round: int) -> st
f"capability. Validate in your own environment before approving." f"capability. Validate in your own environment before approving."
) )
note_section = "" note_section = f"\n\nMITRE note: {agg['note']}" if agg.get("note") else ""
if agg.get("note"):
note_section = f"\n\nMITRE note: {agg['note']}"
return header + ds_section + path_section + warning + note_section return header + ds_section + det_section + warning + note_section
def _build_red_summary(agg: dict, adversary_display: str, eval_round: int) -> str: def _build_red_summary(agg: dict, adversary_display: str, eval_round: int) -> str:
@@ -483,21 +533,25 @@ def _build_red_summary(agg: dict, adversary_display: str, eval_round: int) -> st
f"Vendor: CrowdStrike Falcon", f"Vendor: CrowdStrike Falcon",
f"Best detection level: {agg['detection_type']}", f"Best detection level: {agg['detection_type']}",
f"Tactic: {agg['tactic_name']} ({agg['tactic_id']})", f"Tactic: {agg['tactic_name']} ({agg['tactic_id']})",
f"Unique substeps: {len(occurrences)}",
] ]
if occurrences: if occurrences:
lines.append("") lines.append("")
lines.append("Substeps:") grouped = _group_occurrences_by_scenario(occurrences)
for occ in occurrences: for sc_name, sc_occs in grouped.items():
if len(grouped) > 1:
lines.append(f"{sc_name}:")
for occ in sc_occs:
ref = occ.get("substep_ref", "") ref = occ.get("substep_ref", "")
criteria = occ.get("criteria", "") criteria = occ.get("criteria", "")
step_name = occ.get("step_name", "") step_name = occ.get("step_name", "")
det = occ.get("detection_type", "") det = occ.get("detection_type", "")
if criteria: if criteria:
tag = f"[{ref}]" if ref else "" tag = f" [{ref}]" if ref else " "
step_tag = f" {step_name}:" if step_name else "" step_tag = f" {step_name}" if step_name else ""
det_tag = f" [{det}]" if det and det.lower() != "none" else "" det_tag = f" [{det}]" if det and det.lower() not in ("none", "") else ""
lines.append(f" {tag}{step_tag}{det_tag} {criteria}") lines.append(f"{tag}{step_tag}{det_tag} {criteria}")
return "\n".join(lines) return "\n".join(lines)