feat(auth): move JWT blacklist to Redis with TTL [FASE-0.2]

Revoke tokens by jti in a dedicated Redis DB, honor TTL from JWT exp on logout, reject revoked tokens in get_current_user, and add FakeRedis-backed API tests.
This commit is contained in:
2026-05-18 13:19:15 +02:00
parent 9b70655b7e
commit c5eb6f6dc1
5 changed files with 104 additions and 13 deletions

View File

@@ -80,11 +80,11 @@ def blacklist_token(jti: str, exp: float) -> None:
to ``exp - now`` so the key vanishes when the token would have expired to ``exp - now`` so the key vanishes when the token would have expired
naturally. naturally.
""" """
from app.infrastructure.redis_client import get_redis from app.infrastructure.redis_client import get_redis_blacklist
ttl = max(int(exp - datetime.now(timezone.utc).timestamp()), 1) ttl = max(int(exp - datetime.now(timezone.utc).timestamp()), 1)
try: try:
r = get_redis() r = get_redis_blacklist()
r.setex(f"{_BLACKLIST_PREFIX}{jti}", ttl, "1") r.setex(f"{_BLACKLIST_PREFIX}{jti}", ttl, "1")
except Exception: except Exception:
logger.warning("Failed to blacklist token %s in Redis", jti, exc_info=True) logger.warning("Failed to blacklist token %s in Redis", jti, exc_info=True)
@@ -92,10 +92,10 @@ def blacklist_token(jti: str, exp: float) -> None:
def is_token_blacklisted(jti: str) -> bool: def is_token_blacklisted(jti: str) -> bool:
"""Return ``True`` if *jti* has been revoked (exists in Redis).""" """Return ``True`` if *jti* has been revoked (exists in Redis)."""
from app.infrastructure.redis_client import get_redis from app.infrastructure.redis_client import get_redis_blacklist
try: try:
r = get_redis() r = get_redis_blacklist()
return r.exists(f"{_BLACKLIST_PREFIX}{jti}") > 0 return r.exists(f"{_BLACKLIST_PREFIX}{jti}") > 0
except Exception: except Exception:
logger.warning("Failed to check blacklist for %s in Redis", jti, exc_info=True) logger.warning("Failed to check blacklist for %s in Redis", jti, exc_info=True)

View File

@@ -15,7 +15,7 @@ from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt from jose import JWTError, jwt
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.auth import is_token_blacklisted from app import auth as auth_lib
from app.config import settings from app.config import settings
from app.database import get_db from app.database import get_db
from app.models.user import User from app.models.user import User
@@ -57,6 +57,11 @@ async def get_current_user(
detail="Could not validate credentials", detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
revoked_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
# Prefer cookie, fall back to header # Prefer cookie, fall back to header
token = aegis_token or bearer_token token = aegis_token or bearer_token
@@ -74,8 +79,8 @@ async def get_current_user(
raise credentials_exception raise credentials_exception
# Check token blacklist (revoked tokens) # Check token blacklist (revoked tokens)
jti: str | None = payload.get("jti") jti: str | None = payload.get("jti")
if jti and is_token_blacklisted(jti): if jti and auth_lib.is_token_blacklisted(jti):
raise credentials_exception raise revoked_exception
except JWTError: except JWTError:
raise credentials_exception raise credentials_exception

View File

@@ -96,13 +96,26 @@ def logout(
The token's ``jti`` is added to the Redis blacklist so it cannot The token's ``jti`` is added to the Redis blacklist so it cannot
be reused even if the cookie has already been copied elsewhere. be reused even if the cookie has already been copied elsewhere.
The blacklist entry auto-expires when the token's ``exp`` is reached. The blacklist entry auto-expires when the token's ``exp`` is reached.
When both HttpOnly cookie and ``Authorization: Bearer`` are present
(typical for API clients), **both** are revoked so the session cannot
survive on whichever credential the next request prefers.
""" """
# Attempt to blacklist the token's jti bearer = (
token = aegis_token or request.headers.get("Authorization", "").removeprefix("Bearer ").strip() request.headers.get("Authorization")
if token: or request.headers.get("authorization")
or ""
)
bearer = bearer.removeprefix("Bearer ").removeprefix("bearer ").strip()
seen: set[str] = set()
for raw in (aegis_token, bearer):
if not raw or raw in seen:
continue
seen.add(raw)
try: try:
payload = jwt.decode( payload = jwt.decode(
token, raw,
settings.SECRET_KEY, settings.SECRET_KEY,
algorithms=[settings.ALGORITHM], algorithms=[settings.ALGORITHM],
) )
@@ -111,7 +124,7 @@ def logout(
if jti: if jti:
blacklist_token(jti, float(exp)) blacklist_token(jti, float(exp))
except JWTError: except JWTError:
pass # token already invalid — nothing to revoke pass # token already invalid — nothing to revoke for this raw value
response.delete_cookie( response.delete_cookie(
key=_COOKIE_NAME, key=_COOKIE_NAME,

View File

@@ -80,12 +80,37 @@ def db():
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def client(db): def client(db, monkeypatch):
"""Create a test client with database override. """Create a test client with database override.
Imports ``app.main`` lazily to avoid pulling in boto3 / APScheduler Imports ``app.main`` lazily to avoid pulling in boto3 / APScheduler
when only the ``db`` fixture is needed. when only the ``db`` fixture is needed.
MinIO and the background scheduler are no-ops here so tests do not
require Docker services for application startup.
JWT blacklist uses an in-memory FakeRedis so tests do not require a
real Redis instance.
""" """
import fakeredis
_fake_redis = fakeredis.FakeRedis(decode_responses=True)
def _blacklist_conn():
return _fake_redis
monkeypatch.setattr(
"app.infrastructure.redis_client.get_redis_blacklist",
_blacklist_conn,
)
monkeypatch.setattr("app.main.ensure_bucket_exists", lambda: None)
monkeypatch.setattr("app.main.start_scheduler", lambda: None)
monkeypatch.setattr(
"app.main.scheduler.shutdown",
lambda wait=False: None,
)
from app.main import app from app.main import app
from app.database import get_db from app.database import get_db
import app.database as _db_mod import app.database as _db_mod
@@ -118,6 +143,7 @@ def admin_user(db):
hashed_password=hash_password("admin123"), hashed_password=hash_password("admin123"),
role="admin", role="admin",
is_active=True, is_active=True,
must_change_password=False,
) )
db.add(user) db.add(user)
db.commit() db.commit()
@@ -134,6 +160,7 @@ def red_tech_user(db):
hashed_password=hash_password("redtech123"), hashed_password=hash_password("redtech123"),
role="red_tech", role="red_tech",
is_active=True, is_active=True,
must_change_password=False,
) )
db.add(user) db.add(user)
db.commit() db.commit()
@@ -150,6 +177,7 @@ def blue_tech_user(db):
hashed_password=hash_password("bluetech123"), hashed_password=hash_password("bluetech123"),
role="blue_tech", role="blue_tech",
is_active=True, is_active=True,
must_change_password=False,
) )
db.add(user) db.add(user)
db.commit() db.commit()
@@ -166,6 +194,7 @@ def red_lead_user(db):
hashed_password=hash_password("redlead123"), hashed_password=hash_password("redlead123"),
role="red_lead", role="red_lead",
is_active=True, is_active=True,
must_change_password=False,
) )
db.add(user) db.add(user)
db.commit() db.commit()
@@ -182,6 +211,7 @@ def blue_lead_user(db):
hashed_password=hash_password("bluelead123"), hashed_password=hash_password("bluelead123"),
role="blue_lead", role="blue_lead",
is_active=True, is_active=True,
must_change_password=False,
) )
db.add(user) db.add(user)
db.commit() db.commit()

View File

@@ -79,3 +79,46 @@ def test_get_me_invalid_token(client):
headers={"Authorization": "Bearer invalidtoken"}, headers={"Authorization": "Bearer invalidtoken"},
) )
assert response.status_code == 401 assert response.status_code == 401
def test_logout_revokes_token(client, admin_user):
"""After logout, the same JWT must be rejected (Redis blacklist).
Prefer Authorization over the HttpOnly cookie so logout blacklists the
same token the client sends on the next request; clear cookies to avoid
stale jar state across calls.
"""
login = client.post(
"/api/v1/auth/login",
data={"username": "admin", "password": "admin123"},
)
assert login.status_code == 200
token = login.json()["access_token"]
client.cookies.clear()
out = client.post(
"/api/v1/auth/logout",
headers={"Authorization": f"Bearer {token}"},
)
assert out.status_code == 200
from jose import jwt
from app.config import settings
from app.infrastructure.redis_client import get_redis_blacklist
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM],
)
jti = payload.get("jti")
assert jti
assert get_redis_blacklist().exists(f"blacklist:{jti}")
client.cookies.clear()
assert not len(client.cookies), "cookie jar must be empty to force Authorization Bearer"
me = client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {token}"},
)
assert me.status_code == 401
assert me.json()["detail"] == "Token has been revoked"