docs: update architecture analysis and tech debt docs to reflect resolved items
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

This commit is contained in:
2026-02-18 19:27:52 +01:00
parent f41b8fd8c2
commit 0b65f51d1c
5 changed files with 3131 additions and 0 deletions

953
docs/TARGET_ARCHITECTURE.md Normal file
View File

@@ -0,0 +1,953 @@
# Aegis — Target Architecture: Clean Modular Monolith
> **Author:** Architecture review
> **Date:** February 11, 2026 (updated February 18, 2026)
> **Status:** In Progress — foundational layers implemented
> **Depends on:** ARCHITECTURAL_ANALYSIS.md, DEPENDENCY_ANALYSIS.md, TECH_DEBT_AND_RISKS.md
>
> **Implementation Progress (Feb 18, 2026):**
> - ✅ Domain exceptions hierarchy (`domain/errors.py`, `domain/exceptions.py`)
> - ✅ Error handler middleware (`middleware/error_handler.py`)
> - ✅ TestEntity with full state machine (`domain/test_entity.py`)
> - ✅ TechniqueEntity with status recalculation (`domain/entities/technique.py`)
> - ✅ Value objects: MitreId, ScoringWeights (`domain/value_objects/`)
> - ✅ Repository ports/protocols (`domain/ports/repositories/`)
> - ✅ SQLAlchemy repository implementations (`infrastructure/persistence/repositories/`)
> - ✅ ORM-Entity mappers (`infrastructure/persistence/mappers/`)
> - ✅ FastAPI dependency wiring (`dependencies/repositories.py`)
> - ✅ Unit of Work (`domain/unit_of_work.py`)
> - ✅ Redis-backed token blacklist (`infrastructure/redis_client.py`)
> - ✅ CI pipeline (`.github/workflows/ci.yml`)
> - ✅ 326 tests passing (domain unit tests + integration tests + API tests)
> - ✅ Architecture rules file (`.cursor/rules/aegis-architecture.md`)
>
> **Remaining:** Application layer use cases, Campaign/Compliance domain entities, router migration to repositories, scoring config persistence, structured logging.
---
## Table of Contents
1. [Target Architecture Overview](#1-target-architecture-overview)
2. [Layer Definitions and Responsibilities](#2-layer-definitions-and-responsibilities)
3. [Module Boundaries](#3-module-boundaries)
4. [Dependency Rules](#4-dependency-rules)
5. [Top 5 Modules to Refactor First](#5-top-5-modules-to-refactor-first)
6. [Repository Pattern for Technique](#6-repository-pattern-for-technique)
---
## 1. Target Architecture Overview
### Design Philosophy
The target architecture applies Clean Architecture principles to a modular monolith. This is not a microservices migration — it is an internal reorganization of the existing codebase to enforce separation of concerns, dependency inversion, and testability while maintaining a single deployable unit.
### Target Directory Structure
```
backend/
└── app/
├── main.py # FastAPI app bootstrap (minimal)
├── config.py # Pydantic Settings (read-only)
├── domain/ # ★ DOMAIN LAYER
│ ├── __init__.py
│ │
│ ├── enums.py # TechniqueStatus, TestState, TeamSide, TestResult
│ │ # (moved from models/enums.py — these are domain concepts)
│ │
│ ├── exceptions.py # Domain exception hierarchy
│ │ # EntityNotFoundError
│ │ # DuplicateEntityError
│ │ # InvalidTransitionError
│ │ # InvalidOperationError
│ │ # AuthorizationError
│ │
│ ├── events.py # Domain event definitions (data classes)
│ │ # TestStateChanged, TechniqueStatusRecalculated,
│ │ # CampaignCompleted, EvidenceUploaded
│ │
│ ├── entities/ # Rich domain entities with behavior
│ │ ├── __init__.py
│ │ ├── technique.py # TechniqueEntity: recalculate_status(), mark_reviewed()
│ │ ├── test.py # TestEntity: can_transition(), start_execution(),
│ │ │ # submit_red(), submit_blue(), validate(), reopen()
│ │ ├── campaign.py # CampaignEntity: add_test(), remove_test(), activate(),
│ │ │ # complete(), has_circular_dependency()
│ │ ├── user.py # UserEntity: has_role(), can_access()
│ │ ├── detection_rule.py # DetectionRuleEntity
│ │ ├── threat_actor.py # ThreatActorEntity
│ │ └── evidence.py # EvidenceEntity: validate_upload_permission()
│ │
│ ├── value_objects/ # Immutable, equality-by-value
│ │ ├── __init__.py
│ │ ├── mitre_id.py # MitreId: validated format (T1059, T1059.001)
│ │ ├── score.py # TechniqueScore, TacticScore, OrgScore (with breakdown)
│ │ └── scoring_weights.py # ScoringWeights: validated weight set (sum == 100)
│ │
│ └── ports/ # ★ INTERFACES — the contracts
│ ├── __init__.py
│ ├── repositories/ # Data access contracts (one per aggregate root)
│ │ ├── __init__.py
│ │ ├── technique_repository.py # TechniqueRepository protocol
│ │ ├── test_repository.py # TestRepository protocol
│ │ ├── campaign_repository.py # CampaignRepository protocol
│ │ ├── user_repository.py # UserRepository protocol
│ │ ├── detection_rule_repository.py
│ │ ├── threat_actor_repository.py
│ │ ├── evidence_repository.py
│ │ ├── audit_repository.py
│ │ ├── notification_repository.py
│ │ └── snapshot_repository.py
│ │
│ └── services/ # External capability contracts
│ ├── __init__.py
│ ├── storage_port.py # StoragePort: upload_file(), get_download_url()
│ ├── event_publisher_port.py # EventPublisherPort: publish(DomainEvent)
│ └── token_blacklist_port.py # TokenBlacklistPort: revoke(), is_revoked()
├── application/ # ★ APPLICATION LAYER
│ ├── __init__.py
│ │
│ ├── interfaces/ # Application-level contracts
│ │ ├── __init__.py
│ │ └── unit_of_work.py # UnitOfWork protocol: commit(), rollback(), __enter__/__exit__
│ │
│ ├── dto/ # Input/output data structures for use cases
│ │ ├── __init__.py # Pure data classes — no ORM, no Pydantic
│ │ ├── technique_dto.py # TechniqueListFilters, TechniqueResult, TechniqueDetail
│ │ ├── test_dto.py # CreateTestInput, TestResult, TestTimeline
│ │ ├── scoring_dto.py # ScoreRequest, ScoreResult, ScoreHistoryResult
│ │ ├── heatmap_dto.py # HeatmapFilters, HeatmapLayer, NavigatorExport
│ │ ├── report_dto.py # CoverageReportResult, CsvExportResult
│ │ └── campaign_dto.py # CreateCampaignInput, CampaignProgress
│ │
│ └── use_cases/ # Orchestrators — one class per operation
│ ├── __init__.py
│ │
│ ├── techniques/
│ │ ├── list_techniques.py # ListTechniquesUseCase
│ │ ├── get_technique.py # GetTechniqueUseCase
│ │ ├── create_technique.py # CreateTechniqueUseCase
│ │ ├── update_technique.py # UpdateTechniqueUseCase
│ │ └── review_technique.py # ReviewTechniqueUseCase
│ │
│ ├── tests/
│ │ ├── create_test.py # CreateTestUseCase
│ │ ├── create_from_template.py # CreateFromTemplateUseCase
│ │ ├── start_execution.py # StartExecutionUseCase
│ │ ├── submit_red.py # SubmitRedUseCase
│ │ ├── submit_blue.py # SubmitBlueUseCase
│ │ ├── validate_test.py # ValidateTestUseCase
│ │ ├── reopen_test.py # ReopenTestUseCase
│ │ └── get_retest_chain.py # GetRetestChainUseCase
│ │
│ ├── scoring/
│ │ ├── calculate_technique_score.py
│ │ ├── calculate_tactic_score.py
│ │ ├── calculate_org_score.py
│ │ └── update_scoring_weights.py
│ │
│ ├── heatmap/
│ │ ├── generate_coverage_layer.py
│ │ ├── generate_actor_layer.py
│ │ ├── generate_detection_layer.py
│ │ └── export_navigator.py
│ │
│ ├── reports/
│ │ ├── generate_coverage_report.py
│ │ ├── generate_test_results_report.py
│ │ ├── generate_remediation_report.py
│ │ └── export_coverage_csv.py
│ │
│ └── campaigns/
│ ├── create_campaign.py
│ ├── manage_campaign_tests.py
│ ├── activate_campaign.py
│ ├── generate_from_threat_actor.py
│ └── schedule_recurring.py
├── infrastructure/ # ★ INFRASTRUCTURE LAYER
│ ├── __init__.py
│ │
│ ├── persistence/
│ │ ├── __init__.py
│ │ ├── database.py # Engine, SessionLocal, get_db() — unchanged
│ │ │
│ │ ├── orm/ # SQLAlchemy models (table mapping ONLY)
│ │ │ ├── __init__.py # Re-export all models for Alembic
│ │ │ ├── base.py # declarative_base()
│ │ │ ├── technique_model.py # Current models/technique.py — unchanged
│ │ │ ├── test_model.py # Current models/test.py — unchanged
│ │ │ ├── campaign_model.py
│ │ │ ├── user_model.py
│ │ │ └── ... # All 18 current models, untouched
│ │ │
│ │ ├── repositories/ # Concrete repository implementations
│ │ │ ├── __init__.py
│ │ │ ├── sa_technique_repository.py
│ │ │ ├── sa_test_repository.py
│ │ │ ├── sa_campaign_repository.py
│ │ │ └── ... # One per domain port
│ │ │
│ │ ├── unit_of_work.py # SQLAlchemy UoW (wraps Session commit/rollback)
│ │ │
│ │ └── mappers/ # ORM Model ↔ Domain Entity converters
│ │ ├── __init__.py
│ │ ├── technique_mapper.py # to_entity(model) → TechniqueEntity
│ │ │ # to_model(entity) → TechniqueORM
│ │ ├── test_mapper.py
│ │ └── ...
│ │
│ ├── storage/
│ │ └── minio_storage.py # Implements StoragePort (current storage.py logic)
│ │
│ ├── auth/
│ │ ├── jwt_service.py # Token creation and verification
│ │ └── redis_token_blacklist.py # Implements TokenBlacklistPort
│ │
│ ├── external/ # External data source adapters
│ │ ├── mitre_taxii_adapter.py # Current mitre_sync_service.py
│ │ ├── atomic_red_team_adapter.py # Current atomic_import_service.py
│ │ ├── sigma_adapter.py
│ │ ├── elastic_adapter.py
│ │ ├── caldera_adapter.py
│ │ ├── d3fend_adapter.py
│ │ ├── lolbas_adapter.py
│ │ └── threat_actor_adapter.py
│ │
│ ├── events/
│ │ └── sync_event_publisher.py # Implements EventPublisherPort (in-process dispatch)
│ │
│ ├── cache/
│ │ └── redis_score_cache.py # Replaces current in-memory score_cache.py
│ │
│ └── jobs/
│ └── scheduler.py # APScheduler setup (current mitre_sync_job.py)
└── presentation/ # ★ PRESENTATION LAYER
├── __init__.py
├── api/
│ └── v1/ # Thin routers — HTTP mapping only
│ ├── __init__.py
│ ├── techniques.py # Injects use case via Depends(), maps exceptions
│ ├── tests.py
│ ├── campaigns.py
│ ├── heatmap.py
│ ├── reports.py
│ ├── scores.py
│ ├── metrics.py
│ └── ... # All 21 current routers, thinned
├── schemas/ # Pydantic models (request/response shapes)
│ ├── __init__.py # Current schemas/ — unchanged
│ ├── technique_schema.py
│ ├── test_schema.py
│ └── ...
├── dependencies/ # FastAPI Depends() wiring
│ ├── __init__.py
│ ├── auth.py # Current dependencies/auth.py
│ ├── repositories.py # get_technique_repo(), get_test_repo(), ...
│ └── use_cases.py # get_create_technique_use_case(), ...
├── middleware/
│ ├── error_handler.py # Maps domain exceptions → HTTP responses
│ └── rate_limiter.py
└── mappers/ # Pydantic schema ↔ application DTO converters
├── __init__.py
├── technique_mapper.py # TechniqueCreate → CreateTechniqueInput
│ # TechniqueResult → TechniqueOut
└── ...
```
---
## 2. Layer Definitions and Responsibilities
### Domain Layer — The Core
```
Depends on: NOTHING (zero imports from outside domain/)
```
| Component | Responsibility | What It Must NOT Do |
|-----------|---------------|---------------------|
| **Entities** | Encapsulate business rules, invariants, and state transitions. A `TestEntity` knows which transitions are valid. A `TechniqueEntity` can recalculate its own status from a list of test results. | Import SQLAlchemy, FastAPI, Pydantic, or any framework. Access the database. Make HTTP calls. |
| **Value Objects** | Represent domain concepts with value equality. `MitreId("T1059.001")` validates format on construction. `ScoringWeights` ensures the 5 weights sum to 100. | Be mutable. Have identity (no primary key). |
| **Enums** | Define domain vocabularies: `TechniqueStatus`, `TestState`, `TeamSide`, `TestResult`. | Change based on infrastructure (these are the same enums currently in `models/enums.py`). |
| **Exceptions** | Domain-specific error conditions. `InvalidTransitionError(current=draft, target=validated)`. | Reference HTTP status codes. Know about FastAPI. |
| **Events** | Facts about things that happened. `TestStateChanged(test_id, old_state, new_state, user_id, timestamp)`. | Carry behavior. Know how they will be handled. |
| **Ports** | Interfaces (Protocol) defining what the domain needs from the outside world. `TechniqueRepository`, `StoragePort`, `EventPublisherPort`. | Contain implementations. Reference concrete classes. |
### Application Layer — The Orchestrators
```
Depends on: domain/ only
```
| Component | Responsibility | What It Must NOT Do |
|-----------|---------------|---------------------|
| **Use Cases** | Orchestrate a single business operation by calling domain entities and ports. `CreateTechniqueUseCase` validates uniqueness via `TechniqueRepository`, constructs a `TechniqueEntity`, saves it, and publishes an event. | Know about HTTP, Pydantic, SQLAlchemy, or FastAPI. Contain business rules (those belong in entities). Contain queries (those belong in repositories). |
| **DTOs** | Plain data containers for use case input/output. No validation logic, no ORM awareness. | Inherit from Pydantic `BaseModel`. Reference ORM models. |
| **Unit of Work** | Interface for transaction boundaries. Use cases call `uow.commit()` or `uow.rollback()`. | Know about SQLAlchemy sessions. |
### Infrastructure Layer — The Implementations
```
Depends on: domain/ (implements ports), application/ (implements UoW)
```
| Component | Responsibility | What It Must NOT Do |
|-----------|---------------|---------------------|
| **ORM Models** | Map Python classes to database tables. Unchanged from current `models/`. | Contain business logic. Be passed outside the infrastructure layer (use mappers to convert to domain entities). |
| **Repositories** | Implement port interfaces using SQLAlchemy. `SATechniqueRepository.find_by_mitre_id()` translates to `db.query(Technique).filter(...)`. | Be called by anything outside the application layer. Contain business decisions. |
| **Mappers** | Convert between ORM models and domain entities. `TechniqueMapper.to_entity(orm_model) → TechniqueEntity`. | Contain business logic. Be a 1:1 field copy (they handle relationship loading and value object construction). |
| **External Adapters** | Implement data source integrations. Download ZIPs, parse YAML/TOML/STIX, return domain-compatible data. | Be called from routers directly. Know about HTTP responses. |
| **Storage, Cache, Auth** | Implement service ports. `MinioStorage` implements `StoragePort`. `RedisTokenBlacklist` implements `TokenBlacklistPort`. | Leak implementation details (Redis keys, S3 bucket names) outside the infrastructure layer. |
### Presentation Layer — The HTTP Boundary
```
Depends on: application/ (calls use cases), domain/ (reads exceptions)
```
| Component | Responsibility | What It Must NOT Do |
|-----------|---------------|---------------------|
| **Routers** | Map HTTP requests to use case calls. Parse path/query/body parameters, call the use case, return the response. 10-20 lines per endpoint maximum. | Contain business logic. Execute database queries. Build complex data structures. |
| **Schemas** | Pydantic models for HTTP request/response validation. Unchanged from current `schemas/`. | Be used inside use cases or domain entities. |
| **Dependencies** | Wire use cases via FastAPI `Depends()`. Construct repositories, inject into use cases, return. | Contain logic beyond wiring. |
| **Error Handler** | Map domain exceptions to HTTP responses. `EntityNotFoundError → 404`, `InvalidTransitionError → 400`, `AuthorizationError → 403`. | Know about business rules. |
| **Mappers** | Convert between Pydantic schemas and application DTOs. | Contain business logic. |
---
## 3. Module Boundaries
The monolith is organized into domain modules. Each module owns its entities, repositories, and use cases. Cross-module communication goes through application-layer use cases or domain events — never through direct repository access.
```
┌─────────────────────────────────────────────────────────────────┐
│ Domain Modules │
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌─────────────┐ │
│ │ Technique │ │ Test │ │ Campaign │ │ Scoring │ │
│ │ │ │ │ │ │ │ │ │
│ │ entity │ │ entity │ │ entity │ │ value objs │ │
│ │ repo port │ │ repo port │ │ repo port │ │ use cases │ │
│ │ use cases │ │ use cases │ │ use cases │ │ (reads from │ │
│ │ │ │ │ │ │ │ other repos)│ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ ┌─────┴──────────────┴──────────────┴───────────────┴──────┐ │
│ │ Shared Domain: enums, exceptions, events │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌─────────────┐ │
│ │ Heatmap │ │ Reports │ │Compliance │ │ Threat Intel│ │
│ │ │ │ │ │ │ │ │ │
│ │ use cases │ │ use cases │ │ use cases │ │ adapters │ │
│ │ (reads │ │ (reads │ │ (reads │ │ use cases │ │
│ │ repos) │ │ repos) │ │ repos) │ │ │ │
│ └───────────┘ └───────────┘ └───────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
**Cross-module rule:** A use case in the Scoring module may read from `TechniqueRepository` and `TestRepository` (both defined as ports in the domain layer). It must NOT import the SQLAlchemy model directly.
---
## 4. Dependency Rules
```
┌─────────────────┐
│ Presentation │ Knows: FastAPI, Pydantic, HTTP
│ (routers, │ Depends on: Application, Domain
│ schemas) │
└────────┬─────────┘
│ calls use cases
┌────────▼─────────┐
│ Application │ Knows: Domain entities, ports, DTOs
│ (use cases) │ Depends on: Domain ONLY
└────────┬─────────┘
│ uses entities + ports
┌────────▼─────────┐
│ Domain │ Knows: NOTHING external
│ (entities, │ Depends on: NOTHING
│ ports, enums) │ (this is the core)
└────────▲─────────┘
│ implements ports
┌────────┴─────────┐
│ Infrastructure │ Knows: SQLAlchemy, boto3, Redis, requests
│ (repositories, │ Depends on: Domain (ports), Application (UoW)
│ adapters) │
└──────────────────┘
```
### Import Rules (Enforceable by Linting)
| From \ To | domain/ | application/ | infrastructure/ | presentation/ |
|-----------|---------|-------------|----------------|--------------|
| **domain/** | Self only | FORBIDDEN | FORBIDDEN | FORBIDDEN |
| **application/** | ALLOWED | Self only | FORBIDDEN | FORBIDDEN |
| **infrastructure/** | ALLOWED (ports) | ALLOWED (UoW) | Self only | FORBIDDEN |
| **presentation/** | ALLOWED (exceptions) | ALLOWED (use cases, DTOs) | ALLOWED (wiring only, in dependencies/) | Self only |
---
## 5. Top 5 Modules to Refactor First
### Selection Criteria
Each module is scored on three axes from the DEPENDENCY_ANALYSIS.md findings:
| Axis | Weight | Measurement |
|------|--------|-------------|
| **Complexity** | 35% | Lines of code, number of DB operations, number of models imported, number of concerns mixed |
| **Technical Risk** | 35% | N+1 queries, security issues, silent exception swallowing, framework coupling, scalability bottleneck |
| **Business Impact** | 30% | Centrality to the domain (how many other modules depend on it), user-facing frequency, correctness criticality |
---
### #1: Test Workflow Module
**Refactor scope:** `routers/tests.py` (664 lines, 30 db ops) + `services/test_workflow_service.py` (456 lines, 13 db ops) + `services/status_service.py` (47 lines)
| Axis | Score | Evidence |
|------|-------|----------|
| Complexity | **10/10** | 664-line router with 15+ endpoints. Mixes CRUD, template instantiation, timeline queries, and workflow delegation. The workflow service itself is 456 lines with a state machine, notifications, and audit logging. |
| Technical Risk | **10/10** | `test_workflow_service` imports `FastAPI.HTTPException` — the most severe framework coupling in the codebase. 4 `except Exception: pass` blocks silently swallow notification failures. No way to unit test the state machine without a database session. |
| Business Impact | **10/10** | The Red/Blue validation workflow IS the core product. Every user role interacts with tests daily. A state transition bug could invalidate an entire assessment. 5 other modules depend on test data (scoring, heatmap, reports, metrics, campaigns). |
**Why first:** This module contains the single most important business logic in Aegis (the test state machine), yet it has the most severe coupling problems (HTTPException in domain logic, swallowed exceptions). Extracting a `TestEntity` with the state machine as a domain object unlocks pure unit testing of the most critical business rules.
**What to extract:**
- `TestEntity` with `can_transition()`, `start_execution()`, `submit_red()`, `submit_blue()`, `validate()`, `reopen()``domain/entities/test.py`
- `InvalidTransitionError`, `EntityNotFoundError``domain/exceptions.py`
- `TestRepository` protocol → `domain/ports/repositories/test_repository.py`
- One use case per state transition → `application/use_cases/tests/`
- Remove all `HTTPException` from services
- Replace `except Exception: pass` with event-based notification dispatch
---
### #2: Scoring Module
**Refactor scope:** `services/scoring_service.py` (468 lines, 17 db ops) + `services/score_cache.py` + `routers/scores.py` (2 db ops) + `services/operational_metrics_service.py` (21 db ops)
| Axis | Score | Evidence |
|------|-------|----------|
| Complexity | **9/10** | Multi-dimensional scoring algorithm reading from 7 different models. 5 configurable weights. Tactic, actor, and org scores compound technique scores. Operational metrics add MTTD/MTTR calculations with audit log queries. |
| Technical Risk | **9/10** | **SR-001 from risk registry:** Org score generates ~3,500 DB queries (N+1 pattern). Settings mutated at runtime (thread-unsafe). In-memory cache does not scale across workers. Operational metrics N+1 on audit logs adds ~1,000 more queries. |
| Business Impact | **9/10** | Scores drive executive dashboards, compliance reports, and snapshot history. Incorrect scores misrepresent organizational security posture. Scoring weights mutability without persistence means config is lost on restart. |
**Why second:** Scoring is the second most critical domain concept and the most severe scalability bottleneck. Refactoring it introduces the repository pattern for batch queries and moves scoring weights to a persistent, immutable configuration.
**What to extract:**
- `TechniqueScore`, `TacticScore`, `OrgScore` value objects → `domain/value_objects/score.py`
- `ScoringWeights` value object with validation → `domain/value_objects/scoring_weights.py`
- Scoring algorithm as pure functions operating on domain objects → `application/use_cases/scoring/`
- Batch query methods in repositories → `TechniqueRepository.find_all_with_test_counts()`
- Redis-backed cache → `infrastructure/cache/`
- Persist weights in DB → `ScoringConfigRepository`
---
### #3: Heatmap Module
**Refactor scope:** `routers/heatmap.py` (528 lines, 13 db ops, 0 service delegation)
| Axis | Score | Evidence |
|------|-------|----------|
| Complexity | **9/10** | 528 lines in a single router file. Imports 10 models from 6 different domains. Mixes HTTP handling, complex multi-table queries, color mapping algorithms, ATT&CK Navigator JSON serialization, and streaming export — all in one file with zero delegation. |
| Technical Risk | **8/10** | **SR-003 from risk registry:** 1,400+ queries per request (2 per technique × 700). No caching. Full table scan. Every heatmap page load hammers the database. Most-visited view in the platform. |
| Business Impact | **8/10** | The ATT&CK heatmap is the primary visualization — it is the first thing executives see. Navigator export is used for external reporting and audit evidence. Incorrect heatmap data directly impacts security decision-making. |
**Why third:** This is the purest "fat controller" in the codebase — 528 lines of business logic, queries, and serialization with zero abstraction. It is also the most-visited page and the second-worst scalability bottleneck. Extracting it demonstrates the pattern for all other fat routers.
**What to extract:**
- Layer generation logic → `application/use_cases/heatmap/generate_coverage_layer.py` etc.
- Navigator export format → `application/use_cases/heatmap/export_navigator.py`
- Color mapping → `domain/value_objects/` or utility in application layer
- Batch metadata queries → `TechniqueRepository.find_all_with_coverage_metadata()`
- Router reduced from 528 lines to ~80 (5 endpoints × ~15 lines each)
---
### #4: Campaign Module
**Refactor scope:** `routers/campaigns.py` (36 db ops) + `services/campaign_service.py` (10 db ops, imports HTTPException) + `services/campaign_scheduler_service.py` (8 db ops)
| Axis | Score | Evidence |
|------|-------|----------|
| Complexity | **8/10** | Router has 36 db operations — the highest count of any router. Campaign lifecycle spans creation, test management, activation, completion, scheduling, and threat actor generation. Three files with partially overlapping responsibilities. |
| Technical Risk | **7/10** | `campaign_service.py` imports `HTTPException` (framework coupling). Scheduler creates campaigns in background jobs with its own session. Circular dependency detection logic is complex and untested (no campaign router tests exist). |
| Business Impact | **8/10** | Campaigns organize test execution for entire threat actor profiles. A bug in campaign scheduling or circular dependency detection could spawn infinite campaigns or skip critical test coverage. Campaigns drive the operational workflow for Red/Blue leads. |
**Why fourth:** The campaign module has the most scattered responsibilities (36 db ops in router + service + scheduler) and the second instance of HTTPException in a service. It is a natural candidate after tests, scoring, and heatmap because it depends on both test and technique entities, testing the cross-module communication pattern.
**What to extract:**
- `CampaignEntity` with `add_test()`, `activate()`, `complete()`, `has_circular_dependency()``domain/entities/campaign.py`
- `CampaignRepository` protocol → `domain/ports/repositories/`
- Use cases for lifecycle operations → `application/use_cases/campaigns/`
- Remove `HTTPException` from `campaign_service.py`
- Campaign scheduling as infrastructure concern → `infrastructure/jobs/`
---
### #5: Reports & Metrics Module
**Refactor scope:** `routers/reports.py` (273 lines, 6 db ops) + `routers/metrics.py` (316 lines, 12 db ops) + `routers/compliance.py` (~350 lines, 13 db ops)
| Axis | Score | Evidence |
|------|-------|----------|
| Complexity | **8/10** | Three routers totaling ~940 lines with zero service delegation. Complex aggregation queries, CSV generation, in-memory data transformation, and compliance gap analysis — all inline in route handlers. |
| Technical Risk | **7/10** | **SR-004 from risk registry:** Reports load unbounded result sets (all techniques, all tests). N+1 per-technique test counts in reports. In-memory aggregation instead of SQL GROUP BY. No streaming for CSV export. Compliance calls `calculate_technique_score()` per technique per control — multiplicative N+1. |
| Business Impact | **7/10** | Reports and metrics are consumed by leads and executives for decision-making. Compliance reports map to regulatory requirements (NIST 800-53, CIS Controls). Incorrect metrics erode trust in the platform. |
**Why fifth:** These three routers share the same anti-pattern (fat controller with inline queries and aggregations) and the same fix (extract to application-layer use cases with repository-backed batch queries). Refactoring them as a group establishes the pattern for the remaining 8 routers that still have direct DB access.
**What to extract:**
- Report generation → `application/use_cases/reports/`
- Metrics calculation → `application/use_cases/metrics/` (or merge with scoring)
- Compliance gap analysis → `application/use_cases/compliance/`
- SQL-level aggregation in repositories → `TechniqueRepository.get_coverage_summary()`
- CSV streaming as infrastructure concern → `infrastructure/export/csv_writer.py`
---
### Refactor Priority Summary
```
Module Complexity Risk Impact Weighted Order
─────────────────────────────────────────────────────────
Test Workflow 10 10 10 10.0 #1
Scoring 9 9 9 9.0 #2
Heatmap 9 8 8 8.4 #3
Campaigns 8 7 8 7.7 #4
Reports & Metrics 8 7 7 7.4 #5
```
---
## 6. Repository Pattern for Technique
This section designs a concrete repository pattern for `Technique` that can be introduced **without breaking existing code**. The strategy is additive: new code uses the repository, old code continues working until incrementally migrated.
### 6.1. Domain Port — The Interface
```python
# domain/ports/repositories/technique_repository.py
from __future__ import annotations
import uuid
from typing import Protocol, runtime_checkable
from app.domain.enums import TechniqueStatus
@runtime_checkable
class TechniqueRepository(Protocol):
"""Port defining how the application accesses technique data.
This is a domain contract — implementations live in infrastructure/.
The domain layer NEVER imports the implementation.
"""
# ── Single-entity access ─────────────────────────────────────
def find_by_id(self, technique_id: uuid.UUID) -> TechniqueEntity | None:
"""Return a technique by primary key, or None."""
...
def find_by_mitre_id(self, mitre_id: str) -> TechniqueEntity | None:
"""Return a technique by its MITRE ATT&CK identifier (e.g. 'T1059.001')."""
...
def find_by_mitre_id_with_tests(self, mitre_id: str) -> TechniqueEntity | None:
"""Return a technique with its tests eagerly loaded."""
...
# ── List access ──────────────────────────────────────────────
def list_all(
self,
*,
tactic: str | None = None,
status: TechniqueStatus | None = None,
review_required: bool | None = None,
) -> list[TechniqueEntity]:
"""Return techniques matching the given filters, ordered by mitre_id."""
...
def list_by_tactic(self, tactic: str) -> list[TechniqueEntity]:
"""Return all techniques for a given tactic."""
...
def list_by_ids(self, ids: list[uuid.UUID]) -> list[TechniqueEntity]:
"""Return techniques matching a list of primary keys."""
...
# ── Batch queries (for scoring/heatmap performance) ──────────
def count_by_status(self) -> dict[TechniqueStatus, int]:
"""Return technique counts grouped by status_global.
Single SQL query — replaces the per-technique counting pattern."""
...
def find_all_with_test_counts(self) -> list[TechniqueWithCounts]:
"""Return all techniques with pre-aggregated test counts and
detection rule counts. Single query with subqueries — eliminates
the N+1 pattern in heatmap and scoring."""
...
# ── Mutations ────────────────────────────────────────────────
def save(self, technique: TechniqueEntity) -> TechniqueEntity:
"""Persist a new or updated technique. Returns the saved entity."""
...
def exists_by_mitre_id(self, mitre_id: str) -> bool:
"""Check existence without loading the full entity."""
...
```
**Key design decisions:**
- Uses `typing.Protocol` (structural subtyping) rather than `ABC` — no need for the implementation to explicitly inherit. This is idiomatic Python and works with `isinstance()` checks via `@runtime_checkable`.
- Methods return domain entities (`TechniqueEntity`), never ORM models.
- Batch methods (`count_by_status`, `find_all_with_test_counts`) are designed to eliminate the N+1 patterns identified in SR-001 and SR-003.
- No `Session` parameter — the session is an implementation detail of the SQLAlchemy repository.
### 6.2. Infrastructure Implementation — SQLAlchemy
```python
# infrastructure/persistence/repositories/sa_technique_repository.py
import uuid
from typing import NamedTuple
from sqlalchemy import func
from sqlalchemy.orm import Session, joinedload
from app.domain.enums import TechniqueStatus
from app.domain.entities.technique import TechniqueEntity
from app.domain.ports.repositories.technique_repository import TechniqueRepository
from app.infrastructure.persistence.orm.technique_model import Technique
from app.infrastructure.persistence.orm.test_model import Test
from app.infrastructure.persistence.orm.detection_rule_model import DetectionRule
from app.infrastructure.persistence.mappers.technique_mapper import TechniqueMapper
class TechniqueWithCounts(NamedTuple):
"""Pre-aggregated technique data for heatmap/scoring."""
entity: TechniqueEntity
test_count: int
validated_test_count: int
detection_rule_count: int
class SATechniqueRepository:
"""SQLAlchemy implementation of TechniqueRepository.
Receives a Session from the Unit of Work — does NOT create its own.
Does NOT call commit() — that is the Unit of Work's responsibility.
"""
def __init__(self, session: Session) -> None:
self._session = session
# ── Single-entity access ─────────────────────────────────────
def find_by_id(self, technique_id: uuid.UUID) -> TechniqueEntity | None:
model = self._session.query(Technique).filter(
Technique.id == technique_id
).first()
return TechniqueMapper.to_entity(model) if model else None
def find_by_mitre_id(self, mitre_id: str) -> TechniqueEntity | None:
model = self._session.query(Technique).filter(
Technique.mitre_id == mitre_id
).first()
return TechniqueMapper.to_entity(model) if model else None
def find_by_mitre_id_with_tests(self, mitre_id: str) -> TechniqueEntity | None:
model = (
self._session.query(Technique)
.options(joinedload(Technique.tests))
.filter(Technique.mitre_id == mitre_id)
.first()
)
return TechniqueMapper.to_entity_with_tests(model) if model else None
# ── List access ──────────────────────────────────────────────
def list_all(
self,
*,
tactic: str | None = None,
status: TechniqueStatus | None = None,
review_required: bool | None = None,
) -> list[TechniqueEntity]:
query = self._session.query(Technique)
if tactic is not None:
query = query.filter(Technique.tactic == tactic)
if status is not None:
query = query.filter(Technique.status_global == status)
if review_required is not None:
query = query.filter(Technique.review_required == review_required)
models = query.order_by(Technique.mitre_id).all()
return [TechniqueMapper.to_entity(m) for m in models]
def list_by_tactic(self, tactic: str) -> list[TechniqueEntity]:
models = (
self._session.query(Technique)
.filter(Technique.tactic == tactic)
.order_by(Technique.mitre_id)
.all()
)
return [TechniqueMapper.to_entity(m) for m in models]
def list_by_ids(self, ids: list[uuid.UUID]) -> list[TechniqueEntity]:
models = (
self._session.query(Technique)
.filter(Technique.id.in_(ids))
.all()
)
return [TechniqueMapper.to_entity(m) for m in models]
# ── Batch queries ────────────────────────────────────────────
def count_by_status(self) -> dict[TechniqueStatus, int]:
rows = (
self._session.query(
Technique.status_global,
func.count(Technique.id),
)
.group_by(Technique.status_global)
.all()
)
result = {s: 0 for s in TechniqueStatus}
for status_val, count in rows:
result[status_val] = count
return result
def find_all_with_test_counts(self) -> list[TechniqueWithCounts]:
"""Single query that replaces the N+1 pattern.
Instead of: for each technique → query tests → query rules
This does: one query with subqueries for counts.
"""
test_count_sq = (
self._session.query(
Test.technique_id,
func.count(Test.id).label("test_count"),
func.count(Test.id).filter(Test.state == "validated").label("validated_count"),
)
.group_by(Test.technique_id)
.subquery()
)
rule_count_sq = (
self._session.query(
DetectionRule.mitre_technique_id,
func.count(DetectionRule.id).label("rule_count"),
)
.group_by(DetectionRule.mitre_technique_id)
.subquery()
)
rows = (
self._session.query(
Technique,
func.coalesce(test_count_sq.c.test_count, 0),
func.coalesce(test_count_sq.c.validated_count, 0),
func.coalesce(rule_count_sq.c.rule_count, 0),
)
.outerjoin(test_count_sq, Technique.id == test_count_sq.c.technique_id)
.outerjoin(rule_count_sq, Technique.mitre_id == rule_count_sq.c.mitre_technique_id)
.order_by(Technique.mitre_id)
.all()
)
return [
TechniqueWithCounts(
entity=TechniqueMapper.to_entity(tech),
test_count=tc,
validated_test_count=vtc,
detection_rule_count=rc,
)
for tech, tc, vtc, rc in rows
]
# ── Mutations ────────────────────────────────────────────────
def save(self, technique: TechniqueEntity) -> TechniqueEntity:
model = TechniqueMapper.to_model(technique)
merged = self._session.merge(model)
self._session.flush() # flush to get generated values, but do NOT commit
return TechniqueMapper.to_entity(merged)
def exists_by_mitre_id(self, mitre_id: str) -> bool:
return (
self._session.query(Technique.id)
.filter(Technique.mitre_id == mitre_id)
.first()
) is not None
```
**Key design decisions:**
- **No `commit()`**: The repository flushes but never commits. Transaction control belongs to the Unit of Work, which the use case manages.
- **Returns domain entities**: The mapper converts ORM models to domain entities at the repository boundary. No ORM model ever crosses into the application or domain layers.
- **Batch method**: `find_all_with_test_counts()` replaces the N+1 pattern with subqueries — reducing 1,400+ queries to 1 for the heatmap.
### 6.3. Injection into a Use Case
```python
# presentation/dependencies/repositories.py
from fastapi import Depends
from sqlalchemy.orm import Session
from app.domain.ports.repositories.technique_repository import TechniqueRepository
from app.infrastructure.persistence.database import get_db
from app.infrastructure.persistence.repositories.sa_technique_repository import (
SATechniqueRepository,
)
def get_technique_repository(
db: Session = Depends(get_db),
) -> TechniqueRepository:
"""FastAPI dependency that provides a TechniqueRepository.
Wiring lives ONLY in the presentation layer — the use case
never knows it's getting a SQLAlchemy implementation.
"""
return SATechniqueRepository(db)
```
```python
# presentation/dependencies/use_cases.py
from fastapi import Depends
from app.application.use_cases.techniques.create_technique import CreateTechniqueUseCase
from app.domain.ports.repositories.technique_repository import TechniqueRepository
from app.presentation.dependencies.repositories import get_technique_repository
def get_create_technique_use_case(
technique_repo: TechniqueRepository = Depends(get_technique_repository),
) -> CreateTechniqueUseCase:
return CreateTechniqueUseCase(technique_repo=technique_repo)
```
```python
# application/use_cases/techniques/create_technique.py
import uuid
from app.domain.entities.technique import TechniqueEntity
from app.domain.exceptions import DuplicateEntityError
from app.domain.ports.repositories.technique_repository import TechniqueRepository
from app.application.dto.technique_dto import CreateTechniqueInput, TechniqueResult
class CreateTechniqueUseCase:
"""Application use case: create a new MITRE ATT&CK technique.
This class knows NOTHING about:
- FastAPI, HTTP, Pydantic
- SQLAlchemy, PostgreSQL
- How the repository is implemented
"""
def __init__(self, technique_repo: TechniqueRepository) -> None:
self._repo = technique_repo
def execute(self, input: CreateTechniqueInput, user_id: uuid.UUID) -> TechniqueResult:
# Business rule: mitre_id must be unique
if self._repo.exists_by_mitre_id(input.mitre_id):
raise DuplicateEntityError("Technique", "mitre_id", input.mitre_id)
# Create domain entity
technique = TechniqueEntity.create(
mitre_id=input.mitre_id,
name=input.name,
description=input.description,
tactic=input.tactic,
platforms=input.platforms,
)
# Persist through repository
saved = self._repo.save(technique)
# Return application DTO
return TechniqueResult.from_entity(saved)
```
```python
# presentation/api/v1/techniques.py (refactored — thin router)
from fastapi import APIRouter, Depends, status
from app.application.use_cases.techniques.create_technique import CreateTechniqueUseCase
from app.domain.exceptions import DuplicateEntityError, EntityNotFoundError
from app.presentation.dependencies.auth import get_current_user, require_role
from app.presentation.dependencies.use_cases import get_create_technique_use_case
from app.presentation.schemas.technique_schema import TechniqueCreate, TechniqueOut
router = APIRouter(prefix="/techniques", tags=["techniques"])
@router.post("", response_model=TechniqueOut, status_code=status.HTTP_201_CREATED)
def create_technique(
payload: TechniqueCreate,
use_case: CreateTechniqueUseCase = Depends(get_create_technique_use_case),
current_user = Depends(require_role("admin")),
):
"""Create a new technique.
This router:
- Receives the HTTP request (Pydantic validates it)
- Calls the use case
- The error handler middleware maps domain exceptions to HTTP responses
- Returns the result
Total: 5 lines of actual logic.
"""
result = use_case.execute(
input=CreateTechniqueInput(
mitre_id=payload.mitre_id,
name=payload.name,
description=payload.description,
tactic=payload.tactic,
platforms=payload.platforms,
),
user_id=current_user.id,
)
return result
```
### 6.4. Coexistence Strategy — No Big Bang
The repository can be introduced **alongside existing code** without breaking anything:
```
Phase 1: Create the repository interface and SQLAlchemy implementation.
Both old (direct db.query) and new (repository) code coexist.
New endpoints use the repository. Old endpoints are unchanged.
Phase 2: Migrate routers one endpoint at a time.
Replace db.query(Technique).filter(...) with repo.find_by_mitre_id().
Each migration is a small, reviewable PR.
Phase 3: When all consumers are migrated, the ORM model is no longer
imported outside infrastructure/. Enforce via linting rule.
```
At no point does existing functionality break. Both patterns access the same database, the same tables, the same session. The repository is an additive abstraction — it wraps what already exists.