"""Automated threat-intelligence scan service. Searches public security RSS feeds for mentions of MITRE ATT&CK technique IDs and names. New findings are stored as :class:`IntelItem` records and the related technique is flagged for review. This is an **MVP** implementation — it queries a small set of well-known RSS feeds and parses them with the standard-library :mod:`xml.etree` parser. No LLMs or paid APIs are used. """ import logging import re import defusedxml.ElementTree as ET from datetime import datetime import requests as _requests from sqlalchemy.orm import Session from app.models.intel import IntelItem from app.models.technique import Technique from app.services.audit_service import log_action logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Public security RSS feeds # --------------------------------------------------------------------------- RSS_FEEDS: list[dict[str, str]] = [ { "name": "CISA Alerts", "url": "https://www.cisa.gov/cybersecurity-advisories/all.xml", }, { "name": "SecurityWeek", "url": "https://feeds.feedburner.com/Securityweek", }, { "name": "SANS ISC", "url": "https://isc.sans.edu/rssfeed.xml", }, { "name": "BleepingComputer", "url": "https://www.bleepingcomputer.com/feed/", }, { "name": "The Hacker News", "url": "https://feeds.feedburner.com/TheHackersNews", }, { "name": "Krebs on Security", "url": "https://krebsonsecurity.com/feed/", }, ] # Timeout for each feed request (seconds) _FEED_TIMEOUT = 15 # Minimum technique name length for name-based matching # Short names ("Kill", "BITS") produce too many false positives _MIN_NAME_LEN = 8 # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _fetch_feed(url: str) -> list[dict[str, str]]: """Download and parse an RSS/Atom feed, returning a list of entries. Each entry is a dict with keys ``title``, ``link``, and ``description``. Returns an empty list on any error so the scan can continue. """ try: resp = _requests.get(url, timeout=_FEED_TIMEOUT, headers={ "User-Agent": "AegisPlatform/1.0 IntelScan", }) resp.raise_for_status() except Exception as exc: logger.warning("Failed to fetch feed %s: %s", url, exc) return [] try: root = ET.fromstring(resp.content) except ET.ParseError as exc: logger.warning("Failed to parse feed %s: %s", url, exc) return [] entries: list[dict[str, str]] = [] # RSS 2.0 format: ... for item in root.iter("item"): title_el = item.find("title") link_el = item.find("link") desc_el = item.find("description") entries.append({ "title": title_el.text.strip() if title_el is not None and title_el.text else "", "link": link_el.text.strip() if link_el is not None and link_el.text else "", "description": desc_el.text.strip() if desc_el is not None and desc_el.text else "", }) # Atom format: ... ns = {"atom": "http://www.w3.org/2005/Atom"} for entry in root.iter("{http://www.w3.org/2005/Atom}entry"): title_el = entry.find("atom:title", ns) link_el = entry.find("atom:link", ns) summary_el = entry.find("atom:summary", ns) link_href = "" if link_el is not None: link_href = link_el.get("href", "") entries.append({ "title": title_el.text.strip() if title_el is not None and title_el.text else "", "link": link_href.strip(), "description": summary_el.text.strip() if summary_el is not None and summary_el.text else "", }) return entries def _build_patterns(technique: Technique) -> tuple[list[re.Pattern], list[re.Pattern]]: """Build regex patterns for a technique. Returns two lists: - ``id_patterns``: MITRE ID patterns (high confidence, word-boundary matched) - ``name_patterns``: technique name patterns (lower confidence, long names only) """ id_patterns: list[re.Pattern] = [] name_patterns: list[re.Pattern] = [] # MITRE ID with word boundaries so T1059 doesn't partially match T1059.001 mitre_id_escaped = re.escape(technique.mitre_id) id_patterns.append(re.compile(rf"\b{mitre_id_escaped}\b", re.IGNORECASE)) # Technique name — only for distinctly long names to reduce false positives if technique.name and len(technique.name) >= _MIN_NAME_LEN: name_escaped = re.escape(technique.name) name_patterns.append(re.compile(rf"\b{name_escaped}\b", re.IGNORECASE)) return id_patterns, name_patterns def _entry_matches( entry: dict[str, str], id_patterns: list[re.Pattern], name_patterns: list[re.Pattern], ) -> bool: """Return True if any pattern matches the entry's title or description.""" text = f"{entry.get('title', '')} {entry.get('description', '')}" return any(p.search(text) for p in id_patterns + name_patterns) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def scan_intel(db: Session) -> dict: """Run the intel scan across RSS feeds for known techniques. Parameters ---------- db : Session Active SQLAlchemy database session. Returns ------- dict Summary with keys ``new_items``, ``duplicates_skipped``, ``techniques_flagged``, ``feeds_checked``. """ logger.info("Intel scan starting...") # 1. Load all active techniques techniques = ( db.query(Technique) .order_by(Technique.mitre_id) .all() ) logger.info("Scanning %d techniques against %d feeds", len(techniques), len(RSS_FEEDS)) # 2. Pre-load all existing intel URLs for dedup existing_urls: set[str] = { row[0] for row in db.query(IntelItem.url).all() } # 3. Fetch all feeds once all_entries: list[tuple[str, dict[str, str]]] = [] # (feed_name, entry) feeds_ok = 0 for feed in RSS_FEEDS: entries = _fetch_feed(feed["url"]) if entries: feeds_ok += 1 for entry in entries: all_entries.append((feed["name"], entry)) logger.info("Fetched %d entries from %d/%d feeds", len(all_entries), feeds_ok, len(RSS_FEEDS)) # 4. Match entries to techniques new_items = 0 duplicates_skipped = 0 techniques_flagged: set[str] = set() for technique in techniques: id_patterns, name_patterns = _build_patterns(technique) for feed_name, entry in all_entries: if not _entry_matches(entry, id_patterns, name_patterns): continue # Skip entries with no title (low-quality) if not entry.get("title", "").strip(): continue url = entry.get("link", "").strip() if not url: continue # Dedup if url in existing_urls: duplicates_skipped += 1 continue # Create IntelItem intel_item = IntelItem( technique_id=technique.id, url=url, title=entry.get("title", "")[:500], source=feed_name, detected_at=datetime.utcnow(), reviewed=False, ) db.add(intel_item) existing_urls.add(url) new_items += 1 # Flag technique for review if not technique.review_required: technique.review_required = True techniques_flagged.add(technique.mitre_id) # 5. Single commit db.commit() summary = { "new_items": new_items, "duplicates_skipped": duplicates_skipped, "techniques_flagged": len(techniques_flagged), "feeds_checked": feeds_ok, } logger.info( "Intel scan complete — new=%d, duplicates_skipped=%d, " "techniques_flagged=%d, feeds_checked=%d", new_items, duplicates_skipped, len(techniques_flagged), feeds_ok, ) # 6. Audit log log_action( db, user_id=None, action="intel_scan", entity_type="intel_item", entity_id=None, details=summary, ) db.commit() return summary