c0cecab797
The /api/results/ endpoint returns a LIST: [{name: crowdstrike, adversaries: [...]}]
Previous code called data.get() on the list → AttributeError crash on every import.
Fix: detect list vs dict response, extract the crowdstrike vendor entry first,
then get its adversaries list. Keeps legacy dict fallback just in case.
520 lines
18 KiB
Python
520 lines
18 KiB
Python
"""ATT&CK Evaluations importer — fetches real CrowdStrike detection results
|
|
from MITRE Engenuity's public API and seeds the platform with validated tests.
|
|
|
|
Data source
|
|
-----------
|
|
https://evals.mitre.org/api/
|
|
- /participants/ → list of vendors + rounds they completed
|
|
- /results/?participant=crowdstrike&domain=ENTERPRISE
|
|
→ per-substep detection results per adversary
|
|
|
|
Detection level mapping (MITRE → Aegis)
|
|
---------------------------------------
|
|
Technique / Specific Behavior → detected (correctly identified ATT&CK technique)
|
|
Tactic → partially_detected (behavior noted but not categorized)
|
|
General / IOC / MSSP → partially_detected (anomaly detected, not ATT&CK-mapped)
|
|
Telemetry → partially_detected (raw data only — marginal detection)
|
|
None / N/A → not_detected
|
|
|
|
All imported tests are created in ``in_review`` state so Blue Leads must
|
|
confirm each result before it counts as real coverage for the organisation.
|
|
|
|
Important caveats stored in every test's description
|
|
------------------------------------------------------
|
|
"Source: MITRE ATT&CK Evaluation (Round N — Adversary). Results reflect
|
|
CrowdStrike Falcon in a controlled lab environment, NOT this organisation's
|
|
deployment. Validate detection in your own environment before approving."
|
|
"""
|
|
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import requests
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.enums import TestState, TestResult
|
|
from app.models.evaluation_import import EvaluationImport
|
|
from app.models.technique import Technique
|
|
from app.models.test import Test
|
|
from app.models.user import User
|
|
from app.services.audit_service import log_action
|
|
from app.services.status_service import recalculate_technique_status
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_BASE = "https://evals.mitre.org"
|
|
_TIMEOUT = 30 # seconds per HTTP call
|
|
_VENDOR = "crowdstrike"
|
|
_DOMAIN = "ENTERPRISE"
|
|
|
|
# Browser-like headers to bypass Cloudflare bot protection on evals.mitre.org
|
|
_HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
"Accept": "application/json, text/plain, */*",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
"Referer": "https://evals.mitre.org/",
|
|
"Origin": "https://evals.mitre.org",
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fallback: hardcoded public CrowdStrike ENTERPRISE rounds
|
|
# Used when evals.mitre.org API is unreachable (Cloudflare 502, outage, etc.)
|
|
#
|
|
# Names use the EXACT slugs the live API returns (hyphens, not underscores).
|
|
# Verified from live API response on 2025-06-05.
|
|
# CrowdStrike did NOT participate in Round 6 (OilRig) — not included.
|
|
# ---------------------------------------------------------------------------
|
|
_FALLBACK_ROUNDS: list[dict[str, Any]] = [
|
|
{
|
|
"name": "apt3",
|
|
"display_name": "APT3",
|
|
"eval_round": 1,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "apt29",
|
|
"display_name": "APT29",
|
|
"eval_round": 2,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "carbanak-fin7",
|
|
"display_name": "Carbanak+FIN7",
|
|
"eval_round": 3,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "wizard-spider-sandworm",
|
|
"display_name": "Wizard Spider + Sandworm",
|
|
"eval_round": 4,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "turla",
|
|
"display_name": "Turla",
|
|
"eval_round": 5,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
{
|
|
"name": "er7",
|
|
"display_name": "Enterprise 2025",
|
|
"eval_round": 7,
|
|
"domain": "ENTERPRISE",
|
|
"status": "PUBLIC",
|
|
},
|
|
]
|
|
|
|
# Detection type → quality score (higher = better)
|
|
_DETECTION_SCORE: dict[str, int] = {
|
|
"none": 0,
|
|
"n/a": 0,
|
|
"telemetry": 1,
|
|
"mssp": 2,
|
|
"general": 2,
|
|
"ioc": 2,
|
|
"tactic": 3,
|
|
"technique": 4,
|
|
"specific behavior": 4,
|
|
}
|
|
|
|
|
|
def _score(detection_type: str) -> int:
|
|
key = (detection_type or "").lower().strip()
|
|
for pattern, score in _DETECTION_SCORE.items():
|
|
if pattern in key:
|
|
return score
|
|
return 0
|
|
|
|
|
|
def _score_to_result(score: int) -> TestResult:
|
|
if score >= 4:
|
|
return TestResult.detected
|
|
if score >= 1:
|
|
return TestResult.partially_detected
|
|
return TestResult.not_detected
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def fetch_rounds_with_status() -> dict[str, Any]:
|
|
"""Fetch CrowdStrike ENTERPRISE rounds and report whether the live API was reachable.
|
|
|
|
Returns::
|
|
|
|
{
|
|
"rounds": [{"name": ..., "display_name": ..., "eval_round": ...}, ...],
|
|
"api_reachable": True | False,
|
|
"api_error": None | "<error message>",
|
|
}
|
|
"""
|
|
try:
|
|
session = requests.Session()
|
|
session.headers.update(_HEADERS)
|
|
resp = session.get(f"{_BASE}/api/participants/", timeout=_TIMEOUT)
|
|
resp.raise_for_status()
|
|
participants = resp.json()
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"evals.mitre.org API unreachable (%s) — using hardcoded fallback round list.",
|
|
exc,
|
|
)
|
|
return {
|
|
"rounds": list(_FALLBACK_ROUNDS),
|
|
"api_reachable": False,
|
|
"api_error": str(exc),
|
|
}
|
|
|
|
crowdstrike = next(
|
|
(p for p in participants if p.get("name", "").lower() == _VENDOR),
|
|
None,
|
|
)
|
|
if not crowdstrike:
|
|
logger.warning("Vendor '%s' not found in live data — using fallback.", _VENDOR)
|
|
return {
|
|
"rounds": list(_FALLBACK_ROUNDS),
|
|
"api_reachable": True, # API was reachable, vendor just wasn't listed
|
|
"api_error": f"Vendor '{_VENDOR}' not found in participants list",
|
|
}
|
|
|
|
rounds = [
|
|
adv
|
|
for adv in crowdstrike.get("adversaries_completed", [])
|
|
if adv.get("domain", "").upper() == _DOMAIN
|
|
and adv.get("status", "").upper() == "PUBLIC"
|
|
]
|
|
rounds.sort(key=lambda x: x.get("eval_round", 0))
|
|
return {
|
|
"rounds": rounds if rounds else list(_FALLBACK_ROUNDS),
|
|
"api_reachable": True,
|
|
"api_error": None,
|
|
}
|
|
|
|
|
|
def fetch_available_rounds() -> list[dict[str, Any]]:
|
|
"""Return all evaluation rounds CrowdStrike has completed (ENTERPRISE only).
|
|
|
|
Each dict has: name, display_name, eval_round.
|
|
Sorted by eval_round ascending.
|
|
|
|
Falls back to ``_FALLBACK_ROUNDS`` if the live API is unreachable.
|
|
"""
|
|
return fetch_rounds_with_status()["rounds"]
|
|
|
|
|
|
def get_latest_round() -> dict[str, Any]:
|
|
"""Return the most recent PUBLIC ENTERPRISE round CrowdStrike participated in."""
|
|
rounds = fetch_available_rounds()
|
|
if not rounds:
|
|
raise ValueError("No public Enterprise evaluation rounds found for CrowdStrike")
|
|
return rounds[-1]
|
|
|
|
|
|
def fetch_results_for_adversary(adversary_name: str) -> list[dict[str, Any]]:
|
|
"""Fetch all per-substep detection results for a specific adversary round.
|
|
|
|
Returns a flat list of substep dicts, each containing:
|
|
technique_id, technique_name, tactic_id, best_score, detection_type, note.
|
|
"""
|
|
url = f"{_BASE}/api/results/?participant={_VENDOR}&domain={_DOMAIN}"
|
|
try:
|
|
session = requests.Session()
|
|
session.headers.update(_HEADERS)
|
|
resp = session.get(url, timeout=_TIMEOUT)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except Exception as exc:
|
|
logger.error("Failed to fetch ATT&CK Evaluations results: %s", exc)
|
|
raise
|
|
|
|
# The results endpoint returns a LIST of vendor objects:
|
|
# [{"name": "crowdstrike", "adversaries": [{"Adversary_Name": "apt3", ...}, ...]}, ...]
|
|
# (not a dict — hence the explicit vendor lookup below)
|
|
if isinstance(data, list):
|
|
vendor_entry = next(
|
|
(v for v in data if isinstance(v, dict) and v.get("name", "").lower() == _VENDOR),
|
|
None,
|
|
)
|
|
if not vendor_entry:
|
|
raise ValueError(
|
|
f"Vendor '{_VENDOR}' not found in results response. "
|
|
f"Available vendors: {[v.get('name') for v in data if isinstance(v, dict)]}"
|
|
)
|
|
adversaries = vendor_entry.get("adversaries", [])
|
|
else:
|
|
# Fallback for legacy dict-shaped response (just in case API changes again)
|
|
adversaries = data.get("adversaries", [])
|
|
|
|
target = next(
|
|
(a for a in adversaries if a.get("Adversary_Name", "").lower() == adversary_name.lower()),
|
|
None,
|
|
)
|
|
if not target:
|
|
raise ValueError(
|
|
f"Adversary '{adversary_name}' not found in results. "
|
|
f"Available: {[a.get('Adversary_Name') for a in adversaries]}"
|
|
)
|
|
|
|
substeps: list[dict[str, Any]] = []
|
|
|
|
scenarios = target.get("Detections_By_Step", {})
|
|
for _scenario_name, scenario_data in scenarios.items():
|
|
for step in scenario_data.get("Steps", []):
|
|
for substep in step.get("Substeps", []):
|
|
# Prefer sub-technique over technique
|
|
sub = substep.get("Subtechnique") or {}
|
|
tech = substep.get("Technique") or {}
|
|
tactic = substep.get("Tactic") or {}
|
|
|
|
technique_id = (
|
|
sub.get("Subtechnique_Id")
|
|
or tech.get("Technique_Id")
|
|
or ""
|
|
).strip()
|
|
technique_name = (
|
|
sub.get("Subtechnique_Name")
|
|
or tech.get("Technique_Name")
|
|
or "Unknown"
|
|
).strip()
|
|
|
|
if not technique_id:
|
|
continue
|
|
|
|
detections = substep.get("Detections", [])
|
|
best_score = 0
|
|
best_type = "None"
|
|
best_note = ""
|
|
for det in detections:
|
|
dtype = det.get("Detection_Type", "None")
|
|
s = _score(dtype)
|
|
if s > best_score:
|
|
best_score = s
|
|
best_type = dtype
|
|
best_note = det.get("Detection_Note", "")
|
|
|
|
substeps.append(
|
|
{
|
|
"technique_id": technique_id,
|
|
"technique_name": technique_name,
|
|
"tactic_id": tactic.get("Tactic_Id", ""),
|
|
"tactic_name": tactic.get("Tactic_Name", ""),
|
|
"best_score": best_score,
|
|
"detection_type": best_type,
|
|
"note": best_note,
|
|
}
|
|
)
|
|
|
|
return substeps
|
|
|
|
|
|
def _aggregate_by_technique(substeps: list[dict]) -> dict[str, dict]:
|
|
"""Aggregate substep results per technique — keep best detection score."""
|
|
by_technique: dict[str, dict] = {}
|
|
for sub in substeps:
|
|
tid = sub["technique_id"]
|
|
if tid not in by_technique or sub["best_score"] > by_technique[tid]["best_score"]:
|
|
by_technique[tid] = sub
|
|
return by_technique
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main import function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def import_evaluation_round(
|
|
db: Session,
|
|
adversary_name: str,
|
|
adversary_display: str,
|
|
eval_round: int,
|
|
current_user: User,
|
|
) -> dict[str, Any]:
|
|
"""Import a single ATT&CK Evaluation round for CrowdStrike into the platform.
|
|
|
|
Creates one Test per unique technique with the best detection result
|
|
observed across all substeps for that technique. All tests land in
|
|
``in_review`` state — Blue Leads must confirm before they count as coverage.
|
|
|
|
Returns a summary dict: created, skipped, techniques_covered.
|
|
Raises if the round was already imported (idempotency guard).
|
|
"""
|
|
# Idempotency — refuse duplicate imports
|
|
existing = (
|
|
db.query(EvaluationImport)
|
|
.filter(
|
|
EvaluationImport.adversary_name == adversary_name.lower(),
|
|
EvaluationImport.status == "completed",
|
|
)
|
|
.first()
|
|
)
|
|
if existing:
|
|
raise ValueError(
|
|
f"Round '{adversary_display}' (round {eval_round}) was already imported "
|
|
f"on {existing.imported_at.date()}. Re-import is not allowed."
|
|
)
|
|
|
|
# Fetch and aggregate substep results
|
|
substeps = fetch_results_for_adversary(adversary_name)
|
|
by_technique = _aggregate_by_technique(substeps)
|
|
|
|
created = 0
|
|
skipped = 0
|
|
affected_technique_ids: set = set()
|
|
|
|
for mitre_id, agg in by_technique.items():
|
|
# Look up the technique in our DB
|
|
technique = (
|
|
db.query(Technique)
|
|
.filter(Technique.mitre_id == mitre_id.upper())
|
|
.first()
|
|
)
|
|
if technique is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
detection_result = _score_to_result(agg["best_score"])
|
|
|
|
description = (
|
|
f"Source: MITRE ATT&CK Evaluation — Round {eval_round} ({adversary_display}).\n"
|
|
f"Vendor: CrowdStrike Falcon.\n"
|
|
f"Detection type achieved: {agg['detection_type']}.\n\n"
|
|
f"⚠️ IMPORTANT: These results reflect CrowdStrike Falcon performance in a "
|
|
f"controlled MITRE lab environment against a simulated {adversary_display} "
|
|
f"adversary. They do NOT represent your organisation's actual detection "
|
|
f"capability. Validate in your own environment before approving."
|
|
)
|
|
if agg["note"]:
|
|
description += f"\n\nMITRE note: {agg['note']}"
|
|
|
|
red_summary = (
|
|
f"MITRE ATT&CK Evaluation — Round {eval_round} ({adversary_display})\n"
|
|
f"Vendor: CrowdStrike Falcon\n"
|
|
f"Best detection level: {agg['detection_type']}\n"
|
|
f"Tactic: {agg['tactic_name']} ({agg['tactic_id']})"
|
|
)
|
|
|
|
test = Test(
|
|
technique_id=technique.id,
|
|
name=f"[EVAL R{eval_round}] {adversary_display} — {technique.name}",
|
|
description=description,
|
|
platform=None,
|
|
procedure_text=(
|
|
f"MITRE ATT&CK Evaluation simulation using {adversary_display} TTPs. "
|
|
f"See evaluation report at https://evals.mitre.org for full details."
|
|
),
|
|
created_by=current_user.id,
|
|
state=TestState.in_review,
|
|
attack_success=True,
|
|
red_summary=red_summary,
|
|
red_validation_status="approved",
|
|
red_validated_by=current_user.id,
|
|
red_validated_at=datetime.utcnow(),
|
|
detection_result=detection_result,
|
|
blue_validation_status=None,
|
|
execution_date=datetime.utcnow(),
|
|
created_at=datetime.utcnow(),
|
|
)
|
|
db.add(test)
|
|
db.flush()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=current_user.id,
|
|
action="eval_import_test",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={
|
|
"adversary": adversary_name,
|
|
"eval_round": eval_round,
|
|
"mitre_id": mitre_id,
|
|
"detection_type": agg["detection_type"],
|
|
},
|
|
)
|
|
|
|
affected_technique_ids.add(technique.id)
|
|
created += 1
|
|
|
|
# Recalculate coverage for all touched techniques
|
|
for tech_id in affected_technique_ids:
|
|
tech = db.query(Technique).filter(Technique.id == tech_id).first()
|
|
if tech:
|
|
recalculate_technique_status(db, tech)
|
|
|
|
# Record the import
|
|
record = EvaluationImport(
|
|
id=uuid.uuid4(),
|
|
adversary_name=adversary_name.lower(),
|
|
adversary_display=adversary_display,
|
|
eval_round=eval_round,
|
|
imported_at=datetime.utcnow(),
|
|
imported_by=current_user.id,
|
|
tests_created=created,
|
|
techniques_covered=len(affected_technique_ids),
|
|
status="completed",
|
|
notes=f"Skipped {skipped} techniques not found in local DB.",
|
|
)
|
|
db.add(record)
|
|
db.commit()
|
|
|
|
logger.info(
|
|
"ATT&CK Evaluation import complete — round %d (%s): %d tests created, %d skipped",
|
|
eval_round, adversary_display, created, skipped,
|
|
)
|
|
return {
|
|
"created": created,
|
|
"skipped": skipped,
|
|
"techniques_covered": len(affected_technique_ids),
|
|
"adversary": adversary_display,
|
|
"eval_round": eval_round,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# New-round check (used by the weekly scheduler)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def check_for_new_round(db: Session) -> dict[str, Any]:
|
|
"""Check if a new evaluation round is available that hasn't been imported yet.
|
|
|
|
Returns:
|
|
{"new_round_available": bool, "latest_round": dict | None, "already_imported": bool}
|
|
"""
|
|
try:
|
|
latest = get_latest_round()
|
|
except Exception as exc:
|
|
logger.warning("Could not check for new ATT&CK Evaluation round: %s", exc)
|
|
return {"new_round_available": False, "latest_round": None, "error": str(exc)}
|
|
|
|
already = (
|
|
db.query(EvaluationImport)
|
|
.filter(
|
|
EvaluationImport.adversary_name == latest["name"].lower(),
|
|
EvaluationImport.status == "completed",
|
|
)
|
|
.first()
|
|
)
|
|
|
|
return {
|
|
"new_round_available": already is None,
|
|
"already_imported": already is not None,
|
|
"latest_round": {
|
|
"name": latest["name"],
|
|
"display_name": latest.get("display_name", latest["name"]),
|
|
"eval_round": latest["eval_round"],
|
|
},
|
|
}
|