feat(scoring): composite recency decay and severity weights persisted in DB [FASE-5.1]

This commit is contained in:
2026-05-18 15:07:12 +02:00
parent 2ee59d4e18
commit 05b221a22d
13 changed files with 588 additions and 154 deletions

View File

@@ -0,0 +1,102 @@
"""Tests for Phase 5 snapshot evolution and breakdown fields."""
from datetime import datetime, timedelta, timezone
from app.models.coverage_snapshot import CoverageSnapshot
from app.models.enums import TechniqueStatus
from app.models.technique import Technique
from app.services.snapshot_service import create_snapshot, get_coverage_evolution
def test_create_snapshot_includes_tactic_breakdown(db, admin_user):
tech = Technique(
mitre_id="T1059",
name="Command and Scripting Interpreter",
tactic="execution",
status_global=TechniqueStatus.validated,
)
db.add(tech)
db.commit()
snap = create_snapshot(db, name="Phase5 test", user_id=admin_user.id)
assert snap.by_tactic
assert "execution" in snap.by_tactic
assert snap.by_status
assert snap.coverage_percentage >= 0
assert snap.never_tested_count >= 0
def test_coverage_evolution_filters_by_months(db, admin_user):
now = datetime.now(timezone.utc)
old = CoverageSnapshot(
name="old",
organization_score=50.0,
total_techniques=10,
validated_count=5,
partial_count=1,
not_covered_count=1,
in_progress_count=1,
not_evaluated_count=2,
coverage_percentage=50.0,
by_tactic={"execution": {"total": 10, "validated": 5}},
by_status={"validated": 5},
stale_count=0,
never_tested_count=2,
created_by=admin_user.id,
)
old.created_at = now - timedelta(days=400)
recent = CoverageSnapshot(
name="recent",
organization_score=70.0,
total_techniques=10,
validated_count=7,
partial_count=1,
not_covered_count=0,
in_progress_count=1,
not_evaluated_count=1,
coverage_percentage=70.0,
by_tactic={"execution": {"total": 10, "validated": 7}},
by_status={"validated": 7},
stale_count=1,
never_tested_count=1,
created_by=admin_user.id,
)
db.add_all([old, recent])
db.commit()
evolution = get_coverage_evolution(db, months=6)
assert len(evolution) == 1
assert evolution[0]["org_score"] == 70.0
assert evolution[0]["coverage_pct"] == 70.0
assert evolution[0]["stale_count"] == 1
def test_evolution_endpoint(client, db, admin_user, admin_token):
snap = CoverageSnapshot(
name="api-evolution",
organization_score=60.0,
total_techniques=5,
validated_count=3,
partial_count=0,
not_covered_count=0,
in_progress_count=0,
not_evaluated_count=2,
coverage_percentage=60.0,
by_tactic={},
by_status={"validated": 3},
stale_count=0,
never_tested_count=2,
)
db.add(snap)
db.commit()
response = client.get(
"/api/v1/snapshots/evolution?months=12",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) >= 1
assert "org_score" in data[0]

View File

@@ -18,7 +18,10 @@ from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqu
from app.models.compliance import ComplianceFramework, ComplianceControl, ComplianceControlMapping
from app.models.audit import AuditLog
from app.models.enums import TestState, TestResult, TechniqueStatus
from app.models.scoring_config import ScoringConfig
from app.services.scoring_config_service import get_scoring_weights, update_scoring_weights
from app.services.scoring_service import (
_recency_factor,
calculate_technique_score,
calculate_tactic_score,
calculate_organization_score,
@@ -175,9 +178,11 @@ class TestScoring:
assert result["breakdown"]["tests_validated"]["score"] > 0
def test_technique_score_no_tests(self, db, sample_technique_no_tests):
"""Técnica sin tests → score 0."""
"""Técnica sin tests → solo puede puntuar severidad por defecto."""
result = calculate_technique_score(sample_technique_no_tests, db)
assert result["total_score"] == 0
assert result["breakdown"]["tests_validated"]["score"] == 0
assert result["breakdown"]["recency"]["score"] == 0
assert result["total_score"] == result["breakdown"]["severity"]["score"]
def test_technique_score_partial_detection(self, db, sample_technique, validated_tests):
"""Técnica con detección parcial → score intermedio."""
@@ -187,8 +192,8 @@ class TestScoring:
breakdown = result["breakdown"]
assert "2/3" in breakdown["tests_validated"]["detail"]
def test_technique_score_freshness_penalty(self, db, sample_technique, admin_user):
"""Tests > 180 días → penalización en freshness."""
def test_technique_score_recency_penalty(self, db, sample_technique, admin_user):
"""Tests > 180 días → factor de recencia reducido."""
old_date = datetime.utcnow() - timedelta(days=200)
test = Test(
technique_id=sample_technique.id,
@@ -198,14 +203,53 @@ class TestScoring:
created_by=admin_user.id,
platform="windows",
red_validated_at=old_date,
blue_validated_at=old_date,
)
db.add(test)
db.commit()
result = calculate_technique_score(sample_technique, db)
# Freshness should be 0 for tests > 180 days old
assert result["breakdown"]["freshness"]["score"] == 0
assert "200" in result["breakdown"]["freshness"]["detail"]
assert result["breakdown"]["recency"]["score"] == 5.0 # 0.5 * 10 (181365 días)
assert "200" in result["breakdown"]["recency"]["detail"]
def test_recency_recent_scores_higher_than_old(self, db, sample_technique, admin_user):
"""Mismo resultado de detección: test reciente puntúa más que uno de hace 1 año."""
now = datetime.utcnow()
for days_ago in (1, 400):
db.add(
Test(
technique_id=sample_technique.id,
name=f"Recency {days_ago}",
state=TestState.validated,
detection_result=TestResult.detected,
created_by=admin_user.id,
platform="windows",
red_validated_at=now - timedelta(days=days_ago),
blue_validated_at=now - timedelta(days=days_ago),
)
)
db.commit()
result = calculate_technique_score(sample_technique, db)
assert _recency_factor(now - timedelta(days=1)) == 1.0
assert _recency_factor(now - timedelta(days=400)) == 0.2
assert result["breakdown"]["recency"]["score"] == 10.0
def test_scoring_weights_persist_in_database(self, db, sample_technique, validated_tests):
"""Cambiar pesos en BD se refleja en el breakdown."""
update_scoring_weights(db, tests=50, detection_rules=20, d3fend=10, recency=10, severity=10)
db.commit()
score = calculate_technique_score(sample_technique, db)
assert score["breakdown"]["tests_validated"]["max"] == 50
row = db.query(ScoringConfig).first()
assert row is not None
assert row.weight_tests == 50
update_scoring_weights(db, tests=40, detection_rules=25, d3fend=15, recency=10, severity=10)
db.commit()
assert get_scoring_weights(db).tests == 40
def test_scoring_weights_configurable(self, db, sample_technique, validated_tests):
"""Scoring weights are reflected in the breakdown max values."""

View File

@@ -84,13 +84,13 @@ class TestScoringWeights:
assert w.tests == 40.0
assert w.detection_rules == 25.0
assert w.d3fend == 15.0
assert w.freshness == 10.0
assert w.platform_diversity == 10.0
assert w.recency == 10.0
assert w.severity == 10.0
def test_valid_custom(self):
w = ScoringWeights(
tests=50, detection_rules=20, d3fend=10,
freshness=10, platform_diversity=10,
recency=10, severity=10,
)
assert w.tests == 50
@@ -98,14 +98,14 @@ class TestScoringWeights:
with pytest.raises(ValueError, match="sum to 100"):
ScoringWeights(
tests=50, detection_rules=20, d3fend=10,
freshness=10, platform_diversity=5,
recency=10, severity=5,
)
def test_invalid_negative_weight(self):
with pytest.raises(ValueError, match="non-negative"):
ScoringWeights(
tests=-10, detection_rules=40, d3fend=30,
freshness=20, platform_diversity=20,
recency=20, severity=20,
)
def test_immutable(self):