"""Phase 14: API Key service — create, list, revoke, authenticate.""" from __future__ import annotations from datetime import datetime from typing import List, Optional from uuid import UUID from sqlalchemy.orm import Session from app.domain.errors import EntityNotFoundError, DuplicateEntityError from app.models.api_key import ApiKey, generate_raw_key, hash_key, key_prefix_display from app.models.user import User # ── Create ──────────────────────────────────────────────────────────────────── def create_api_key( db: Session, user_id: UUID, name: str, scopes: List[str], description: Optional[str] = None, expires_at: Optional[datetime] = None, ) -> tuple[ApiKey, str]: """ Create a new API key. Returns ``(ApiKey, raw_key)`` — the raw_key must be shown to the user immediately and is never retrievable again. """ raw_key = generate_raw_key() key_hash = hash_key(raw_key) prefix = key_prefix_display(raw_key) # Detect accidental collision (astronomically unlikely) if db.query(ApiKey).filter(ApiKey.key_hash == key_hash).first(): raise DuplicateEntityError("ApiKey", "key_hash", "") key = ApiKey( name = name, description = description, key_prefix = prefix, key_hash = key_hash, user_id = user_id, scopes = scopes, expires_at = expires_at, ) db.add(key) db.commit() db.refresh(key) return key, raw_key # ── Read ────────────────────────────────────────────────────────────────────── def list_api_keys( db: Session, user_id: Optional[UUID] = None, include_inactive: bool = False, ) -> List[ApiKey]: q = db.query(ApiKey) if user_id is not None: q = q.filter(ApiKey.user_id == user_id) if not include_inactive: q = q.filter(ApiKey.is_active == True) return q.order_by(ApiKey.created_at.desc()).all() def get_api_key(db: Session, key_id: UUID, user_id: Optional[UUID] = None) -> ApiKey: q = db.query(ApiKey).filter(ApiKey.id == key_id) if user_id is not None: q = q.filter(ApiKey.user_id == user_id) key = q.first() if not key: raise EntityNotFoundError("ApiKey", str(key_id)) return key # ── Update / Revoke ─────────────────────────────────────────────────────────── def update_api_key( db: Session, key_id: UUID, user_id: Optional[UUID] = None, *, name: Optional[str] = None, description: Optional[str] = None, scopes: Optional[List[str]] = None, expires_at: Optional[datetime] = None, is_active: Optional[bool] = None, ) -> ApiKey: key = get_api_key(db, key_id, user_id) if name is not None: key.name = name if description is not None: key.description = description if scopes is not None: key.scopes = scopes if expires_at is not None: key.expires_at = expires_at if is_active is not None: key.is_active = is_active db.commit() db.refresh(key) return key def revoke_api_key( db: Session, key_id: UUID, user_id: Optional[UUID] = None, ) -> ApiKey: """Soft-revoke: set is_active = False.""" return update_api_key(db, key_id, user_id, is_active=False) def delete_api_key(db: Session, key_id: UUID, user_id: Optional[UUID] = None) -> None: """Hard delete — use revoke instead for audit trail.""" key = get_api_key(db, key_id, user_id) db.delete(key) db.commit() # ── Authentication ──────────────────────────────────────────────────────────── def authenticate_raw_key(db: Session, raw_key: str) -> Optional[User]: """ Verify a raw API key. Returns the owning User if the key is valid, active, and not expired. Updates ``last_used_at`` (throttled to once per request — always updates). Returns None on any failure. """ h = hash_key(raw_key) key: Optional[ApiKey] = db.query(ApiKey).filter(ApiKey.key_hash == h).first() if key is None or not key.is_active: return None if key.expires_at and key.expires_at < datetime.utcnow(): return None # Update last_used_at key.last_used_at = datetime.utcnow() db.commit() user: Optional[User] = db.query(User).filter(User.id == key.user_id).first() if user is None or not user.is_active: return None # Attach the key's scopes to the user instance so scope-enforcement # dependencies can verify them without an additional DB query. # _api_key_scopes=None means "full user access" (JWT path). user._api_key_scopes = key.scopes or [] return user