128 lines
3.8 KiB
Python
128 lines
3.8 KiB
Python
"""Tests for OSINT enrichment (NVD CVE lookup per technique)."""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.models.enums import TechniqueStatus
|
|
from app.models.osint_item import OsintItem
|
|
from app.models.technique import Technique
|
|
from app.services.osint_enrichment_service import (
|
|
NVD_RATE_LIMIT_BATCH,
|
|
NVD_RATE_LIMIT_WAIT,
|
|
enrich_all_techniques,
|
|
enrich_technique_with_cves,
|
|
)
|
|
|
|
SAMPLE_NVD_RESPONSE = {
|
|
"vulnerabilities": [
|
|
{
|
|
"cve": {
|
|
"id": "CVE-2024-12345",
|
|
"descriptions": [{"lang": "en", "value": "Remote code execution example."}],
|
|
"metrics": {
|
|
"cvssMetricV31": [
|
|
{
|
|
"cvssData": {
|
|
"baseSeverity": "HIGH",
|
|
"baseScore": 8.1,
|
|
}
|
|
}
|
|
]
|
|
},
|
|
}
|
|
},
|
|
{
|
|
"cve": {
|
|
"id": "CVE-2024-99999",
|
|
"descriptions": [{"lang": "en", "value": "Another issue."}],
|
|
"metrics": {},
|
|
}
|
|
},
|
|
]
|
|
}
|
|
|
|
|
|
def _technique(db, *, mitre_id="T1059", name="Command and Scripting Interpreter"):
|
|
tech = Technique(
|
|
mitre_id=mitre_id,
|
|
name=name,
|
|
tactic="execution",
|
|
status_global=TechniqueStatus.in_progress,
|
|
)
|
|
db.add(tech)
|
|
db.commit()
|
|
db.refresh(tech)
|
|
return tech
|
|
|
|
|
|
@patch("app.services.osint_enrichment_service.requests.get")
|
|
def test_enrich_technique_fetches_cves(mock_get, db):
|
|
mock_resp = MagicMock()
|
|
mock_resp.status_code = 200
|
|
mock_resp.json.return_value = SAMPLE_NVD_RESPONSE
|
|
mock_get.return_value = mock_resp
|
|
|
|
tech = _technique(db)
|
|
count = enrich_technique_with_cves(db, tech)
|
|
|
|
assert count == 2
|
|
items = db.query(OsintItem).filter(OsintItem.technique_id == tech.id).all()
|
|
assert len(items) == 2
|
|
assert items[0].source_type == "cve"
|
|
assert "CVE-2024" in items[0].source_url
|
|
assert tech.review_required is True
|
|
mock_get.assert_called_once()
|
|
assert mock_get.call_args.kwargs["params"]["keywordSearch"] == tech.name
|
|
|
|
|
|
@patch("app.services.osint_enrichment_service.requests.get")
|
|
def test_enrich_technique_no_duplicates_on_rerun(mock_get, db):
|
|
mock_resp = MagicMock()
|
|
mock_resp.status_code = 200
|
|
mock_resp.json.return_value = SAMPLE_NVD_RESPONSE
|
|
mock_get.return_value = mock_resp
|
|
|
|
tech = _technique(db)
|
|
assert enrich_technique_with_cves(db, tech) == 2
|
|
assert enrich_technique_with_cves(db, tech) == 0
|
|
assert db.query(OsintItem).filter(OsintItem.technique_id == tech.id).count() == 2
|
|
|
|
|
|
@patch("app.services.osint_enrichment_service.requests.get")
|
|
def test_enrich_technique_api_error_returns_zero(mock_get, db):
|
|
mock_resp = MagicMock()
|
|
mock_resp.status_code = 503
|
|
mock_get.return_value = mock_resp
|
|
|
|
tech = _technique(db)
|
|
assert enrich_technique_with_cves(db, tech) == 0
|
|
assert db.query(OsintItem).count() == 0
|
|
assert tech.review_required is False
|
|
|
|
|
|
@patch("app.services.osint_enrichment_service.time.sleep")
|
|
@patch("app.services.osint_enrichment_service.requests.get")
|
|
def test_enrich_all_techniques_rate_limits_nvd(mock_get, mock_sleep, db):
|
|
mock_resp = MagicMock()
|
|
mock_resp.status_code = 200
|
|
mock_resp.json.return_value = {"vulnerabilities": []}
|
|
mock_get.return_value = mock_resp
|
|
|
|
for i in range(NVD_RATE_LIMIT_BATCH + 1):
|
|
db.add(
|
|
Technique(
|
|
mitre_id=f"T100{i}",
|
|
name=f"Technique {i}",
|
|
tactic="execution",
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
total = enrich_all_techniques(db)
|
|
|
|
assert total == 0
|
|
assert mock_get.call_count == NVD_RATE_LIMIT_BATCH + 1
|
|
mock_sleep.assert_called_once_with(NVD_RATE_LIMIT_WAIT)
|