Files
Aegis/backend/app/services/stale_detection_service.py
Kitos 222979574a feat(phase-38): automatic intelligence — OSINT enrichment + stale coverage detection
Tarea 4.1 — OSINT Enrichment:
- Add OsintItem model with source_type, severity, CVSS metadata, review flag
- Add Alembic migration b022 with osint_items table and optimized indexes
- Add osint_enrichment_service with NVD API integration, deduplication, rate limiting
- Add OSINT router: GET /osint/items, /osint/summary, /osint/technique/{id}
- Add POST /osint/items/{id}/review to mark items as reviewed
- Add POST /osint/enrich/{technique_id} for manual single-technique enrichment
- Techniques with new CVEs are automatically flagged review_required=True
- Register weekly enrichment job in APScheduler
- Add NVD_API_KEY config setting for optional increased rate limits

Tarea 4.2 — Stale Coverage Detection:
- Add stale_detection_service that flags techniques with no validated test
  in the last N days, or never-validated but with a coverage status
- Configurable threshold via STALE_THRESHOLD_DAYS setting (default 365)
- Register daily stale detection job in APScheduler
- Only flags techniques not already marked review_required
2026-02-17 17:47:47 +01:00

79 lines
2.4 KiB
Python

"""Stale coverage detection — marks techniques whose last validated test
is older than a configurable threshold.
This is the simple version. The full Decay Engine (Fase 8) will replace
this with a multi-factor, configurable decay model with confidence scores.
"""
import logging
from datetime import datetime, timedelta
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.config import settings
from app.models.technique import Technique
from app.models.test import Test
logger = logging.getLogger(__name__)
STALE_THRESHOLD_DAYS = getattr(settings, "STALE_THRESHOLD_DAYS", 365)
def detect_stale_coverage(db: Session) -> int:
"""Scan all techniques and flag those with stale coverage.
A technique is considered stale when:
- It has a status other than ``not_evaluated``, AND
- Its most recent *validated* test is older than *STALE_THRESHOLD_DAYS*, OR
- It has never had a validated test (but has been manually marked as
covered/partial).
Returns the number of newly-flagged techniques.
"""
cutoff = datetime.utcnow() - timedelta(days=STALE_THRESHOLD_DAYS)
# Subquery: latest validated test date per technique
latest_test = (
db.query(
Test.technique_id,
func.max(Test.created_at).label("last_tested"),
)
.filter(Test.state == "validated")
.group_by(Test.technique_id)
.subquery()
)
# Find techniques that are stale
stale_techniques = (
db.query(Technique)
.outerjoin(latest_test, Technique.id == latest_test.c.technique_id)
.filter(
# Either tested before cutoff, or never tested at all
(latest_test.c.last_tested < cutoff)
| (latest_test.c.last_tested.is_(None))
)
.filter(
# Only flag techniques that have a real status (not never-evaluated ones)
Technique.status_global != "not_evaluated"
)
.all()
)
count = 0
for tech in stale_techniques:
if not tech.review_required:
tech.review_required = True
count += 1
logger.info("Marked %s as stale coverage", tech.mitre_id)
if count > 0:
db.commit()
logger.info(
"Stale coverage detection complete — %d techniques flagged", count
)
else:
logger.info("Stale coverage detection complete — no new stale techniques")
return count