287 lines
9.1 KiB
Python
287 lines
9.1 KiB
Python
"""Pydantic schemas for User management endpoints."""
|
|
|
|
# Import re
|
|
import re
|
|
|
|
# Import uuid
|
|
import uuid
|
|
|
|
# Import datetime from datetime
|
|
from datetime import datetime
|
|
|
|
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator, model_validator
|
|
|
|
|
|
# ── Username policy ─────────────────────────────────────────────────
|
|
|
|
_USERNAME_RE = re.compile(r"^[a-zA-Z0-9_-]{3,50}$")
|
|
# Assign _RESERVED_USERNAMES = frozenset({
|
|
_RESERVED_USERNAMES = frozenset({
|
|
# Literal argument value
|
|
"admin", "root", "system", "api", "null", "undefined",
|
|
# Literal argument value
|
|
"administrator", "superuser", "aegis",
|
|
})
|
|
|
|
|
|
# Define function _validate_username
|
|
def _validate_username(username: str) -> str:
|
|
"""Validate username format and reject reserved names.
|
|
|
|
Args:
|
|
username (str): The username string to validate.
|
|
|
|
Returns:
|
|
str: The validated username, unchanged.
|
|
"""
|
|
# Check: not _USERNAME_RE.match(username)
|
|
if not _USERNAME_RE.match(username):
|
|
# Raise ValueError
|
|
raise ValueError(
|
|
# Literal argument value
|
|
"Username must be 3-50 characters, containing only "
|
|
# Literal argument value
|
|
"letters, digits, underscores, and hyphens"
|
|
)
|
|
# Check: username.lower() in _RESERVED_USERNAMES
|
|
if username.lower() in _RESERVED_USERNAMES:
|
|
# Raise ValueError
|
|
raise ValueError(f"Username '{username}' is reserved")
|
|
# Return username
|
|
return username
|
|
|
|
|
|
# ── Password policy ─────────────────────────────────────────────────
|
|
|
|
_MIN_PASSWORD_LENGTH = 12
|
|
|
|
# Assign _PASSWORD_RULES = [
|
|
_PASSWORD_RULES: list[tuple[str, str]] = [
|
|
(r"[A-Z]", "at least one uppercase letter"),
|
|
(r"[a-z]", "at least one lowercase letter"),
|
|
(r"[0-9]", "at least one digit"),
|
|
(r"[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>/?`~]", "at least one special character"),
|
|
]
|
|
|
|
|
|
# Define function _validate_password_strength
|
|
def _validate_password_strength(password: str) -> str:
|
|
"""Check that *password* satisfies the complexity policy.
|
|
|
|
Rules:
|
|
|
|
- Minimum 12 characters
|
|
- At least one uppercase letter
|
|
- At least one lowercase letter
|
|
- At least one digit
|
|
- At least one special character
|
|
|
|
Args:
|
|
password (str): The plaintext password to validate.
|
|
|
|
Returns:
|
|
str: The validated password, unchanged.
|
|
"""
|
|
# Assign errors = []
|
|
errors: list[str] = []
|
|
|
|
# Check: len(password) < _MIN_PASSWORD_LENGTH
|
|
if len(password) < _MIN_PASSWORD_LENGTH:
|
|
# Call errors.append()
|
|
errors.append(f"must be at least {_MIN_PASSWORD_LENGTH} characters long")
|
|
|
|
# Iterate over _PASSWORD_RULES
|
|
for pattern, description in _PASSWORD_RULES:
|
|
# Check: not re.search(pattern, password)
|
|
if not re.search(pattern, password):
|
|
# Call errors.append()
|
|
errors.append(description)
|
|
|
|
# Check: errors
|
|
if errors:
|
|
# Raise ValueError
|
|
raise ValueError(
|
|
# Literal argument value
|
|
"Password does not meet complexity requirements: " + "; ".join(errors)
|
|
)
|
|
|
|
# Return password
|
|
return password
|
|
|
|
|
|
# ── Create ──────────────────────────────────────────────────────────
|
|
|
|
class UserCreate(BaseModel):
|
|
"""Payload for creating a new user."""
|
|
|
|
# username: str
|
|
username: str
|
|
# Assign email = None
|
|
email: str | None = None
|
|
# password: str
|
|
password: str
|
|
# Assign role = "viewer"
|
|
role: str = "viewer"
|
|
|
|
# Apply the @field_validator decorator
|
|
@field_validator("username")
|
|
# Apply the @classmethod decorator
|
|
@classmethod
|
|
# Define function username_format
|
|
def username_format(cls, v: str) -> str:
|
|
"""Validate the username field against the platform policy.
|
|
|
|
Args:
|
|
v (str): Raw username value from the request body.
|
|
|
|
Returns:
|
|
str: The validated username.
|
|
"""
|
|
# Return _validate_username(v)
|
|
return _validate_username(v)
|
|
|
|
# Apply the @field_validator decorator
|
|
@field_validator("password")
|
|
# Apply the @classmethod decorator
|
|
@classmethod
|
|
# Define function password_strength
|
|
def password_strength(cls, v: str) -> str:
|
|
"""Validate the password field against the complexity policy.
|
|
|
|
Args:
|
|
v (str): Raw password value from the request body.
|
|
|
|
Returns:
|
|
str: The validated password.
|
|
"""
|
|
# Return _validate_password_strength(v)
|
|
return _validate_password_strength(v)
|
|
|
|
|
|
# ── Update ──────────────────────────────────────────────────────────
|
|
|
|
class UserUpdate(BaseModel):
|
|
"""Payload for partially updating an existing user.
|
|
|
|
Every field is optional so callers send only what changed.
|
|
"""
|
|
|
|
# Assign email = None
|
|
email: str | None = None
|
|
# Assign role = None
|
|
role: str | None = None
|
|
# Assign is_active = None
|
|
is_active: bool | None = None
|
|
# Assign password = None
|
|
password: str | None = None
|
|
|
|
# Apply the @field_validator decorator
|
|
@field_validator("password")
|
|
# Apply the @classmethod decorator
|
|
@classmethod
|
|
# Define function password_strength
|
|
def password_strength(cls, v: str | None) -> str | None:
|
|
"""Validate the password field when provided.
|
|
|
|
Args:
|
|
v (str | None): Raw password value, or ``None`` when unchanged.
|
|
|
|
Returns:
|
|
str | None: The validated password, or ``None``.
|
|
"""
|
|
# Check: v is not None
|
|
if v is not None:
|
|
# Return _validate_password_strength(v)
|
|
return _validate_password_strength(v)
|
|
# Return v
|
|
return v
|
|
|
|
|
|
# ── Read (full) ─────────────────────────────────────────────────────
|
|
|
|
class PasswordChange(BaseModel):
|
|
"""Payload for changing the current user's password."""
|
|
|
|
# current_password: str
|
|
current_password: str
|
|
# new_password: str
|
|
new_password: str
|
|
|
|
# Apply the @field_validator decorator
|
|
@field_validator("new_password")
|
|
# Apply the @classmethod decorator
|
|
@classmethod
|
|
# Define function new_password_strength
|
|
def new_password_strength(cls, v: str) -> str:
|
|
"""Validate the new password against the complexity policy.
|
|
|
|
Args:
|
|
v (str): Raw new-password value from the request body.
|
|
|
|
Returns:
|
|
str: The validated new password.
|
|
"""
|
|
# Return _validate_password_strength(v)
|
|
return _validate_password_strength(v)
|
|
|
|
|
|
class UserPreferencesUpdate(BaseModel):
|
|
"""Payload for updating current user's notification preferences and Jira/Tempo settings."""
|
|
|
|
notification_preferences: dict | None = None
|
|
jira_account_id: str | None = None
|
|
# Personal Jira API token (Atlassian token) — write-only.
|
|
# Set to empty string "" to clear the token.
|
|
jira_api_token: str | None = None
|
|
# Atlassian email for Jira auth — overrides account email.
|
|
# Set to empty string "" to clear (falls back to account email).
|
|
jira_email: str | None = None
|
|
# Personal Tempo API token — write-only.
|
|
# Set to empty string "" to clear the token.
|
|
tempo_api_token: str | None = None
|
|
|
|
|
|
class UserOut(BaseModel):
|
|
"""Complete representation returned by the API."""
|
|
|
|
# id: uuid.UUID
|
|
id: uuid.UUID
|
|
# username: str
|
|
username: str
|
|
# Assign email = None
|
|
email: str | None = None
|
|
# role: str
|
|
role: str
|
|
# is_active: bool
|
|
is_active: bool
|
|
# Assign must_change_password = True
|
|
must_change_password: bool = True
|
|
# Assign created_at = None
|
|
created_at: datetime | None = None
|
|
# Assign last_login = None
|
|
last_login: datetime | None = None
|
|
notification_preferences: dict | None = None
|
|
jira_account_id: str | None = None
|
|
jira_email: str | None = None
|
|
# Read from ORM but NEVER exposed in responses — used only to derive *_token_set flags.
|
|
jira_api_token: str | None = Field(default=None, exclude=True)
|
|
tempo_api_token: str | None = Field(default=None, exclude=True)
|
|
# True when the user has the respective token stored.
|
|
jira_token_set: bool = False
|
|
tempo_token_set: bool = False
|
|
|
|
# Assign model_config = ConfigDict(from_attributes=True)
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
@model_validator(mode="after")
|
|
def _derive_token_set_flags(self) -> "UserOut":
|
|
"""Derive *_token_set booleans from the (excluded) raw token fields.
|
|
|
|
Uses @model_validator(mode='after') so Pydantic's Rust core calls it
|
|
during FastAPI response serialisation — model_validate() overrides are
|
|
bypassed by FastAPI's __pydantic_validator__.validate_python() path.
|
|
"""
|
|
self.jira_token_set = bool(self.jira_api_token)
|
|
self.tempo_token_set = bool(self.tempo_api_token)
|
|
return self
|