Files
Aegis/backend/tests/test_audit_trail.py

85 lines
2.6 KiB
Python

"""Tests for enhanced audit trail (IP, user-agent, integrity hash)."""
from datetime import datetime, timedelta, timezone
import pytest
from app.middleware.request_context import request_ip, request_user_agent
from app.models.audit import AuditLog
from app.services.audit_service import (
compute_integrity_hash,
log_action,
verify_audit_integrity,
)
from app.jobs.retention_job import apply_retention_policies
class TestAuditIntegrity:
def test_integrity_hash_set_on_log(self, db):
entry = log_action(
db,
user_id=None,
action="test_action",
entity_type="test",
entity_id="abc",
ip_address="10.0.0.1",
user_agent="pytest",
)
db.commit()
db.refresh(entry)
assert entry.integrity_hash
assert len(entry.integrity_hash) == 64
assert verify_audit_integrity(entry)
def test_tampered_entry_fails_integrity(self, db):
entry = log_action(
db,
user_id=None,
action="test_action",
entity_type="test",
entity_id="abc",
)
db.commit()
entry.entity_id = "tampered"
assert not verify_audit_integrity(entry)
def test_recomputed_hash_matches_stored(self, db):
entry = log_action(db, None, "update", "user", "1")
db.commit()
assert entry.integrity_hash == compute_integrity_hash(entry)
class TestRequestContext:
def test_context_vars_used_by_log_action(self, db):
token_ip = request_ip.set("203.0.113.42")
token_ua = request_user_agent.set("AegisTestClient/1.0")
try:
entry = log_action(db, None, "ctx_action", "system", None)
db.commit()
assert entry.ip_address == "203.0.113.42"
assert entry.user_agent == "AegisTestClient/1.0"
finally:
request_ip.reset(token_ip)
request_user_agent.reset(token_ua)
class TestRetentionJob:
def test_deletes_old_audit_logs(self, db):
old = AuditLog(
action="old",
entity_type="system",
timestamp=datetime.now(timezone.utc) - timedelta(days=800),
)
recent = AuditLog(
action="recent",
entity_type="system",
timestamp=datetime.now(timezone.utc) - timedelta(days=1),
)
db.add_all([old, recent])
db.commit()
summary = apply_retention_policies(db)
assert summary["audit_logs_deleted"] >= 1
remaining = db.query(AuditLog).all()
assert all(log.action != "old" for log in remaining)