"""Tests for OSINT enrichment (NVD CVE lookup per technique).""" from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, patch import pytest from app.models.enums import TechniqueStatus from app.models.osint_item import OsintItem from app.models.technique import Technique from app.services.osint_enrichment_service import ( NVD_RATE_LIMIT_BATCH, NVD_RATE_LIMIT_WAIT, enrich_all_techniques, enrich_technique_with_cves, ) SAMPLE_NVD_RESPONSE = { "vulnerabilities": [ { "cve": { "id": "CVE-2024-12345", "descriptions": [{"lang": "en", "value": "Remote code execution example."}], "metrics": { "cvssMetricV31": [ { "cvssData": { "baseSeverity": "HIGH", "baseScore": 8.1, } } ] }, } }, { "cve": { "id": "CVE-2024-99999", "descriptions": [{"lang": "en", "value": "Another issue."}], "metrics": {}, } }, ] } def _technique(db, *, mitre_id="T1059", name="Command and Scripting Interpreter"): tech = Technique( mitre_id=mitre_id, name=name, tactic="execution", status_global=TechniqueStatus.in_progress, ) db.add(tech) db.commit() db.refresh(tech) return tech @patch("app.services.osint_enrichment_service.requests.get") def test_enrich_technique_fetches_cves(mock_get, db): mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = SAMPLE_NVD_RESPONSE mock_get.return_value = mock_resp tech = _technique(db) count = enrich_technique_with_cves(db, tech) assert count == 2 items = db.query(OsintItem).filter(OsintItem.technique_id == tech.id).all() assert len(items) == 2 assert items[0].source_type == "cve" assert "CVE-2024" in items[0].source_url assert tech.review_required is True mock_get.assert_called_once() assert mock_get.call_args.kwargs["params"]["keywordSearch"] == tech.name @patch("app.services.osint_enrichment_service.requests.get") def test_enrich_technique_no_duplicates_on_rerun(mock_get, db): mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = SAMPLE_NVD_RESPONSE mock_get.return_value = mock_resp tech = _technique(db) assert enrich_technique_with_cves(db, tech) == 2 assert enrich_technique_with_cves(db, tech) == 0 assert db.query(OsintItem).filter(OsintItem.technique_id == tech.id).count() == 2 @patch("app.services.osint_enrichment_service.requests.get") def test_enrich_technique_api_error_returns_zero(mock_get, db): mock_resp = MagicMock() mock_resp.status_code = 503 mock_get.return_value = mock_resp tech = _technique(db) assert enrich_technique_with_cves(db, tech) == 0 assert db.query(OsintItem).count() == 0 assert tech.review_required is False @patch("app.services.osint_enrichment_service.time.sleep") @patch("app.services.osint_enrichment_service.requests.get") def test_enrich_all_techniques_rate_limits_nvd(mock_get, mock_sleep, db): mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = {"vulnerabilities": []} mock_get.return_value = mock_resp for i in range(NVD_RATE_LIMIT_BATCH + 1): db.add( Technique( mitre_id=f"T100{i}", name=f"Technique {i}", tactic="execution", ) ) db.commit() total = enrich_all_techniques(db) assert total == 0 assert mock_get.call_count == NVD_RATE_LIMIT_BATCH + 1 mock_sleep.assert_called_once_with(NVD_RATE_LIMIT_WAIT)