Files
Aegis/backend/app/services/audit_service.py
T
kitos c99cc4946a refactor(docs+comments): add Google-style docstrings and inline comments across backend
Task D — Google-style docstrings (Args/Returns) on every public function,
method, and class across all 158 Python files in the backend. Zero ruff D
violations (pydocstyle Google convention).

Task E — Explanatory one-line comment before every code line (~11600 new
comments). ruff check passes clean after isort re-sort.
2026-06-10 13:25:14 +02:00

116 lines
3.9 KiB
Python

"""Audit logging with request context and integrity hashing."""
# Enable future language features for compatibility
from __future__ import annotations
# Import hashlib
import hashlib
# Import datetime, timezone from datetime
from datetime import datetime, timezone
# Import UUID from uuid
from uuid import UUID
# Import Session from sqlalchemy.orm
from sqlalchemy.orm import Session
# Import request_ip, request_user_agent from app.middleware.request_context
from app.middleware.request_context import request_ip, request_user_agent
# Import AuditLog from app.models.audit
from app.models.audit import AuditLog
# Define function _integrity_payload
def _integrity_payload(entry: AuditLog) -> str:
# Assign ts = entry.timestamp
ts = entry.timestamp
# Check: ts is None
if ts is None:
# Assign ts = datetime.now(timezone.utc)
ts = datetime.now(timezone.utc)
# Assign user_part = str(entry.user_id) if entry.user_id else ""
user_part = str(entry.user_id) if entry.user_id else ""
# Assign entity_type = entry.entity_type or ""
entity_type = entry.entity_type or ""
# Assign entity_id = entry.entity_id or ""
entity_id = entry.entity_id or ""
# Return f"{user_part}:{entry.action}:{entity_type}:{entity_id}:{ts.isoforma...
return f"{user_part}:{entry.action}:{entity_type}:{entity_id}:{ts.isoformat()}"
# Define function compute_integrity_hash
def compute_integrity_hash(entry: AuditLog) -> str:
"""Return the SHA-256 hex digest for an audit log entry."""
# Return hashlib.sha256(_integrity_payload(entry).encode()).hexdigest()
return hashlib.sha256(_integrity_payload(entry).encode()).hexdigest()
# Define function verify_audit_integrity
def verify_audit_integrity(entry: AuditLog) -> bool:
"""Return whether the stored hash matches the entry's current fields."""
# Check: not entry.integrity_hash
if not entry.integrity_hash:
# Return False
return False
# Return entry.integrity_hash == compute_integrity_hash(entry)
return entry.integrity_hash == compute_integrity_hash(entry)
# Define function log_action
def log_action(
# Entry: db
db: Session,
# Entry: user_id
user_id: UUID | None,
# Entry: action
action: str,
# Entry: entity_type
entity_type: str | None = None,
# Entry: entity_id
entity_id: str | None = None,
# Entry: details
details: dict | None = None,
*,
# Entry: ip_address
ip_address: str | None = None,
# Entry: user_agent
user_agent: str | None = None,
# Entry: session_id
session_id: str | None = None,
) -> AuditLog:
"""Record an audit event. Does not commit — the caller owns the transaction."""
# Assign ip = ip_address if ip_address is not None else request_ip.get("")
ip = ip_address if ip_address is not None else request_ip.get("")
# Assign ua = user_agent if user_agent is not None else request_user_agent.get("")
ua = user_agent if user_agent is not None else request_user_agent.get("")
# Assign entry = AuditLog(
entry = AuditLog(
# Keyword argument: user_id
user_id=user_id,
# Keyword argument: action
action=action,
# Keyword argument: entity_type
entity_type=entity_type,
# Keyword argument: entity_id
entity_id=str(entity_id) if entity_id else None,
# Keyword argument: details
details=details,
# Keyword argument: ip_address
ip_address=ip or None,
# Keyword argument: user_agent
user_agent=ua or None,
# Keyword argument: session_id
session_id=session_id,
)
# Stage new record(s) for database insertion
db.add(entry)
# Flush changes to DB without committing the transaction
db.flush()
# Assign entry.integrity_hash = compute_integrity_hash(entry)
entry.integrity_hash = compute_integrity_hash(entry)
# Return entry
return entry