fix(evaluations): fix duplicate substeps and improve eval test format by scenario grouping
Aegis CI / lint-and-test (push) Has been cancelled
Aegis CI / lint-and-test (push) Has been cancelled
This commit is contained in:
@@ -272,7 +272,7 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]:
|
||||
substeps: list[dict[str, Any]] = []
|
||||
|
||||
scenarios = target.get("Detections_By_Step", {})
|
||||
for _scenario_name, scenario_data in scenarios.items():
|
||||
for scenario_name, scenario_data in scenarios.items():
|
||||
for step in scenario_data.get("Steps", []):
|
||||
step_num = step.get("Step_Num", "")
|
||||
step_name = step.get("Step_Name", "")
|
||||
@@ -331,6 +331,7 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]:
|
||||
"detection_type": best_type,
|
||||
"note": best_note,
|
||||
# Enrichment fields from the API
|
||||
"scenario_name": scenario_name,
|
||||
"step_num": step_num,
|
||||
"step_name": step_name,
|
||||
"step_description": step_description,
|
||||
@@ -346,17 +347,34 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]:
|
||||
def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]:
|
||||
"""Aggregate substep results per technique.
|
||||
|
||||
Keeps the best detection score and accumulates ALL occurrences so that
|
||||
the importer can build a rich attack-path narrative in procedure_text.
|
||||
- Deduplicates substeps by (substep_ref, criteria) — prevents duplicates
|
||||
that arise when adversaries with multiple scenarios (e.g. Wizard Spider +
|
||||
Sandworm) repeat the same substep across a "combined" replay scenario.
|
||||
- Groups unique occurrences by scenario_name so the narrative can show
|
||||
"Wizard Spider scenario" vs "Sandworm scenario" separately.
|
||||
- Tracks best detection score across all unique substeps.
|
||||
"""
|
||||
by_technique: dict[str, dict] = {}
|
||||
|
||||
for sub in substeps:
|
||||
tid = sub["technique_id"]
|
||||
if tid not in by_technique:
|
||||
by_technique[tid] = {**sub, "occurrences": []}
|
||||
by_technique[tid] = {
|
||||
**sub,
|
||||
"occurrences": [], # flat list of unique occurrences
|
||||
"_seen_keys": set(), # (substep_ref, criteria) dedup set
|
||||
}
|
||||
|
||||
# Always record this occurrence for the narrative
|
||||
by_technique[tid]["occurrences"].append({
|
||||
agg = by_technique[tid]
|
||||
|
||||
# Deduplication key: same substep_ref + same criteria text = duplicate
|
||||
dedup_key = (sub["substep_ref"], sub["criteria"])
|
||||
if dedup_key in agg["_seen_keys"]:
|
||||
continue
|
||||
agg["_seen_keys"].add(dedup_key)
|
||||
|
||||
agg["occurrences"].append({
|
||||
"scenario_name": sub["scenario_name"],
|
||||
"substep_ref": sub["substep_ref"],
|
||||
"step_num": sub["step_num"],
|
||||
"step_name": sub["step_name"],
|
||||
@@ -368,19 +386,37 @@ def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]:
|
||||
"note": sub["note"],
|
||||
})
|
||||
|
||||
# Promote to best detection if this substep scored higher
|
||||
if sub["best_score"] > by_technique[tid]["best_score"]:
|
||||
by_technique[tid]["best_score"] = sub["best_score"]
|
||||
by_technique[tid]["detection_type"] = sub["detection_type"]
|
||||
by_technique[tid]["note"] = sub["note"]
|
||||
by_technique[tid]["tactic_id"] = sub["tactic_id"]
|
||||
by_technique[tid]["tactic_name"] = sub["tactic_name"]
|
||||
# Promote best detection score
|
||||
if sub["best_score"] > agg["best_score"]:
|
||||
agg["best_score"] = sub["best_score"]
|
||||
agg["detection_type"] = sub["detection_type"]
|
||||
agg["note"] = sub["note"]
|
||||
agg["tactic_id"] = sub["tactic_id"]
|
||||
agg["tactic_name"] = sub["tactic_name"]
|
||||
|
||||
# Clean up internal dedup sets before returning
|
||||
for agg in by_technique.values():
|
||||
agg.pop("_seen_keys", None)
|
||||
|
||||
return by_technique
|
||||
|
||||
|
||||
def _group_occurrences_by_scenario(occurrences: list[dict]) -> dict[str, list[dict]]:
|
||||
"""Group a technique's occurrences by scenario, preserving insertion order."""
|
||||
grouped: dict[str, list[dict]] = {}
|
||||
for occ in occurrences:
|
||||
sc = occ.get("scenario_name", "Scenario_1")
|
||||
grouped.setdefault(sc, []).append(occ)
|
||||
return grouped
|
||||
|
||||
|
||||
def _build_procedure_text(agg: dict, adversary_display: str, eval_round: int) -> str:
|
||||
"""Build a rich attack-path narrative for the Test.procedure_text field."""
|
||||
"""Build a rich attack-path narrative for the Test.procedure_text field.
|
||||
|
||||
Groups substeps by scenario so adversaries with multiple threat groups
|
||||
(e.g. Wizard Spider + Sandworm with 3 scenarios) are clearly separated.
|
||||
Includes Step.Description narrative for context.
|
||||
"""
|
||||
occurrences = agg.get("occurrences", [])
|
||||
if not occurrences:
|
||||
return (
|
||||
@@ -388,42 +424,56 @@ def _build_procedure_text(agg: dict, adversary_display: str, eval_round: int) ->
|
||||
f"See evaluation report at https://evals.mitre.org for full details."
|
||||
)
|
||||
|
||||
lines: list[str] = []
|
||||
lines.append(f"ATT&CK Evaluation R{eval_round} — {adversary_display}\n")
|
||||
lines: list[str] = [f"ATT&CK Evaluation R{eval_round} — {adversary_display}", ""]
|
||||
|
||||
# Include step description(s) — deduplicated, one per step
|
||||
seen_steps: set = set()
|
||||
for occ in occurrences:
|
||||
step_key = str(occ.get("step_num", ""))
|
||||
grouped = _group_occurrences_by_scenario(occurrences)
|
||||
scenario_count = len(grouped)
|
||||
|
||||
for sc_name, sc_occs in grouped.items():
|
||||
# Scenario header — only shown when there are multiple scenarios
|
||||
if scenario_count > 1:
|
||||
idx = sc_name.replace("Scenario_", "Scenario ")
|
||||
lines.append(f"=== {idx} ===")
|
||||
|
||||
# Within each scenario, group by step to emit description once per step
|
||||
seen_step_descs: set = set()
|
||||
for occ in sc_occs:
|
||||
step_num = occ.get("step_num", "")
|
||||
step_name = occ.get("step_name", "")
|
||||
step_desc = occ.get("step_description", "")
|
||||
if step_key and step_key not in seen_steps and step_desc:
|
||||
seen_steps.add(step_key)
|
||||
truncated = step_desc[:500] + ("..." if len(step_desc) > 500 else "")
|
||||
lines.append(f"Step {step_key} — {step_name}:")
|
||||
lines.append(truncated)
|
||||
lines.append("")
|
||||
# Use (step_num or step_name) as dedup key for descriptions
|
||||
step_key = str(step_num) if step_num else step_name
|
||||
|
||||
if step_key and step_key not in seen_step_descs:
|
||||
seen_step_descs.add(step_key)
|
||||
header = f"Step {step_num} — {step_name}:" if step_num else f"— {step_name}:"
|
||||
lines.append(header)
|
||||
if step_desc:
|
||||
truncated = step_desc[:450] + ("…" if len(step_desc) > 450 else "")
|
||||
lines.append(truncated)
|
||||
|
||||
# List all attack criteria for this technique
|
||||
lines.append("Attack steps observed:")
|
||||
for occ in occurrences:
|
||||
ref = occ.get("substep_ref", "")
|
||||
criteria = occ.get("criteria", "")
|
||||
step_name = occ.get("step_name", "")
|
||||
det = occ.get("detection_type", "")
|
||||
if criteria:
|
||||
prefix = f"[{ref}]" if ref else "•"
|
||||
lines.append(f" {prefix} {criteria}")
|
||||
if step_name:
|
||||
lines.append(f" ↳ Step: {step_name}")
|
||||
tag = f" [{ref}]" if ref else " •"
|
||||
det_tag = f" [{det}]" if det and det.lower() not in ("none", "") else ""
|
||||
lines.append(f"{tag}{det_tag} {criteria}")
|
||||
|
||||
return "\n".join(lines)
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines).rstrip()
|
||||
|
||||
|
||||
def _build_description(agg: dict, adversary_display: str, eval_round: int) -> str:
|
||||
"""Build the full Test.description with detection details and attack path."""
|
||||
"""Build Test.description with source metadata, detection guidance and warning.
|
||||
|
||||
The 'criteria' field from the MITRE API describes what each substep does AND
|
||||
what should be detected, so it doubles as blue-team detection guidance.
|
||||
"""
|
||||
occurrences = agg.get("occurrences", [])
|
||||
|
||||
# Collect all unique data sources across every occurrence of this technique
|
||||
# Collect all unique data sources across every unique occurrence
|
||||
all_data_sources: list[str] = sorted({
|
||||
src
|
||||
for occ in occurrences
|
||||
@@ -442,23 +492,25 @@ def _build_description(agg: dict, adversary_display: str, eval_round: int) -> st
|
||||
f" • {ds}" for ds in all_data_sources
|
||||
)
|
||||
|
||||
# Attack path / substep criteria section
|
||||
path_lines: list[str] = []
|
||||
for occ in occurrences:
|
||||
# Detection guidance — what criteria were observed (blue team can use these as IOCs)
|
||||
det_lines: list[str] = []
|
||||
grouped = _group_occurrences_by_scenario(occurrences)
|
||||
for sc_name, sc_occs in grouped.items():
|
||||
scenario_label = f"[{sc_name}] " if len(grouped) > 1 else ""
|
||||
for occ in sc_occs:
|
||||
ref = occ.get("substep_ref", "")
|
||||
criteria = occ.get("criteria", "")
|
||||
step_name = occ.get("step_name", "")
|
||||
criteria = occ.get("criteria", "")
|
||||
det_type = occ.get("detection_type", "")
|
||||
if criteria:
|
||||
label = f"[{ref}]" if ref else "•"
|
||||
step_label = f" ({step_name})" if step_name else ""
|
||||
det_label = f" — {det_type}" if det_type and det_type.lower() != "none" else ""
|
||||
path_lines.append(f" {label}{step_label}{det_label}:")
|
||||
path_lines.append(f" {criteria}")
|
||||
det_label = f" — {det_type}" if det_type and det_type.lower() not in ("none", "") else ""
|
||||
det_lines.append(f" {scenario_label}{label}{step_label}{det_label}: {criteria}")
|
||||
|
||||
path_section = ""
|
||||
if path_lines:
|
||||
path_section = "\n\nAttack path — substep criteria:\n" + "\n".join(path_lines)
|
||||
det_section = ""
|
||||
if det_lines:
|
||||
det_section = "\n\nDetection criteria (what to look for):\n" + "\n".join(det_lines)
|
||||
|
||||
warning = (
|
||||
f"\n\n⚠️ IMPORTANT: These results reflect CrowdStrike Falcon performance in a "
|
||||
@@ -467,11 +519,9 @@ def _build_description(agg: dict, adversary_display: str, eval_round: int) -> st
|
||||
f"capability. Validate in your own environment before approving."
|
||||
)
|
||||
|
||||
note_section = ""
|
||||
if agg.get("note"):
|
||||
note_section = f"\n\nMITRE note: {agg['note']}"
|
||||
note_section = f"\n\nMITRE note: {agg['note']}" if agg.get("note") else ""
|
||||
|
||||
return header + ds_section + path_section + warning + note_section
|
||||
return header + ds_section + det_section + warning + note_section
|
||||
|
||||
|
||||
def _build_red_summary(agg: dict, adversary_display: str, eval_round: int) -> str:
|
||||
@@ -483,20 +533,24 @@ def _build_red_summary(agg: dict, adversary_display: str, eval_round: int) -> st
|
||||
f"Vendor: CrowdStrike Falcon",
|
||||
f"Best detection level: {agg['detection_type']}",
|
||||
f"Tactic: {agg['tactic_name']} ({agg['tactic_id']})",
|
||||
f"Unique substeps: {len(occurrences)}",
|
||||
]
|
||||
|
||||
if occurrences:
|
||||
lines.append("")
|
||||
lines.append("Substeps:")
|
||||
for occ in occurrences:
|
||||
grouped = _group_occurrences_by_scenario(occurrences)
|
||||
for sc_name, sc_occs in grouped.items():
|
||||
if len(grouped) > 1:
|
||||
lines.append(f"{sc_name}:")
|
||||
for occ in sc_occs:
|
||||
ref = occ.get("substep_ref", "")
|
||||
criteria = occ.get("criteria", "")
|
||||
step_name = occ.get("step_name", "")
|
||||
det = occ.get("detection_type", "")
|
||||
if criteria:
|
||||
tag = f" [{ref}]" if ref else " •"
|
||||
step_tag = f" {step_name}:" if step_name else ""
|
||||
det_tag = f" [{det}]" if det and det.lower() != "none" else ""
|
||||
step_tag = f" {step_name} —" if step_name else ""
|
||||
det_tag = f" [{det}]" if det and det.lower() not in ("none", "") else ""
|
||||
lines.append(f"{tag}{step_tag}{det_tag} {criteria}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
Reference in New Issue
Block a user