Files
Aegis/backend/tests/test_scoring_and_compliance.py

440 lines
16 KiB
Python

"""Tests for scoring, operational metrics, and compliance — T-236.
Uses the in-memory SQLite test database from conftest.py to verify
calculations with known data.
"""
import uuid
from datetime import datetime, timedelta
import pytest
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_template import TestTemplate
from app.models.detection_rule import DetectionRule
from app.models.test_detection_result import TestDetectionResult
from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqueMapping
from app.models.compliance import ComplianceFramework, ComplianceControl, ComplianceControlMapping
from app.models.audit import AuditLog
from app.models.enums import TestState, TestResult, TechniqueStatus
from app.services.scoring_service import (
calculate_technique_score,
calculate_tactic_score,
calculate_organization_score,
)
from app.services.operational_metrics_service import (
calculate_mttd,
calculate_mttr,
calculate_detection_efficacy,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def sample_technique(db):
"""Create a technique with known data."""
tech = Technique(
mitre_id="T1059",
name="Command and Scripting Interpreter",
tactic="execution",
platforms=["windows", "linux", "macos"],
status_global=TechniqueStatus.validated,
)
db.add(tech)
db.commit()
db.refresh(tech)
return tech
@pytest.fixture
def sample_technique_no_tests(db):
"""Create a technique with no tests."""
tech = Technique(
mitre_id="T9999",
name="No Tests Technique",
tactic="discovery",
platforms=["windows"],
status_global=TechniqueStatus.not_evaluated,
)
db.add(tech)
db.commit()
db.refresh(tech)
return tech
@pytest.fixture
def validated_tests(db, sample_technique, admin_user):
"""Create multiple validated tests with detection results."""
now = datetime.utcnow()
tests = []
for i, result in enumerate([TestResult.detected, TestResult.detected, TestResult.not_detected]):
test = Test(
technique_id=sample_technique.id,
name=f"Test {i+1} for T1059",
state=TestState.validated,
detection_result=result,
created_by=admin_user.id,
platform=["windows", "linux", "macos"][i % 3],
red_validated_at=now - timedelta(days=i * 30),
blue_validated_at=now - timedelta(days=i * 30),
created_at=now - timedelta(days=i * 30 + 5),
)
db.add(test)
tests.append(test)
db.commit()
for t in tests:
db.refresh(t)
return tests
@pytest.fixture
def compliance_setup(db, sample_technique, sample_technique_no_tests):
"""Create a compliance framework with controls mapped to techniques."""
framework = ComplianceFramework(
name="NIST 800-53",
version="5.0",
description="NIST Special Publication 800-53",
)
db.add(framework)
db.flush()
# Control 1: mapped to validated technique
control1 = ComplianceControl(
framework_id=framework.id,
control_id="AC-2",
title="Account Management",
category="Access Control",
)
db.add(control1)
db.flush()
mapping1 = ComplianceControlMapping(
compliance_control_id=control1.id,
technique_id=sample_technique.id,
)
db.add(mapping1)
# Control 2: mapped to technique with no tests
control2 = ComplianceControl(
framework_id=framework.id,
control_id="SI-4",
title="Information System Monitoring",
category="System and Information Integrity",
)
db.add(control2)
db.flush()
mapping2 = ComplianceControlMapping(
compliance_control_id=control2.id,
technique_id=sample_technique_no_tests.id,
)
db.add(mapping2)
db.commit()
return {
"framework": framework,
"control_covered": control1,
"control_not_covered": control2,
}
# ═══════════════════════════════════════════════════════════════════════
# Scoring Tests
# ═══════════════════════════════════════════════════════════════════════
class TestScoring:
def test_technique_score_all_detected(self, db, sample_technique, admin_user):
"""Técnica con todos los tests detected → score alto."""
now = datetime.utcnow()
for i in range(3):
test = Test(
technique_id=sample_technique.id,
name=f"All Detected {i}",
state=TestState.validated,
detection_result=TestResult.detected,
created_by=admin_user.id,
platform=["windows", "linux", "macos"][i],
red_validated_at=now - timedelta(days=10),
)
db.add(test)
db.commit()
result = calculate_technique_score(sample_technique, db)
assert result["total_score"] > 0
# Test component should be maxed out (all detected)
assert result["breakdown"]["tests_validated"]["score"] > 0
def test_technique_score_no_tests(self, db, sample_technique_no_tests):
"""Técnica sin tests → score 0."""
result = calculate_technique_score(sample_technique_no_tests, db)
assert result["total_score"] == 0
def test_technique_score_partial_detection(self, db, sample_technique, validated_tests):
"""Técnica con detección parcial → score intermedio."""
result = calculate_technique_score(sample_technique, db)
# 2 detected out of 3 validated → partial score
assert 0 < result["total_score"] < 100
breakdown = result["breakdown"]
assert "2/3" in breakdown["tests_validated"]["detail"]
def test_technique_score_freshness_penalty(self, db, sample_technique, admin_user):
"""Tests > 180 días → penalización en freshness."""
old_date = datetime.utcnow() - timedelta(days=200)
test = Test(
technique_id=sample_technique.id,
name="Old Test",
state=TestState.validated,
detection_result=TestResult.detected,
created_by=admin_user.id,
platform="windows",
red_validated_at=old_date,
)
db.add(test)
db.commit()
result = calculate_technique_score(sample_technique, db)
# Freshness should be 0 for tests > 180 days old
assert result["breakdown"]["freshness"]["score"] == 0
assert "200" in result["breakdown"]["freshness"]["detail"]
def test_scoring_weights_configurable(self, db, sample_technique, validated_tests):
"""Cambiar pesos cambia el score resultante."""
from app.config import settings
original_weight = settings.SCORING_WEIGHT_TESTS
score1 = calculate_technique_score(sample_technique, db)
# Change weight
settings.SCORING_WEIGHT_TESTS = 80
score2 = calculate_technique_score(sample_technique, db)
# Restore
settings.SCORING_WEIGHT_TESTS = original_weight
# Different weights should produce different scores
assert score1["total_score"] != score2["total_score"]
def test_organization_score_aggregation(self, db, sample_technique, validated_tests):
"""Score global agrega correctamente los scores de técnicas."""
result = calculate_organization_score(db)
assert result["techniques_total"] >= 1
assert result["overall_score"] >= 0
assert result["techniques_evaluated"] >= 0
# ═══════════════════════════════════════════════════════════════════════
# Operational Metrics Tests
# ═══════════════════════════════════════════════════════════════════════
class TestOperationalMetrics:
def test_mttd_calculation(self, db, sample_technique, admin_user):
"""MTTD se calcula desde timestamps del audit_log."""
now = datetime.utcnow()
test = Test(
technique_id=sample_technique.id,
name="MTTD Test",
state=TestState.validated,
created_by=admin_user.id,
)
db.add(test)
db.flush()
# Create audit log entries for state transitions
start_log = AuditLog(
user_id=admin_user.id,
action="start_execution",
entity_type="test",
entity_id=str(test.id),
timestamp=now - timedelta(hours=5),
)
submit_log = AuditLog(
user_id=admin_user.id,
action="submit_red",
entity_type="test",
entity_id=str(test.id),
timestamp=now - timedelta(hours=2),
)
db.add(start_log)
db.add(submit_log)
db.commit()
result = calculate_mttd(db)
# Should have data (3 hours between start and submit)
if result is not None:
assert result["sample_size"] >= 1
assert result["mean_hours"] >= 0
def test_mttr_calculation(self, db, sample_technique, admin_user):
"""MTTR incluye tiempo de remediación."""
now = datetime.utcnow()
test = Test(
technique_id=sample_technique.id,
name="MTTR Test",
state=TestState.validated,
remediation_status="completed",
blue_validated_at=now - timedelta(hours=48),
created_by=admin_user.id,
)
db.add(test)
db.flush()
# Audit log for remediation completion
log = AuditLog(
user_id=admin_user.id,
action="update_remediation",
entity_type="test",
entity_id=str(test.id),
timestamp=now - timedelta(hours=24),
)
db.add(log)
db.commit()
result = calculate_mttr(db)
if result is not None:
assert result["sample_size"] >= 1
assert result["mean_hours"] > 0
def test_detection_efficacy(self, db, sample_technique, validated_tests):
"""Detection efficacy con datos de prueba conocidos."""
result = calculate_detection_efficacy(db)
assert result["total"] == 3
assert result["detected"] == 2
assert result["not_detected"] == 1
expected_pct = round((2 / 3) * 100, 1)
assert result["percentage"] == expected_pct
def test_metrics_with_no_data(self, db):
"""Métricas retornan null/cero cuando no hay datos suficientes."""
mttd = calculate_mttd(db)
mttr = calculate_mttr(db)
efficacy = calculate_detection_efficacy(db)
assert mttd is None
assert mttr is None
assert efficacy["total"] == 0
assert efficacy["percentage"] == 0
# ═══════════════════════════════════════════════════════════════════════
# Compliance Tests
# ═══════════════════════════════════════════════════════════════════════
class TestCompliance:
def test_control_fully_covered(self, db, sample_technique, validated_tests, compliance_setup):
"""Control con todas las técnicas validated → covered."""
control = compliance_setup["control_covered"]
mappings = (
db.query(ComplianceControlMapping)
.filter(ComplianceControlMapping.compliance_control_id == control.id)
.all()
)
assert len(mappings) == 1
# The mapped technique has validated tests
technique = mappings[0].technique
assert technique.status_global == TechniqueStatus.validated
def test_control_not_covered(self, db, compliance_setup):
"""Control con todas las técnicas sin tests → not_covered."""
control = compliance_setup["control_not_covered"]
mappings = (
db.query(ComplianceControlMapping)
.filter(ComplianceControlMapping.compliance_control_id == control.id)
.all()
)
assert len(mappings) == 1
technique = mappings[0].technique
assert technique.status_global == TechniqueStatus.not_evaluated
def test_control_partially_covered(self, db, sample_technique, sample_technique_no_tests, admin_user, compliance_setup):
"""Control con técnicas mixtas → partially_covered."""
control = compliance_setup["control_covered"]
# Add second mapping to the not-evaluated technique
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=sample_technique_no_tests.id,
)
db.add(mapping)
db.commit()
# Now this control has two techniques: one validated, one not_evaluated
mappings = (
db.query(ComplianceControlMapping)
.filter(ComplianceControlMapping.compliance_control_id == control.id)
.all()
)
assert len(mappings) == 2
statuses = [m.technique.status_global for m in mappings]
assert TechniqueStatus.validated in statuses
assert TechniqueStatus.not_evaluated in statuses
def test_compliance_percentage(self, db, sample_technique, validated_tests, compliance_setup):
"""Porcentaje global de compliance calculado correctamente."""
framework = compliance_setup["framework"]
controls = (
db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
)
assert len(controls) == 2
covered = 0
total = len(controls)
for control in controls:
mappings = control.technique_mappings
if all(
m.technique.status_global in (TechniqueStatus.validated, TechniqueStatus.partial)
for m in mappings
):
covered += 1
pct = round((covered / total) * 100, 1)
assert pct == 50.0 # 1 out of 2 controls covered
def test_compliance_gaps(self, db, compliance_setup):
"""Gaps retorna solo controles no cubiertos con sus técnicas."""
framework = compliance_setup["framework"]
controls = (
db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
)
gaps = []
for control in controls:
mappings = control.technique_mappings
uncovered_techniques = [
m.technique
for m in mappings
if m.technique.status_global in (TechniqueStatus.not_evaluated, TechniqueStatus.not_covered)
]
if uncovered_techniques:
gaps.append({
"control_id": control.control_id,
"title": control.title,
"uncovered_techniques": [t.mitre_id for t in uncovered_techniques],
})
assert len(gaps) >= 1
si4_gap = next((g for g in gaps if g["control_id"] == "SI-4"), None)
assert si4_gap is not None
assert "T9999" in si4_gap["uncovered_techniques"]