7703c36ed7
Aegis CI / lint-and-test (push) Has been cancelled
- Capture Step.Description (HTML stripped), step name/number, substep ref, criteria, and data sources from MITRE ATT&CK Evaluations API - _aggregate_by_technique() now accumulates ALL occurrences per technique (multiple substep refs, criteria, step contexts) instead of keeping only the best-scoring one - New helper functions _build_procedure_text(), _build_description(), _build_red_summary() generate rich narratives from accumulated occurrences - New re_enrich_evaluation_round() service function + POST endpoint /system/attck-evaluations/re-enrich to update already-imported tests without changing detection results or validation state - Frontend: Re-enrich button per imported round + result banner in SystemPage Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
745 lines
26 KiB
Python
745 lines
26 KiB
Python
"""ATT&CK Evaluations importer — fetches real CrowdStrike detection results
|
|
from MITRE Engenuity's public API and seeds the platform with validated tests.
|
|
|
|
Data source
|
|
-----------
|
|
https://evals.mitre.org/api/
|
|
- /participants/ → list of vendors + rounds they completed
|
|
- /results/?participant=crowdstrike&domain=ENTERPRISE
|
|
→ per-substep detection results per adversary
|
|
|
|
Detection level mapping (MITRE → Aegis)
|
|
---------------------------------------
|
|
Technique / Specific Behavior → detected (correctly identified ATT&CK technique)
|
|
Tactic → partially_detected (behavior noted but not categorized)
|
|
General / IOC / MSSP → partially_detected (anomaly detected, not ATT&CK-mapped)
|
|
Telemetry → partially_detected (raw data only — marginal detection)
|
|
None / N/A → not_detected
|
|
|
|
All imported tests are created in ``in_review`` state so Blue Leads must
|
|
confirm each result before it counts as real coverage for the organisation.
|
|
|
|
Important caveats stored in every test's description
|
|
------------------------------------------------------
|
|
"Source: MITRE ATT&CK Evaluation (Round N — Adversary). Results reflect
|
|
CrowdStrike Falcon in a controlled lab environment, NOT this organisation's
|
|
deployment. Validate detection in your own environment before approving."
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import requests
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.enums import TestState, TestResult
|
|
from app.models.evaluation_import import EvaluationImport
|
|
from app.models.technique import Technique
|
|
from app.models.test import Test
|
|
from app.models.user import User
|
|
from app.services.audit_service import log_action
|
|
from app.services.status_service import recalculate_technique_status
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_BASE = "https://evals.mitre.org"
|
|
_TIMEOUT = 30 # seconds per HTTP call
|
|
_VENDOR = "crowdstrike"
|
|
_DOMAIN = "ENTERPRISE"
|
|
|
|
# Browser-like headers to bypass Cloudflare bot protection on evals.mitre.org
|
|
_HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
"Accept": "application/json, text/plain, */*",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
"Referer": "https://evals.mitre.org/",
|
|
"Origin": "https://evals.mitre.org",
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fallback: hardcoded public CrowdStrike ENTERPRISE rounds
|
|
# Used when evals.mitre.org API is unreachable (Cloudflare 502, outage, etc.)
|
|
#
|
|
# Names use the EXACT slugs the live API returns (hyphens, not underscores).
|
|
# Verified from live API response on 2025-06-05.
|
|
# CrowdStrike did NOT participate in Round 6 (OilRig) — not included.
|
|
# ---------------------------------------------------------------------------
|
|
_FALLBACK_ROUNDS: list[dict[str, Any]] = [
|
|
{
|
|
"name": "apt3",
|
|
"display_name": "APT3",
|
|
"eval_round": 1,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "apt29",
|
|
"display_name": "APT29",
|
|
"eval_round": 2,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "carbanak-fin7",
|
|
"display_name": "Carbanak+FIN7",
|
|
"eval_round": 3,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "wizard-spider-sandworm",
|
|
"display_name": "Wizard Spider + Sandworm",
|
|
"eval_round": 4,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "turla",
|
|
"display_name": "Turla",
|
|
"eval_round": 5,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "er7",
|
|
"display_name": "Enterprise 2025",
|
|
"eval_round": 7,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
]
|
|
|
|
# Detection type → quality score (higher = better)
|
|
_DETECTION_SCORE: dict[str, int] = {
|
|
"none": 0,
|
|
"n/a": 0,
|
|
"telemetry": 1,
|
|
"mssp": 2,
|
|
"general": 2,
|
|
"ioc": 2,
|
|
"tactic": 3,
|
|
"technique": 4,
|
|
"specific behavior": 4,
|
|
}
|
|
|
|
|
|
def _score(detection_type: str) -> int:
|
|
key = (detection_type or "").lower().strip()
|
|
for pattern, score in _DETECTION_SCORE.items():
|
|
if pattern in key:
|
|
return score
|
|
return 0
|
|
|
|
|
|
def _score_to_result(score: int) -> TestResult:
|
|
if score >= 4:
|
|
return TestResult.detected
|
|
if score >= 1:
|
|
return TestResult.partially_detected
|
|
return TestResult.not_detected
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def fetch_rounds_with_status() -> dict[str, Any]:
|
|
"""Fetch CrowdStrike ENTERPRISE rounds and report whether the live API was reachable.
|
|
|
|
Returns::
|
|
|
|
{
|
|
"rounds": [{"name": ..., "display_name": ..., "eval_round": ...}, ...],
|
|
"api_reachable": True | False,
|
|
"api_error": None | "<error message>",
|
|
}
|
|
"""
|
|
try:
|
|
session = requests.Session()
|
|
session.headers.update(_HEADERS)
|
|
resp = session.get(f"{_BASE}/api/participants/", timeout=_TIMEOUT)
|
|
resp.raise_for_status()
|
|
participants = resp.json()
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"evals.mitre.org API unreachable (%s) — using hardcoded fallback round list.",
|
|
exc,
|
|
)
|
|
return {
|
|
"rounds": list(_FALLBACK_ROUNDS),
|
|
"api_reachable": False,
|
|
"api_error": str(exc),
|
|
}
|
|
|
|
crowdstrike = next(
|
|
(p for p in participants if p.get("name", "").lower() == _VENDOR),
|
|
None,
|
|
)
|
|
if not crowdstrike:
|
|
logger.warning("Vendor '%s' not found in live data — using fallback.", _VENDOR)
|
|
return {
|
|
"rounds": list(_FALLBACK_ROUNDS),
|
|
"api_reachable": True, # API was reachable, vendor just wasn't listed
|
|
"api_error": f"Vendor '{_VENDOR}' not found in participants list",
|
|
}
|
|
|
|
rounds = [
|
|
adv
|
|
for adv in crowdstrike.get("adversaries_completed", [])
|
|
if adv.get("domain", "").upper() == _DOMAIN
|
|
and adv.get("status", "").upper() == "PUBLIC"
|
|
]
|
|
rounds.sort(key=lambda x: x.get("eval_round", 0))
|
|
return {
|
|
"rounds": rounds if rounds else list(_FALLBACK_ROUNDS),
|
|
"api_reachable": True,
|
|
"api_error": None,
|
|
}
|
|
|
|
|
|
def fetch_available_rounds() -> list[dict[str, Any]]:
|
|
"""Return all evaluation rounds CrowdStrike has completed (ENTERPRISE only).
|
|
|
|
Each dict has: name, display_name, eval_round.
|
|
Sorted by eval_round ascending.
|
|
|
|
Falls back to ``_FALLBACK_ROUNDS`` if the live API is unreachable.
|
|
"""
|
|
return fetch_rounds_with_status()["rounds"]
|
|
|
|
|
|
def get_latest_round() -> dict[str, Any]:
|
|
"""Return the most recent PUBLIC ENTERPRISE round CrowdStrike participated in."""
|
|
rounds = fetch_available_rounds()
|
|
if not rounds:
|
|
raise ValueError("No public Enterprise evaluation rounds found for CrowdStrike")
|
|
return rounds[-1]
|
|
|
|
|
|
def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]:
|
|
"""Fetch all per-substep detection results for a specific adversary round.
|
|
|
|
Returns a flat list of substep dicts, each containing:
|
|
technique_id, technique_name, tactic_id, best_score, detection_type, note.
|
|
"""
|
|
url = f"{_BASE}/api/results/?participant={_VENDOR}&domain={_DOMAIN}"
|
|
try:
|
|
session = requests.Session()
|
|
session.headers.update(_HEADERS)
|
|
resp = session.get(url, timeout=_TIMEOUT)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except Exception as exc:
|
|
logger.error("Failed to fetch ATT&CK Evaluations results: %s", exc)
|
|
raise
|
|
|
|
# The results endpoint returns a LIST of vendor objects:
|
|
# [{"name": "crowdstrike", "adversaries": [{"Adversary_Name": "apt3", ...}, ...]}, ...]
|
|
# (not a dict — hence the explicit vendor lookup below)
|
|
if isinstance(data, list):
|
|
vendor_entry = next(
|
|
(v for v in data if isinstance(v, dict) and v.get("name", "").lower() == _VENDOR),
|
|
None,
|
|
)
|
|
if not vendor_entry:
|
|
raise ValueError(
|
|
f"Vendor '{_VENDOR}' not found in results response. "
|
|
f"Available vendors: {[v.get('name') for v in data if isinstance(v, dict)]}"
|
|
)
|
|
adversaries = vendor_entry.get("adversaries", [])
|
|
else:
|
|
# Fallback for legacy dict-shaped response (just in case API changes again)
|
|
adversaries = data.get("adversaries", [])
|
|
|
|
target = next(
|
|
(a for a in adversaries if a.get("Adversary_Name", "").lower() == adversary_name.lower()),
|
|
None,
|
|
)
|
|
if not target:
|
|
raise ValueError(
|
|
f"Adversary '{adversary_name}' not found in results. "
|
|
f"Available: {[a.get('Adversary_Name') for a in adversaries]}"
|
|
)
|
|
|
|
substeps: list[dict[str, Any]] = []
|
|
|
|
scenarios = target.get("Detections_By_Step", {})
|
|
for _scenario_name, scenario_data in scenarios.items():
|
|
for step in scenario_data.get("Steps", []):
|
|
step_num = step.get("Step_Num", "")
|
|
step_name = step.get("Step_Name", "")
|
|
# Strip HTML tags from the Step.Description narrative
|
|
step_desc_raw = step.get("Description") or ""
|
|
step_description = re.sub(r"<[^>]+>", " ", step_desc_raw)
|
|
step_description = re.sub(r"\s+", " ", step_description).strip()
|
|
|
|
for substep in step.get("Substeps", []):
|
|
# Prefer sub-technique over technique
|
|
sub = substep.get("Subtechnique") or {}
|
|
tech = substep.get("Technique") or {}
|
|
tactic = substep.get("Tactic") or {}
|
|
|
|
technique_id = (
|
|
sub.get("Subtechnique_Id")
|
|
or tech.get("Technique_Id")
|
|
or ""
|
|
).strip()
|
|
technique_name = (
|
|
sub.get("Subtechnique_Name")
|
|
or tech.get("Technique_Name")
|
|
or "Unknown"
|
|
).strip()
|
|
|
|
if not technique_id:
|
|
continue
|
|
|
|
detections = substep.get("Detections", [])
|
|
best_score = 0
|
|
best_type = "None"
|
|
best_note = ""
|
|
for det in detections:
|
|
dtype = det.get("Detection_Type", "None")
|
|
s = _score(dtype)
|
|
if s > best_score:
|
|
best_score = s
|
|
best_type = dtype
|
|
best_note = det.get("Detection_Note", "")
|
|
|
|
# Collect all unique data sources from screenshots across all detections
|
|
data_sources: list[str] = sorted({
|
|
src
|
|
for det in detections
|
|
for sc in det.get("Screenshots", [])
|
|
for src in sc.get("Data_Sources", [])
|
|
})
|
|
|
|
substeps.append(
|
|
{
|
|
"technique_id": technique_id,
|
|
"technique_name": technique_name,
|
|
"tactic_id": tactic.get("Tactic_Id", ""),
|
|
"tactic_name": tactic.get("Tactic_Name", ""),
|
|
"best_score": best_score,
|
|
"detection_type": best_type,
|
|
"note": best_note,
|
|
# Enrichment fields from the API
|
|
"step_num": step_num,
|
|
"step_name": step_name,
|
|
"step_description": step_description,
|
|
"substep_ref": substep.get("Substep", ""),
|
|
"criteria": (substep.get("Criteria") or "").strip(),
|
|
"data_sources": data_sources,
|
|
}
|
|
)
|
|
|
|
return substeps
|
|
|
|
|
|
def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]:
|
|
"""Aggregate substep results per technique.
|
|
|
|
Keeps the best detection score and accumulates ALL occurrences so that
|
|
the importer can build a rich attack-path narrative in procedure_text.
|
|
"""
|
|
by_technique: dict[str, dict] = {}
|
|
for sub in substeps:
|
|
tid = sub["technique_id"]
|
|
if tid not in by_technique:
|
|
by_technique[tid] = {**sub, "occurrences": []}
|
|
|
|
# Always record this occurrence for the narrative
|
|
by_technique[tid]["occurrences"].append({
|
|
"substep_ref": sub["substep_ref"],
|
|
"step_num": sub["step_num"],
|
|
"step_name": sub["step_name"],
|
|
"step_description": sub["step_description"],
|
|
"criteria": sub["criteria"],
|
|
"data_sources": sub["data_sources"],
|
|
"detection_type": sub["detection_type"],
|
|
"best_score": sub["best_score"],
|
|
"note": sub["note"],
|
|
})
|
|
|
|
# Promote to best detection if this substep scored higher
|
|
if sub["best_score"] > by_technique[tid]["best_score"]:
|
|
by_technique[tid]["best_score"] = sub["best_score"]
|
|
by_technique[tid]["detection_type"] = sub["detection_type"]
|
|
by_technique[tid]["note"] = sub["note"]
|
|
by_technique[tid]["tactic_id"] = sub["tactic_id"]
|
|
by_technique[tid]["tactic_name"] = sub["tactic_name"]
|
|
|
|
return by_technique
|
|
|
|
|
|
def _build_procedure_text(agg: dict, adversary_display: str, eval_round: int) -> str:
|
|
"""Build a rich attack-path narrative for the Test.procedure_text field."""
|
|
occurrences = agg.get("occurrences", [])
|
|
if not occurrences:
|
|
return (
|
|
f"MITRE ATT&CK Evaluation simulation using {adversary_display} TTPs. "
|
|
f"See evaluation report at https://evals.mitre.org for full details."
|
|
)
|
|
|
|
lines: list[str] = []
|
|
lines.append(f"ATT&CK Evaluation R{eval_round} — {adversary_display}\n")
|
|
|
|
# Include step description(s) — deduplicated, one per step
|
|
seen_steps: set = set()
|
|
for occ in occurrences:
|
|
step_key = str(occ.get("step_num", ""))
|
|
step_name = occ.get("step_name", "")
|
|
step_desc = occ.get("step_description", "")
|
|
if step_key and step_key not in seen_steps and step_desc:
|
|
seen_steps.add(step_key)
|
|
truncated = step_desc[:500] + ("..." if len(step_desc) > 500 else "")
|
|
lines.append(f"Step {step_key} — {step_name}:")
|
|
lines.append(truncated)
|
|
lines.append("")
|
|
|
|
# List all attack criteria for this technique
|
|
lines.append("Attack steps observed:")
|
|
for occ in occurrences:
|
|
ref = occ.get("substep_ref", "")
|
|
criteria = occ.get("criteria", "")
|
|
step_name = occ.get("step_name", "")
|
|
if criteria:
|
|
prefix = f"[{ref}]" if ref else "•"
|
|
lines.append(f" {prefix} {criteria}")
|
|
if step_name:
|
|
lines.append(f" ↳ Step: {step_name}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_description(agg: dict, adversary_display: str, eval_round: int) -> str:
|
|
"""Build the full Test.description with detection details and attack path."""
|
|
occurrences = agg.get("occurrences", [])
|
|
|
|
# Collect all unique data sources across every occurrence of this technique
|
|
all_data_sources: list[str] = sorted({
|
|
src
|
|
for occ in occurrences
|
|
for src in occ.get("data_sources", [])
|
|
})
|
|
|
|
header = (
|
|
f"Source: MITRE ATT&CK Evaluation — Round {eval_round} ({adversary_display}).\n"
|
|
f"Vendor: CrowdStrike Falcon.\n"
|
|
f"Detection type achieved: {agg['detection_type']}."
|
|
)
|
|
|
|
ds_section = ""
|
|
if all_data_sources:
|
|
ds_section = "\n\nData sources observed:\n" + "\n".join(
|
|
f" • {ds}" for ds in all_data_sources
|
|
)
|
|
|
|
# Attack path / substep criteria section
|
|
path_lines: list[str] = []
|
|
for occ in occurrences:
|
|
ref = occ.get("substep_ref", "")
|
|
criteria = occ.get("criteria", "")
|
|
step_name = occ.get("step_name", "")
|
|
det_type = occ.get("detection_type", "")
|
|
if criteria:
|
|
label = f"[{ref}]" if ref else "•"
|
|
step_label = f" ({step_name})" if step_name else ""
|
|
det_label = f" — {det_type}" if det_type and det_type.lower() != "none" else ""
|
|
path_lines.append(f" {label}{step_label}{det_label}:")
|
|
path_lines.append(f" {criteria}")
|
|
|
|
path_section = ""
|
|
if path_lines:
|
|
path_section = "\n\nAttack path — substep criteria:\n" + "\n".join(path_lines)
|
|
|
|
warning = (
|
|
f"\n\n⚠️ IMPORTANT: These results reflect CrowdStrike Falcon performance in a "
|
|
f"controlled MITRE lab environment against a simulated {adversary_display} "
|
|
f"adversary. They do NOT represent your organisation's actual detection "
|
|
f"capability. Validate in your own environment before approving."
|
|
)
|
|
|
|
note_section = ""
|
|
if agg.get("note"):
|
|
note_section = f"\n\nMITRE note: {agg['note']}"
|
|
|
|
return header + ds_section + path_section + warning + note_section
|
|
|
|
|
|
def _build_red_summary(agg: dict, adversary_display: str, eval_round: int) -> str:
|
|
"""Build the Red Team summary for the Test.red_summary field."""
|
|
occurrences = agg.get("occurrences", [])
|
|
|
|
lines = [
|
|
f"MITRE ATT&CK Evaluation — Round {eval_round} ({adversary_display})",
|
|
f"Vendor: CrowdStrike Falcon",
|
|
f"Best detection level: {agg['detection_type']}",
|
|
f"Tactic: {agg['tactic_name']} ({agg['tactic_id']})",
|
|
]
|
|
|
|
if occurrences:
|
|
lines.append("")
|
|
lines.append("Substeps:")
|
|
for occ in occurrences:
|
|
ref = occ.get("substep_ref", "")
|
|
criteria = occ.get("criteria", "")
|
|
step_name = occ.get("step_name", "")
|
|
det = occ.get("detection_type", "")
|
|
if criteria:
|
|
tag = f"[{ref}]" if ref else "•"
|
|
step_tag = f" {step_name}:" if step_name else ""
|
|
det_tag = f" [{det}]" if det and det.lower() != "none" else ""
|
|
lines.append(f" {tag}{step_tag}{det_tag} {criteria}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main import function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def import_evaluation_round(
|
|
db: Session,
|
|
adversary_name: str,
|
|
adversary_display: str,
|
|
eval_round: int,
|
|
current_user: User,
|
|
) -> dict[str, Any]:
|
|
"""Import a single ATT&CK Evaluation round for CrowdStrike into the platform.
|
|
|
|
Creates one Test per unique technique with the best detection result
|
|
observed across all substeps for that technique. All tests land in
|
|
``in_review`` state — Blue Leads must confirm before they count as coverage.
|
|
|
|
Returns a summary dict: created, skipped, techniques_covered.
|
|
Raises if the round was already imported (idempotency guard).
|
|
"""
|
|
# Idempotency — refuse duplicate imports
|
|
existing = (
|
|
db.query(EvaluationImport)
|
|
.filter(
|
|
EvaluationImport.adversary_name == adversary_name.lower(),
|
|
EvaluationImport.status == "completed",
|
|
)
|
|
.first()
|
|
)
|
|
if existing:
|
|
raise ValueError(
|
|
f"Round '{adversary_display}' (round {eval_round}) was already imported "
|
|
f"on {existing.imported_at.date()}. Re-import is not allowed."
|
|
)
|
|
|
|
# Fetch and aggregate substep results
|
|
substeps = fetch_results_for_adversary(adversary_name)
|
|
by_technique = _aggregate_by_technique(substeps)
|
|
|
|
created = 0
|
|
skipped = 0
|
|
affected_technique_ids: set = set()
|
|
|
|
for mitre_id, agg in by_technique.items():
|
|
# Look up the technique in our DB
|
|
technique = (
|
|
db.query(Technique)
|
|
.filter(Technique.mitre_id == mitre_id.upper())
|
|
.first()
|
|
)
|
|
if technique is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
detection_result = _score_to_result(agg["best_score"])
|
|
|
|
description = _build_description(agg, adversary_display, eval_round)
|
|
red_summary = _build_red_summary(agg, adversary_display, eval_round)
|
|
procedure_text = _build_procedure_text(agg, adversary_display, eval_round)
|
|
|
|
test = Test(
|
|
technique_id=technique.id,
|
|
name=f"[EVAL R{eval_round}] {adversary_display} — {technique.name}",
|
|
description=description,
|
|
platform=None,
|
|
procedure_text=procedure_text,
|
|
created_by=current_user.id,
|
|
state=TestState.in_review,
|
|
attack_success=True,
|
|
red_summary=red_summary,
|
|
red_validation_status="approved",
|
|
red_validated_by=current_user.id,
|
|
red_validated_at=datetime.utcnow(),
|
|
detection_result=detection_result,
|
|
blue_validation_status=None,
|
|
execution_date=datetime.utcnow(),
|
|
created_at=datetime.utcnow(),
|
|
)
|
|
db.add(test)
|
|
db.flush()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="eval_import_test",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={
|
|
"adversary": adversary_name,
|
|
"eval_round": eval_round,
|
|
"mitre_id": mitre_id,
|
|
"detection_type": agg["detection_type"],
|
|
},
|
|
)
|
|
|
|
affected_technique_ids.add(technique.id)
|
|
created += 1
|
|
|
|
# Recalculate coverage for all touched techniques
|
|
for tech_id in affected_technique_ids:
|
|
tech = db.query(Technique).filter(Technique.id == tech_id).first()
|
|
if tech:
|
|
recalculate_technique_status(db, tech)
|
|
|
|
# Record the import
|
|
record = EvaluationImport(
|
|
id=uuid.uuid4(),
|
|
adversary_name=adversary_name.lower(),
|
|
adversary_display=adversary_display,
|
|
eval_round=eval_round,
|
|
imported_at=datetime.utcnow(),
|
|
imported_by=current_user.id,
|
|
tests_created=created,
|
|
techniques_covered=len(affected_technique_ids),
|
|
status="completed",
|
|
notes=f"Skipped {skipped} techniques not found in local DB.",
|
|
)
|
|
db.add(record)
|
|
db.commit()
|
|
|
|
logger.info(
|
|
"ATT&CK Evaluation import complete — round %d (%s): %d tests created, %d skipped",
|
|
eval_round, adversary_display, created, skipped,
|
|
)
|
|
return {
|
|
"created": created,
|
|
"skipped": skipped,
|
|
"techniques_covered": len(affected_technique_ids),
|
|
"adversary": adversary_display,
|
|
"eval_round": eval_round,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# New-round check (used by the weekly scheduler)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def check_for_new_round(db: Session) -> dict[str, Any]:
|
|
"""Check if a new evaluation round is available that hasn't been imported yet.
|
|
|
|
Returns:
|
|
{"new_round_available": bool, "latest_round": dict | None, "already_imported": bool}
|
|
"""
|
|
try:
|
|
latest = get_latest_round()
|
|
except Exception as exc:
|
|
logger.warning("Could not check for new ATT&CK Evaluation round: %s", exc)
|
|
return {"new_round_available": False, "latest_round": None, "error": str(exc)}
|
|
|
|
already = (
|
|
db.query(EvaluationImport)
|
|
.filter(
|
|
EvaluationImport.adversary_name == latest["name"].lower(),
|
|
EvaluationImport.status == "completed",
|
|
)
|
|
.first()
|
|
)
|
|
|
|
return {
|
|
"new_round_available": already is None,
|
|
"already_imported": already is not None,
|
|
"latest_round": {
|
|
"name": latest["name"],
|
|
"display_name": latest.get("display_name", latest["name"]),
|
|
"eval_round": latest["eval_round"],
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Re-enrich existing tests with richer API data
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def re_enrich_evaluation_round(
|
|
db: Session,
|
|
adversary_name: str,
|
|
adversary_display: str,
|
|
eval_round: int,
|
|
current_user: User,
|
|
) -> dict[str, Any]:
|
|
"""Update procedure_text / description / red_summary on already-imported tests
|
|
for a given round using the enriched API data (attack path, criteria, data sources).
|
|
|
|
This is non-destructive — it only updates the three narrative fields and does
|
|
not change detection results, state, or validation status.
|
|
"""
|
|
# Fetch & aggregate (same flow as import)
|
|
substeps = fetch_results_for_adversary(adversary_name)
|
|
by_technique = _aggregate_by_technique(substeps)
|
|
|
|
updated = 0
|
|
skipped = 0
|
|
|
|
for mitre_id, agg in by_technique.items():
|
|
technique = (
|
|
db.query(Technique)
|
|
.filter(Technique.mitre_id == mitre_id.upper())
|
|
.first()
|
|
)
|
|
if technique is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Find the existing test for this round + technique
|
|
existing_test = (
|
|
db.query(Test)
|
|
.filter(
|
|
Test.technique_id == technique.id,
|
|
Test.name.like(f"[EVAL R{eval_round}]%"),
|
|
)
|
|
.first()
|
|
)
|
|
if not existing_test:
|
|
skipped += 1
|
|
continue
|
|
|
|
existing_test.description = _build_description(agg, adversary_display, eval_round)
|
|
existing_test.red_summary = _build_red_summary(agg, adversary_display, eval_round)
|
|
existing_test.procedure_text = _build_procedure_text(agg, adversary_display, eval_round)
|
|
updated += 1
|
|
|
|
db.commit()
|
|
|
|
logger.info(
|
|
"Re-enrichment complete — round %d (%s): %d tests updated, %d skipped",
|
|
eval_round, adversary_display, updated, skipped,
|
|
)
|
|
return {
|
|
"updated": updated,
|
|
"skipped": skipped,
|
|
"adversary": adversary_display,
|
|
"eval_round": eval_round,
|
|
"message": (
|
|
f"Re-enriched {updated} tests for {adversary_display} (Round {eval_round}) "
|
|
f"with attack path, criteria and data sources from MITRE API."
|
|
),
|
|
}
|