diff --git a/backend/app/services/attck_evaluations_service.py b/backend/app/services/attck_evaluations_service.py index 1f4fa5a..c41c2c8 100644 --- a/backend/app/services/attck_evaluations_service.py +++ b/backend/app/services/attck_evaluations_service.py @@ -272,7 +272,7 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]: substeps: list[dict[str, Any]] = [] scenarios = target.get("Detections_By_Step", {}) - for _scenario_name, scenario_data in scenarios.items(): + for scenario_name, scenario_data in scenarios.items(): for step in scenario_data.get("Steps", []): step_num = step.get("Step_Num", "") step_name = step.get("Step_Name", "") @@ -331,6 +331,7 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]: "detection_type": best_type, "note": best_note, # Enrichment fields from the API + "scenario_name": scenario_name, "step_num": step_num, "step_name": step_name, "step_description": step_description, @@ -346,17 +347,34 @@ def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]: def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]: """Aggregate substep results per technique. - Keeps the best detection score and accumulates ALL occurrences so that - the importer can build a rich attack-path narrative in procedure_text. + - Deduplicates substeps by (substep_ref, criteria) — prevents duplicates + that arise when adversaries with multiple scenarios (e.g. Wizard Spider + + Sandworm) repeat the same substep across a "combined" replay scenario. + - Groups unique occurrences by scenario_name so the narrative can show + "Wizard Spider scenario" vs "Sandworm scenario" separately. + - Tracks best detection score across all unique substeps. """ by_technique: dict[str, dict] = {} + for sub in substeps: tid = sub["technique_id"] if tid not in by_technique: - by_technique[tid] = {**sub, "occurrences": []} + by_technique[tid] = { + **sub, + "occurrences": [], # flat list of unique occurrences + "_seen_keys": set(), # (substep_ref, criteria) dedup set + } - # Always record this occurrence for the narrative - by_technique[tid]["occurrences"].append({ + agg = by_technique[tid] + + # Deduplication key: same substep_ref + same criteria text = duplicate + dedup_key = (sub["substep_ref"], sub["criteria"]) + if dedup_key in agg["_seen_keys"]: + continue + agg["_seen_keys"].add(dedup_key) + + agg["occurrences"].append({ + "scenario_name": sub["scenario_name"], "substep_ref": sub["substep_ref"], "step_num": sub["step_num"], "step_name": sub["step_name"], @@ -368,19 +386,37 @@ def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]: "note": sub["note"], }) - # Promote to best detection if this substep scored higher - if sub["best_score"] > by_technique[tid]["best_score"]: - by_technique[tid]["best_score"] = sub["best_score"] - by_technique[tid]["detection_type"] = sub["detection_type"] - by_technique[tid]["note"] = sub["note"] - by_technique[tid]["tactic_id"] = sub["tactic_id"] - by_technique[tid]["tactic_name"] = sub["tactic_name"] + # Promote best detection score + if sub["best_score"] > agg["best_score"]: + agg["best_score"] = sub["best_score"] + agg["detection_type"] = sub["detection_type"] + agg["note"] = sub["note"] + agg["tactic_id"] = sub["tactic_id"] + agg["tactic_name"] = sub["tactic_name"] + + # Clean up internal dedup sets before returning + for agg in by_technique.values(): + agg.pop("_seen_keys", None) return by_technique +def _group_occurrences_by_scenario(occurrences: list[dict]) -> dict[str, list[dict]]: + """Group a technique's occurrences by scenario, preserving insertion order.""" + grouped: dict[str, list[dict]] = {} + for occ in occurrences: + sc = occ.get("scenario_name", "Scenario_1") + grouped.setdefault(sc, []).append(occ) + return grouped + + def _build_procedure_text(agg: dict, adversary_display: str, eval_round: int) -> str: - """Build a rich attack-path narrative for the Test.procedure_text field.""" + """Build a rich attack-path narrative for the Test.procedure_text field. + + Groups substeps by scenario so adversaries with multiple threat groups + (e.g. Wizard Spider + Sandworm with 3 scenarios) are clearly separated. + Includes Step.Description narrative for context. + """ occurrences = agg.get("occurrences", []) if not occurrences: return ( @@ -388,42 +424,56 @@ def _build_procedure_text(agg: dict, adversary_display: str, eval_round: int) -> f"See evaluation report at https://evals.mitre.org for full details." ) - lines: list[str] = [] - lines.append(f"ATT&CK Evaluation R{eval_round} — {adversary_display}\n") + lines: list[str] = [f"ATT&CK Evaluation R{eval_round} — {adversary_display}", ""] - # Include step description(s) — deduplicated, one per step - seen_steps: set = set() - for occ in occurrences: - step_key = str(occ.get("step_num", "")) - step_name = occ.get("step_name", "") - step_desc = occ.get("step_description", "") - if step_key and step_key not in seen_steps and step_desc: - seen_steps.add(step_key) - truncated = step_desc[:500] + ("..." if len(step_desc) > 500 else "") - lines.append(f"Step {step_key} — {step_name}:") - lines.append(truncated) - lines.append("") + grouped = _group_occurrences_by_scenario(occurrences) + scenario_count = len(grouped) - # List all attack criteria for this technique - lines.append("Attack steps observed:") - for occ in occurrences: - ref = occ.get("substep_ref", "") - criteria = occ.get("criteria", "") - step_name = occ.get("step_name", "") - if criteria: - prefix = f"[{ref}]" if ref else "•" - lines.append(f" {prefix} {criteria}") - if step_name: - lines.append(f" ↳ Step: {step_name}") + for sc_name, sc_occs in grouped.items(): + # Scenario header — only shown when there are multiple scenarios + if scenario_count > 1: + idx = sc_name.replace("Scenario_", "Scenario ") + lines.append(f"=== {idx} ===") - return "\n".join(lines) + # Within each scenario, group by step to emit description once per step + seen_step_descs: set = set() + for occ in sc_occs: + step_num = occ.get("step_num", "") + step_name = occ.get("step_name", "") + step_desc = occ.get("step_description", "") + # Use (step_num or step_name) as dedup key for descriptions + step_key = str(step_num) if step_num else step_name + + if step_key and step_key not in seen_step_descs: + seen_step_descs.add(step_key) + header = f"Step {step_num} — {step_name}:" if step_num else f"— {step_name}:" + lines.append(header) + if step_desc: + truncated = step_desc[:450] + ("…" if len(step_desc) > 450 else "") + lines.append(truncated) + + ref = occ.get("substep_ref", "") + criteria = occ.get("criteria", "") + det = occ.get("detection_type", "") + if criteria: + tag = f" [{ref}]" if ref else " •" + det_tag = f" [{det}]" if det and det.lower() not in ("none", "") else "" + lines.append(f"{tag}{det_tag} {criteria}") + + lines.append("") + + return "\n".join(lines).rstrip() def _build_description(agg: dict, adversary_display: str, eval_round: int) -> str: - """Build the full Test.description with detection details and attack path.""" + """Build Test.description with source metadata, detection guidance and warning. + + The 'criteria' field from the MITRE API describes what each substep does AND + what should be detected, so it doubles as blue-team detection guidance. + """ occurrences = agg.get("occurrences", []) - # Collect all unique data sources across every occurrence of this technique + # Collect all unique data sources across every unique occurrence all_data_sources: list[str] = sorted({ src for occ in occurrences @@ -442,23 +492,25 @@ def _build_description(agg: dict, adversary_display: str, eval_round: int) -> st f" • {ds}" for ds in all_data_sources ) - # Attack path / substep criteria section - path_lines: list[str] = [] - for occ in occurrences: - ref = occ.get("substep_ref", "") - criteria = occ.get("criteria", "") - step_name = occ.get("step_name", "") - det_type = occ.get("detection_type", "") - if criteria: - label = f"[{ref}]" if ref else "•" - step_label = f" ({step_name})" if step_name else "" - det_label = f" — {det_type}" if det_type and det_type.lower() != "none" else "" - path_lines.append(f" {label}{step_label}{det_label}:") - path_lines.append(f" {criteria}") + # Detection guidance — what criteria were observed (blue team can use these as IOCs) + det_lines: list[str] = [] + grouped = _group_occurrences_by_scenario(occurrences) + for sc_name, sc_occs in grouped.items(): + scenario_label = f"[{sc_name}] " if len(grouped) > 1 else "" + for occ in sc_occs: + ref = occ.get("substep_ref", "") + step_name = occ.get("step_name", "") + criteria = occ.get("criteria", "") + det_type = occ.get("detection_type", "") + if criteria: + label = f"[{ref}]" if ref else "•" + step_label = f" ({step_name})" if step_name else "" + det_label = f" — {det_type}" if det_type and det_type.lower() not in ("none", "") else "" + det_lines.append(f" {scenario_label}{label}{step_label}{det_label}: {criteria}") - path_section = "" - if path_lines: - path_section = "\n\nAttack path — substep criteria:\n" + "\n".join(path_lines) + det_section = "" + if det_lines: + det_section = "\n\nDetection criteria (what to look for):\n" + "\n".join(det_lines) warning = ( f"\n\n⚠️ IMPORTANT: These results reflect CrowdStrike Falcon performance in a " @@ -467,11 +519,9 @@ def _build_description(agg: dict, adversary_display: str, eval_round: int) -> st f"capability. Validate in your own environment before approving." ) - note_section = "" - if agg.get("note"): - note_section = f"\n\nMITRE note: {agg['note']}" + note_section = f"\n\nMITRE note: {agg['note']}" if agg.get("note") else "" - return header + ds_section + path_section + warning + note_section + return header + ds_section + det_section + warning + note_section def _build_red_summary(agg: dict, adversary_display: str, eval_round: int) -> str: @@ -483,21 +533,25 @@ def _build_red_summary(agg: dict, adversary_display: str, eval_round: int) -> st f"Vendor: CrowdStrike Falcon", f"Best detection level: {agg['detection_type']}", f"Tactic: {agg['tactic_name']} ({agg['tactic_id']})", + f"Unique substeps: {len(occurrences)}", ] if occurrences: lines.append("") - lines.append("Substeps:") - for occ in occurrences: - ref = occ.get("substep_ref", "") - criteria = occ.get("criteria", "") - step_name = occ.get("step_name", "") - det = occ.get("detection_type", "") - if criteria: - tag = f"[{ref}]" if ref else "•" - step_tag = f" {step_name}:" if step_name else "" - det_tag = f" [{det}]" if det and det.lower() != "none" else "" - lines.append(f" {tag}{step_tag}{det_tag} {criteria}") + grouped = _group_occurrences_by_scenario(occurrences) + for sc_name, sc_occs in grouped.items(): + if len(grouped) > 1: + lines.append(f"{sc_name}:") + for occ in sc_occs: + ref = occ.get("substep_ref", "") + criteria = occ.get("criteria", "") + step_name = occ.get("step_name", "") + det = occ.get("detection_type", "") + if criteria: + tag = f" [{ref}]" if ref else " •" + step_tag = f" {step_name} —" if step_name else "" + det_tag = f" [{det}]" if det and det.lower() not in ("none", "") else "" + lines.append(f"{tag}{step_tag}{det_tag} {criteria}") return "\n".join(lines)