85 lines
2.6 KiB
Python
85 lines
2.6 KiB
Python
"""Tests for enhanced audit trail (IP, user-agent, integrity hash)."""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
|
|
from app.middleware.request_context import request_ip, request_user_agent
|
|
from app.models.audit import AuditLog
|
|
from app.services.audit_service import (
|
|
compute_integrity_hash,
|
|
log_action,
|
|
verify_audit_integrity,
|
|
)
|
|
from app.jobs.retention_job import apply_retention_policies
|
|
|
|
|
|
class TestAuditIntegrity:
|
|
def test_integrity_hash_set_on_log(self, db):
|
|
entry = log_action(
|
|
db,
|
|
user_id=None,
|
|
action="test_action",
|
|
entity_type="test",
|
|
entity_id="abc",
|
|
ip_address="10.0.0.1",
|
|
user_agent="pytest",
|
|
)
|
|
db.commit()
|
|
db.refresh(entry)
|
|
assert entry.integrity_hash
|
|
assert len(entry.integrity_hash) == 64
|
|
assert verify_audit_integrity(entry)
|
|
|
|
def test_tampered_entry_fails_integrity(self, db):
|
|
entry = log_action(
|
|
db,
|
|
user_id=None,
|
|
action="test_action",
|
|
entity_type="test",
|
|
entity_id="abc",
|
|
)
|
|
db.commit()
|
|
entry.entity_id = "tampered"
|
|
assert not verify_audit_integrity(entry)
|
|
|
|
def test_recomputed_hash_matches_stored(self, db):
|
|
entry = log_action(db, None, "update", "user", "1")
|
|
db.commit()
|
|
assert entry.integrity_hash == compute_integrity_hash(entry)
|
|
|
|
|
|
class TestRequestContext:
|
|
def test_context_vars_used_by_log_action(self, db):
|
|
token_ip = request_ip.set("203.0.113.42")
|
|
token_ua = request_user_agent.set("AegisTestClient/1.0")
|
|
try:
|
|
entry = log_action(db, None, "ctx_action", "system", None)
|
|
db.commit()
|
|
assert entry.ip_address == "203.0.113.42"
|
|
assert entry.user_agent == "AegisTestClient/1.0"
|
|
finally:
|
|
request_ip.reset(token_ip)
|
|
request_user_agent.reset(token_ua)
|
|
|
|
|
|
class TestRetentionJob:
|
|
def test_deletes_old_audit_logs(self, db):
|
|
old = AuditLog(
|
|
action="old",
|
|
entity_type="system",
|
|
timestamp=datetime.now(timezone.utc) - timedelta(days=800),
|
|
)
|
|
recent = AuditLog(
|
|
action="recent",
|
|
entity_type="system",
|
|
timestamp=datetime.now(timezone.utc) - timedelta(days=1),
|
|
)
|
|
db.add_all([old, recent])
|
|
db.commit()
|
|
|
|
summary = apply_retention_policies(db)
|
|
assert summary["audit_logs_deleted"] >= 1
|
|
remaining = db.query(AuditLog).all()
|
|
assert all(log.action != "old" for log in remaining)
|