From f41b8fd8c20ff37f2d606c9315fd9b5d4ed0172f Mon Sep 17 00:00:00 2001 From: Kitos Date: Wed, 18 Feb 2026 19:11:14 +0100 Subject: [PATCH] fix(security): add username validation, constant-time login, default credential rejection, and tooling --- .cursor/rules/aegis-architecture.md | 189 ++++++++++++++++++++++ backend/app/config.py | 16 ++ backend/app/routers/auth.py | 8 +- backend/app/schemas/user.py | 26 +++ backend/tests/test_security_validators.py | 85 ++++++++++ scripts/agent_validate_backend.sh | 23 +++ tasks/lessons.md | 21 +++ tasks/todo.md | 26 +++ 8 files changed, 393 insertions(+), 1 deletion(-) create mode 100644 .cursor/rules/aegis-architecture.md create mode 100644 backend/tests/test_security_validators.py create mode 100644 scripts/agent_validate_backend.sh create mode 100644 tasks/lessons.md create mode 100644 tasks/todo.md diff --git a/.cursor/rules/aegis-architecture.md b/.cursor/rules/aegis-architecture.md new file mode 100644 index 0000000..f1c5fcf --- /dev/null +++ b/.cursor/rules/aegis-architecture.md @@ -0,0 +1,189 @@ +--- +description: Aegis backend Clean Architecture rules. Apply when working on any backend Python file under backend/app/ or backend/tests/. +globs: backend/**/*.py +--- + +# Aegis — Clean Modular Monolith Architecture + +## Architecture Overview + +Aegis follows a **Clean Architecture** pattern inside a modular monolith. The backend has four layers with strict dependency rules: + +``` +Presentation → Application → Domain ← Infrastructure +``` + +**The golden rule:** dependencies only point towards the Domain layer. Infrastructure implements the ports (interfaces) defined in Domain. + +## Layer Structure and Rules + +### Domain Layer (`backend/app/domain/`) + +The innermost layer. **ZERO** imports from FastAPI, SQLAlchemy, Pydantic, or any framework. + +| Directory | Purpose | +|-----------|---------| +| `domain/enums.py` | Canonical domain enums (TechniqueStatus, TestState, TeamSide, TestResult) | +| `domain/errors.py` | Exception hierarchy (DomainError → EntityNotFoundError, InvalidStateTransition, etc.) | +| `domain/exceptions.py` | Backward-compatible re-exports from errors.py | +| `domain/test_entity.py` | TestEntity — pure state machine with domain events | +| `domain/entities/` | Rich domain entities (TechniqueEntity, etc.) with business behavior | +| `domain/value_objects/` | Immutable value types (MitreId, ScoringWeights) | +| `domain/ports/repositories/` | Protocol interfaces defining data access contracts | +| `domain/ports/services/` | Protocol interfaces for external capabilities (storage, events) | +| `domain/unit_of_work.py` | UnitOfWork wrapping SQLAlchemy session | + +**NEVER** import from `app.models`, `app.routers`, `app.infrastructure`, `fastapi`, or `sqlalchemy` inside `domain/`. + +### Application Layer (`backend/app/application/` — future) + +Use case orchestrators. Depends only on Domain. + +| Directory | Purpose | +|-----------|---------| +| `application/use_cases/` | One class per business operation | +| `application/dto/` | Plain data containers for use case input/output | +| `application/interfaces/` | Application-level contracts (UnitOfWork protocol) | + +### Infrastructure Layer (`backend/app/infrastructure/`) + +Implements ports defined in Domain. Depends on Domain and Application. + +| Directory | Purpose | +|-----------|---------| +| `infrastructure/redis_client.py` | Redis connection singleton | +| `infrastructure/persistence/repositories/` | SQLAlchemy implementations of repository ports | +| `infrastructure/persistence/mappers/` | ORM model ↔ domain entity converters | + +### Presentation Layer (routers, schemas, dependencies) + +HTTP boundary. Depends on Application and Domain (for exceptions). + +| Directory | Purpose | +|-----------|---------| +| `routers/` | FastAPI routers — HTTP mapping only | +| `schemas/` | Pydantic request/response models | +| `dependencies/` | FastAPI `Depends()` wiring (auth, repositories) | +| `middleware/` | Error handler mapping domain exceptions → HTTP responses | + +## Import Rules (Strict) + +| From \ To | domain/ | application/ | infrastructure/ | presentation/ | +|-----------|---------|-------------|----------------|--------------| +| **domain/** | Self only | FORBIDDEN | FORBIDDEN | FORBIDDEN | +| **application/** | ALLOWED | Self only | FORBIDDEN | FORBIDDEN | +| **infrastructure/** | ALLOWED (ports) | ALLOWED (UoW) | Self only | FORBIDDEN | +| **presentation/** | ALLOWED (exceptions) | ALLOWED (use cases) | ALLOWED (wiring in dependencies/) | Self only | + +## How to Add a New Feature + +### 1. Start from the Domain + +- Define or reuse domain entities in `domain/entities/` +- Add value objects if needed in `domain/value_objects/` +- Define repository port if a new aggregate root in `domain/ports/repositories/` +- Domain exceptions go in `domain/errors.py` +- Business rules live IN the entity, not in services or routers + +### 2. Implement Infrastructure + +- Create SQLAlchemy repository implementation in `infrastructure/persistence/repositories/` +- Create mapper if converting between ORM model and domain entity +- Repository does NOT call `commit()` — only `flush()` +- Transaction control belongs to the Unit of Work + +### 3. Wire in Presentation + +- Add FastAPI `Depends()` provider in `dependencies/repositories.py` +- Keep routers thin: parse request → call service/use case → return response +- Map domain exceptions to HTTP via the error handler middleware (automatic) + +### 4. Tests (Mandatory) + +Every change MUST include tests: +- **Domain entities/value objects**: pure unit tests, no DB, no mocking frameworks +- **Repositories**: integration tests using the `db` fixture from conftest +- **Routers**: API tests using the `client` fixture +- At least one success test + one failure/edge-case test per behavior + +Before committing, run: `scripts/agent_validate_backend.sh` + +## Existing Patterns to Follow + +### Domain Entity Pattern (see `domain/test_entity.py`) + +```python +@dataclass +class SomeEntity: + id: uuid.UUID + # fields... + _events: list[DomainEvent] = field(default_factory=list, repr=False) + + @classmethod + def from_orm(cls, model: Any) -> "SomeEntity": + """Build from SQLAlchemy model.""" + ... + + def apply_to(self, model: Any) -> None: + """Copy mutable fields back onto the ORM model.""" + ... + + def some_business_method(self) -> None: + """Business logic lives HERE, not in services.""" + ... + self._events.append(DomainEvent("something_happened")) +``` + +### Repository Port Pattern (Protocol) + +```python +from typing import Protocol, runtime_checkable + +@runtime_checkable +class SomeRepository(Protocol): + def find_by_id(self, id: uuid.UUID) -> SomeEntity | None: ... + def save(self, entity: SomeEntity) -> SomeEntity: ... +``` + +### Repository Implementation Pattern + +```python +class SASomeRepository: + def __init__(self, session: Session) -> None: + self._session = session + + def find_by_id(self, id: uuid.UUID) -> SomeEntity | None: + model = self._session.query(SomeModel).filter(SomeModel.id == id).first() + return SomeMapper.to_entity(model) if model else None + + def save(self, entity: SomeEntity) -> SomeEntity: + model = SomeMapper.to_model(entity) + merged = self._session.merge(model) + self._session.flush() # NO commit — UoW does that + return SomeMapper.to_entity(merged) +``` + +### Error Handling (automatic via middleware) + +Services raise domain exceptions → middleware maps to HTTP: +- `EntityNotFoundError` → 404 +- `DuplicateEntityError` → 409 +- `InvalidStateTransition` → 400 +- `BusinessRuleViolation` → 400 +- `PermissionViolation` → 403 + +### Coexistence Strategy + +Old code (direct `db.query()` in routers) and new code (repositories) coexist. Migration is incremental: +1. New endpoints use repositories +2. Existing endpoints are migrated one at a time +3. Both access the same DB, same session, same tables + +## Key Conventions + +- **Enums**: canonical source is `domain/enums.py`, `models/enums.py` re-exports +- **Exceptions**: raise from `domain/errors.py`, never raise `HTTPException` from services +- **Commits**: only via `UnitOfWork.commit()` or at the router level, never inside services/repos +- **IDs**: UUID everywhere (primary keys, foreign keys) +- **Tests**: SQLite in-memory for unit/integration, PostgreSQL in CI +- **Validation**: Pydantic in schemas (presentation), domain rules in entities (domain) diff --git a/backend/app/config.py b/backend/app/config.py index 4f61508..4fad710 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -106,3 +106,19 @@ if settings.SECRET_KEY in _UNSAFE_SECRETS: "Set SECRET_KEY in your environment for persistent sessions.", stacklevel=2, ) + +# --------------------------------------------------------------------------- +# SEC-002: Reject default credentials in production +# --------------------------------------------------------------------------- +if _is_production: + _DEFAULT_CREDS = { + ("MINIO_ACCESS_KEY", settings.MINIO_ACCESS_KEY, "minioadmin"), + ("MINIO_SECRET_KEY", settings.MINIO_SECRET_KEY, "minioadmin"), + } + for name, current, default in _DEFAULT_CREDS: + if current == default: + raise RuntimeError( + f"CRITICAL: {name} is using the default value '{default}'. " + f"Set a strong value via the {name} environment variable " + f"before running in production." + ) diff --git a/backend/app/routers/auth.py b/backend/app/routers/auth.py index 4ee93f5..4860bb0 100644 --- a/backend/app/routers/auth.py +++ b/backend/app/routers/auth.py @@ -58,7 +58,13 @@ def login( """ user = db.query(User).filter(User.username == form_data.username).first() - if user is None or not verify_password(form_data.password, user.hashed_password): + # Constant-time comparison: always run bcrypt verify to prevent + # timing-based user enumeration (SEC-005). + _DUMMY_HASH = "$2b$12$LJ3m4ys3Lg3dMO/NpNmOaeVwFpWJMxlB2FLmEAo9fZr.S8H1vC4Wy" + hashed = user.hashed_password if user else _DUMMY_HASH + password_valid = verify_password(form_data.password, hashed) + + if user is None or not password_valid: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password", diff --git a/backend/app/schemas/user.py b/backend/app/schemas/user.py index 4ac6334..7d89d61 100644 --- a/backend/app/schemas/user.py +++ b/backend/app/schemas/user.py @@ -7,6 +7,27 @@ from datetime import datetime from pydantic import BaseModel, ConfigDict, EmailStr, field_validator +# ── Username policy ───────────────────────────────────────────────── + +_USERNAME_RE = re.compile(r"^[a-zA-Z0-9_-]{3,50}$") +_RESERVED_USERNAMES = frozenset({ + "admin", "root", "system", "api", "null", "undefined", + "administrator", "superuser", "aegis", +}) + + +def _validate_username(username: str) -> str: + """Validate username format and reject reserved names.""" + if not _USERNAME_RE.match(username): + raise ValueError( + "Username must be 3-50 characters, containing only " + "letters, digits, underscores, and hyphens" + ) + if username.lower() in _RESERVED_USERNAMES: + raise ValueError(f"Username '{username}' is reserved") + return username + + # ── Password policy ───────────────────────────────────────────────── _MIN_PASSWORD_LENGTH = 12 @@ -56,6 +77,11 @@ class UserCreate(BaseModel): password: str role: str = "viewer" + @field_validator("username") + @classmethod + def username_format(cls, v: str) -> str: + return _validate_username(v) + @field_validator("password") @classmethod def password_strength(cls, v: str) -> str: diff --git a/backend/tests/test_security_validators.py b/backend/tests/test_security_validators.py new file mode 100644 index 0000000..6e6868e --- /dev/null +++ b/backend/tests/test_security_validators.py @@ -0,0 +1,85 @@ +"""Tests for security validators (username, password complexity).""" + +import sys +import os + +backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if backend_dir not in sys.path: + sys.path.insert(0, backend_dir) + +import pytest +from pydantic import ValidationError + +from app.schemas.user import UserCreate + + +# ── Username validation ────────────────────────────────────────────── + + +class TestUsernameValidation: + + def test_valid_username(self): + u = UserCreate(username="john_doe", password="SecurePass123!@#") + assert u.username == "john_doe" + + def test_valid_username_with_hyphens(self): + u = UserCreate(username="john-doe", password="SecurePass123!@#") + assert u.username == "john-doe" + + def test_valid_username_numeric(self): + u = UserCreate(username="user123", password="SecurePass123!@#") + assert u.username == "user123" + + def test_too_short_username(self): + with pytest.raises(ValidationError, match="3-50 characters"): + UserCreate(username="ab", password="SecurePass123!@#") + + def test_username_with_spaces(self): + with pytest.raises(ValidationError, match="3-50 characters"): + UserCreate(username="john doe", password="SecurePass123!@#") + + def test_username_with_special_chars(self): + with pytest.raises(ValidationError, match="3-50 characters"): + UserCreate(username="john@doe", password="SecurePass123!@#") + + def test_reserved_username_admin(self): + with pytest.raises(ValidationError, match="reserved"): + UserCreate(username="admin", password="SecurePass123!@#") + + def test_reserved_username_root(self): + with pytest.raises(ValidationError, match="reserved"): + UserCreate(username="root", password="SecurePass123!@#") + + def test_reserved_username_case_insensitive(self): + with pytest.raises(ValidationError, match="reserved"): + UserCreate(username="ADMIN", password="SecurePass123!@#") + + +# ── Password validation ───────────────────────────────────────────── + + +class TestPasswordValidation: + + def test_valid_strong_password(self): + u = UserCreate(username="testuser", password="SecurePass123!@#") + assert u.password == "SecurePass123!@#" + + def test_too_short_password(self): + with pytest.raises(ValidationError, match="12 characters"): + UserCreate(username="testuser", password="Short1!") + + def test_no_uppercase(self): + with pytest.raises(ValidationError, match="uppercase"): + UserCreate(username="testuser", password="securepass123!@#") + + def test_no_lowercase(self): + with pytest.raises(ValidationError, match="lowercase"): + UserCreate(username="testuser", password="SECUREPASS123!@#") + + def test_no_digit(self): + with pytest.raises(ValidationError, match="digit"): + UserCreate(username="testuser", password="SecurePassword!@#") + + def test_no_special_char(self): + with pytest.raises(ValidationError, match="special"): + UserCreate(username="testuser", password="SecurePass12345") diff --git a/scripts/agent_validate_backend.sh b/scripts/agent_validate_backend.sh new file mode 100644 index 0000000..183c8ec --- /dev/null +++ b/scripts/agent_validate_backend.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +CONTAINER="aegis-backend" + +echo "==========================================" +echo " Aegis Backend Validation (in-container)" +echo "==========================================" + +echo "" +echo "[1/2] Linting with ruff..." +docker exec "$CONTAINER" sh -c "cd /app && /home/appuser/.local/bin/ruff check app/ tests/ --no-fix" +echo " ✓ Lint passed" + +echo "" +echo "[2/2] Running tests with pytest..." +docker exec "$CONTAINER" sh -c "cd /app && python -m pytest tests/ -v --tb=short -q" +echo " ✓ Tests passed" + +echo "" +echo "==========================================" +echo " ALL CHECKS PASSED" +echo "==========================================" diff --git a/tasks/lessons.md b/tasks/lessons.md new file mode 100644 index 0000000..10c66a9 --- /dev/null +++ b/tasks/lessons.md @@ -0,0 +1,21 @@ +# Aegis — Lessons Learned + +## Architecture + +- Domain entities must have ZERO framework imports. If you need SQLAlchemy or FastAPI in an entity, the design is wrong. +- Services should never call `db.commit()`. Use UnitOfWork at the router/use-case level. +- Domain exceptions propagate up and the error_handler middleware maps them to HTTP responses automatically. +- The `from_orm()` / `apply_to()` pattern bridges ORM models and domain entities cleanly. + +## Testing + +- Use the `db` fixture for repository/integration tests, `client` fixture for API tests. +- SQLite conftest patches PostgreSQL types (UUID, JSONB) — always verify on real PG in CI. +- Pure domain tests need no fixtures at all — just construct entities directly. + +## Patterns to Avoid + +- Never raise `HTTPException` from services — raise domain exceptions instead. +- Never put business logic in routers — delegate to entities or services. +- Never create DB sessions manually (`SessionLocal()`) outside of background jobs. +- Never swallow exceptions with bare `except: pass` — at minimum log a warning. diff --git a/tasks/todo.md b/tasks/todo.md new file mode 100644 index 0000000..47d1368 --- /dev/null +++ b/tasks/todo.md @@ -0,0 +1,26 @@ +# Aegis — Task Tracker + +## In Progress + +- [ ] Clean Architecture foundation: domain enums, value objects, entities, repository ports + implementations + +## Completed + +- [x] Domain exceptions hierarchy (domain/errors.py) +- [x] TestEntity with state machine (domain/test_entity.py) +- [x] Unit of Work (domain/unit_of_work.py) +- [x] Error handler middleware (middleware/error_handler.py) +- [x] Redis-backed token blacklist (auth.py) +- [x] CI pipeline (.github/workflows/ci.yml) +- [x] Heatmap service extracted (services/heatmap_service.py) +- [x] Scoring bulk queries (bulk_technique_scores) +- [x] Architecture skill file (.cursor/rules/aegis-architecture.md) +- [x] Agent validation script (scripts/agent_validate_backend.sh) + +## Backlog + +- [ ] Application layer use cases +- [ ] Migrate fat routers to use repositories +- [ ] Scoring config persistence (DB instead of mutable settings) +- [ ] Structured JSON logging +- [ ] Frontend type generation from OpenAPI