Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
Foundational changes required before any new feature work can begin. - 0.1 Redis infrastructure: add redis:7-alpine to docker-compose dev and prod, REDIS_URL config, singleton client in app/infrastructure/redis_client.py - 0.2 Token blacklist on Redis SEC-001: replace in-memory dict with Redis SETEX keyed by jti, auto-expiring TTL derived from token exp - 0.3 Database indexes SR-006: Alembic migration b019 with 5 composite indexes for scoring, MTTD/MTTR, remediation, and notification queries - 0.4 Domain exceptions TD-003: app/domain/exceptions.py with typed errors, error_handler middleware mapping them to HTTP, services decoupled from FastAPI - 0.5 Fix silenced exceptions TD-007: replace 4 bare except-pass blocks in test_workflow_service with logger.warning with exc_info - 0.6 CI pipeline TD-009: GitHub Actions workflow with Postgres and Redis service containers, ruff lint, pytest; ruff.toml for baseline config
35 lines
811 B
Python
35 lines
811 B
Python
"""Redis client singleton.
|
|
|
|
Provides a lazily-initialised Redis connection that is reused across
|
|
the application. The connection URL is read from ``settings.REDIS_URL``.
|
|
|
|
Usage::
|
|
|
|
from app.infrastructure.redis_client import get_redis
|
|
|
|
r = get_redis()
|
|
r.set("key", "value", ex=300)
|
|
"""
|
|
|
|
import logging
|
|
|
|
import redis
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_redis_client: redis.Redis | None = None
|
|
|
|
|
|
def get_redis() -> redis.Redis:
|
|
"""Return a shared Redis client, creating it on first call."""
|
|
global _redis_client
|
|
if _redis_client is None:
|
|
_redis_client = redis.from_url(
|
|
settings.REDIS_URL,
|
|
decode_responses=True,
|
|
)
|
|
logger.info("Redis client connected to %s", settings.REDIS_URL)
|
|
return _redis_client
|