From 2ee59d4e18cf9b86ed7bafeb7ead859581fc00bd Mon Sep 17 00:00:00 2001 From: Kitos Date: Mon, 18 May 2026 14:50:31 +0200 Subject: [PATCH] test(intel): verify OSINT enrichment and stale coverage detection [FASE-4] --- .../app/services/stale_detection_service.py | 19 ++- .../tests/test_osint_enrichment_service.py | 127 ++++++++++++++++++ backend/tests/test_stale_detection_service.py | 78 +++++++++++ 3 files changed, 218 insertions(+), 6 deletions(-) create mode 100644 backend/tests/test_osint_enrichment_service.py create mode 100644 backend/tests/test_stale_detection_service.py diff --git a/backend/app/services/stale_detection_service.py b/backend/app/services/stale_detection_service.py index 950edb3..bbd8374 100644 --- a/backend/app/services/stale_detection_service.py +++ b/backend/app/services/stale_detection_service.py @@ -6,18 +6,19 @@ this with a multi-factor, configurable decay model with confidence scores. """ import logging -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from sqlalchemy import func from sqlalchemy.orm import Session from app.config import settings +from app.models.enums import TechniqueStatus, TestState from app.models.technique import Technique from app.models.test import Test logger = logging.getLogger(__name__) -STALE_THRESHOLD_DAYS = getattr(settings, "STALE_THRESHOLD_DAYS", 365) +STALE_THRESHOLD_DAYS = settings.STALE_THRESHOLD_DAYS def detect_stale_coverage(db: Session) -> int: @@ -31,15 +32,21 @@ def detect_stale_coverage(db: Session) -> int: Returns the number of newly-flagged techniques. """ - cutoff = datetime.utcnow() - timedelta(days=STALE_THRESHOLD_DAYS) + cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_THRESHOLD_DAYS) + + last_validated = func.coalesce( + Test.blue_validated_at, + Test.red_validated_at, + Test.created_at, + ) # Subquery: latest validated test date per technique latest_test = ( db.query( Test.technique_id, - func.max(Test.created_at).label("last_tested"), + func.max(last_validated).label("last_tested"), ) - .filter(Test.state == "validated") + .filter(Test.state == TestState.validated) .group_by(Test.technique_id) .subquery() ) @@ -55,7 +62,7 @@ def detect_stale_coverage(db: Session) -> int: ) .filter( # Only flag techniques that have a real status (not never-evaluated ones) - Technique.status_global != "not_evaluated" + Technique.status_global != TechniqueStatus.not_evaluated ) .all() ) diff --git a/backend/tests/test_osint_enrichment_service.py b/backend/tests/test_osint_enrichment_service.py new file mode 100644 index 0000000..bfe858d --- /dev/null +++ b/backend/tests/test_osint_enrichment_service.py @@ -0,0 +1,127 @@ +"""Tests for OSINT enrichment (NVD CVE lookup per technique).""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, patch + +import pytest + +from app.models.enums import TechniqueStatus +from app.models.osint_item import OsintItem +from app.models.technique import Technique +from app.services.osint_enrichment_service import ( + NVD_RATE_LIMIT_BATCH, + NVD_RATE_LIMIT_WAIT, + enrich_all_techniques, + enrich_technique_with_cves, +) + +SAMPLE_NVD_RESPONSE = { + "vulnerabilities": [ + { + "cve": { + "id": "CVE-2024-12345", + "descriptions": [{"lang": "en", "value": "Remote code execution example."}], + "metrics": { + "cvssMetricV31": [ + { + "cvssData": { + "baseSeverity": "HIGH", + "baseScore": 8.1, + } + } + ] + }, + } + }, + { + "cve": { + "id": "CVE-2024-99999", + "descriptions": [{"lang": "en", "value": "Another issue."}], + "metrics": {}, + } + }, + ] +} + + +def _technique(db, *, mitre_id="T1059", name="Command and Scripting Interpreter"): + tech = Technique( + mitre_id=mitre_id, + name=name, + tactic="execution", + status_global=TechniqueStatus.in_progress, + ) + db.add(tech) + db.commit() + db.refresh(tech) + return tech + + +@patch("app.services.osint_enrichment_service.requests.get") +def test_enrich_technique_fetches_cves(mock_get, db): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = SAMPLE_NVD_RESPONSE + mock_get.return_value = mock_resp + + tech = _technique(db) + count = enrich_technique_with_cves(db, tech) + + assert count == 2 + items = db.query(OsintItem).filter(OsintItem.technique_id == tech.id).all() + assert len(items) == 2 + assert items[0].source_type == "cve" + assert "CVE-2024" in items[0].source_url + assert tech.review_required is True + mock_get.assert_called_once() + assert mock_get.call_args.kwargs["params"]["keywordSearch"] == tech.name + + +@patch("app.services.osint_enrichment_service.requests.get") +def test_enrich_technique_no_duplicates_on_rerun(mock_get, db): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = SAMPLE_NVD_RESPONSE + mock_get.return_value = mock_resp + + tech = _technique(db) + assert enrich_technique_with_cves(db, tech) == 2 + assert enrich_technique_with_cves(db, tech) == 0 + assert db.query(OsintItem).filter(OsintItem.technique_id == tech.id).count() == 2 + + +@patch("app.services.osint_enrichment_service.requests.get") +def test_enrich_technique_api_error_returns_zero(mock_get, db): + mock_resp = MagicMock() + mock_resp.status_code = 503 + mock_get.return_value = mock_resp + + tech = _technique(db) + assert enrich_technique_with_cves(db, tech) == 0 + assert db.query(OsintItem).count() == 0 + assert tech.review_required is False + + +@patch("app.services.osint_enrichment_service.time.sleep") +@patch("app.services.osint_enrichment_service.requests.get") +def test_enrich_all_techniques_rate_limits_nvd(mock_get, mock_sleep, db): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"vulnerabilities": []} + mock_get.return_value = mock_resp + + for i in range(NVD_RATE_LIMIT_BATCH + 1): + db.add( + Technique( + mitre_id=f"T100{i}", + name=f"Technique {i}", + tactic="execution", + ) + ) + db.commit() + + total = enrich_all_techniques(db) + + assert total == 0 + assert mock_get.call_count == NVD_RATE_LIMIT_BATCH + 1 + mock_sleep.assert_called_once_with(NVD_RATE_LIMIT_WAIT) diff --git a/backend/tests/test_stale_detection_service.py b/backend/tests/test_stale_detection_service.py new file mode 100644 index 0000000..d15395d --- /dev/null +++ b/backend/tests/test_stale_detection_service.py @@ -0,0 +1,78 @@ +"""Tests for stale coverage detection.""" + +from datetime import datetime, timedelta, timezone + +from app.models.enums import TechniqueStatus, TestState +from app.models.technique import Technique +from app.models.test import Test +from app.services.stale_detection_service import STALE_THRESHOLD_DAYS, detect_stale_coverage + + +def _technique(db, *, mitre_id="T1059", status=TechniqueStatus.validated): + tech = Technique( + mitre_id=mitre_id, + name="Command and Scripting Interpreter", + tactic="execution", + status_global=status, + review_required=False, + ) + db.add(tech) + db.commit() + db.refresh(tech) + return tech + + +def _validated_test(db, technique, *, days_ago: int): + validated_at = datetime.now(timezone.utc) - timedelta(days=days_ago) + test = Test( + technique_id=technique.id, + name="Coverage test", + state=TestState.validated, + red_validated_at=validated_at, + blue_validated_at=validated_at, + created_at=validated_at, + ) + db.add(test) + db.commit() + return test + + +def test_stale_technique_flagged_after_threshold(db): + tech = _technique(db) + _validated_test(db, tech, days_ago=STALE_THRESHOLD_DAYS + 30) + + count = detect_stale_coverage(db) + + db.refresh(tech) + assert count == 1 + assert tech.review_required is True + + +def test_recent_validated_technique_not_flagged(db): + tech = _technique(db) + _validated_test(db, tech, days_ago=30) + + count = detect_stale_coverage(db) + + db.refresh(tech) + assert count == 0 + assert tech.review_required is False + + +def test_not_evaluated_never_tested_not_flagged(db): + tech = _technique(db, status=TechniqueStatus.not_evaluated) + + count = detect_stale_coverage(db) + + db.refresh(tech) + assert count == 0 + assert tech.review_required is False + + +def test_stale_detection_idempotent(db): + tech = _technique(db, mitre_id="T1204") + _validated_test(db, tech, days_ago=STALE_THRESHOLD_DAYS + 60) + tech.review_required = True + db.commit() + + assert detect_stale_coverage(db) == 0