94 lines
2.5 KiB
Python
94 lines
2.5 KiB
Python
"""Audit log query service — framework-agnostic query logic for audit logs.
|
|
|
|
Provides paginated logs and distinct action/entity-type lists.
|
|
No FastAPI imports.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from app.models.audit import AuditLog
|
|
|
|
|
|
def list_logs(
|
|
db: Session,
|
|
*,
|
|
user_id: str | None = None,
|
|
action: str | None = None,
|
|
entity_type: str | None = None,
|
|
start_date: datetime | None = None,
|
|
end_date: datetime | None = None,
|
|
offset: int = 0,
|
|
limit: int = 50,
|
|
) -> dict:
|
|
"""Return paginated audit logs with optional filters.
|
|
|
|
Returns a dict with keys: items, total, offset, limit.
|
|
Each item is a dict with: id, user_id, username, action, entity_type,
|
|
entity_id, timestamp, details.
|
|
"""
|
|
query = db.query(AuditLog).options(joinedload(AuditLog.user))
|
|
|
|
if user_id:
|
|
query = query.filter(AuditLog.user_id == user_id)
|
|
if action:
|
|
query = query.filter(AuditLog.action == action)
|
|
if entity_type:
|
|
query = query.filter(AuditLog.entity_type == entity_type)
|
|
if start_date:
|
|
query = query.filter(AuditLog.timestamp >= start_date)
|
|
if end_date:
|
|
query = query.filter(AuditLog.timestamp <= end_date)
|
|
|
|
total = query.count()
|
|
|
|
logs = (
|
|
query
|
|
.order_by(AuditLog.timestamp.desc())
|
|
.offset(offset)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
items = [
|
|
{
|
|
"id": log.id,
|
|
"user_id": log.user_id,
|
|
"username": log.user.username if log.user else None,
|
|
"action": log.action,
|
|
"entity_type": log.entity_type,
|
|
"entity_id": log.entity_id,
|
|
"timestamp": log.timestamp,
|
|
"details": log.details,
|
|
}
|
|
for log in logs
|
|
]
|
|
|
|
return {"items": items, "total": total, "offset": offset, "limit": limit}
|
|
|
|
|
|
def list_distinct_actions(db: Session) -> list[str]:
|
|
"""Return a list of distinct action types in the audit log."""
|
|
actions = (
|
|
db.query(AuditLog.action)
|
|
.distinct()
|
|
.order_by(AuditLog.action)
|
|
.all()
|
|
)
|
|
return [a[0] for a in actions]
|
|
|
|
|
|
def list_distinct_entity_types(db: Session) -> list[str]:
|
|
"""Return a list of distinct entity types in the audit log."""
|
|
types = (
|
|
db.query(AuditLog.entity_type)
|
|
.filter(AuditLog.entity_type.isnot(None))
|
|
.distinct()
|
|
.order_by(AuditLog.entity_type)
|
|
.all()
|
|
)
|
|
return [t[0] for t in types]
|