Files
Aegis/backend/app/services/stale_detection_service.py

86 lines
2.6 KiB
Python

"""Stale coverage detection — marks techniques whose last validated test
is older than a configurable threshold.
This is the simple version. The full Decay Engine (Fase 8) will replace
this with a multi-factor, configurable decay model with confidence scores.
"""
import logging
from datetime import datetime, timedelta, timezone
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.config import settings
from app.models.enums import TechniqueStatus, TestState
from app.models.technique import Technique
from app.models.test import Test
logger = logging.getLogger(__name__)
STALE_THRESHOLD_DAYS = settings.STALE_THRESHOLD_DAYS
def detect_stale_coverage(db: Session) -> int:
"""Scan all techniques and flag those with stale coverage.
A technique is considered stale when:
- It has a status other than ``not_evaluated``, AND
- Its most recent *validated* test is older than *STALE_THRESHOLD_DAYS*, OR
- It has never had a validated test (but has been manually marked as
covered/partial).
Returns the number of newly-flagged techniques.
"""
cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_THRESHOLD_DAYS)
last_validated = func.coalesce(
Test.blue_validated_at,
Test.red_validated_at,
Test.created_at,
)
# Subquery: latest validated test date per technique
latest_test = (
db.query(
Test.technique_id,
func.max(last_validated).label("last_tested"),
)
.filter(Test.state == TestState.validated)
.group_by(Test.technique_id)
.subquery()
)
# Find techniques that are stale
stale_techniques = (
db.query(Technique)
.outerjoin(latest_test, Technique.id == latest_test.c.technique_id)
.filter(
# Either tested before cutoff, or never tested at all
(latest_test.c.last_tested < cutoff)
| (latest_test.c.last_tested.is_(None))
)
.filter(
# Only flag techniques that have a real status (not never-evaluated ones)
Technique.status_global != TechniqueStatus.not_evaluated
)
.all()
)
count = 0
for tech in stale_techniques:
if not tech.review_required:
tech.review_required = True
count += 1
logger.info("Marked %s as stale coverage", tech.mitre_id)
if count > 0:
db.commit()
logger.info(
"Stale coverage detection complete — %d techniques flagged", count
)
else:
logger.info("Stale coverage detection complete — no new stale techniques")
return count