Compare commits

..

2 Commits

Author SHA1 Message Date
kitos 611e10620e refactor(domain): introduce domain exceptions boundary
Aegis CI / lint-and-test (push) Has been cancelled
- Create domain/errors.py as canonical error hierarchy: DomainError, InvalidStateTransition, PermissionViolation, BusinessRuleViolation, EntityNotFoundError, DuplicateEntityError

- InvalidOperationError now inherits from BusinessRuleViolation for semantic consistency

- Convert domain/exceptions.py to backward-compatible re-export shim with legacy aliases (DomainException, InvalidTransitionError, AuthorizationError)

- Update error_handler.py to import from domain/errors.py and map all new error types

- Update main.py to register DomainError (new base) as the exception handler root
2026-02-18 13:44:47 +01:00
kitos 55dba1e00a db: enforce unique constraint on test_detection_results
- Add UniqueConstraint(test_id, detection_rule_id) named uq_tdr_test_rule to TestDetectionResult model

- Alembic b025: safely deduplicate existing rows before creating constraint
2026-02-18 13:20:28 +01:00
6 changed files with 173 additions and 78 deletions
@@ -0,0 +1,41 @@
"""add_unique_test_detection_result
Enforce one evaluation per (test, detection_rule) pair. Before creating
the constraint, duplicate rows (if any) are collapsed so the migration
never fails on an existing database.
Revision ID: b025uqtdr
Revises: b024critidx
Create Date: 2026-02-18 14:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
revision: str = "b025uqtdr"
down_revision: Union[str, None] = "b024critidx"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Remove duplicates keeping the most recently evaluated row
op.execute("""
DELETE FROM test_detection_results
WHERE id NOT IN (
SELECT DISTINCT ON (test_id, detection_rule_id) id
FROM test_detection_results
ORDER BY test_id, detection_rule_id, evaluated_at DESC NULLS LAST
)
""")
op.create_unique_constraint(
"uq_tdr_test_rule",
"test_detection_results",
["test_id", "detection_rule_id"],
)
def downgrade() -> None:
op.drop_constraint("uq_tdr_test_rule", "test_detection_results", type_="unique")
+96
View File
@@ -0,0 +1,96 @@
"""Canonical domain error hierarchy for Aegis.
Every service-layer error should be a subclass of :class:`DomainError`.
The global exception handler in ``app.middleware.error_handler`` maps
each concrete subclass to an appropriate HTTP status code so that
services never depend on FastAPI.
Existing code that imports from ``app.domain.exceptions`` continues to
work — that module re-exports everything defined here.
"""
from __future__ import annotations
class DomainError(Exception):
"""Base for all domain errors."""
def __init__(self, message: str, *, code: str = "DOMAIN_ERROR") -> None:
self.message = message
self.code = code
super().__init__(message)
# ── Entity lifecycle ──────────────────────────────────────────────────
class EntityNotFoundError(DomainError):
"""A requested entity does not exist."""
def __init__(self, entity: str, identifier: str) -> None:
super().__init__(f"{entity} not found: {identifier}", code="NOT_FOUND")
self.entity = entity
self.identifier = identifier
class DuplicateEntityError(DomainError):
"""Creating an entity that already exists."""
def __init__(self, entity: str, field: str, value: str) -> None:
super().__init__(
f"{entity} with {field}='{value}' already exists",
code="DUPLICATE",
)
# ── State machine ────────────────────────────────────────────────────
class InvalidStateTransition(DomainError):
"""A state-machine transition is not allowed."""
def __init__(
self,
current_state: str,
target_state: str,
valid_transitions: list[str] | None = None,
) -> None:
msg = f"Cannot transition from '{current_state}' to '{target_state}'"
if valid_transitions:
msg += f". Valid transitions: {valid_transitions}"
super().__init__(msg, code="INVALID_TRANSITION")
self.current_state = current_state
self.target_state = target_state
self.valid_transitions = valid_transitions or []
# ── Business rules ────────────────────────────────────────────────────
class BusinessRuleViolation(DomainError):
"""An operation violates a business invariant."""
def __init__(self, message: str) -> None:
super().__init__(message, code="BUSINESS_RULE_VIOLATION")
class InvalidOperationError(BusinessRuleViolation):
"""An operation is invalid in the current context.
Kept for backward compatibility; new code should prefer
:class:`BusinessRuleViolation` directly.
"""
def __init__(self, message: str) -> None:
super().__init__(message)
self.code = "INVALID_OPERATION"
# ── Authorization ────────────────────────────────────────────────────
class PermissionViolation(DomainError):
"""The user lacks permissions for an action."""
def __init__(self, message: str = "Insufficient permissions") -> None:
super().__init__(message, code="FORBIDDEN")
+17 -62
View File
@@ -1,67 +1,22 @@
"""Domain exceptions for Aegis business logic. """Backward-compatible re-exports from :mod:`app.domain.errors`.
These exceptions are raised by service-layer code and automatically All domain errors now live in ``errors.py``. This module preserves the
mapped to HTTP responses by the error-handler middleware registered old import paths so that existing code keeps working without changes::
in ``app.main``. This keeps the service layer free from any HTTP
or framework coupling. from app.domain.exceptions import InvalidTransitionError # still works
""" """
from app.domain.errors import ( # noqa: F401
class DomainException(Exception): BusinessRuleViolation,
"""Base for all domain exceptions.""" DomainError,
DuplicateEntityError,
def __init__(self, message: str, code: str = "DOMAIN_ERROR"): EntityNotFoundError,
self.message = message InvalidOperationError,
self.code = code InvalidStateTransition,
super().__init__(message) PermissionViolation,
class EntityNotFoundError(DomainException):
"""Raised when a requested entity does not exist."""
def __init__(self, entity: str, identifier: str):
super().__init__(f"{entity} not found: {identifier}", "NOT_FOUND")
self.entity = entity
self.identifier = identifier
class DuplicateEntityError(DomainException):
"""Raised when creating an entity that already exists."""
def __init__(self, entity: str, field: str, value: str):
super().__init__(
f"{entity} with {field}='{value}' already exists",
"DUPLICATE",
) )
# Legacy aliases — old name → new name
class InvalidTransitionError(DomainException): DomainException = DomainError
"""Raised when a state-machine transition is not allowed.""" InvalidTransitionError = InvalidStateTransition
AuthorizationError = PermissionViolation
def __init__(
self,
current_state: str,
target_state: str,
valid_transitions: list[str] | None = None,
):
msg = f"Cannot transition from '{current_state}' to '{target_state}'"
if valid_transitions:
msg += f". Valid transitions: {valid_transitions}"
super().__init__(msg, "INVALID_TRANSITION")
self.current_state = current_state
self.target_state = target_state
self.valid_transitions = valid_transitions or []
class InvalidOperationError(DomainException):
"""Raised when an operation is invalid in the current context."""
def __init__(self, message: str):
super().__init__(message, "INVALID_OPERATION")
class AuthorizationError(DomainException):
"""Raised when the user lacks permissions for an action."""
def __init__(self, message: str = "Insufficient permissions"):
super().__init__(message, "FORBIDDEN")
+2 -2
View File
@@ -38,7 +38,7 @@ from app.routers import professional_reports as professional_reports_router
from app.routers import analytics as analytics_router from app.routers import analytics as analytics_router
from app.routers import advanced_metrics as advanced_metrics_router from app.routers import advanced_metrics as advanced_metrics_router
from app.routers import osint as osint_router from app.routers import osint as osint_router
from app.domain.exceptions import DomainException from app.domain.errors import DomainError
from app.middleware.error_handler import domain_exception_handler from app.middleware.error_handler import domain_exception_handler
from app.storage import ensure_bucket_exists from app.storage import ensure_bucket_exists
from app.jobs.mitre_sync_job import start_scheduler, scheduler from app.jobs.mitre_sync_job import start_scheduler, scheduler
@@ -77,7 +77,7 @@ app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# ── Domain exception → HTTP mapping ────────────────────────────────────── # ── Domain exception → HTTP mapping ──────────────────────────────────────
app.add_exception_handler(DomainException, domain_exception_handler) app.add_exception_handler(DomainError, domain_exception_handler)
# ── CORS ────────────────────────────────────────────────────────────────── # ── CORS ──────────────────────────────────────────────────────────────────
from app.config import settings as _settings from app.config import settings as _settings
+14 -12
View File
@@ -1,41 +1,43 @@
"""Domain exception → HTTP response mapping. """Domain error → HTTP response mapping.
This module provides a single exception handler that converts This module provides a single exception handler that converts
domain-layer exceptions into structured JSON responses, keeping domain-layer errors into structured JSON responses, keeping
the service layer free from FastAPI's ``HTTPException``. the service layer free from FastAPI's ``HTTPException``.
""" """
from fastapi import Request from fastapi import Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from app.domain.exceptions import ( from app.domain.errors import (
AuthorizationError, BusinessRuleViolation,
DomainException, DomainError,
DuplicateEntityError, DuplicateEntityError,
EntityNotFoundError, EntityNotFoundError,
InvalidOperationError, InvalidOperationError,
InvalidTransitionError, InvalidStateTransition,
PermissionViolation,
) )
EXCEPTION_STATUS_MAP: dict[type[DomainException], int] = { EXCEPTION_STATUS_MAP: dict[type[DomainError], int] = {
EntityNotFoundError: 404, EntityNotFoundError: 404,
DuplicateEntityError: 409, DuplicateEntityError: 409,
InvalidTransitionError: 400, InvalidStateTransition: 400,
InvalidOperationError: 400, InvalidOperationError: 400,
AuthorizationError: 403, BusinessRuleViolation: 400,
PermissionViolation: 403,
} }
async def domain_exception_handler( async def domain_exception_handler(
request: Request, request: Request,
exc: DomainException, exc: DomainError,
) -> JSONResponse: ) -> JSONResponse:
"""Convert a :class:`DomainException` into a JSON error response.""" """Convert a :class:`DomainError` into a JSON error response."""
status_code = EXCEPTION_STATUS_MAP.get(type(exc), 400) status_code = EXCEPTION_STATUS_MAP.get(type(exc), 400)
content: dict = {"detail": exc.message, "code": exc.code} content: dict = {"detail": exc.message, "code": exc.code}
if isinstance(exc, InvalidTransitionError): if isinstance(exc, InvalidStateTransition):
content["current_state"] = exc.current_state content["current_state"] = exc.current_state
content["target_state"] = exc.target_state content["target_state"] = exc.target_state
content["valid_transitions"] = exc.valid_transitions content["valid_transitions"] = exc.valid_transitions
+2 -1
View File
@@ -7,7 +7,7 @@ rule as triggered / not triggered / not applicable, along with notes.
import uuid import uuid
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, String, Text, Boolean, DateTime, ForeignKey, Index from sqlalchemy import Column, String, Text, Boolean, DateTime, ForeignKey, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
@@ -52,4 +52,5 @@ class TestDetectionResult(Base):
__table_args__ = ( __table_args__ = (
Index('ix_tdr_test', 'test_id'), Index('ix_tdr_test', 'test_id'),
Index('ix_tdr_rule', 'detection_rule_id'), Index('ix_tdr_rule', 'detection_rule_id'),
UniqueConstraint('test_id', 'detection_rule_id', name='uq_tdr_test_rule'),
) )