"""Tests for scoring, operational metrics, and compliance — T-236. Uses the in-memory SQLite test database from conftest.py to verify calculations with known data. """ import uuid from datetime import datetime, timedelta import pytest from app.models.technique import Technique from app.models.test import Test from app.models.test_template import TestTemplate from app.models.detection_rule import DetectionRule from app.models.test_detection_result import TestDetectionResult from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqueMapping from app.models.compliance import ComplianceFramework, ComplianceControl, ComplianceControlMapping from app.models.audit import AuditLog from app.models.enums import TestState, TestResult, TechniqueStatus from app.services.scoring_service import ( calculate_technique_score, calculate_tactic_score, calculate_organization_score, ) from app.services.operational_metrics_service import ( calculate_mttd, calculate_mttr, calculate_detection_efficacy, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def sample_technique(db): """Create a technique with known data.""" tech = Technique( mitre_id="T1059", name="Command and Scripting Interpreter", tactic="execution", platforms=["windows", "linux", "macos"], status_global=TechniqueStatus.validated, ) db.add(tech) db.commit() db.refresh(tech) return tech @pytest.fixture def sample_technique_no_tests(db): """Create a technique with no tests.""" tech = Technique( mitre_id="T9999", name="No Tests Technique", tactic="discovery", platforms=["windows"], status_global=TechniqueStatus.not_evaluated, ) db.add(tech) db.commit() db.refresh(tech) return tech @pytest.fixture def validated_tests(db, sample_technique, admin_user): """Create multiple validated tests with detection results.""" now = datetime.utcnow() tests = [] for i, result in enumerate([TestResult.detected, TestResult.detected, TestResult.not_detected]): test = Test( technique_id=sample_technique.id, name=f"Test {i+1} for T1059", state=TestState.validated, detection_result=result, created_by=admin_user.id, platform=["windows", "linux", "macos"][i % 3], red_validated_at=now - timedelta(days=i * 30), blue_validated_at=now - timedelta(days=i * 30), created_at=now - timedelta(days=i * 30 + 5), ) db.add(test) tests.append(test) db.commit() for t in tests: db.refresh(t) return tests @pytest.fixture def compliance_setup(db, sample_technique, sample_technique_no_tests): """Create a compliance framework with controls mapped to techniques.""" framework = ComplianceFramework( name="NIST 800-53", version="5.0", description="NIST Special Publication 800-53", ) db.add(framework) db.flush() # Control 1: mapped to validated technique control1 = ComplianceControl( framework_id=framework.id, control_id="AC-2", title="Account Management", category="Access Control", ) db.add(control1) db.flush() mapping1 = ComplianceControlMapping( compliance_control_id=control1.id, technique_id=sample_technique.id, ) db.add(mapping1) # Control 2: mapped to technique with no tests control2 = ComplianceControl( framework_id=framework.id, control_id="SI-4", title="Information System Monitoring", category="System and Information Integrity", ) db.add(control2) db.flush() mapping2 = ComplianceControlMapping( compliance_control_id=control2.id, technique_id=sample_technique_no_tests.id, ) db.add(mapping2) db.commit() return { "framework": framework, "control_covered": control1, "control_not_covered": control2, } # ═══════════════════════════════════════════════════════════════════════ # Scoring Tests # ═══════════════════════════════════════════════════════════════════════ class TestScoring: def test_technique_score_all_detected(self, db, sample_technique, admin_user): """Técnica con todos los tests detected → score alto.""" now = datetime.utcnow() for i in range(3): test = Test( technique_id=sample_technique.id, name=f"All Detected {i}", state=TestState.validated, detection_result=TestResult.detected, created_by=admin_user.id, platform=["windows", "linux", "macos"][i], red_validated_at=now - timedelta(days=10), ) db.add(test) db.commit() result = calculate_technique_score(sample_technique, db) assert result["total_score"] > 0 # Test component should be maxed out (all detected) assert result["breakdown"]["tests_validated"]["score"] > 0 def test_technique_score_no_tests(self, db, sample_technique_no_tests): """Técnica sin tests → score 0.""" result = calculate_technique_score(sample_technique_no_tests, db) assert result["total_score"] == 0 def test_technique_score_partial_detection(self, db, sample_technique, validated_tests): """Técnica con detección parcial → score intermedio.""" result = calculate_technique_score(sample_technique, db) # 2 detected out of 3 validated → partial score assert 0 < result["total_score"] < 100 breakdown = result["breakdown"] assert "2/3" in breakdown["tests_validated"]["detail"] def test_technique_score_freshness_penalty(self, db, sample_technique, admin_user): """Tests > 180 días → penalización en freshness.""" old_date = datetime.utcnow() - timedelta(days=200) test = Test( technique_id=sample_technique.id, name="Old Test", state=TestState.validated, detection_result=TestResult.detected, created_by=admin_user.id, platform="windows", red_validated_at=old_date, ) db.add(test) db.commit() result = calculate_technique_score(sample_technique, db) # Freshness should be 0 for tests > 180 days old assert result["breakdown"]["freshness"]["score"] == 0 assert "200" in result["breakdown"]["freshness"]["detail"] def test_scoring_weights_configurable(self, db, sample_technique, validated_tests): """Cambiar pesos cambia el score resultante.""" from app.config import settings original_weight = settings.SCORING_WEIGHT_TESTS score1 = calculate_technique_score(sample_technique, db) # Change weight settings.SCORING_WEIGHT_TESTS = 80 score2 = calculate_technique_score(sample_technique, db) # Restore settings.SCORING_WEIGHT_TESTS = original_weight # Different weights should produce different scores assert score1["total_score"] != score2["total_score"] def test_organization_score_aggregation(self, db, sample_technique, validated_tests): """Score global agrega correctamente los scores de técnicas.""" result = calculate_organization_score(db) assert result["techniques_total"] >= 1 assert result["overall_score"] >= 0 assert result["techniques_evaluated"] >= 0 # ═══════════════════════════════════════════════════════════════════════ # Operational Metrics Tests # ═══════════════════════════════════════════════════════════════════════ class TestOperationalMetrics: def test_mttd_calculation(self, db, sample_technique, admin_user): """MTTD se calcula desde timestamps del audit_log.""" now = datetime.utcnow() test = Test( technique_id=sample_technique.id, name="MTTD Test", state=TestState.validated, created_by=admin_user.id, ) db.add(test) db.flush() # Create audit log entries for state transitions start_log = AuditLog( user_id=admin_user.id, action="start_execution", entity_type="test", entity_id=str(test.id), timestamp=now - timedelta(hours=5), ) submit_log = AuditLog( user_id=admin_user.id, action="submit_red", entity_type="test", entity_id=str(test.id), timestamp=now - timedelta(hours=2), ) db.add(start_log) db.add(submit_log) db.commit() result = calculate_mttd(db) # Should have data (3 hours between start and submit) if result is not None: assert result["sample_size"] >= 1 assert result["mean_hours"] >= 0 def test_mttr_calculation(self, db, sample_technique, admin_user): """MTTR incluye tiempo de remediación.""" now = datetime.utcnow() test = Test( technique_id=sample_technique.id, name="MTTR Test", state=TestState.validated, remediation_status="completed", blue_validated_at=now - timedelta(hours=48), created_by=admin_user.id, ) db.add(test) db.flush() # Audit log for remediation completion log = AuditLog( user_id=admin_user.id, action="update_remediation", entity_type="test", entity_id=str(test.id), timestamp=now - timedelta(hours=24), ) db.add(log) db.commit() result = calculate_mttr(db) if result is not None: assert result["sample_size"] >= 1 assert result["mean_hours"] > 0 def test_detection_efficacy(self, db, sample_technique, validated_tests): """Detection efficacy con datos de prueba conocidos.""" result = calculate_detection_efficacy(db) assert result["total"] == 3 assert result["detected"] == 2 assert result["not_detected"] == 1 expected_pct = round((2 / 3) * 100, 1) assert result["percentage"] == expected_pct def test_metrics_with_no_data(self, db): """Métricas retornan null/cero cuando no hay datos suficientes.""" mttd = calculate_mttd(db) mttr = calculate_mttr(db) efficacy = calculate_detection_efficacy(db) assert mttd is None assert mttr is None assert efficacy["total"] == 0 assert efficacy["percentage"] == 0 # ═══════════════════════════════════════════════════════════════════════ # Compliance Tests # ═══════════════════════════════════════════════════════════════════════ class TestCompliance: def test_control_fully_covered(self, db, sample_technique, validated_tests, compliance_setup): """Control con todas las técnicas validated → covered.""" control = compliance_setup["control_covered"] mappings = ( db.query(ComplianceControlMapping) .filter(ComplianceControlMapping.compliance_control_id == control.id) .all() ) assert len(mappings) == 1 # The mapped technique has validated tests technique = mappings[0].technique assert technique.status_global == TechniqueStatus.validated def test_control_not_covered(self, db, compliance_setup): """Control con todas las técnicas sin tests → not_covered.""" control = compliance_setup["control_not_covered"] mappings = ( db.query(ComplianceControlMapping) .filter(ComplianceControlMapping.compliance_control_id == control.id) .all() ) assert len(mappings) == 1 technique = mappings[0].technique assert technique.status_global == TechniqueStatus.not_evaluated def test_control_partially_covered(self, db, sample_technique, sample_technique_no_tests, admin_user, compliance_setup): """Control con técnicas mixtas → partially_covered.""" control = compliance_setup["control_covered"] # Add second mapping to the not-evaluated technique mapping = ComplianceControlMapping( compliance_control_id=control.id, technique_id=sample_technique_no_tests.id, ) db.add(mapping) db.commit() # Now this control has two techniques: one validated, one not_evaluated mappings = ( db.query(ComplianceControlMapping) .filter(ComplianceControlMapping.compliance_control_id == control.id) .all() ) assert len(mappings) == 2 statuses = [m.technique.status_global for m in mappings] assert TechniqueStatus.validated in statuses assert TechniqueStatus.not_evaluated in statuses def test_compliance_percentage(self, db, sample_technique, validated_tests, compliance_setup): """Porcentaje global de compliance calculado correctamente.""" framework = compliance_setup["framework"] controls = ( db.query(ComplianceControl) .filter(ComplianceControl.framework_id == framework.id) .all() ) assert len(controls) == 2 covered = 0 total = len(controls) for control in controls: mappings = control.technique_mappings if all( m.technique.status_global in (TechniqueStatus.validated, TechniqueStatus.partial) for m in mappings ): covered += 1 pct = round((covered / total) * 100, 1) assert pct == 50.0 # 1 out of 2 controls covered def test_compliance_gaps(self, db, compliance_setup): """Gaps retorna solo controles no cubiertos con sus técnicas.""" framework = compliance_setup["framework"] controls = ( db.query(ComplianceControl) .filter(ComplianceControl.framework_id == framework.id) .all() ) gaps = [] for control in controls: mappings = control.technique_mappings uncovered_techniques = [ m.technique for m in mappings if m.technique.status_global in (TechniqueStatus.not_evaluated, TechniqueStatus.not_covered) ] if uncovered_techniques: gaps.append({ "control_id": control.control_id, "title": control.title, "uncovered_techniques": [t.mitre_id for t in uncovered_techniques], }) assert len(gaps) >= 1 si4_gap = next((g for g in gaps if g["control_id"] == "SI-4"), None) assert si4_gap is not None assert "T9999" in si4_gap["uncovered_techniques"]