feat(intel): major intel scan improvements + Review Queue integration
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Backend:
- intel_service: remove 50-technique limit (scan all techniques), improve
pattern matching with word boundaries (\bT1059\b), raise min name length
to 8 chars to reduce false positives, skip entries with empty titles
- technique_query_service: add intel_items to get_technique_detail() so
the technique page now shows recent threat intel articles (last 20)
- New GET /intel/items endpoint with optional technique_id filter
Frontend:
- New api/intel.ts with listIntelItems()
- ReviewQueuePage: complete redesign
* Expandable rows — click a technique to see its intel articles inline
* IntelPanel component fetches articles per technique on expand
* 'Create Template from Intel' button opens pre-filled modal:
name (from article title), source_url (article link), technique_id
User reads the article and fills the attack procedure
* Updated explanation text: lists all 3 reasons a technique can be flagged
(MITRE update / intel scan / new template or detection rule)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -57,8 +57,9 @@ RSS_FEEDS: list[dict[str, str]] = [
|
||||
# Timeout for each feed request (seconds)
|
||||
_FEED_TIMEOUT = 15
|
||||
|
||||
# Maximum number of techniques to scan (to keep MVP fast)
|
||||
_MAX_TECHNIQUES = 50
|
||||
# Minimum technique name length for name-based matching
|
||||
# Short names ("Kill", "BITS") produce too many false positives
|
||||
_MIN_NAME_LEN = 8
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -118,19 +119,36 @@ def _fetch_feed(url: str) -> list[dict[str, str]]:
|
||||
return entries
|
||||
|
||||
|
||||
def _build_patterns(technique: Technique) -> list[re.Pattern]:
|
||||
"""Build regex patterns to search feed content for a given technique."""
|
||||
patterns: list[re.Pattern] = []
|
||||
def _build_patterns(technique: Technique) -> tuple[list[re.Pattern], list[re.Pattern]]:
|
||||
"""Build regex patterns for a technique.
|
||||
|
||||
mitre_id = re.escape(technique.mitre_id)
|
||||
patterns.append(re.compile(mitre_id, re.IGNORECASE))
|
||||
Returns two lists:
|
||||
- ``id_patterns``: MITRE ID patterns (high confidence, word-boundary matched)
|
||||
- ``name_patterns``: technique name patterns (lower confidence, long names only)
|
||||
"""
|
||||
id_patterns: list[re.Pattern] = []
|
||||
name_patterns: list[re.Pattern] = []
|
||||
|
||||
# Technique name — match if the full name appears
|
||||
if technique.name and len(technique.name) > 4:
|
||||
# MITRE ID with word boundaries so T1059 doesn't partially match T1059.001
|
||||
mitre_id_escaped = re.escape(technique.mitre_id)
|
||||
id_patterns.append(re.compile(rf"\b{mitre_id_escaped}\b", re.IGNORECASE))
|
||||
|
||||
# Technique name — only for distinctly long names to reduce false positives
|
||||
if technique.name and len(technique.name) >= _MIN_NAME_LEN:
|
||||
name_escaped = re.escape(technique.name)
|
||||
patterns.append(re.compile(name_escaped, re.IGNORECASE))
|
||||
name_patterns.append(re.compile(rf"\b{name_escaped}\b", re.IGNORECASE))
|
||||
|
||||
return patterns
|
||||
return id_patterns, name_patterns
|
||||
|
||||
|
||||
def _entry_matches(
|
||||
entry: dict[str, str],
|
||||
id_patterns: list[re.Pattern],
|
||||
name_patterns: list[re.Pattern],
|
||||
) -> bool:
|
||||
"""Return True if any pattern matches the entry's title or description."""
|
||||
text = f"{entry.get('title', '')} {entry.get('description', '')}"
|
||||
return any(p.search(text) for p in id_patterns + name_patterns)
|
||||
|
||||
|
||||
def _entry_matches(entry: dict[str, str], patterns: list[re.Pattern]) -> bool:
|
||||
@@ -160,11 +178,10 @@ def scan_intel(db: Session) -> dict:
|
||||
"""
|
||||
logger.info("Intel scan starting...")
|
||||
|
||||
# 1. Load techniques (limit for MVP speed)
|
||||
# 1. Load all active techniques
|
||||
techniques = (
|
||||
db.query(Technique)
|
||||
.order_by(Technique.mitre_id)
|
||||
.limit(_MAX_TECHNIQUES)
|
||||
.all()
|
||||
)
|
||||
logger.info("Scanning %d techniques against %d feeds", len(techniques), len(RSS_FEEDS))
|
||||
@@ -192,10 +209,14 @@ def scan_intel(db: Session) -> dict:
|
||||
techniques_flagged: set[str] = set()
|
||||
|
||||
for technique in techniques:
|
||||
patterns = _build_patterns(technique)
|
||||
id_patterns, name_patterns = _build_patterns(technique)
|
||||
|
||||
for feed_name, entry in all_entries:
|
||||
if not _entry_matches(entry, patterns):
|
||||
if not _entry_matches(entry, id_patterns, name_patterns):
|
||||
continue
|
||||
|
||||
# Skip entries with no title (low-quality)
|
||||
if not entry.get("title", "").strip():
|
||||
continue
|
||||
|
||||
url = entry.get("link", "").strip()
|
||||
|
||||
Reference in New Issue
Block a user