T-032: User management admin panel - backend users router with CRUD, frontend UsersPage with modals T-033: Audit log viewer - backend audit router with filters/pagination, frontend AuditLogPage T-034: Global error handling - ErrorBoundary, LoadingSpinner, ErrorMessage, Toast components T-035: Backend tests - pytest setup with SQLite, tests for health/auth/techniques/tests T-036: Documentation - Updated README with testing section, created docs/API.md
119 lines
3.4 KiB
Python
119 lines
3.4 KiB
Python
"""Audit log viewer router (admin only)."""
|
|
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from app.database import get_db
|
|
from app.dependencies.auth import require_role
|
|
from app.models.audit import AuditLog
|
|
from app.models.user import User
|
|
from app.schemas.audit import AuditLogOut, AuditLogPage
|
|
|
|
router = APIRouter(prefix="/audit-logs", tags=["audit"])
|
|
|
|
|
|
@router.get("", response_model=AuditLogPage)
|
|
def list_audit_logs(
|
|
user_id: Optional[str] = Query(None, description="Filter by user ID"),
|
|
action: Optional[str] = Query(None, description="Filter by action type"),
|
|
entity_type: Optional[str] = Query(None, description="Filter by entity type"),
|
|
start_date: Optional[datetime] = Query(None, description="Filter by start date"),
|
|
end_date: Optional[datetime] = Query(None, description="Filter by end date"),
|
|
offset: int = Query(0, ge=0, description="Number of records to skip"),
|
|
limit: int = Query(50, ge=1, le=100, description="Max records to return"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Return paginated audit logs with optional filters.
|
|
|
|
**Requires admin role.**
|
|
"""
|
|
query = db.query(AuditLog).options(joinedload(AuditLog.user))
|
|
|
|
# Apply filters
|
|
if user_id:
|
|
query = query.filter(AuditLog.user_id == user_id)
|
|
if action:
|
|
query = query.filter(AuditLog.action == action)
|
|
if entity_type:
|
|
query = query.filter(AuditLog.entity_type == entity_type)
|
|
if start_date:
|
|
query = query.filter(AuditLog.timestamp >= start_date)
|
|
if end_date:
|
|
query = query.filter(AuditLog.timestamp <= end_date)
|
|
|
|
# Get total count
|
|
total = query.count()
|
|
|
|
# Get paginated results
|
|
logs = (
|
|
query
|
|
.order_by(AuditLog.timestamp.desc())
|
|
.offset(offset)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
# Convert to response format with username
|
|
items = []
|
|
for log in logs:
|
|
item = AuditLogOut(
|
|
id=log.id,
|
|
user_id=log.user_id,
|
|
username=log.user.username if log.user else None,
|
|
action=log.action,
|
|
entity_type=log.entity_type,
|
|
entity_id=log.entity_id,
|
|
timestamp=log.timestamp,
|
|
details=log.details,
|
|
)
|
|
items.append(item)
|
|
|
|
return AuditLogPage(
|
|
items=items,
|
|
total=total,
|
|
offset=offset,
|
|
limit=limit,
|
|
)
|
|
|
|
|
|
@router.get("/actions", response_model=list[str])
|
|
def list_actions(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Return a list of distinct action types in the audit log.
|
|
|
|
**Requires admin role.**
|
|
"""
|
|
actions = (
|
|
db.query(AuditLog.action)
|
|
.distinct()
|
|
.order_by(AuditLog.action)
|
|
.all()
|
|
)
|
|
return [a[0] for a in actions]
|
|
|
|
|
|
@router.get("/entity-types", response_model=list[str])
|
|
def list_entity_types(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_role("admin")),
|
|
):
|
|
"""Return a list of distinct entity types in the audit log.
|
|
|
|
**Requires admin role.**
|
|
"""
|
|
types = (
|
|
db.query(AuditLog.entity_type)
|
|
.filter(AuditLog.entity_type.isnot(None))
|
|
.distinct()
|
|
.order_by(AuditLog.entity_type)
|
|
.all()
|
|
)
|
|
return [t[0] for t in types]
|