test(intel): verify OSINT enrichment and stale coverage detection [FASE-4]

This commit is contained in:
2026-05-18 14:50:31 +02:00
parent bdeeed54e1
commit 2ee59d4e18
3 changed files with 218 additions and 6 deletions

View File

@@ -0,0 +1,127 @@
"""Tests for OSINT enrichment (NVD CVE lookup per technique)."""
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
from app.models.enums import TechniqueStatus
from app.models.osint_item import OsintItem
from app.models.technique import Technique
from app.services.osint_enrichment_service import (
NVD_RATE_LIMIT_BATCH,
NVD_RATE_LIMIT_WAIT,
enrich_all_techniques,
enrich_technique_with_cves,
)
SAMPLE_NVD_RESPONSE = {
"vulnerabilities": [
{
"cve": {
"id": "CVE-2024-12345",
"descriptions": [{"lang": "en", "value": "Remote code execution example."}],
"metrics": {
"cvssMetricV31": [
{
"cvssData": {
"baseSeverity": "HIGH",
"baseScore": 8.1,
}
}
]
},
}
},
{
"cve": {
"id": "CVE-2024-99999",
"descriptions": [{"lang": "en", "value": "Another issue."}],
"metrics": {},
}
},
]
}
def _technique(db, *, mitre_id="T1059", name="Command and Scripting Interpreter"):
tech = Technique(
mitre_id=mitre_id,
name=name,
tactic="execution",
status_global=TechniqueStatus.in_progress,
)
db.add(tech)
db.commit()
db.refresh(tech)
return tech
@patch("app.services.osint_enrichment_service.requests.get")
def test_enrich_technique_fetches_cves(mock_get, db):
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = SAMPLE_NVD_RESPONSE
mock_get.return_value = mock_resp
tech = _technique(db)
count = enrich_technique_with_cves(db, tech)
assert count == 2
items = db.query(OsintItem).filter(OsintItem.technique_id == tech.id).all()
assert len(items) == 2
assert items[0].source_type == "cve"
assert "CVE-2024" in items[0].source_url
assert tech.review_required is True
mock_get.assert_called_once()
assert mock_get.call_args.kwargs["params"]["keywordSearch"] == tech.name
@patch("app.services.osint_enrichment_service.requests.get")
def test_enrich_technique_no_duplicates_on_rerun(mock_get, db):
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = SAMPLE_NVD_RESPONSE
mock_get.return_value = mock_resp
tech = _technique(db)
assert enrich_technique_with_cves(db, tech) == 2
assert enrich_technique_with_cves(db, tech) == 0
assert db.query(OsintItem).filter(OsintItem.technique_id == tech.id).count() == 2
@patch("app.services.osint_enrichment_service.requests.get")
def test_enrich_technique_api_error_returns_zero(mock_get, db):
mock_resp = MagicMock()
mock_resp.status_code = 503
mock_get.return_value = mock_resp
tech = _technique(db)
assert enrich_technique_with_cves(db, tech) == 0
assert db.query(OsintItem).count() == 0
assert tech.review_required is False
@patch("app.services.osint_enrichment_service.time.sleep")
@patch("app.services.osint_enrichment_service.requests.get")
def test_enrich_all_techniques_rate_limits_nvd(mock_get, mock_sleep, db):
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"vulnerabilities": []}
mock_get.return_value = mock_resp
for i in range(NVD_RATE_LIMIT_BATCH + 1):
db.add(
Technique(
mitre_id=f"T100{i}",
name=f"Technique {i}",
tactic="execution",
)
)
db.commit()
total = enrich_all_techniques(db)
assert total == 0
assert mock_get.call_count == NVD_RATE_LIMIT_BATCH + 1
mock_sleep.assert_called_once_with(NVD_RATE_LIMIT_WAIT)

View File

@@ -0,0 +1,78 @@
"""Tests for stale coverage detection."""
from datetime import datetime, timedelta, timezone
from app.models.enums import TechniqueStatus, TestState
from app.models.technique import Technique
from app.models.test import Test
from app.services.stale_detection_service import STALE_THRESHOLD_DAYS, detect_stale_coverage
def _technique(db, *, mitre_id="T1059", status=TechniqueStatus.validated):
tech = Technique(
mitre_id=mitre_id,
name="Command and Scripting Interpreter",
tactic="execution",
status_global=status,
review_required=False,
)
db.add(tech)
db.commit()
db.refresh(tech)
return tech
def _validated_test(db, technique, *, days_ago: int):
validated_at = datetime.now(timezone.utc) - timedelta(days=days_ago)
test = Test(
technique_id=technique.id,
name="Coverage test",
state=TestState.validated,
red_validated_at=validated_at,
blue_validated_at=validated_at,
created_at=validated_at,
)
db.add(test)
db.commit()
return test
def test_stale_technique_flagged_after_threshold(db):
tech = _technique(db)
_validated_test(db, tech, days_ago=STALE_THRESHOLD_DAYS + 30)
count = detect_stale_coverage(db)
db.refresh(tech)
assert count == 1
assert tech.review_required is True
def test_recent_validated_technique_not_flagged(db):
tech = _technique(db)
_validated_test(db, tech, days_ago=30)
count = detect_stale_coverage(db)
db.refresh(tech)
assert count == 0
assert tech.review_required is False
def test_not_evaluated_never_tested_not_flagged(db):
tech = _technique(db, status=TechniqueStatus.not_evaluated)
count = detect_stale_coverage(db)
db.refresh(tech)
assert count == 0
assert tech.review_required is False
def test_stale_detection_idempotent(db):
tech = _technique(db, mitre_id="T1204")
_validated_test(db, tech, days_ago=STALE_THRESHOLD_DAYS + 60)
tech.review_required = True
db.commit()
assert detect_stale_coverage(db) == 0