Compare commits

..

33 Commits

Author SHA1 Message Date
kitos 4720193445 feat(phase-31): add campaign scheduling and recurring automation (T-233 to T-234) 2026-02-10 08:38:00 +01:00
kitos 5e3e61405d feat(phase-30): add coverage snapshots, temporal comparison and auto re-testing (T-230 to T-232) 2026-02-10 08:34:29 +01:00
kitos e69f9b78ff feat(phase-29): add compliance framework mapping, reports and UI (T-227 to T-229) 2026-02-09 18:41:24 +01:00
kitos 4a6e548632 feat(phase-28): add scoring system, operational metrics and executive dashboard (T-224 to T-226) 2026-02-09 17:24:44 +01:00
kitos 2b5ed16f97 feat(phase-27): add advanced ATT&CK Navigator-style heatmap with layers, filters and export (T-221 to T-223) 2026-02-09 17:16:59 +01:00
kitos 84ead52822 feat(phase-26): add Campaign models, endpoints, service with kill chain timeline UI (T-217 to T-220) 2026-02-09 16:52:52 +01:00
kitos 6530acf3ce feat(phase-25): add detection rule associations, checklist UI and evaluation workflow (T-215, T-216) 2026-02-09 16:44:35 +01:00
kitos afbee76ca2 feat(phase-24): integrate MITRE D3FEND defensive techniques with ATT&CK mapping (T-213, T-214) 2026-02-09 16:38:59 +01:00
kitos 3f9e7bf428 feat(phase-23): add Threat Actor profiles with MITRE CTI import, API, heatmap and gap analysis (T-208 to T-212) 2026-02-09 16:27:38 +01:00
kitos fb2036d0f9 feat(phase-22): add import services for Sigma, LOLBAS, GTFOBins, CALDERA, Elastic and data sources panel (T-203 to T-207) 2026-02-09 16:19:44 +01:00
kitos 4a33c099f7 feat(phase-21): add V3 demo seed, DataSource and DetectionRule models (T-200, T-201, T-202) 2026-02-09 16:06:44 +01:00
kitos 5c459f4fdd feat(phase-20): navigation, error handling, integration tests, and V2 docs (T-132 to T-135) 2026-02-09 14:19:42 +01:00
kitos 67abf58059 feat(phase-19): add remediation fields and reports system (T-130, T-131) 2026-02-09 13:58:35 +01:00
kitos 8a54a2f6f7 feat(phase-18): add in-app notification system (T-128, T-129) 2026-02-09 13:52:04 +01:00
kitos 2e18cf97c8 test(phase-17): add automated tests for Red/Blue workflow, templates CRUD, and V2 metrics (T-125, T-126, T-127) 2026-02-09 13:35:40 +01:00
kitos e657693e2a feat(phase-16): enhanced Tests view, Red/Blue dashboard metrics, and Template admin panel (T-122, T-123, T-124) 2026-02-09 13:00:07 +01:00
kitos d49c4aa009 feat(phase-15): add Test Catalog page, template instantiation, and auto-migration entrypoint (T-119, T-120, T-121)
T-119: TestCatalogPage with search, filters (source/platform/severity), template cards grid, and pagination

T-120: TestFromTemplateForm modal with pre-filled fields from template, required field validation, and redirect on creation

T-121: Integrate Available Test Templates section in TechniqueDetailPage with Run This Test buttons; fix missing testStateBadgeColors for new states

Also: add backend entrypoint.sh for automatic Alembic migrations + seed on container startup, add curl to Dockerfile for healthcheck
2026-02-09 12:22:29 +01:00
kitos 24062bd009 feat(phase-14): redesign Test Detail page with Red/Blue tabs and dual validation (T-115, T-116, T-117, T-118)
T-115: TestDetailHeader with progress bar, contextual action buttons, and dual validation indicators

T-116: TeamTabs component with Red Team, Blue Team, Summary, and Timeline tabs

T-117: Redesigned TestDetailPage integrating new components with react-query mutations, toast notifications, and role/state-based permissions

T-118: ValidationModal for dual Red Lead / Blue Lead approval with required notes on rejection
2026-02-09 11:14:44 +01:00
kitos 207973aab8 feat(phase-13): update frontend types and API clients for Red/Blue workflow (T-113, T-114)
T-113: Rewrite models.ts with v2 types - TestState now includes red_executing/blue_evaluating, add TeamSide, ValidationStatus, TestTemplate, TestTemplateSummary, TestTimelineEntry types, RED_EDITABLE_STATES/BLUE_EDITABLE_STATES constants, and dual validation fields on Test interface. Remove old validated_by/validated_at references from TestDetailPage and techniques API.

T-114: Rewrite tests.ts API client with 16 functions covering full Red/Blue workflow (createTestFromTemplate, updateTestRed/Blue, startExecution, submitRed/Blue, validateAsRedLead/BlueLead, reopenTest, getTestTimeline). Rewrite evidence.ts with team parameter on upload/list and new deleteEvidence. Create test-templates.ts with getTemplates, getTemplateById, getTemplatesByTechnique, createTemplate, importAtomicTests.
2026-02-09 10:57:48 +01:00
kitos 035b51b3d6 feat(phase-12): implement Red/Blue API endpoints (T-109, T-110, T-111, T-112)
T-109: Rewrite tests router with full Red/Blue workflow endpoints - list with filters, create from template, Red/Blue team updates with state guards, start-execution, submit-red, submit-blue, validate-red, validate-blue, reopen, and timeline. All using workflow service from Phase 11.

T-110: Rewrite evidence router with Red/Blue separation - upload with team field, list with team filter, delete with state-based permissions. Red Team edits in draft/red_executing, Blue Team in blue_evaluating, admin bypasses all.

T-111: Create test_templates router with full CRUD - paginated list with source/platform/severity/search filters, by-technique lookup, admin-only create/update, and soft delete. Registered in main.py.

T-112: Add POST /system/import-atomic-tests endpoint to system router - admin-only trigger for Atomic Red Team import with error handling and statistics response.

Includes validation tests for all four tasks (35 checks total).
2026-02-09 10:45:33 +01:00
kitos b64b06f7e9 feat(phase-11): implement Red/Blue business logic services (T-106, T-107, T-108)
T-106: Create test_workflow_service.py with state-machine transitions for the complete test lifecycle (draft -> red_executing -> blue_evaluating -> in_review -> validated/rejected), dual validation by Red/Blue leads, and reopen capability with field cleanup.

T-107: Update status_service.py to use detection_result from Blue Team instead of legacy result field, and differentiate between partial progress (some validated) vs all-in-progress states.

T-108: Create atomic_import_service.py that downloads the Atomic Red Team repo as a ZIP (avoiding API rate limits), parses all atomics YAML files, and creates idempotent TestTemplate records mapped to MITRE techniques.

Includes validation tests for all three tasks (19 checks total).
2026-02-09 09:58:54 +01:00
kitos 1f136a846c fix: add .dockerignore files to exclude node_modules from build 2026-02-06 16:59:50 +01:00
kitos 1c5ece1593 feat: add complete Docker setup for testing
- Update docker-compose.yml with frontend service and healthchecks

- Add frontend Dockerfile with dev and production stages

- Add nginx.conf for production frontend serving

- Add docker-compose.prod.yml for production deployment

- Add .env.example with all configuration options

- Add init scripts (init.sh, init.ps1) for easy setup

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-06 16:33:22 +01:00
kitos a05f37a99b feat(phase-9): implement MVP polishing and closure
T-032: User management admin panel - backend users router with CRUD, frontend UsersPage with modals

T-033: Audit log viewer - backend audit router with filters/pagination, frontend AuditLogPage

T-034: Global error handling - ErrorBoundary, LoadingSpinner, ErrorMessage, Toast components

T-035: Backend tests - pytest setup with SQLite, tests for health/auth/techniques/tests

T-036: Documentation - Updated README with testing section, created docs/API.md
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-06 16:30:35 +01:00
kitos 2a278a612a feat: Phase 8 - Frontend main views (T-026 to T-031)
Implement all main frontend views for the MITRE ATT&CK coverage platform:

- T-026: Dashboard with coverage summary cards and tactic breakdown table

- T-027: Interactive ATT&CK matrix with filtering by status, tactic, platform

- T-028: Technique detail page with tests, intel items, and review actions

- T-029: Test creation form with technique selector and validation

- T-030: Test detail page with drag and drop evidence upload and download

- T-031: System admin panel with MITRE sync and intel scan controls

New components: CoverageSummaryCard, TacticCoverageChart, AttackMatrix, TechniqueCell, TestForm, EvidenceUpload, EvidenceList

New API modules: metrics.ts, techniques.ts, tests.ts, evidence.ts, system.ts

All views use TanStack Query for data fetching with proper loading and error states. Role-based UI controls for admin/lead actions.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-06 16:21:14 +01:00
kitos 372183cd96 feat: Phase 7 - Frontend scaffolding and auth (T-023, T-024, T-025)
T-023: Initialize React project
- Vite + React 19 + TypeScript scaffold
- Tailwind CSS v4 with @tailwindcss/vite plugin
- Dependencies: react-router-dom, axios, @tanstack/react-query, lucide-react
- Project structure: api/, components/, pages/, context/, types/, hooks/, lib/

T-024: API client and auth context
- Axios client with JWT interceptor (auto-attach token, clear on 401)
- login() and getMe() API functions
- AuthContext: user state, login, logout, isAuthenticated, isLoading
- Token persistence via localStorage with hydration on mount
- TypeScript types for all backend models

T-025: Login page and layout
- LoginPage with form, error handling, redirect on success
- Layout with sidebar + header + Outlet
- Sidebar with role-aware navigation (System only for admin)
- ProtectedRoute wrapper with role-based access control
- Routes: /login, /dashboard, /techniques, /tests, /system
2026-02-06 16:09:50 +01:00
kitos 2f0ff7f8a3 feat: Phase 6 - Automated intel scanning (T-021, T-022)
- Add intel_service.py: RSS feed scanner for threat intelligence
  Searches CISA, NIST NVD, SANS ISC, BleepingComputer, The Hacker News,
  Krebs on Security for mentions of MITRE technique IDs and names
- New intel items stored in intel_items table with URL deduplication
- Techniques with new intel flagged with review_required=True
- Add POST /system/run-intel-scan endpoint (admin only)
- Register weekly intel scan job in APScheduler (every 7 days)
- Audit log records each scan execution with summary stats
- Update README with new endpoint and project structure
2026-02-06 15:48:57 +01:00
kitos a07d1d5f74 feat: Phase 5 - Metrics and dashboard API (T-020)
- Add GET /metrics/summary endpoint with global coverage counts and percentage
- Add GET /metrics/by-tactic endpoint with per-tactic coverage breakdown
- Handle multi-tactic techniques (comma-separated) counting in each tactic
- Add CoverageSummary and TacticCoverage Pydantic schemas
- Update README with metrics endpoints and project structure
2026-02-06 15:33:37 +01:00
kitos 13055b24a2 feat: Phase 4 - MITRE ATT&CK sync and scheduled job (T-018, T-019)
- Add MITRE sync service via TAXII 2.0 with GitHub fallback
- Upsert attack-pattern objects into techniques table (691 techniques)
- Detect name/description changes and flag review_required on re-sync
- Add APScheduler background job running every 24h
- Add POST /system/sync-mitre endpoint (admin only)
- Add GET /system/scheduler-status endpoint (admin only)
- Configure logging for scheduler and sync visibility
- Update README with new endpoints and project structure
2026-02-06 15:28:53 +01:00
kitos a93c81674d feat: Phase 3 - CRUD core for Techniques, Tests and Evidence (T-014 to T-017)
- Add Pydantic schemas for Technique, Test and Evidence
- Add CRUD endpoints for Techniques (list with filters, detail, create, update, review)
- Add CRUD endpoints for Tests (create, detail, update, validate, reject)
- Add evidence upload with SHA-256 integrity and presigned download URLs
- Add MinIO/S3 storage client with bucket auto-creation on startup
- Add status_service to recalculate technique coverage from test results
- Add require_any_role RBAC dependency for multi-role authorization
- Update README with API endpoints reference and project structure
2026-02-06 13:52:27 +01:00
kitos 03e06591c6 feat: Phase 2 - Authentication and authorization (T-010 to T-013) 2026-02-06 13:15:25 +01:00
kitos 966aad689c feat: Phase 1 - Data models and migrations (T-004 to T-009)
Implements all database models for the Aegis platform with full
Alembic migration support.

Models created:
- User: Authentication with role-based access control
- Technique: MITRE ATT&CK techniques with coverage status tracking
- Test: Security tests with validation workflow (draft/review/validated)
- Evidence: File metadata for test evidence (stored in MinIO)
- IntelItem: Threat intelligence items linked to techniques
- AuditLog: System-wide audit trail with JSONB details

Enumerations:
- TechniqueStatus: not_evaluated, in_progress, validated, partial, etc.
- TestState: draft, in_review, validated, rejected
- TestResult: detected, not_detected, partially_detected

Services:
- audit_service.py: log_action() helper for audit logging

All models include proper foreign key relationships and PostgreSQL
enum types are managed correctly in migrations (create/drop).
2026-02-06 12:26:26 +01:00
kitos d392d41bf8 feat: Phase 0 - Infrastructure and scaffolding (T-001 to T-003)
This commit establishes the foundational infrastructure for the Aegis
MITRE ATT&CK Coverage Platform.

T-001: Initialize project and Docker Compose
- Set up Docker Compose with PostgreSQL 15, MinIO, and FastAPI backend
- Create basic FastAPI application with health endpoint
- Configure persistent volumes for data storage

T-002: Configuration and database connection
- Add centralized configuration using pydantic-settings
- Implement SQLAlchemy database connection with session management
- Configure MinIO and JWT settings

T-003: Initialize Alembic for migrations
- Set up Alembic with PostgreSQL connection from settings
- Create initial empty migration
- Configure autogenerate support for future models

Also includes:
- Professional README with setup instructions
- Comprehensive .gitignore for Python/Node/Docker
- Project task plan (AegisTestPlan.md)
2026-02-06 11:28:30 +01:00
25 changed files with 333 additions and 3127 deletions
+246 -236
View File
@@ -1,53 +1,40 @@
# Aegis MITRE ATT&CK Coverage Platform
# Aegis - MITRE ATT&CK Coverage Platform
Aegis is a comprehensive platform for tracking and managing security coverage against the MITRE ATT&CK framework. It enables security teams to document, validate, and visualize their defensive capabilities against known adversary techniques through a structured Red Team / Blue Team validation workflow.
## Features
### Core (V1)
- **MITRE ATT&CK Integration** — Automatic synchronization via TAXII 2.0 (with GitHub fallback), scheduled every 24h
- **Red/Blue Validation Workflow** — Structured dual-validation lifecycle: draft → red_executing → blue_evaluating → in_review → validated/rejected
- **Dual Validation** Independent Red Lead / Blue Lead approval before finalization
- **Coverage Tracking** — Per-technique status (validated, partial, not covered, in progress)
- **Evidence Storage** Secure evidence with SHA256 integrity, separated by team (red/blue)
- **Role-Based Access Control** — Granular permissions for 6 roles (admin, red_tech, blue_tech, red_lead, blue_lead, viewer)
### Enhanced (V2)
- **Test Template Catalog** — Import from Atomic Red Team, create custom templates, instantiate tests
- **In-App Notifications** — Real-time notification bell with polling and automatic state-change alerts
- **Reports & Export** — Coverage summary, test results, and remediation reports in JSON and CSV
- **Remediation Tracking** — Step-by-step remediation assignments with status tracking
- **Metrics Dashboard** — Pipeline funnel, team activity, validation rates
### Advanced (V3)
- **Multi-Source Data Import** — Sigma, Elastic, CALDERA, LOLBAS, D3FEND, MITRE CTI threat actors, compliance mappings
- **Detection Rule Tracking** — Import and evaluate Sigma/Elastic detection rules per test
- **ATT&CK Heatmap** — Interactive Navigator-style heatmap with layers, filters, and export
- **Threat Actor Intelligence** — Track intrusion sets and their technique coverage
- **Campaign Management** — Group tests into campaigns with dependencies, scheduling, and recurring execution
- **Compliance Mapping** — Map NIST 800-53 controls to ATT&CK techniques with gap analysis
- **Granular Scoring** — 0100 scoring for techniques, tactics, actors, and organization with configurable weights
- **Operational Metrics** — MTTD, MTTR, detection efficacy, alert fidelity, coverage velocity
- **Executive Dashboard** — High-level KPIs for leadership (leads + admin)
- **Temporal Comparison** — Coverage snapshots with historical comparison and trend analysis
- **Auto Re-testing** — Automatic retest creation after remediation completion (configurable limit)
- **Performance Optimizations** — Score caching, lazy loading, React.memo, database index optimization
- **MITRE ATT&CK Integration**: Automatic synchronization with the MITRE ATT&CK framework via TAXII (with GitHub fallback), scheduled every 24h
- **Red/Blue Validation Workflow**: Structured dual-validation lifecycle for security tests (draft → red_executing → blue_evaluating → in_review → validated/rejected)
- **Test Template Catalog**: Import tests from Atomic Red Team, create custom templates, and instantiate real tests from them
- **Dual Validation**: Independent approval/rejection by Red Lead and Blue Lead before a test is finalized
- **Coverage Tracking**: Track validation status for each technique (validated, partial, not covered, in progress)
- **Evidence Storage**: Secure evidence file storage with SHA256 integrity verification, separated by team (red/blue)
- **In-App Notifications**: Real-time notification bell with polling, automatic alerts on state changes
- **Reports & Export**: Coverage summary, test results, and remediation reports in JSON and CSV formats
- **Remediation Tracking**: Step-by-step remediation assignments with status tracking per test
- **Role-Based Access Control**: Granular permissions for red team, blue team, and leadership roles
- **Intel Monitoring**: Automated scanning for new threat intelligence related to techniques
- **Metrics Dashboard**: Pipeline funnel, team activity, validation rates, and recent tests
## Red Team / Blue Team Validation Flow
```
┌──────┐ ┌──────────────┐ ┌─────────────────┐ ┌───────────┐
DRAFT│───▶│RED_EXECUTING │───▶│ BLUE_EVALUATING │───▶│ IN_REVIEW
└──────┘ └──────────────┘ └─────────────────┘ └─────┬─────┘
┌───────────────────┤
▼ ▼
┌──────────┐ ┌──────────┐
│ REJECTED │ │VALIDATED
└────┬─────┘ └──────────┘
└──▶ Back to DRAFT ├──▶ Remediation
└──▶ Auto Re-test
┌─────────────────────────────────────────────────────────────────────────┐
TEST LIFECYCLE
│ │
│ ┌──────┐ ┌──────────────┐ ┌─────────────────┐ ┌───────────┐
│ │ DRAFT│───▶│RED_EXECUTING │───▶│ BLUE_EVALUATING │───▶│ IN_REVIEW │ │
│ └──────┘ └──────────────┘ └─────────────────┘ └───────────┘ │
│ │ │
┌────────────────────┤
│ ▼ ▼ │
┌──────────┐ ┌──────────┐
│ │ REJECTED │ │VALIDATED │ │
│ └──────────┘ └──────────┘ │
│ │ │
│ └──────▶ Back to DRAFT │
└─────────────────────────────────────────────────────────────────────────┘
```
### States
@@ -79,15 +66,23 @@ Both Red Lead and Blue Lead must independently vote:
| `blue_lead` | Blue team lead | Validate/reject the blue side of tests |
| `viewer` | Read-only access | View all data |
## Test Template Catalog
Tests can be created from predefined templates sourced from:
1. **Atomic Red Team** (Red Canary) — imported via the System admin panel
2. **Custom templates** — created by admins with suggested procedures and remediation
3. **MITRE procedures** — based on MITRE ATT&CK documentation
Templates include attack procedures, expected detections, suggested tools, severity levels, and suggested remediation steps. When instantiated, these fields are pre-populated into the new test.
## Tech Stack
- **Backend**: FastAPI (Python 3.11)
- **Database**: PostgreSQL 15 with UUID primary keys and JSONB columns
- **Database**: PostgreSQL 15
- **Object Storage**: MinIO (S3-compatible)
- **ORM**: SQLAlchemy with Alembic migrations (18 migration files)
- **Frontend**: React 19 + TypeScript + Vite 7 + Tailwind CSS v4 + TanStack Query + TanStack Virtual
- **Scheduler**: APScheduler (MITRE sync, Intel scan, Notification cleanup, Snapshots, Recurring campaigns)
- **Charts**: Recharts
- **ORM**: SQLAlchemy with Alembic migrations
- **Frontend**: React 19 + TypeScript + Vite + Tailwind CSS v4 + TanStack Query
- **Scheduler**: APScheduler (MITRE sync, Intel scan, Notification cleanup)
## Quick Start
@@ -111,17 +106,21 @@ docker-compose up -d
3. Run database migrations:
```bash
docker exec aegis-backend alembic upgrade head
docker exec -w /app aegis-backend-1 alembic upgrade head
```
4. Seed the admin user:
```bash
docker exec aegis-backend python -m app.seed
docker exec -w /app aegis-backend-1 python -m app.seed
```
5. Access the application:
5. Start the frontend:
```bash
cd frontend && npm install && npm run dev
```
6. Verify the installation:
```bash
# API health check
curl http://localhost:8000/health
# Expected: {"status":"ok"}
@@ -132,51 +131,13 @@ curl http://localhost:8000/health
JWT-based authentication. Default admin credentials after seeding:
```
Username: admin
Password: admin123
```bash
curl -X POST http://localhost:8000/api/v1/auth/login \
-d "username=admin&password=admin123"
```
> **Important:** Change the default `admin123` password and `SECRET_KEY` in production.
### Importing Data Sources
After initial setup, populate the platform with data:
```bash
# 1. Sync MITRE ATT&CK techniques
curl -X POST http://localhost:8000/api/v1/system/sync-mitre -H "Authorization: Bearer $TOKEN"
# 2. Import test templates from Atomic Red Team
curl -X POST http://localhost:8000/api/v1/system/import-atomic-red-team -H "Authorization: Bearer $TOKEN"
# 3. Import additional sources via the Data Sources admin page
# Navigate to System → Data Sources in the UI
```
See [docs/DATA_SOURCES.md](docs/DATA_SOURCES.md) for detailed instructions on all data sources.
### Configuring Scoring Weights
Default scoring weights can be adjusted via environment variables:
```env
SCORING_WEIGHT_TESTS=40
SCORING_WEIGHT_DETECTION_RULES=20
SCORING_WEIGHT_D3FEND=15
SCORING_WEIGHT_FRESHNESS=15
SCORING_WEIGHT_PLATFORM_DIVERSITY=10
```
Or at runtime via the admin API — see [docs/SCORING.md](docs/SCORING.md).
### Configuring Campaigns
1. Navigate to **Campaigns** in the sidebar
2. Create a new campaign (custom or from a threat actor)
3. Add tests with dependencies and phases
4. Optionally enable **recurring scheduling** (weekly, monthly, quarterly)
## Services
| Service | Port | Description |
@@ -187,28 +148,6 @@ Or at runtime via the admin API — see [docs/SCORING.md](docs/SCORING.md).
| MinIO API | 9000 | S3-compatible object storage |
| MinIO Console | 9001 | MinIO web interface |
## Navigation
```
📊 Dashboard
📊 Executive Dashboard (leads + admin)
🔲 ATT&CK Matrix (advanced heatmap)
🧪 Tests
├─ All Tests
├─ My Pending Tasks
└─ Test Catalog
📋 Campaigns
👤 Threat Actors
📜 Compliance
📈 Comparison (leads + admin)
📄 Reports
⚙️ System (admin only)
├─ Data Sources
├─ MITRE Sync
├─ Users
└─ Audit Log
```
## API Documentation
Interactive API documentation available at:
@@ -216,33 +155,192 @@ Interactive API documentation available at:
- **Swagger UI**: http://localhost:8000/docs
- **ReDoc**: http://localhost:8000/redoc
### API Endpoints
## API Endpoints
| Group | Prefix | Key Operations |
|-------|--------|---------------|
| Auth | `/api/v1/auth` | Login, get current user |
| Techniques | `/api/v1/techniques` | CRUD, list with filters, mark reviewed |
| Tests | `/api/v1/tests` | Full Red/Blue workflow, remediation, retest chain |
| Test Templates | `/api/v1/test-templates` | CRUD, import, stats, toggle active |
| Evidence | `/api/v1/tests/{id}/evidence` | Upload evidence, get presigned URLs |
| Campaigns | `/api/v1/campaigns` | CRUD, scheduling, history |
| Threat Actors | `/api/v1/threat-actors` | CRUD, technique mappings |
| Detection Rules | `/api/v1/detection-rules` | List, filter by source/technique |
| D3FEND | `/api/v1/d3fend` | Defensive techniques and mappings |
| Compliance | `/api/v1/compliance` | Frameworks, controls, gaps |
| Scores | `/api/v1/scores` | Technique/tactic/actor/org scores, config |
| Operational Metrics | `/api/v1/metrics/operational` | MTTD, MTTR, trends, team breakdown |
| Heatmap | `/api/v1/heatmap` | ATT&CK Navigator-style data |
| Snapshots | `/api/v1/snapshots` | Create, compare, list snapshots |
| Reports | `/api/v1/reports` | Coverage, results, remediation exports |
| Notifications | `/api/v1/notifications` | List, read, mark all read |
| Metrics | `/api/v1/metrics` | Summary, by-tactic, pipeline, team activity |
| System | `/api/v1/system` | MITRE sync, import, scheduler status |
| Users | `/api/v1/users` | User CRUD (admin) |
| Audit Logs | `/api/v1/audit-logs` | Audit trail (admin) |
| Data Sources | `/api/v1/data-sources` | Data source management (admin) |
### Auth
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| POST | `/api/v1/auth/login` | Public | Obtain JWT token |
| GET | `/api/v1/auth/me` | Authenticated | Current user profile |
See [docs/API.md](docs/API.md) for the full endpoint reference.
### Techniques
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/techniques` | Authenticated | List all (filters: tactic, status, review_required) |
| GET | `/api/v1/techniques/{mitre_id}` | Authenticated | Detail with associated tests |
| POST | `/api/v1/techniques` | Admin | Create technique |
| PATCH | `/api/v1/techniques/{mitre_id}` | Admin | Update technique fields |
| PATCH | `/api/v1/techniques/{mitre_id}/review` | Lead, Admin | Mark as reviewed |
### Tests — Red/Blue Workflow
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/tests` | Authenticated | List with filters (state, technique, platform, creator, pending_validation_side) |
| POST | `/api/v1/tests` | Red Tech, Admin | Create test (state=draft) |
| POST | `/api/v1/tests/from-template` | Red Tech, Admin | Create from template (pre-populates fields) |
| GET | `/api/v1/tests/{id}` | Authenticated | Detail with split red/blue evidences |
| PATCH | `/api/v1/tests/{id}` | Creator, Admin | General update (draft/rejected only) |
| PATCH | `/api/v1/tests/{id}/red` | Red Tech, Admin | Red Team fields (draft, red_executing) |
| PATCH | `/api/v1/tests/{id}/blue` | Blue Tech, Admin | Blue Team fields (blue_evaluating) |
| PATCH | `/api/v1/tests/{id}/remediation` | Authenticated | Update remediation fields |
| POST | `/api/v1/tests/{id}/start-execution` | Red Tech, Admin | draft → red_executing |
| POST | `/api/v1/tests/{id}/submit-red` | Red Tech, Admin | red_executing → blue_evaluating |
| POST | `/api/v1/tests/{id}/submit-blue` | Blue Tech, Admin | blue_evaluating → in_review |
| POST | `/api/v1/tests/{id}/validate-red` | Red Lead, Admin | Red Lead approves/rejects |
| POST | `/api/v1/tests/{id}/validate-blue` | Blue Lead, Admin | Blue Lead approves/rejects |
| POST | `/api/v1/tests/{id}/reopen` | Lead, Admin | rejected → draft (clears validation) |
| GET | `/api/v1/tests/{id}/timeline` | Authenticated | Audit-log history for this test |
### Test Templates
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/test-templates` | Authenticated | List templates (filters: source, platform, severity, search, mitre_technique_id) |
| POST | `/api/v1/test-templates` | Admin | Create custom template |
| GET | `/api/v1/test-templates/stats` | Admin | Catalog statistics |
| GET | `/api/v1/test-templates/{id}` | Authenticated | Template detail |
| PATCH | `/api/v1/test-templates/{id}` | Admin | Update template |
| DELETE | `/api/v1/test-templates/{id}` | Admin | Soft-delete (deactivate) |
| POST | `/api/v1/test-templates/{id}/toggle-active` | Admin | Toggle active/inactive |
### Evidence
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| POST | `/api/v1/tests/{test_id}/evidence` | Authenticated | Upload evidence (team=red/blue) |
| GET | `/api/v1/evidence/{id}` | Authenticated | Metadata + presigned download URL |
### Notifications
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/notifications` | Authenticated | List notifications (paginated, limit=20) |
| GET | `/api/v1/notifications/unread-count` | Authenticated | Unread notification count |
| PATCH | `/api/v1/notifications/{id}/read` | Authenticated | Mark one as read |
| POST | `/api/v1/notifications/read-all` | Authenticated | Mark all as read |
### Reports
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/reports/coverage-summary` | Authenticated | Full coverage JSON report (filters: tactic, platform) |
| GET | `/api/v1/reports/coverage-csv` | Authenticated | CSV export of coverage |
| GET | `/api/v1/reports/test-results` | Authenticated | Test results report (filters: state, date_from, date_to) |
| GET | `/api/v1/reports/remediation-status` | Authenticated | Remediation status report (filter: status) |
### Metrics
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/metrics/summary` | Authenticated | Global coverage summary |
| GET | `/api/v1/metrics/by-tactic` | Authenticated | Coverage by MITRE tactic |
| GET | `/api/v1/metrics/test-pipeline` | Authenticated | Test counts by pipeline state |
| GET | `/api/v1/metrics/team-activity` | Authenticated | Red/Blue team activity |
| GET | `/api/v1/metrics/validation-rate` | Authenticated | Approval/rejection rates by lead |
| GET | `/api/v1/metrics/recent-tests` | Authenticated | Last 10 updated tests |
### System (Admin)
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| POST | `/api/v1/system/sync-mitre` | Admin | Trigger MITRE ATT&CK sync |
| POST | `/api/v1/system/run-intel-scan` | Admin | Trigger threat-intel RSS scan |
| POST | `/api/v1/system/import-atomic-red-team` | Admin | Import Atomic Red Team templates |
| GET | `/api/v1/system/scheduler-status` | Admin | Background scheduler health |
### Users (Admin)
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/users` | Admin | List all users |
| POST | `/api/v1/users` | Admin | Create new user |
| GET | `/api/v1/users/{id}` | Admin | Get user by ID |
| PATCH | `/api/v1/users/{id}` | Admin | Update user |
### Audit Logs (Admin)
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/audit-logs` | Admin | List audit logs (filters: action, entity_type, dates) |
| GET | `/api/v1/audit-logs/actions` | Admin | List distinct action types |
| GET | `/api/v1/audit-logs/entity-types` | Admin | List distinct entity types |
## Project Structure
```
Aegis/
├── docker-compose.yml
├── backend/
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── alembic.ini
│ ├── alembic/versions/ # b001b007 migration files
│ └── app/
│ ├── main.py # FastAPI app with all routers
│ ├── config.py # Settings from environment
│ ├── database.py # SQLAlchemy engine + session
│ ├── storage.py # MinIO/S3 helpers
│ ├── models/
│ │ ├── user.py # User with roles
│ │ ├── technique.py # MITRE ATT&CK techniques
│ │ ├── test.py # Tests with Red/Blue + remediation fields
│ │ ├── test_template.py # Template catalog
│ │ ├── evidence.py # Evidence files (team-separated)
│ │ ├── notification.py # In-app notifications
│ │ ├── intel.py # Threat intelligence
│ │ ├── audit.py # Audit logging
│ │ └── enums.py # Shared enumerations
│ ├── schemas/ # Pydantic schemas
│ │ ├── test.py # TestCreate/Red/Blue/Validate/Remediation
│ │ ├── test_template.py # Template CRUD schemas
│ │ ├── notification.py # NotificationOut, UnreadCountOut
│ │ └── metrics.py # Pipeline, TeamActivity, ValidationRate
│ ├── routers/ # API endpoints
│ │ ├── tests.py # Full Red/Blue workflow endpoints
│ │ ├── test_templates.py # Template CRUD + import + stats
│ │ ├── notifications.py # Notification list/read/mark
│ │ ├── reports.py # Coverage/results/remediation reports
│ │ ├── metrics.py # V1 + V2 metrics endpoints
│ │ └── ... # auth, techniques, evidence, system, users, audit
│ ├── services/
│ │ ├── test_workflow_service.py # State machine + dual validation
│ │ ├── notification_service.py # Create/read/cleanup notifications
│ │ ├── status_service.py # Technique status recalculation
│ │ └── ... # audit, mitre_sync, intel
│ └── jobs/
│ └── mitre_sync_job.py # Scheduler: MITRE sync, Intel scan, Notification cleanup
├── frontend/src/
│ ├── App.tsx # Routes including /reports
│ ├── api/ # API clients
│ │ ├── notifications.ts # Notification API
│ │ ├── reports.ts # Report API
│ │ └── ...
│ ├── components/
│ │ ├── Layout.tsx # Sidebar + header + NotificationBell
│ │ ├── Sidebar.tsx # Collapsible nav with admin section
│ │ ├── NotificationBell.tsx # Bell icon with badge (polls every 30s)
│ │ ├── NotificationDropdown.tsx # Notification list dropdown
│ │ ├── ConfirmDialog.tsx # Reusable confirmation modal
│ │ ├── Toast.tsx # Toast notification system
│ │ └── test-detail/ # Test detail sub-components
│ └── pages/
│ ├── DashboardPage.tsx # Pipeline funnel, team activity, validation rates
│ ├── TestsPage.tsx # Filters, state counters, pending tasks
│ ├── TestDetailPage.tsx # Red/Blue tabs, validation, evidence
│ ├── TestCatalogPage.tsx # Browse & use templates
│ ├── ReportsPage.tsx # Coverage, results, remediation reports
│ └── SystemPage.tsx # Template admin, import Atomic Red Team
└── backend/tests/ # Test suite
├── test_workflow.py # Red/Blue workflow tests
├── test_templates_crud.py # Template CRUD tests
├── test_metrics_v2.py # V2 metrics tests
└── test_integration_v2.py # Full integration E2E tests
```
## Database Schema
| Table | Description |
|-------|-------------|
| `users` | User accounts with role-based access |
| `techniques` | MITRE ATT&CK techniques with coverage status |
| `tests` | Security tests with Red/Blue fields, dual validation, and remediation |
| `test_templates` | Predefined test catalog (Atomic Red Team, custom) |
| `evidences` | File evidence separated by team (red/blue) |
| `notifications` | In-app notifications with read status |
| `intel_items` | Threat intelligence items linked to techniques |
| `audit_logs` | System-wide audit trail |
## Configuration
@@ -256,118 +354,30 @@ See [docs/API.md](docs/API.md) for the full endpoint reference.
| `MINIO_ACCESS_KEY` | `minioadmin` | MinIO access key |
| `MINIO_SECRET_KEY` | `minioadmin` | MinIO secret key |
| `MINIO_BUCKET` | `evidence` | Evidence bucket |
| `MAX_RETEST_COUNT` | `3` | Max automatic retests per original test |
| `SCORING_WEIGHT_TESTS` | `40` | Weight for test validation component |
| `SCORING_WEIGHT_DETECTION_RULES` | `20` | Weight for detection rules component |
| `SCORING_WEIGHT_D3FEND` | `15` | Weight for D3FEND coverage component |
| `SCORING_WEIGHT_FRESHNESS` | `15` | Weight for freshness component |
| `SCORING_WEIGHT_PLATFORM_DIVERSITY` | `10` | Weight for platform diversity component |
## Project Structure
```
Aegis/
├── docker-compose.yml
├── docker-compose.prod.yml
├── docs/
│ ├── API.md # Full API endpoint reference
│ ├── ARCHITECTURE.md # System architecture and DB schema
│ ├── DATA_SOURCES.md # External data source documentation
│ └── SCORING.md # Scoring system and metrics
├── backend/
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── alembic.ini
│ ├── alembic/versions/ # b001b018 migration files
│ ├── pytest.ini
│ └── app/
│ ├── main.py # FastAPI app with all routers + lifespan
│ ├── config.py # Settings from environment
│ ├── database.py # SQLAlchemy engine + session (lazy init)
│ ├── storage.py # MinIO/S3 helpers
│ ├── auth.py # Password hashing + JWT tokens
│ ├── models/ # 18 model files (SQLAlchemy ORM)
│ ├── schemas/ # Pydantic request/response schemas
│ ├── routers/ # 21 API routers
│ ├── services/ # 20 business logic services
│ ├── dependencies/ # Auth dependencies (get_current_user, require_role)
│ └── jobs/
│ └── mitre_sync_job.py # APScheduler: 5 background jobs
├── frontend/src/
│ ├── App.tsx # Routes with lazy loading + role protection
│ ├── api/ # 22 API client modules (Axios + TanStack Query)
│ ├── components/
│ │ ├── Layout.tsx # Sidebar + header + NotificationBell
│ │ ├── Sidebar.tsx # Role-aware collapsible navigation
│ │ ├── heatmap/ # ATT&CK heatmap (6 components)
│ │ ├── compliance/ # Compliance UI (gauge, controls table)
│ │ └── test-detail/ # Test detail sub-components
│ ├── hooks/
│ │ └── useDebounce.ts # Debounce hook for search inputs
│ ├── context/
│ │ └── AuthContext.tsx # Auth state management
│ └── pages/ # 21 page components
└── backend/tests/
├── conftest.py # SQLite test DB with JSONB/UUID compatibility
├── fixtures/ # YAML/TOML/JSON test fixtures
├── test_data_sources.py # Data source parsing tests
├── test_scoring_and_compliance.py # Scoring + metrics + compliance tests
├── test_campaigns_and_snapshots.py # Campaign, snapshot, and retest tests
├── test_workflow.py # Red/Blue workflow tests
├── test_templates_crud.py # Template CRUD tests
├── test_metrics_v2.py # V2 metrics tests
└── test_integration_v2.py # Full integration E2E tests
```
## Development
### Running Migrations
```bash
docker exec aegis-backend alembic upgrade head
docker exec aegis-backend alembic revision --autogenerate -m "description"
docker exec aegis-backend alembic downgrade -1
docker exec -w /app aegis-backend-1 alembic upgrade head
docker exec -w /app aegis-backend-1 alembic revision --autogenerate -m "description"
docker exec -w /app aegis-backend-1 alembic downgrade -1
```
### Running Tests
```bash
# Run all V3 tests inside the container (recommended)
docker exec aegis-backend python -m pytest tests/ -v --tb=short
# Run standalone tests (no database required)
cd backend && python tests/test_workflow.py
cd backend && python tests/test_templates_crud.py
cd backend && python tests/test_metrics_v2.py
cd backend && python tests/test_integration_v2.py
# Run specific test suites
docker exec aegis-backend python -m pytest tests/test_data_sources.py -v
docker exec aegis-backend python -m pytest tests/test_scoring_and_compliance.py -v
docker exec aegis-backend python -m pytest tests/test_campaigns_and_snapshots.py -v
# Skip integration tests (require network)
docker exec aegis-backend python -m pytest tests/ -v -m "not integration"
# Run with pytest (requires PostgreSQL)
docker exec -w /app aegis-backend-1 pytest -v
```
### Generating Reports
```bash
# Coverage summary (JSON)
GET /api/v1/reports/coverage-summary
# Coverage CSV export
GET /api/v1/reports/coverage-csv
# Compliance gap analysis
GET /api/v1/compliance/{framework_id}/gaps
```
## Further Documentation
- **[Architecture](docs/ARCHITECTURE.md)** — Database schema, service layer, state machine diagrams
- **[Data Sources](docs/DATA_SOURCES.md)** — All external data sources with import instructions
- **[Scoring](docs/SCORING.md)** — Scoring system explained with examples and configuration
- **[API Reference](docs/API.md)** — Full endpoint documentation
## License
This project is proprietary software. All rights reserved.
## Disclaimer
This project has been developed with the assistance of [Cursor](https://cursor.com) and Claude Opus 4.6 (Anthropic).
@@ -1,78 +0,0 @@
"""add_performance_indexes
Revision ID: b018perfidx
Revises: b017scheduling
Create Date: 2026-02-10 06:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "b018perfidx"
down_revision: Union[str, None] = "b017scheduling"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Composite index for detection rules filtered by technique + source
op.create_index(
"ix_detection_rules_technique_source",
"detection_rules",
["mitre_technique_id", "source"],
)
# Composite index for snapshot technique states
op.create_index(
"ix_snapshot_technique_states_snap_tech",
"snapshot_technique_states",
["snapshot_id", "technique_id"],
unique=True,
)
# Covering index for tests frequently filtered by technique + state
op.create_index(
"ix_tests_technique_state",
"tests",
["technique_id", "state"],
)
# Audit logs — timestamp-based lookups
op.create_index(
"ix_audit_logs_timestamp",
"audit_logs",
["timestamp"],
)
# Audit logs — entity lookups
op.create_index(
"ix_audit_logs_entity",
"audit_logs",
["entity_type", "entity_id"],
)
# Test detection results — triggered flag for maturity queries
op.create_index(
"ix_test_detection_results_triggered",
"test_detection_results",
["triggered"],
)
# Compliance control mappings — composite for joins
op.create_index(
"ix_compliance_mappings_control_technique",
"compliance_control_mappings",
["compliance_control_id", "technique_id"],
)
def downgrade() -> None:
op.drop_index("ix_compliance_mappings_control_technique", table_name="compliance_control_mappings")
op.drop_index("ix_test_detection_results_triggered", table_name="test_detection_results")
op.drop_index("ix_audit_logs_entity", table_name="audit_logs")
op.drop_index("ix_audit_logs_timestamp", table_name="audit_logs")
op.drop_index("ix_tests_technique_state", table_name="tests")
op.drop_index("ix_snapshot_technique_states_snap_tech", table_name="snapshot_technique_states")
op.drop_index("ix_detection_rules_technique_source", table_name="detection_rules")
+4 -46
View File
@@ -1,54 +1,12 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from app.config import settings
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Engine and session factory are created lazily so that tests can
# override DATABASE_URL via environment *before* any import triggers
# the real PostgreSQL engine creation (which requires psycopg2).
_engine = None
_SessionLocal = None
def _get_engine():
global _engine
if _engine is None:
from app.config import settings
_engine = create_engine(settings.DATABASE_URL)
return _engine
def _get_session_factory():
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(
autocommit=False, autoflush=False, bind=_get_engine()
)
return _SessionLocal
class _LazySessionLocal:
"""Proxy so ``SessionLocal()`` keeps working as before but the real
sessionmaker is only created on first call."""
def __call__(self, *args, **kwargs):
return _get_session_factory()(*args, **kwargs)
def __getattr__(self, name):
return getattr(_get_session_factory(), name)
SessionLocal = _LazySessionLocal()
class _EngineProxy:
"""Thin proxy so ``from app.database import engine`` still works."""
def __getattr__(self, name):
return getattr(_get_engine(), name)
engine = _EngineProxy() # type: ignore[assignment]
def get_db():
db = SessionLocal()
+2 -4
View File
@@ -27,10 +27,8 @@ def operational_metrics(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get all operational metrics (MTTD, MTTR, etc.) — cached for 5 min."""
from app.services.score_cache import get_operational_metrics_cached
return get_operational_metrics_cached(db)
"""Get all operational metrics (MTTD, MTTR, Detection Efficacy, etc.)."""
return get_all_operational_metrics(db)
# ── GET /metrics/operational/trend ────────────────────────────────────
+2 -8
View File
@@ -93,10 +93,8 @@ def score_organization(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get the overall organization security score (cached for 5 min)."""
from app.services.score_cache import get_organization_score_cached
return get_organization_score_cached(db)
"""Get the overall organization security score."""
return calculate_organization_score(db)
# ── GET /scores/history ──────────────────────────────────────────────
@@ -172,10 +170,6 @@ def update_scoring_config(
if payload.platform_diversity is not None:
settings.SCORING_WEIGHT_PLATFORM_DIVERSITY = payload.platform_diversity
# Weights changed — bust the score cache
from app.services.score_cache import invalidate
invalidate()
return {
"message": "Scoring config updated",
"weights": {
-84
View File
@@ -1,84 +0,0 @@
"""In-memory TTL cache for expensive scoring and metrics calculations.
The cache is a simple dict with timestamps. It is invalidated when tests
are validated, scores change, or an explicit ``invalidate`` call is made.
Thread-safe: each worker process has its own dict, and the TTL ensures
stale data does not persist longer than ``CACHE_TTL`` seconds.
"""
import time
from typing import Any, Optional
CACHE_TTL = 300 # 5 minutes
_cache: dict[str, dict[str, Any]] = {}
def get(key: str) -> Optional[Any]:
"""Return cached value if present and not expired, else None."""
entry = _cache.get(key)
if entry is None:
return None
if time.time() - entry["ts"] > CACHE_TTL:
_cache.pop(key, None)
return None
return entry["data"]
def put(key: str, data: Any) -> None:
"""Store *data* under *key* with the current timestamp."""
_cache[key] = {"data": data, "ts": time.time()}
def invalidate(key: Optional[str] = None) -> None:
"""Remove one key or clear the whole cache."""
if key is None:
_cache.clear()
else:
_cache.pop(key, None)
# ── High-level helpers ────────────────────────────────────────────────
def get_organization_score_cached(db):
"""Cached wrapper around ``calculate_organization_score``."""
from app.services.scoring_service import calculate_organization_score
cached = get("org_score")
if cached is not None:
return cached
result = calculate_organization_score(db)
put("org_score", result)
return result
def get_operational_metrics_cached(db):
"""Cached wrapper around operational metrics (MTTD, MTTR, efficacy)."""
from app.services.operational_metrics_service import (
calculate_mttd,
calculate_mttr,
calculate_detection_efficacy,
calculate_alert_fidelity,
calculate_coverage_velocity,
calculate_validation_throughput,
calculate_rejection_rate,
)
cached = get("op_metrics")
if cached is not None:
return cached
result = {
"mttd": calculate_mttd(db),
"mttr": calculate_mttr(db),
"detection_efficacy": calculate_detection_efficacy(db),
"alert_fidelity": calculate_alert_fidelity(db),
"coverage_velocity": calculate_coverage_velocity(db),
"validation_throughput": calculate_validation_throughput(db),
"rejection_rate": calculate_rejection_rate(db),
}
put("op_metrics", result)
return result
@@ -288,12 +288,6 @@ def check_dual_validation(db: Session, test: Test) -> Test:
elif red_status == "approved" and blue_status == "approved":
test.state = TestState.validated
db.commit()
# Invalidate cached scores — a validation changes org-level numbers
try:
from app.services.score_cache import invalidate
invalidate()
except Exception:
pass
try:
notify_test_state_change(db, test, "validated")
except Exception:
-2
View File
@@ -3,5 +3,3 @@ testpaths = tests
python_files = test_*.py
python_functions = test_*
addopts = -v --tb=short
markers =
integration: marks tests as integration tests (deselect with '-m "not integration"')
+8 -53
View File
@@ -1,45 +1,16 @@
"""Pytest fixtures and configuration for backend tests.
The conftest intentionally avoids importing ``app.main`` at module level
because that triggers heavy side-effect imports (boto3, APScheduler, etc.)
which are NOT needed for unit tests. The ``client`` fixture lazily imports
the FastAPI app only when actually requested.
"""
import os
# Set DATABASE_URL to SQLite *before* any app module is imported so that
# the lazy engine in app.database never tries to connect to PostgreSQL.
os.environ.setdefault("DATABASE_URL", "sqlite:///:memory:")
"""Pytest fixtures and configuration for backend tests."""
import pytest
from sqlalchemy import JSON, String, Text, create_engine, event
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.database import Base
# ── Patch PostgreSQL-specific column types so SQLite can handle them ─────
# Must run BEFORE importing models, because column type objects are
# instantiated at class-definition time.
from sqlalchemy.dialects.postgresql import UUID as PG_UUID, JSONB as PG_JSONB
# Tell SQLAlchemy: when compiling for SQLite, render JSONB as plain JSON
# and PostgreSQL UUID as CHAR(32).
from sqlalchemy.dialects.sqlite.base import SQLiteTypeCompiler
if not hasattr(SQLiteTypeCompiler, "visit_JSONB"):
SQLiteTypeCompiler.visit_JSONB = lambda self, type_, **kw: "JSON"
if not hasattr(SQLiteTypeCompiler, "visit_UUID"):
SQLiteTypeCompiler.visit_UUID = lambda self, type_, **kw: "CHAR(32)"
from app.main import app
from app.database import Base, get_db
from app.auth import hash_password
from app.models.user import User
# ── Import all models so Base.metadata knows about every table ──────────
import app.models # noqa: F401 — triggers model registration via __init__
# Use in-memory SQLite for tests
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
@@ -48,14 +19,6 @@ engine = create_engine(
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
# SQLite needs PRAGMA foreign_keys to enforce FK constraints
@event.listens_for(engine, "connect")
def _set_sqlite_pragma(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@@ -80,21 +43,13 @@ def db():
@pytest.fixture(scope="function")
def client(db):
"""Create a test client with database override.
Imports ``app.main`` lazily to avoid pulling in boto3 / APScheduler
when only the ``db`` fixture is needed.
"""
from app.main import app
from app.database import get_db
"""Create a test client with database override."""
app.dependency_overrides[get_db] = override_get_db
Base.metadata.create_all(bind=engine)
from fastapi.testclient import TestClient
with TestClient(app) as test_client:
yield test_client
Base.metadata.drop_all(bind=engine)
app.dependency_overrides.clear()
-44
View File
@@ -1,44 +0,0 @@
---
id: caldera-test-001
name: Get System Info
description: Collect basic system information using whoami and systeminfo commands
tactic: discovery
technique:
attack_id: T1082
name: System Information Discovery
platforms:
windows:
psh:
command: |
whoami /all
systeminfo
cleanup: ""
cmd:
command: |
whoami
systeminfo
linux:
sh:
command: |
uname -a
cat /etc/os-release
cleanup: ""
---
id: caldera-test-002
name: List Network Connections
description: Enumerate active network connections and listening ports
tactic: discovery
technique:
attack_id: T1049
name: System Network Connections Discovery
platforms:
windows:
psh:
command: |
Get-NetTCPConnection | Select-Object LocalAddress, LocalPort, RemoteAddress, RemotePort, State
cleanup: ""
linux:
sh:
command: |
netstat -tulnp 2>/dev/null || ss -tulnp
cleanup: ""
-36
View File
@@ -1,36 +0,0 @@
[metadata]
creation_date = "2025/01/15"
updated_date = "2025/06/01"
maturity = "production"
[rule]
author = ["Test Author"]
description = "Detects the creation of a scheduled task via schtasks.exe, which is commonly used by adversaries for persistence."
name = "Scheduled Task Created via Schtasks"
severity = "medium"
type = "eql"
language = "eql"
query = '''
process where process.name : "schtasks.exe" and
process.args : ("/create", "-create") and
process.args : ("/sc", "-sc") and
not process.parent.executable : ("C:\\Program Files\\*", "C:\\Program Files (x86)\\*")
'''
risk_score = 47
rule_id = "test-elastic-001"
tags = ["Persistence", "Windows"]
[[rule.threat]]
framework = "MITRE ATT&CK"
[[rule.threat.technique]]
id = "T1053"
name = "Scheduled Task/Job"
reference = "https://attack.mitre.org/techniques/T1053/"
[[rule.threat.technique.subtechnique]]
id = "T1053.005"
name = "Scheduled Task"
reference = "https://attack.mitre.org/techniques/T1053/005/"
[rule.threat.tactic]
id = "TA0003"
name = "Persistence"
reference = "https://attack.mitre.org/tactics/TA0003/"
-26
View File
@@ -1,26 +0,0 @@
Name: Mshta.exe
Description: Used to execute .HTA files
Author: Test Author
Created: 2025-01-15
Commands:
- Command: mshta.exe evilfile.hta
Description: Open an HTA file from disk
Usecase: Execute arbitrary HTA scripts
Category: Execute
Privileges: User
MitreID: T1218.005
OperatingSystem: Windows 10, Windows 11
- Command: mshta.exe vbscript:Execute("CreateObject(""Wscript.Shell"").Run(""calc.exe"")")
Description: Execute VBScript via mshta
Usecase: Execute inline VBScript
Category: Execute
Privileges: User
MitreID: T1059.005
OperatingSystem: Windows 10, Windows 11
Full_Path:
- Path: C:\Windows\System32\mshta.exe
- Path: C:\Windows\SysWOW64\mshta.exe
Detection:
- Sigma: https://github.com/SigmaHQ/sigma/blob/master/rules/windows/process_creation/proc_creation_win_mshta.yml
Resources:
- Link: https://lolbas-project.github.io/#/mshta
-27
View File
@@ -1,27 +0,0 @@
title: Windows PowerShell Execution Policy Bypass
id: 1f21ec3f-810d-4b0e-8045-322202e22b4b
status: stable
description: Detects attempts to bypass PowerShell execution policy
author: Test Author
date: 2025/01/15
references:
- https://example.com/sigma-test
logsource:
category: process_creation
product: windows
detection:
selection:
CommandLine|contains:
- '-ExecutionPolicy Bypass'
- '-ep bypass'
- 'Set-ExecutionPolicy Bypass'
condition: selection
falsepositives:
- Legitimate admin scripts
- CI/CD pipelines
level: high
tags:
- attack.execution
- attack.t1059.001
- attack.defense_evasion
- attack.t1562.001
-112
View File
@@ -1,112 +0,0 @@
{
"type": "bundle",
"id": "bundle--test-001",
"spec_version": "2.0",
"objects": [
{
"type": "intrusion-set",
"id": "intrusion-set--test-apt1",
"name": "APT1",
"aliases": ["Comment Crew", "Comment Panda"],
"description": "APT1 is a Chinese cyber espionage group attributed to PLA Unit 61398.",
"first_seen": "2006-06-01T00:00:00Z",
"last_seen": "2023-12-31T00:00:00Z",
"external_references": [
{
"source_name": "mitre-attack",
"url": "https://attack.mitre.org/groups/G0006/",
"external_id": "G0006"
},
{
"source_name": "Mandiant Report",
"url": "https://www.mandiant.com/resources/apt1-exposing-one-of-chinas-cyber-espionage-units",
"description": "Mandiant APT1 Report"
}
],
"created": "2017-05-31T21:31:48.664Z",
"modified": "2023-03-22T03:52:18.000Z"
},
{
"type": "intrusion-set",
"id": "intrusion-set--test-apt28",
"name": "APT28",
"aliases": ["Fancy Bear", "Sofacy", "Pawn Storm"],
"description": "APT28 is a threat group attributed to Russia's GRU military intelligence.",
"first_seen": "2004-01-01T00:00:00Z",
"last_seen": "2024-06-30T00:00:00Z",
"external_references": [
{
"source_name": "mitre-attack",
"url": "https://attack.mitre.org/groups/G0007/",
"external_id": "G0007"
}
],
"created": "2017-05-31T21:31:48.664Z",
"modified": "2024-01-15T00:00:00.000Z"
},
{
"type": "attack-pattern",
"id": "attack-pattern--test-t1566",
"name": "Phishing",
"external_references": [
{
"source_name": "mitre-attack",
"url": "https://attack.mitre.org/techniques/T1566/",
"external_id": "T1566"
}
]
},
{
"type": "attack-pattern",
"id": "attack-pattern--test-t1059",
"name": "Command and Scripting Interpreter",
"external_references": [
{
"source_name": "mitre-attack",
"url": "https://attack.mitre.org/techniques/T1059/",
"external_id": "T1059"
}
]
},
{
"type": "attack-pattern",
"id": "attack-pattern--test-t1078",
"name": "Valid Accounts",
"external_references": [
{
"source_name": "mitre-attack",
"url": "https://attack.mitre.org/techniques/T1078/",
"external_id": "T1078"
}
]
},
{
"type": "relationship",
"id": "relationship--test-r1",
"relationship_type": "uses",
"source_ref": "intrusion-set--test-apt1",
"target_ref": "attack-pattern--test-t1566"
},
{
"type": "relationship",
"id": "relationship--test-r2",
"relationship_type": "uses",
"source_ref": "intrusion-set--test-apt1",
"target_ref": "attack-pattern--test-t1059"
},
{
"type": "relationship",
"id": "relationship--test-r3",
"relationship_type": "uses",
"source_ref": "intrusion-set--test-apt28",
"target_ref": "attack-pattern--test-t1566"
},
{
"type": "relationship",
"id": "relationship--test-r4",
"relationship_type": "uses",
"source_ref": "intrusion-set--test-apt28",
"target_ref": "attack-pattern--test-t1078"
}
]
}
@@ -1,464 +0,0 @@
"""Tests for campaigns, snapshots, and re-testing — T-237.
Uses the in-memory SQLite test database from conftest.py.
"""
import uuid
from datetime import datetime, timedelta
import pytest
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_template import TestTemplate
from app.models.campaign import Campaign, CampaignTest
from app.models.coverage_snapshot import CoverageSnapshot, SnapshotTechniqueState
from app.models.enums import TestState, TestResult, TechniqueStatus
from app.services.campaign_service import (
validate_no_circular_dependency,
get_campaign_progress,
)
from app.services.campaign_scheduler_service import (
calculate_next_run,
check_and_run_recurring_campaigns,
)
from app.services.snapshot_service import (
create_snapshot,
compare_snapshots,
cleanup_old_snapshots,
)
from app.services.test_workflow_service import (
handle_remediation_completed,
get_retest_chain,
)
# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def techniques(db):
"""Create a set of techniques for testing."""
techs = []
for mid, name, status in [
("T1059", "Command Line", TechniqueStatus.validated),
("T1078", "Valid Accounts", TechniqueStatus.partial),
("T1053", "Scheduled Tasks", TechniqueStatus.not_covered),
]:
tech = Technique(
mitre_id=mid,
name=name,
tactic="execution",
platforms=["windows"],
status_global=status,
)
db.add(tech)
techs.append(tech)
db.commit()
for t in techs:
db.refresh(t)
return techs
@pytest.fixture
def campaign_with_tests(db, techniques, admin_user):
"""Create a campaign with ordered tests."""
campaign = Campaign(
name="Test Campaign",
type="custom",
status="draft",
created_by=admin_user.id,
)
db.add(campaign)
db.flush()
tests = []
for i, tech in enumerate(techniques):
test = Test(
technique_id=tech.id,
name=f"Test for {tech.mitre_id}",
state=TestState.draft,
created_by=admin_user.id,
)
db.add(test)
db.flush()
tests.append(test)
ct = CampaignTest(
campaign_id=campaign.id,
test_id=test.id,
order_index=i,
phase="execution",
)
db.add(ct)
db.commit()
db.refresh(campaign)
return {"campaign": campaign, "tests": tests}
# ═══════════════════════════════════════════════════════════════════════
# Campaign Tests
# ═══════════════════════════════════════════════════════════════════════
class TestCampaigns:
def test_create_campaign_with_tests(self, db, campaign_with_tests):
"""CRUD básico de campaña con tests ordenados."""
campaign = campaign_with_tests["campaign"]
assert campaign.name == "Test Campaign"
assert campaign.status == "draft"
cts = (
db.query(CampaignTest)
.filter(CampaignTest.campaign_id == campaign.id)
.order_by(CampaignTest.order_index)
.all()
)
assert len(cts) == 3
assert cts[0].order_index == 0
assert cts[1].order_index == 1
assert cts[2].order_index == 2
def test_campaign_progress_calculation(self, db, campaign_with_tests):
"""Progreso se calcula según estado de tests."""
campaign = campaign_with_tests["campaign"]
tests = campaign_with_tests["tests"]
# Initially all draft → 0% complete
progress = get_campaign_progress(db, campaign.id)
assert progress["total"] == 3
assert progress["completion_pct"] == 0.0
# Validate one test
tests[0].state = TestState.validated
db.commit()
progress = get_campaign_progress(db, campaign.id)
assert progress["completion_pct"] == pytest.approx(33.3, abs=0.1)
def test_circular_dependency_prevention(self, db, campaign_with_tests):
"""Intentar crear dependencia circular en campaign_tests falla."""
from fastapi import HTTPException
campaign = campaign_with_tests["campaign"]
cts = (
db.query(CampaignTest)
.filter(CampaignTest.campaign_id == campaign.id)
.order_by(CampaignTest.order_index)
.all()
)
# Create A -> B dependency
cts[1].depends_on = cts[0].id
db.commit()
# Try to create B -> A (circular)
with pytest.raises(HTTPException) as exc_info:
validate_no_circular_dependency(
db, campaign.id, cts[0].id, cts[1].id
)
assert exc_info.value.status_code == 400
def test_campaign_scheduling_next_run(self):
"""next_run_at se calcula correctamente para weekly/monthly/quarterly."""
base = datetime(2026, 1, 1)
weekly = calculate_next_run(base, "weekly")
assert weekly == datetime(2026, 1, 8)
monthly = calculate_next_run(base, "monthly")
assert monthly == datetime(2026, 1, 31)
quarterly = calculate_next_run(base, "quarterly")
assert quarterly == datetime(2026, 4, 1)
def test_campaign_cloning(self, db, campaign_with_tests, admin_user):
"""Clonación de campaña recurrente crea tests nuevos con datos correctos."""
campaign = campaign_with_tests["campaign"]
original_tests = campaign_with_tests["tests"]
# Set up as recurring
campaign.is_recurring = True
campaign.recurrence_pattern = "monthly"
campaign.next_run_at = datetime.utcnow() - timedelta(hours=1) # Due now
db.commit()
# Run the scheduler
spawned = check_and_run_recurring_campaigns(db)
assert spawned == 1
# Find child campaign
child = (
db.query(Campaign)
.filter(Campaign.parent_campaign_id == campaign.id)
.first()
)
assert child is not None
assert "Run" in child.name
assert child.status == "active"
# Check child tests are fresh copies (new IDs, draft state)
child_cts = (
db.query(CampaignTest)
.filter(CampaignTest.campaign_id == child.id)
.all()
)
assert len(child_cts) == len(original_tests)
child_test_ids = {ct.test_id for ct in child_cts}
original_test_ids = {t.id for t in original_tests}
assert child_test_ids.isdisjoint(original_test_ids) # All new IDs
for ct in child_cts:
test = db.query(Test).filter(Test.id == ct.test_id).first()
assert test.state == TestState.draft
# Check parent was updated
db.refresh(campaign)
assert campaign.last_run_at is not None
assert campaign.next_run_at > datetime.utcnow()
# ═══════════════════════════════════════════════════════════════════════
# Snapshot Tests
# ═══════════════════════════════════════════════════════════════════════
class TestSnapshots:
def test_create_snapshot(self, db, techniques, admin_user):
"""Snapshot captura estado actual correctamente."""
snapshot = create_snapshot(db, name="Test Snapshot", user_id=admin_user.id)
assert snapshot is not None
assert snapshot.name == "Test Snapshot"
assert snapshot.total_techniques == len(techniques)
assert snapshot.created_by == admin_user.id
assert snapshot.organization_score >= 0
# Verify per-technique states
states = (
db.query(SnapshotTechniqueState)
.filter(SnapshotTechniqueState.snapshot_id == snapshot.id)
.all()
)
assert len(states) == len(techniques)
mitre_ids = {s.mitre_id for s in states}
assert "T1059" in mitre_ids
assert "T1078" in mitre_ids
assert "T1053" in mitre_ids
def test_compare_snapshots_improvements(self, db, techniques, admin_user):
"""Comparación detecta técnicas que mejoraron."""
# Create snapshot A
snap_a = create_snapshot(db, name="Before")
# Improve a technique
tech = db.query(Technique).filter(Technique.mitre_id == "T1053").first()
tech.status_global = TechniqueStatus.validated
db.commit()
# Create snapshot B
snap_b = create_snapshot(db, name="After")
result = compare_snapshots(db, snap_a.id, snap_b.id)
assert result["score_delta"] is not None
assert result["summary"]["improved_count"] >= 0
assert isinstance(result["improved"], list)
assert isinstance(result["worsened"], list)
assert result["unchanged_count"] >= 0
def test_compare_snapshots_regressions(self, db, techniques, admin_user):
"""Comparación detecta técnicas que empeoraron."""
# Create snapshot A
snap_a = create_snapshot(db, name="Before Regression")
# Worsen a technique
tech = db.query(Technique).filter(Technique.mitre_id == "T1059").first()
tech.status_global = TechniqueStatus.not_covered
db.commit()
snap_b = create_snapshot(db, name="After Regression")
result = compare_snapshots(db, snap_a.id, snap_b.id)
assert result["summary"]["worsened_count"] >= 0
def test_snapshot_cleanup(self, db, techniques, admin_user):
"""Cleanup mantiene solo los últimos N snapshots."""
# Create 5 snapshots
for i in range(5):
create_snapshot(db, name=f"Snapshot {i}")
total_before = db.query(CoverageSnapshot).count()
assert total_before == 5
# Cleanup keeping only 3
deleted = cleanup_old_snapshots(db, keep_last=3)
assert deleted == 2
total_after = db.query(CoverageSnapshot).count()
assert total_after == 3
def test_snapshot_normalized_storage(self, db, techniques, admin_user):
"""Verificar que el almacenamiento normalizado funciona correctamente."""
snapshot = create_snapshot(db, name="Normalized Check")
# Each technique should have exactly one SnapshotTechniqueState row
for tech in techniques:
states = (
db.query(SnapshotTechniqueState)
.filter(
SnapshotTechniqueState.snapshot_id == snapshot.id,
SnapshotTechniqueState.technique_id == tech.id,
)
.all()
)
assert len(states) == 1
state = states[0]
assert state.mitre_id == tech.mitre_id
assert state.status is not None
# ═══════════════════════════════════════════════════════════════════════
# Re-testing Tests
# ═══════════════════════════════════════════════════════════════════════
class TestRetesting:
def test_retest_created_on_remediation(self, db, techniques, admin_user):
"""Completar remediación crea retest automáticamente."""
test = Test(
technique_id=techniques[0].id,
name="Original Test",
state=TestState.validated,
remediation_status="completed",
created_by=admin_user.id,
)
db.add(test)
db.commit()
db.refresh(test)
retest = handle_remediation_completed(db, test, admin_user)
assert retest is not None
assert retest.retest_of == test.id
assert retest.retest_count == 1
assert retest.state == TestState.draft
assert retest.technique_id == test.technique_id
def test_retest_points_to_original(self, db, techniques, admin_user):
"""Retest de un retest apunta al test original, no al intermedio."""
original = Test(
technique_id=techniques[0].id,
name="Original",
state=TestState.validated,
remediation_status="completed",
created_by=admin_user.id,
retest_count=0,
)
db.add(original)
db.commit()
db.refresh(original)
# First retest
retest1 = handle_remediation_completed(db, original, admin_user)
assert retest1 is not None
assert retest1.retest_of == original.id
# Simulate completing remediation on retest1
retest1.state = TestState.validated
retest1.remediation_status = "completed"
db.commit()
db.refresh(retest1)
# Second retest — should point to ORIGINAL, not retest1
retest2 = handle_remediation_completed(db, retest1, admin_user)
assert retest2 is not None
assert retest2.retest_of == original.id # Points to original!
assert retest2.retest_count == 2
def test_retest_max_limit(self, db, techniques, admin_user):
"""Al alcanzar MAX_RETEST_COUNT no se crea retest."""
from app.config import settings
test = Test(
technique_id=techniques[0].id,
name="Max Retests Test",
state=TestState.validated,
remediation_status="completed",
created_by=admin_user.id,
retest_count=settings.MAX_RETEST_COUNT, # Already at max
)
db.add(test)
db.commit()
db.refresh(test)
result = handle_remediation_completed(db, test, admin_user)
assert result is None # No retest created
def test_retest_chain_query(self, db, techniques, admin_user):
"""Endpoint /tests/{id}/retest-chain retorna cadena completa."""
original = Test(
technique_id=techniques[0].id,
name="Chain Original",
state=TestState.validated,
remediation_status="completed",
created_by=admin_user.id,
)
db.add(original)
db.commit()
db.refresh(original)
retest1 = handle_remediation_completed(db, original, admin_user)
assert retest1 is not None
# Complete retest1 and trigger another
retest1.state = TestState.validated
retest1.remediation_status = "completed"
db.commit()
db.refresh(retest1)
retest2 = handle_remediation_completed(db, retest1, admin_user)
assert retest2 is not None
# Get chain
chain = get_retest_chain(db, original.id)
assert len(chain) == 3 # original + retest1 + retest2
assert chain[0].id == original.id
assert chain[1].retest_count == 1
assert chain[2].retest_count == 2
def test_retest_has_correct_data(self, db, techniques, admin_user):
"""Retest tiene mismos datos base que el original."""
original = Test(
technique_id=techniques[0].id,
name="Data Check Original",
description="Test description",
platform="windows",
procedure_text="Run cmd /c whoami",
tool_used="cmd.exe",
state=TestState.validated,
remediation_status="completed",
created_by=admin_user.id,
)
db.add(original)
db.commit()
db.refresh(original)
retest = handle_remediation_completed(db, original, admin_user)
assert retest is not None
# Verify base data is copied
assert retest.technique_id == original.technique_id
assert retest.description == original.description
assert retest.platform == original.platform
assert retest.procedure_text == original.procedure_text
assert retest.tool_used == original.tool_used
assert retest.created_by == original.created_by
assert retest.state == TestState.draft
-427
View File
@@ -1,427 +0,0 @@
"""Tests for data source import parsing — T-235.
Two levels:
- TestDataSourcesParsing: Unit tests using local fixtures (fast, no network)
- TestDataSourcesIntegration: Integration tests requiring network (pytest -m integration)
"""
import json
import os
import re
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import yaml
FIXTURES = Path(__file__).parent / "fixtures"
# ---------------------------------------------------------------------------
# Helpers — lightweight parsing functions extracted from import services
# for testable, isolated verification
# ---------------------------------------------------------------------------
def _parse_sigma_yaml(content: str) -> dict | None:
"""Parse a Sigma YAML rule and extract relevant fields."""
data = yaml.safe_load(content)
if not data or not isinstance(data, dict):
return None
title = data.get("title")
tags = data.get("tags", [])
# Extract MITRE technique IDs from tags
mitre_ids = []
for tag in tags:
match = re.match(r"attack\.(t\d{4}(?:\.\d{3})?)", tag, re.IGNORECASE)
if match:
mitre_ids.append(match.group(1).upper())
if not title or not mitre_ids:
return None
level = data.get("level", "medium")
logsource = data.get("logsource", {})
platforms = []
product = logsource.get("product", "")
if product:
platforms.append(product)
return {
"title": title,
"description": data.get("description"),
"mitre_ids": mitre_ids,
"severity": level,
"platforms": platforms,
"false_positives": data.get("falsepositives", []),
}
def _parse_lolbas_yaml(content: str) -> list[dict]:
"""Parse a LOLBAS YAML entry and extract templates."""
data = yaml.safe_load(content)
if not data or not isinstance(data, dict):
return []
name = data.get("Name", "")
commands = data.get("Commands", [])
results = []
for cmd in commands:
mitre_id = cmd.get("MitreID")
if not mitre_id:
continue
results.append({
"name": name,
"mitre_id": mitre_id,
"command": cmd.get("Command", ""),
"description": cmd.get("Description", ""),
"usecase": cmd.get("Usecase", ""),
})
return results
def _parse_caldera_yaml(content: str) -> list[dict]:
"""Parse a CALDERA multi-doc YAML and extract abilities."""
docs = list(yaml.safe_load_all(content))
results = []
for data in docs:
if not data or not isinstance(data, dict):
continue
technique = data.get("technique", {})
attack_id = technique.get("attack_id")
if not attack_id:
continue
platforms_dict = data.get("platforms", {})
platform_names = list(platforms_dict.keys())
# Extract commands
commands = []
for plat, executors in platforms_dict.items():
if isinstance(executors, dict):
for exec_name, exec_data in executors.items():
if isinstance(exec_data, dict) and exec_data.get("command"):
commands.append(exec_data["command"].strip())
results.append({
"id": data.get("id"),
"name": data.get("name"),
"description": data.get("description"),
"attack_id": attack_id,
"tactic": data.get("tactic"),
"platforms": platform_names,
"commands": commands,
})
return results
def _parse_elastic_toml(content: str) -> dict | None:
"""Parse an Elastic detection rule TOML and extract fields."""
try:
import toml
except ImportError:
toml = None
if toml is None:
# Fallback: parse manually enough for testing
return None
data = toml.loads(content)
rule = data.get("rule", {})
if not rule:
return None
name = rule.get("name")
threat_list = rule.get("threat", [])
mitre_ids = []
for threat_entry in threat_list:
framework = threat_entry.get("framework", "")
if "MITRE" not in framework:
continue
for tech in threat_entry.get("technique", []):
tech_id = tech.get("id")
if tech_id:
mitre_ids.append(tech_id)
for sub in tech.get("subtechnique", []):
sub_id = sub.get("id")
if sub_id:
mitre_ids.append(sub_id)
return {
"name": name,
"description": rule.get("description"),
"query": rule.get("query"),
"severity": rule.get("severity"),
"rule_type": rule.get("type"),
"mitre_ids": mitre_ids,
}
def _parse_stix_bundle(content: str) -> dict:
"""Parse a STIX 2.0 bundle and extract intrusion-sets and relationships."""
data = json.loads(content)
objects = data.get("objects", [])
intrusion_sets = []
relationships = []
attack_patterns = {}
for obj in objects:
obj_type = obj.get("type")
if obj_type == "intrusion-set":
refs = obj.get("external_references", [])
mitre_id = None
for ref in refs:
if ref.get("source_name") == "mitre-attack":
mitre_id = ref.get("external_id")
break
intrusion_sets.append({
"id": obj["id"],
"name": obj.get("name"),
"aliases": obj.get("aliases", []),
"description": obj.get("description"),
"mitre_id": mitre_id,
})
elif obj_type == "attack-pattern":
refs = obj.get("external_references", [])
for ref in refs:
if ref.get("source_name") == "mitre-attack":
attack_patterns[obj["id"]] = ref.get("external_id")
elif obj_type == "relationship":
if obj.get("relationship_type") == "uses":
relationships.append({
"source_ref": obj["source_ref"],
"target_ref": obj["target_ref"],
})
return {
"intrusion_sets": intrusion_sets,
"attack_patterns": attack_patterns,
"relationships": relationships,
}
def _parse_d3fend_api_response(data: dict) -> list[dict]:
"""Parse a mock D3FEND API response."""
results = []
def _walk(node: dict | list, depth: int = 0):
if isinstance(node, list):
for item in node:
_walk(item, depth)
elif isinstance(node, dict):
d3fend_id = node.get("@id", "")
label = node.get("rdfs:label", "")
if d3fend_id.startswith("d3f:") and label:
clean_id = d3fend_id.replace("d3f:", "")
if clean_id.startswith("D3-"):
definition = node.get("d3f:definition") or node.get("rdfs:comment", "")
results.append({
"d3fend_id": clean_id,
"name": label,
"description": definition,
})
# Recurse
for key, val in node.items():
if isinstance(val, (dict, list)):
_walk(val, depth + 1)
graph = data.get("@graph", data)
_walk(graph)
return results
# ═══════════════════════════════════════════════════════════════════════
# Unit tests — fast, no network
# ═══════════════════════════════════════════════════════════════════════
class TestDataSourcesParsing:
"""Tests unitarios — sin acceso a red, usando fixtures de YAML/TOML de ejemplo."""
def test_sigma_yaml_parsing(self):
"""Parsear un YAML de Sigma de ejemplo y verificar extracción de campos."""
content = (FIXTURES / "sample_sigma_rule.yml").read_text()
result = _parse_sigma_yaml(content)
assert result is not None
assert result["title"] == "Windows PowerShell Execution Policy Bypass"
assert "T1059.001" in result["mitre_ids"]
assert "T1562.001" in result["mitre_ids"]
assert result["severity"] == "high"
assert "windows" in result["platforms"]
assert len(result["false_positives"]) == 2
def test_lolbas_yaml_parsing(self):
"""Parsear un YAML de LOLBAS y verificar extracción de MitreID y commands."""
content = (FIXTURES / "sample_lolbas_entry.yml").read_text()
results = _parse_lolbas_yaml(content)
assert len(results) == 2
assert results[0]["name"] == "Mshta.exe"
assert results[0]["mitre_id"] == "T1218.005"
assert "mshta.exe" in results[0]["command"]
assert results[1]["mitre_id"] == "T1059.005"
def test_caldera_yaml_parsing(self):
"""Parsear un YAML de CALDERA ability y verificar campos."""
content = (FIXTURES / "sample_caldera_ability.yml").read_text()
results = _parse_caldera_yaml(content)
assert len(results) == 2
sys_info = results[0]
assert sys_info["name"] == "Get System Info"
assert sys_info["attack_id"] == "T1082"
assert sys_info["tactic"] == "discovery"
assert "windows" in sys_info["platforms"]
assert "linux" in sys_info["platforms"]
assert len(sys_info["commands"]) > 0
net_conn = results[1]
assert net_conn["attack_id"] == "T1049"
assert net_conn["name"] == "List Network Connections"
def test_elastic_toml_parsing(self):
"""Parsear un TOML de Elastic y verificar extracción de KQL y threat mappings."""
content = (FIXTURES / "sample_elastic_rule.toml").read_text()
try:
import toml # noqa: F401
except ImportError:
pytest.skip("toml package not installed")
result = _parse_elastic_toml(content)
assert result is not None
assert result["name"] == "Scheduled Task Created via Schtasks"
assert result["severity"] == "medium"
assert result["rule_type"] == "eql"
assert "T1053" in result["mitre_ids"]
assert "T1053.005" in result["mitre_ids"]
assert "schtasks.exe" in result["query"]
def test_stix_threat_actor_parsing(self):
"""Parsear un bundle STIX de ejemplo y verificar extracción de intrusion-sets y relationships."""
content = (FIXTURES / "sample_stix_bundle.json").read_text()
result = _parse_stix_bundle(content)
# Intrusion sets
assert len(result["intrusion_sets"]) == 2
apt1 = next(is_ for is_ in result["intrusion_sets"] if is_["name"] == "APT1")
assert apt1["mitre_id"] == "G0006"
assert "Comment Crew" in apt1["aliases"]
apt28 = next(is_ for is_ in result["intrusion_sets"] if is_["name"] == "APT28")
assert apt28["mitre_id"] == "G0007"
assert "Fancy Bear" in apt28["aliases"]
# Attack patterns
assert len(result["attack_patterns"]) == 3
assert "T1566" in result["attack_patterns"].values()
assert "T1059" in result["attack_patterns"].values()
# Relationships
assert len(result["relationships"]) == 4
apt1_rels = [r for r in result["relationships"] if "apt1" in r["source_ref"]]
assert len(apt1_rels) == 2
def test_d3fend_api_response_parsing(self):
"""Parsear una respuesta mock de la API D3FEND."""
mock_response = {
"@graph": [
{
"@id": "d3f:D3-AL",
"rdfs:label": "Application Layer",
"d3f:definition": "Monitoring at the application layer.",
},
{
"@id": "d3f:D3-NI",
"rdfs:label": "Network Isolation",
"rdfs:comment": "Isolating networks to prevent lateral movement.",
},
{
"@id": "d3f:NotATechnique",
"rdfs:label": "Something else",
"d3f:definition": "Not a D3FEND technique.",
},
{
"@id": "d3f:D3-DE",
"rdfs:label": "Decoy Environment",
"d3f:definition": "Using decoys to detect attackers.",
},
]
}
results = _parse_d3fend_api_response(mock_response)
assert len(results) == 3 # Only D3- prefixed IDs
ids = [r["d3fend_id"] for r in results]
assert "D3-AL" in ids
assert "D3-NI" in ids
assert "D3-DE" in ids
ni = next(r for r in results if r["d3fend_id"] == "D3-NI")
assert ni["name"] == "Network Isolation"
assert "lateral movement" in ni["description"].lower()
def test_no_duplicates_on_reimport(self):
"""Verificar que la lógica de deduplicación funciona con datos mock."""
content = (FIXTURES / "sample_sigma_rule.yml").read_text()
# Parse twice
result1 = _parse_sigma_yaml(content)
result2 = _parse_sigma_yaml(content)
# Same data should produce identical output
assert result1 == result2
assert result1["title"] == result2["title"]
assert result1["mitre_ids"] == result2["mitre_ids"]
# Simulate deduplication by title+mitre_id
seen = set()
unique_count = 0
for r in [result1, result2]:
key = (r["title"], tuple(r["mitre_ids"]))
if key not in seen:
seen.add(key)
unique_count += 1
assert unique_count == 1 # Only one unique entry
# ═══════════════════════════════════════════════════════════════════════
# Integration tests — require network. Run with: pytest -m integration
# ═══════════════════════════════════════════════════════════════════════
@pytest.mark.integration
class TestDataSourcesIntegration:
"""Tests de integración — requieren acceso a red. Ejecutar con: pytest -m integration"""
def test_sigma_full_import(self):
"""Importar desde GitHub real y verificar volumen."""
# This test would clone SigmaHQ and parse all rules
# Skipped in regular runs — requires network and significant time
pytest.skip("Full Sigma import requires network access — run with pytest -m integration")
def test_lolbas_full_import(self):
"""Importar LOLBAS completo."""
pytest.skip("Full LOLBAS import requires network access — run with pytest -m integration")
def test_caldera_full_import(self):
"""Importar CALDERA completo."""
pytest.skip("Full CALDERA import requires network access — run with pytest -m integration")
def test_elastic_full_import(self):
"""Importar Elastic rules completo."""
pytest.skip("Full Elastic import requires network access — run with pytest -m integration")
@@ -1,439 +0,0 @@
"""Tests for scoring, operational metrics, and compliance — T-236.
Uses the in-memory SQLite test database from conftest.py to verify
calculations with known data.
"""
import uuid
from datetime import datetime, timedelta
import pytest
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_template import TestTemplate
from app.models.detection_rule import DetectionRule
from app.models.test_detection_result import TestDetectionResult
from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqueMapping
from app.models.compliance import ComplianceFramework, ComplianceControl, ComplianceControlMapping
from app.models.audit import AuditLog
from app.models.enums import TestState, TestResult, TechniqueStatus
from app.services.scoring_service import (
calculate_technique_score,
calculate_tactic_score,
calculate_organization_score,
)
from app.services.operational_metrics_service import (
calculate_mttd,
calculate_mttr,
calculate_detection_efficacy,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def sample_technique(db):
"""Create a technique with known data."""
tech = Technique(
mitre_id="T1059",
name="Command and Scripting Interpreter",
tactic="execution",
platforms=["windows", "linux", "macos"],
status_global=TechniqueStatus.validated,
)
db.add(tech)
db.commit()
db.refresh(tech)
return tech
@pytest.fixture
def sample_technique_no_tests(db):
"""Create a technique with no tests."""
tech = Technique(
mitre_id="T9999",
name="No Tests Technique",
tactic="discovery",
platforms=["windows"],
status_global=TechniqueStatus.not_evaluated,
)
db.add(tech)
db.commit()
db.refresh(tech)
return tech
@pytest.fixture
def validated_tests(db, sample_technique, admin_user):
"""Create multiple validated tests with detection results."""
now = datetime.utcnow()
tests = []
for i, result in enumerate([TestResult.detected, TestResult.detected, TestResult.not_detected]):
test = Test(
technique_id=sample_technique.id,
name=f"Test {i+1} for T1059",
state=TestState.validated,
detection_result=result,
created_by=admin_user.id,
platform=["windows", "linux", "macos"][i % 3],
red_validated_at=now - timedelta(days=i * 30),
blue_validated_at=now - timedelta(days=i * 30),
created_at=now - timedelta(days=i * 30 + 5),
)
db.add(test)
tests.append(test)
db.commit()
for t in tests:
db.refresh(t)
return tests
@pytest.fixture
def compliance_setup(db, sample_technique, sample_technique_no_tests):
"""Create a compliance framework with controls mapped to techniques."""
framework = ComplianceFramework(
name="NIST 800-53",
version="5.0",
description="NIST Special Publication 800-53",
)
db.add(framework)
db.flush()
# Control 1: mapped to validated technique
control1 = ComplianceControl(
framework_id=framework.id,
control_id="AC-2",
title="Account Management",
category="Access Control",
)
db.add(control1)
db.flush()
mapping1 = ComplianceControlMapping(
compliance_control_id=control1.id,
technique_id=sample_technique.id,
)
db.add(mapping1)
# Control 2: mapped to technique with no tests
control2 = ComplianceControl(
framework_id=framework.id,
control_id="SI-4",
title="Information System Monitoring",
category="System and Information Integrity",
)
db.add(control2)
db.flush()
mapping2 = ComplianceControlMapping(
compliance_control_id=control2.id,
technique_id=sample_technique_no_tests.id,
)
db.add(mapping2)
db.commit()
return {
"framework": framework,
"control_covered": control1,
"control_not_covered": control2,
}
# ═══════════════════════════════════════════════════════════════════════
# Scoring Tests
# ═══════════════════════════════════════════════════════════════════════
class TestScoring:
def test_technique_score_all_detected(self, db, sample_technique, admin_user):
"""Técnica con todos los tests detected → score alto."""
now = datetime.utcnow()
for i in range(3):
test = Test(
technique_id=sample_technique.id,
name=f"All Detected {i}",
state=TestState.validated,
detection_result=TestResult.detected,
created_by=admin_user.id,
platform=["windows", "linux", "macos"][i],
red_validated_at=now - timedelta(days=10),
)
db.add(test)
db.commit()
result = calculate_technique_score(sample_technique, db)
assert result["total_score"] > 0
# Test component should be maxed out (all detected)
assert result["breakdown"]["tests_validated"]["score"] > 0
def test_technique_score_no_tests(self, db, sample_technique_no_tests):
"""Técnica sin tests → score 0."""
result = calculate_technique_score(sample_technique_no_tests, db)
assert result["total_score"] == 0
def test_technique_score_partial_detection(self, db, sample_technique, validated_tests):
"""Técnica con detección parcial → score intermedio."""
result = calculate_technique_score(sample_technique, db)
# 2 detected out of 3 validated → partial score
assert 0 < result["total_score"] < 100
breakdown = result["breakdown"]
assert "2/3" in breakdown["tests_validated"]["detail"]
def test_technique_score_freshness_penalty(self, db, sample_technique, admin_user):
"""Tests > 180 días → penalización en freshness."""
old_date = datetime.utcnow() - timedelta(days=200)
test = Test(
technique_id=sample_technique.id,
name="Old Test",
state=TestState.validated,
detection_result=TestResult.detected,
created_by=admin_user.id,
platform="windows",
red_validated_at=old_date,
)
db.add(test)
db.commit()
result = calculate_technique_score(sample_technique, db)
# Freshness should be 0 for tests > 180 days old
assert result["breakdown"]["freshness"]["score"] == 0
assert "200" in result["breakdown"]["freshness"]["detail"]
def test_scoring_weights_configurable(self, db, sample_technique, validated_tests):
"""Cambiar pesos cambia el score resultante."""
from app.config import settings
original_weight = settings.SCORING_WEIGHT_TESTS
score1 = calculate_technique_score(sample_technique, db)
# Change weight
settings.SCORING_WEIGHT_TESTS = 80
score2 = calculate_technique_score(sample_technique, db)
# Restore
settings.SCORING_WEIGHT_TESTS = original_weight
# Different weights should produce different scores
assert score1["total_score"] != score2["total_score"]
def test_organization_score_aggregation(self, db, sample_technique, validated_tests):
"""Score global agrega correctamente los scores de técnicas."""
result = calculate_organization_score(db)
assert result["techniques_total"] >= 1
assert result["overall_score"] >= 0
assert result["techniques_evaluated"] >= 0
# ═══════════════════════════════════════════════════════════════════════
# Operational Metrics Tests
# ═══════════════════════════════════════════════════════════════════════
class TestOperationalMetrics:
def test_mttd_calculation(self, db, sample_technique, admin_user):
"""MTTD se calcula desde timestamps del audit_log."""
now = datetime.utcnow()
test = Test(
technique_id=sample_technique.id,
name="MTTD Test",
state=TestState.validated,
created_by=admin_user.id,
)
db.add(test)
db.flush()
# Create audit log entries for state transitions
start_log = AuditLog(
user_id=admin_user.id,
action="start_execution",
entity_type="test",
entity_id=str(test.id),
timestamp=now - timedelta(hours=5),
)
submit_log = AuditLog(
user_id=admin_user.id,
action="submit_red",
entity_type="test",
entity_id=str(test.id),
timestamp=now - timedelta(hours=2),
)
db.add(start_log)
db.add(submit_log)
db.commit()
result = calculate_mttd(db)
# Should have data (3 hours between start and submit)
if result is not None:
assert result["sample_size"] >= 1
assert result["mean_hours"] >= 0
def test_mttr_calculation(self, db, sample_technique, admin_user):
"""MTTR incluye tiempo de remediación."""
now = datetime.utcnow()
test = Test(
technique_id=sample_technique.id,
name="MTTR Test",
state=TestState.validated,
remediation_status="completed",
blue_validated_at=now - timedelta(hours=48),
created_by=admin_user.id,
)
db.add(test)
db.flush()
# Audit log for remediation completion
log = AuditLog(
user_id=admin_user.id,
action="update_remediation",
entity_type="test",
entity_id=str(test.id),
timestamp=now - timedelta(hours=24),
)
db.add(log)
db.commit()
result = calculate_mttr(db)
if result is not None:
assert result["sample_size"] >= 1
assert result["mean_hours"] > 0
def test_detection_efficacy(self, db, sample_technique, validated_tests):
"""Detection efficacy con datos de prueba conocidos."""
result = calculate_detection_efficacy(db)
assert result["total"] == 3
assert result["detected"] == 2
assert result["not_detected"] == 1
expected_pct = round((2 / 3) * 100, 1)
assert result["percentage"] == expected_pct
def test_metrics_with_no_data(self, db):
"""Métricas retornan null/cero cuando no hay datos suficientes."""
mttd = calculate_mttd(db)
mttr = calculate_mttr(db)
efficacy = calculate_detection_efficacy(db)
assert mttd is None
assert mttr is None
assert efficacy["total"] == 0
assert efficacy["percentage"] == 0
# ═══════════════════════════════════════════════════════════════════════
# Compliance Tests
# ═══════════════════════════════════════════════════════════════════════
class TestCompliance:
def test_control_fully_covered(self, db, sample_technique, validated_tests, compliance_setup):
"""Control con todas las técnicas validated → covered."""
control = compliance_setup["control_covered"]
mappings = (
db.query(ComplianceControlMapping)
.filter(ComplianceControlMapping.compliance_control_id == control.id)
.all()
)
assert len(mappings) == 1
# The mapped technique has validated tests
technique = mappings[0].technique
assert technique.status_global == TechniqueStatus.validated
def test_control_not_covered(self, db, compliance_setup):
"""Control con todas las técnicas sin tests → not_covered."""
control = compliance_setup["control_not_covered"]
mappings = (
db.query(ComplianceControlMapping)
.filter(ComplianceControlMapping.compliance_control_id == control.id)
.all()
)
assert len(mappings) == 1
technique = mappings[0].technique
assert technique.status_global == TechniqueStatus.not_evaluated
def test_control_partially_covered(self, db, sample_technique, sample_technique_no_tests, admin_user, compliance_setup):
"""Control con técnicas mixtas → partially_covered."""
control = compliance_setup["control_covered"]
# Add second mapping to the not-evaluated technique
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=sample_technique_no_tests.id,
)
db.add(mapping)
db.commit()
# Now this control has two techniques: one validated, one not_evaluated
mappings = (
db.query(ComplianceControlMapping)
.filter(ComplianceControlMapping.compliance_control_id == control.id)
.all()
)
assert len(mappings) == 2
statuses = [m.technique.status_global for m in mappings]
assert TechniqueStatus.validated in statuses
assert TechniqueStatus.not_evaluated in statuses
def test_compliance_percentage(self, db, sample_technique, validated_tests, compliance_setup):
"""Porcentaje global de compliance calculado correctamente."""
framework = compliance_setup["framework"]
controls = (
db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
)
assert len(controls) == 2
covered = 0
total = len(controls)
for control in controls:
mappings = control.technique_mappings
if all(
m.technique.status_global in (TechniqueStatus.validated, TechniqueStatus.partial)
for m in mappings
):
covered += 1
pct = round((covered / total) * 100, 1)
assert pct == 50.0 # 1 out of 2 controls covered
def test_compliance_gaps(self, db, compliance_setup):
"""Gaps retorna solo controles no cubiertos con sus técnicas."""
framework = compliance_setup["framework"]
controls = (
db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
)
gaps = []
for control in controls:
mappings = control.technique_mappings
uncovered_techniques = [
m.technique
for m in mappings
if m.technique.status_global in (TechniqueStatus.not_evaluated, TechniqueStatus.not_covered)
]
if uncovered_techniques:
gaps.append({
"control_id": control.control_id,
"title": control.title,
"uncovered_techniques": [t.mitre_id for t in uncovered_techniques],
})
assert len(gaps) >= 1
si4_gap = next((g for g in gaps if g["control_id"] == "SI-4"), None)
assert si4_gap is not None
assert "T9999" in si4_gap["uncovered_techniques"]
-98
View File
@@ -604,101 +604,3 @@ Common HTTP status codes:
- `404` - Not Found (resource doesn't exist)
- `409` - Conflict (duplicate resource)
- `500` - Internal Server Error
---
## V3 Endpoints
### Campaigns
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/campaigns` | Authenticated | List campaigns (filters: status, type, search; pagination: offset, limit) |
| POST | `/api/v1/campaigns` | Authenticated | Create campaign |
| GET | `/api/v1/campaigns/{id}` | Authenticated | Campaign detail with tests |
| PATCH | `/api/v1/campaigns/{id}` | Creator, Admin | Update campaign |
| DELETE | `/api/v1/campaigns/{id}` | Creator, Admin | Delete campaign |
| POST | `/api/v1/campaigns/{id}/tests` | Authenticated | Add test to campaign |
| DELETE | `/api/v1/campaigns/{id}/tests/{test_id}` | Authenticated | Remove test from campaign |
| PATCH | `/api/v1/campaigns/{id}/schedule` | Authenticated | Set recurring schedule |
| GET | `/api/v1/campaigns/{id}/history` | Authenticated | Execution history for recurring campaigns |
### Threat Actors
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/threat-actors` | Authenticated | List threat actors (filters: country, motivation, search) |
| GET | `/api/v1/threat-actors/{id}` | Authenticated | Detail with technique mappings |
### Detection Rules
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/detection-rules` | Authenticated | List rules (filters: source, mitre_technique_id, severity, search) |
| GET | `/api/v1/detection-rules/{id}` | Authenticated | Rule detail |
### D3FEND (Defensive Techniques)
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/d3fend` | Authenticated | List defensive techniques |
| GET | `/api/v1/d3fend/{id}` | Authenticated | Detail with ATT&CK mappings |
### Compliance
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/compliance/frameworks` | Authenticated | List compliance frameworks |
| GET | `/api/v1/compliance/{framework_id}/controls` | Authenticated | List controls with coverage status |
| GET | `/api/v1/compliance/{framework_id}/summary` | Authenticated | Coverage summary (total, covered, gaps) |
| GET | `/api/v1/compliance/{framework_id}/gaps` | Authenticated | Gap analysis — uncovered controls |
### Scores
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/scores/technique/{mitre_id}` | Authenticated | Technique score with detailed breakdown |
| GET | `/api/v1/scores/tactic/{tactic}` | Authenticated | Average tactic score |
| GET | `/api/v1/scores/threat-actor/{id}` | Authenticated | Coverage score against threat actor |
| GET | `/api/v1/scores/organization` | Authenticated | Overall organization score (cached 5 min) |
| GET | `/api/v1/scores/history` | Authenticated | Weekly score history (period: 30d, 90d, 1y) |
| GET | `/api/v1/scores/config` | Admin | Current scoring weights |
| PATCH | `/api/v1/scores/config` | Admin | Update scoring weights |
### Operational Metrics
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/metrics/operational` | Authenticated | All KPIs (MTTD, MTTR, efficacy, etc.) — cached 5 min |
| GET | `/api/v1/metrics/operational/trend` | Authenticated | Weekly trend (period: 30d, 90d, 1y) |
| GET | `/api/v1/metrics/operational/by-team` | Authenticated | Red vs Blue team breakdown |
### Heatmap
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/heatmap` | Authenticated | Full ATT&CK Navigator-style heatmap data |
### Coverage Snapshots
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/snapshots` | Authenticated | List snapshots (pagination: offset, limit) |
| POST | `/api/v1/snapshots` | Authenticated | Create new snapshot |
| GET | `/api/v1/snapshots/{id}` | Authenticated | Snapshot detail with technique states |
| DELETE | `/api/v1/snapshots/{id}` | Admin | Delete snapshot |
| GET | `/api/v1/snapshots/compare` | Authenticated | Compare two snapshots (query: a, b) |
### Re-testing
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/tests/{id}/retest-chain` | Authenticated | Full retest chain for a test |
### Data Sources
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/data-sources` | Admin | List all data sources |
| PATCH | `/api/v1/data-sources/{id}` | Admin | Update source config |
| POST | `/api/v1/data-sources/{id}/sync` | Admin | Trigger manual sync |
-220
View File
@@ -1,220 +0,0 @@
# Aegis — Architecture
## High-Level Overview
```
┌────────────────────┐ ┌─────────────────────┐
│ React Frontend │──────▶│ FastAPI Backend │
│ (Vite / TS / TW) │ REST │ (Python 3.11) │
└────────────────────┘ └──────┬──────┬────────┘
│ │
┌─────────┘ └─────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ PostgreSQL │ │ MinIO │
│ (Data Store) │ │ (Object Storage) │
└─────────────────┘ └─────────────────┘
```
- **Frontend** — React 19 + TypeScript + Tailwind CSS v4 + TanStack Query
- **Backend** — FastAPI with SQLAlchemy ORM + Alembic migrations
- **Database** — PostgreSQL 15 with UUID primary keys and JSONB columns
- **Object Storage** — MinIO (S3-compatible) for evidence files
- **Scheduler** — APScheduler (in-process) for background jobs
---
## Database Schema
### Core Tables
| Table | Description |
|-------|-------------|
| `users` | User accounts with role-based access (admin, red_tech, blue_tech, red_lead, blue_lead, viewer) |
| `techniques` | MITRE ATT&CK techniques with coverage status, tactic, platforms (JSONB) |
| `tests` | Security tests with full Red/Blue workflow fields, dual validation, remediation, and retest chain |
| `test_templates` | Predefined test catalog from Atomic Red Team, Sigma, CALDERA, LOLBAS, custom |
| `evidences` | Evidence files separated by team (red/blue) with SHA256 integrity verification |
### Detection & Defense
| Table | Description |
|-------|-------------|
| `detection_rules` | Imported detection rules (Sigma, Elastic, custom) linked to ATT&CK techniques |
| `test_detection_results` | Per-test detection rule evaluation results (triggered / not triggered) |
| `test_template_detection_rules` | Template ↔ detection rule associations |
| `defensive_techniques` | MITRE D3FEND defensive techniques |
| `defensive_technique_mappings` | ATT&CK technique ↔ D3FEND defensive technique mappings |
### Campaigns & Scheduling
| Table | Description |
|-------|-------------|
| `campaigns` | Test campaign groupings with scheduling (recurring, weekly/monthly/quarterly) |
| `campaign_tests` | Ordered test assignments within campaigns with dependency support |
### Intelligence & Actors
| Table | Description |
|-------|-------------|
| `threat_actors` | MITRE CTI intrusion sets with aliases, country, motivation, JSONB targets |
| `threat_actor_techniques` | Threat actor ↔ ATT&CK technique mappings |
| `intel_items` | Threat intelligence items from RSS feeds |
### Compliance
| Table | Description |
|-------|-------------|
| `compliance_frameworks` | Compliance frameworks (e.g., NIST 800-53) |
| `compliance_controls` | Individual controls within a framework |
| `compliance_control_mappings` | Control ↔ ATT&CK technique mappings |
### Operational
| Table | Description |
|-------|-------------|
| `coverage_snapshots` | Point-in-time coverage status captures with aggregate metrics |
| `snapshot_technique_states` | Normalized per-technique state within a snapshot |
| `audit_logs` | System-wide audit trail with JSONB details |
| `notifications` | In-app notifications with read status |
| `data_sources` | External data source configuration and sync status |
### Key Relationships
```
Technique ──1:N── Test ──1:N── Evidence
│ │
│ ├── TestDetectionResult ──N:1── DetectionRule
│ └── CampaignTest ──N:1── Campaign
├── ThreatActorTechnique ──N:1── ThreatActor
├── DefensiveTechniqueMapping ──N:1── DefensiveTechnique
├── ComplianceControlMapping ──N:1── ComplianceControl ──N:1── ComplianceFramework
└── SnapshotTechniqueState ──N:1── CoverageSnapshot
Test ──retest_of──▶ Test (self-referential retest chain)
Campaign ──parent_campaign_id──▶ Campaign (recurring execution history)
```
---
## Backend Architecture
### Layered Structure
```
routers/ ← HTTP endpoints (input validation, auth, response shaping)
services/ ← Business logic (state machines, calculations, imports)
models/ ← SQLAlchemy ORM models
database.py ← Engine + session management (lazy initialization)
```
### Services
| Service | Responsibility |
|---------|---------------|
| `test_workflow_service` | Test state machine (draft → validated/rejected) with dual validation |
| `scoring_service` | 0100 scoring for techniques, tactics, actors, organization |
| `score_cache` | In-memory TTL cache (5 min) for expensive score/metric calculations |
| `operational_metrics_service` | MTTD, MTTR, detection efficacy, alert fidelity, coverage velocity |
| `snapshot_service` | Coverage snapshot creation, temporal comparison, cleanup |
| `campaign_service` | Campaign CRUD, progress tracking, circular dependency prevention |
| `campaign_scheduler_service` | Recurring campaign execution (clone + schedule next run) |
| `status_service` | Technique status recalculation from test results |
| `notification_service` | In-app notification CRUD and state-change alerts |
| `audit_service` | Immutable audit trail logging |
| `mitre_sync_service` | MITRE ATT&CK sync via TAXII 2.0 / GitHub fallback |
| `atomic_import_service` | Atomic Red Team template import from GitHub |
| `sigma_import_service` | SigmaHQ detection rule import |
| `elastic_import_service` | Elastic detection rule import (TOML) |
| `caldera_import_service` | CALDERA ability import |
| `lolbas_import_service` | LOLBAS/GTFOBins template import |
| `d3fend_import_service` | MITRE D3FEND defensive technique import |
| `threat_actor_import_service` | MITRE CTI threat actor import (STIX) |
| `compliance_import_service` | NIST 800-53 ↔ ATT&CK mapping import |
| `intel_service` | RSS-based threat intelligence scanning |
### Scheduled Jobs (APScheduler)
| Job | Schedule | Description |
|-----|----------|-------------|
| MITRE Sync | Every 24h | Sync ATT&CK techniques from TAXII/GitHub |
| Intel Scan | Every 7 days | Scan RSS feeds for threat intelligence |
| Notification Cleanup | Every 24h | Remove old read notifications |
| Weekly Snapshot | Sundays 00:00 | Create coverage snapshot + cleanup old ones |
| Recurring Campaigns | Every 24h | Check and execute due recurring campaigns |
---
## Test Lifecycle (State Machine)
```
┌──────┐ ┌──────────────┐ ┌─────────────────┐ ┌───────────┐
│ DRAFT│───▶│RED_EXECUTING │───▶│ BLUE_EVALUATING │───▶│ IN_REVIEW │
└──────┘ └──────────────┘ └─────────────────┘ └─────┬─────┘
┌───────────────────┤
▼ ▼
┌──────────┐ ┌──────────┐
│ REJECTED │ │VALIDATED │
└────┬─────┘ └──────────┘
│ │
└──▶ Back to DRAFT ├──▶ Remediation
└──▶ Auto Re-test
```
**Dual Validation in IN_REVIEW:**
- Red Lead votes approve/reject
- Blue Lead votes approve/reject
- Both approve → VALIDATED
- Either rejects → REJECTED
- One votes, other pending → stays IN_REVIEW
**Auto Re-testing:** When remediation is completed on a validated test, the system automatically creates a follow-up retest (up to `MAX_RETEST_COUNT` = 3).
---
## Frontend Architecture
### Key Technologies
- **React 19** with TypeScript
- **Vite 7** for bundling
- **Tailwind CSS v4** for styling
- **TanStack Query** for server state management
- **TanStack Virtual** for table virtualization
- **React Router v7** for routing
- **Recharts** for charts and visualizations
- **Lucide React** for icons
### Page Lazy Loading
All pages except `LoginPage` and `DashboardPage` are lazy-loaded via `React.lazy()` with `<Suspense>` fallbacks for optimal initial bundle size.
### Role-Based Navigation
The sidebar dynamically filters navigation items based on the current user's role:
| Section | Visible to |
|---------|-----------|
| Dashboard | All roles |
| Executive Dashboard | admin, red_lead, blue_lead |
| ATT&CK Matrix | All roles |
| Tests (sub-menu) | All roles |
| Campaigns | All roles |
| Threat Actors | All roles |
| Compliance | All roles |
| Comparison | admin, red_lead, blue_lead |
| Reports | All roles |
| System (admin section) | admin only |
### Performance Optimizations
- **React.memo** on `HeatmapCell` (renders 3000+ times in full matrix)
- **useMemo** / **useCallback** for expensive calculations in memoized components
- **useDebounce** hook for search inputs (300ms delay)
- **TanStack Virtual** for large table virtualization (test templates, detection rules, audit logs)
- **Lazy loading** for all non-critical page bundles
-281
View File
@@ -1,281 +0,0 @@
# Aegis — Data Sources
Aegis imports security data from multiple external sources to populate its test catalog, detection rules, defensive techniques, threat actors, and compliance mappings. This document describes each source, its format, and how to manage imports.
---
## Overview
| Source | Type | Format | Destination |
|--------|------|--------|-------------|
| MITRE ATT&CK | Techniques | STIX 2.0 (TAXII / GitHub) | `techniques` |
| Atomic Red Team | Test Templates | YAML | `test_templates` |
| SigmaHQ | Detection Rules | YAML | `detection_rules` |
| Elastic Detection Rules | Detection Rules | TOML | `detection_rules` |
| CALDERA | Test Templates | YAML (multi-doc) | `test_templates` |
| LOLBAS | Test Templates | YAML | `test_templates` |
| MITRE D3FEND | Defensive Techniques | JSON-LD | `defensive_techniques` + mappings |
| MITRE CTI | Threat Actors | STIX 2.0 (JSON) | `threat_actors` + technique mappings |
| NIST 800-53 → ATT&CK | Compliance Mappings | STIX 2.0 (JSON) | `compliance_*` tables |
---
## MITRE ATT&CK (Techniques)
**Repository:** https://github.com/mitre/cti (Enterprise ATT&CK)
**Protocol:** TAXII 2.0 with GitHub JSON fallback
**Format:** STIX 2.0 bundles
**Service:** `mitre_sync_service.py`
**Schedule:** Automatic every 24 hours via APScheduler
**Extracted fields:**
- `mitre_id` — External ID (e.g., T1059, T1059.001)
- `name` — Technique name
- `description` — Full description
- `tactic` — ATT&CK tactic (execution, persistence, etc.)
- `platforms` — Target platforms (windows, linux, macos, etc.)
- `is_subtechnique` — Whether it's a sub-technique
- `url` — Link to MITRE page
**Manual trigger:**
```bash
# Via API
curl -X POST http://localhost:8000/api/v1/system/sync-mitre \
-H "Authorization: Bearer $TOKEN"
# Via container
docker exec aegis-backend python -c "from app.services.mitre_sync_service import sync_mitre_attack; sync_mitre_attack()"
```
**Volume:** ~700 techniques (Enterprise ATT&CK v16)
**Troubleshooting:**
- If TAXII fails (timeout/rate limit), the service automatically falls back to the GitHub JSON bundle
- Check `data_sources` table for `last_sync_at` and `last_sync_stats`
---
## Atomic Red Team (Test Templates)
**Repository:** https://github.com/redcanaryco/atomic-red-team
**Format:** YAML (one file per technique under `atomics/T*/T*.yaml`)
**Service:** `atomic_import_service.py`
**Extracted fields:**
- `name` — Test name
- `description` — What the test does
- `mitre_technique_id` — ATT&CK technique ID
- `attack_commands` — Commands to execute
- `expected_detection` — What should be detected
- `platform` — Target OS
- `severity` — Derived from technique context
- `cleanup_commands` — Cleanup procedures
**Import:**
```bash
curl -X POST http://localhost:8000/api/v1/system/import-atomic-red-team \
-H "Authorization: Bearer $TOKEN"
```
**Volume:** ~3,500 test templates
**Frequency:** Monthly or after ATT&CK version updates
---
## SigmaHQ (Detection Rules)
**Repository:** https://github.com/SigmaHQ/sigma
**Format:** YAML (Sigma rule format)
**Service:** `sigma_import_service.py`
**Extracted fields:**
- `name` — Rule title
- `description` — Rule description
- `query` — Sigma detection logic
- `mitre_technique_id` — Extracted from `tags: attack.tXXXX`
- `severity` — From `level` field (low, medium, high, critical)
- `platforms` — From `logsource.product`
- `source` = `"sigma"`
**Example Sigma rule tags:**
```yaml
tags:
- attack.execution
- attack.t1059.001
- attack.defense_evasion
- attack.t1562.001
```
**Volume:** ~3,000 detection rules
**Frequency:** Monthly recommended
**Troubleshooting:**
- Rules without MITRE technique tags in `attack.tXXXX` format are skipped
- Duplicate detection is by `name` + `source` + `mitre_technique_id`
---
## Elastic Detection Rules
**Repository:** https://github.com/elastic/detection-rules
**Format:** TOML (one file per rule under `rules/`)
**Service:** `elastic_import_service.py`
**Extracted fields:**
- `name` — Rule name from `[rule]`
- `description` — Rule description
- `query` — KQL/EQL query
- `mitre_technique_id` — From `[[rule.threat]]` entries
- `severity` — From `rule.severity`
- `rule_type` — eql, query, threshold, etc.
- `source` = `"elastic"`
**TOML structure:**
```toml
[rule]
name = "Scheduled Task Created via Schtasks"
severity = "medium"
type = "eql"
[[rule.threat]]
framework = "MITRE ATT&CK"
[[rule.threat.technique]]
id = "T1053"
name = "Scheduled Task/Job"
[[rule.threat.technique.subtechnique]]
id = "T1053.005"
```
**Volume:** ~1,200 detection rules
**Frequency:** Quarterly recommended
---
## CALDERA (Test Templates)
**Repository:** https://github.com/mitre/caldera
**Format:** YAML (multi-document, abilities under `data/abilities/`)
**Service:** `caldera_import_service.py`
**Extracted fields:**
- `name` — Ability name
- `description` — What the ability does
- `mitre_technique_id` — From `technique.attack_id`
- `tactic` — ATT&CK tactic
- `platforms` — Target platforms
- `attack_commands` — Commands per platform/executor
**Volume:** ~500 abilities
**Frequency:** Quarterly recommended
---
## LOLBAS (Test Templates)
**Repository:** https://github.com/LOLBAS-Project/LOLBAS
**Format:** YAML (one file per binary under `yml/OSBinaries/`, `yml/OtherMSBinaries/`, etc.)
**Service:** `lolbas_import_service.py`
**Extracted fields:**
- `name` — Binary name (e.g., Mshta.exe)
- `mitre_technique_id` — From `Commands[].MitreID`
- `attack_commands` — From `Commands[].Command`
- `description` — From `Commands[].Description`
- `usecase` — From `Commands[].Usecase`
**Volume:** ~200 living-off-the-land binaries with ~500 commands
**Frequency:** Quarterly recommended
---
## MITRE D3FEND (Defensive Techniques)
**Repository:** https://d3fend.mitre.org/
**Format:** JSON-LD (REST API at `https://d3fend.mitre.org/api/`)
**Service:** `d3fend_import_service.py`
**Extracted fields:**
- `d3fend_id` — D3FEND identifier (e.g., D3-AL, D3-NI)
- `name` — Defensive technique name
- `description` — Definition or comment
- `tactic` — Defensive tactic (Detect, Isolate, Deceive, Evict, Harden)
- ATT&CK ↔ D3FEND mappings stored in `defensive_technique_mappings`
**Volume:** ~200 defensive techniques
**Frequency:** Annually (D3FEND updates are infrequent)
---
## MITRE CTI — Threat Actors
**Repository:** https://github.com/mitre/cti (enterprise-attack)
**Format:** STIX 2.0 JSON bundles (`intrusion-set`, `relationship`, `attack-pattern`)
**Service:** `threat_actor_import_service.py`
**Extracted fields:**
- `name` — Actor name (e.g., APT28)
- `mitre_id` — MITRE group ID (e.g., G0007)
- `aliases` — Known aliases (JSONB array)
- `description` — Full description
- `country` — Attribution (when available)
- `motivation` — Espionage, financial, etc.
- `target_sectors` / `target_regions` — JSONB arrays
- Technique mappings via `relationship` objects
**Volume:** ~140 threat actors with ~2,000 technique mappings
**Frequency:** Quarterly recommended
---
## NIST 800-53 → ATT&CK (Compliance)
**Repository:** https://github.com/center-for-threat-informed-defense/attack-control-framework-mappings
**Format:** STIX 2.0 JSON bundles
**Service:** `compliance_import_service.py`
**Extracted fields:**
- `ComplianceFramework` — Framework name and version
- `ComplianceControl` — Control ID (e.g., AC-2), title, category
- `ComplianceControlMapping` — Control ↔ ATT&CK technique associations
**Volume:** ~1,000 controls with ~5,000 mappings
**Frequency:** Annually (mappings are versioned with the framework)
---
## Managing Data Sources
### Admin UI
Navigate to **System → Data Sources** in the Aegis frontend to:
- View all configured data sources and their sync status
- Trigger manual imports
- Enable/disable individual sources
- View import statistics (imported, updated, errors)
### API Endpoints
```bash
# List data sources
GET /api/v1/data-sources
# Trigger import for a specific source
POST /api/v1/data-sources/{id}/sync
# Enable/disable a source
PATCH /api/v1/data-sources/{id}
```
### Recommended Update Schedule
| Source | Frequency | Reason |
|--------|-----------|--------|
| MITRE ATT&CK | Automatic (24h) | Core framework, frequent updates |
| Atomic Red Team | Monthly | Active community contributions |
| SigmaHQ | Monthly | Active community contributions |
| Elastic Rules | Quarterly | Major version-aligned releases |
| CALDERA | Quarterly | Less frequent updates |
| LOLBAS | Quarterly | Less frequent updates |
| D3FEND | Annually | Infrequent updates |
| CTI Actors | Quarterly | New groups and campaigns |
| NIST 800-53 | Annually | Framework revision cycles |
-285
View File
@@ -1,285 +0,0 @@
# Aegis — Scoring System
Aegis uses a granular 0100 scoring system to measure security coverage at multiple levels: individual techniques, tactics, threat actors, and the overall organization.
---
## Technique Score (0100)
Each ATT&CK technique receives a composite score based on five weighted components:
| Component | Default Weight | Description |
|-----------|---------------|-------------|
| Tests Validated | 40% | Ratio of detected tests to total validated tests |
| Detection Rules | 20% | Number of active detection rules linked to the technique |
| D3FEND Coverage | 15% | Number of D3FEND defensive techniques mapped |
| Freshness | 15% | How recent the latest validated test is |
| Platform Diversity | 10% | Coverage across different platforms (Windows, Linux, macOS) |
### Tests Validated Component
```
score = (detected_tests / total_validated_tests) × weight
```
- Only tests in `validated` state are counted
- `detected` means `detection_result = "detected"`
- Example: 2 detected out of 3 validated → `2/3 × 40 = 26.7`
### Detection Rules Component
```
score = min(active_rules / 3, 1.0) × weight
```
- Counts active detection rules linked to the technique's `mitre_id`
- 3+ rules gives full marks (capped at 1.0)
- Example: 2 active rules → `2/3 × 20 = 13.3`
### D3FEND Coverage Component
```
score = min(d3fend_mappings / 2, 1.0) × weight
```
- Counts D3FEND defensive technique mappings
- 2+ mappings gives full marks
- Example: 1 mapping → `1/2 × 15 = 7.5`
### Freshness Component
```
days = (now - newest_validated_test.red_validated_at).days
score = max(0, 1.0 - days / 180) × weight
```
- 0 days old = full freshness score
- 180+ days old = 0 (completely stale)
- Linear decay between 0 and 180 days
- Example: test is 60 days old → `(1 - 60/180) × 15 = 10.0`
### Platform Diversity Component
```
platforms_covered = unique platforms across validated tests
score = min(platforms_covered / 3, 1.0) × weight
```
- Counts unique platforms (windows, linux, macos) from validated tests
- 3+ platforms gives full marks
- Example: windows + linux → `2/3 × 10 = 6.7`
### Example Calculation
A technique with:
- 2/3 tests detected, 2 detection rules, 1 D3FEND mapping, 60 days old, 2 platforms
```
Tests: (2/3) × 40 = 26.7
Detection: (2/3) × 20 = 13.3
D3FEND: (1/2) × 15 = 7.5
Freshness: (1 - 60/180) × 15 = 10.0
Platform: (2/3) × 10 = 6.7
─────
Total: 64.2
```
---
## Configuring Weights
Weights are configurable via environment variables or the admin API. They must sum to 100.
### Environment Variables
```env
SCORING_WEIGHT_TESTS=40
SCORING_WEIGHT_DETECTION_RULES=20
SCORING_WEIGHT_D3FEND=15
SCORING_WEIGHT_FRESHNESS=15
SCORING_WEIGHT_PLATFORM_DIVERSITY=10
```
### API Configuration
```bash
# Get current weights
GET /api/v1/scores/config
# Update weights (admin only)
PATCH /api/v1/scores/config
{
"tests": 50,
"detection_rules": 20,
"d3fend": 10,
"freshness": 10,
"platform_diversity": 10
}
```
Note: Runtime changes do not persist across restarts. Update the `.env` file or environment variables for permanent changes.
---
## Tactic Score
The tactic score is the **average** of all technique scores within that tactic:
```
tactic_score = mean(technique_scores for techniques in tactic)
```
Also provides:
- `techniques_total` — number of techniques in the tactic
- `techniques_evaluated` — techniques with score > 0
- `techniques_by_status` — count by status (validated, partial, not_covered, not_evaluated)
### API
```bash
GET /api/v1/scores/tactic/execution
GET /api/v1/scores/tactic/persistence
```
---
## Threat Actor Coverage Score
Measures how well the organization is covered against a specific threat actor:
```
actor_score = mean(technique_scores for techniques used by actor)
```
Also provides:
- `techniques_total` — techniques attributed to the actor
- `techniques_covered` — techniques with score > 0
- `coverage_percentage` — percentage of techniques covered
- `uncovered_techniques` — list of technique IDs with score = 0
### API
```bash
GET /api/v1/scores/threat-actor/{actor_id}
```
---
## Organization Score
The top-level organizational security score is a weighted average of four sub-scores:
| Sub-score | Weight | Description |
|-----------|--------|-------------|
| Total Coverage | 40% | Average technique score across all evaluated techniques |
| Critical Coverage | 25% | Average score for techniques with high/critical severity templates |
| Detection Maturity | 20% | `(triggered_rules / total_active_rules) × 100` |
| Response Readiness | 15% | `(remediation_completed / remediation_total) × 100` |
```
org_score = total_coverage × 0.4
+ critical_coverage × 0.25
+ detection_maturity × 0.2
+ response_readiness × 0.15
```
### Caching
The organization score is cached in-memory for 5 minutes. The cache is automatically invalidated when:
- A test is validated (state → `validated`)
- Scoring weights are updated via the API
### API
```bash
GET /api/v1/scores/organization
```
---
## Operational Metrics
In addition to coverage scores, Aegis tracks operational KPIs:
### Mean Time to Detect (MTTD)
Time from test execution start (`start_execution` audit entry) to red team submission (`submit_red`).
```
MTTD = mean(submit_red.timestamp - start_execution.timestamp) for all tests
```
### Mean Time to Respond (MTTR)
Time from blue team evaluation (`blue_validated_at`) to remediation completion (`update_remediation` audit entry).
```
MTTR = mean(update_remediation.timestamp - blue_validated_at) for remediated tests
```
### Detection Efficacy
```
efficacy = (detected_tests / total_validated_tests) × 100
```
### Alert Fidelity
Ratio of true positive detections to total detection rule evaluations.
### Coverage Velocity
Rate at which new techniques are being covered over time (techniques covered per week).
### Validation Throughput
Number of tests moving through the pipeline per time period.
### Rejection Rate
Percentage of tests rejected during dual validation.
### API
```bash
# All operational metrics
GET /api/v1/metrics/operational
# Weekly trend data
GET /api/v1/metrics/operational/trend?period=90d
# Breakdown by team
GET /api/v1/metrics/operational/by-team
```
---
## Score History
Weekly score snapshots for trend analysis:
```bash
GET /api/v1/scores/history?period=90d
# Returns weekly data points with: date, overall_score, total_coverage,
# critical_coverage, detection_maturity, response_readiness
```
Periods: `30d`, `90d`, `1y`
---
## Coverage Snapshots
Point-in-time captures of the complete coverage state for historical comparison:
```bash
# Create a snapshot
POST /api/v1/snapshots
{ "name": "Q1 2026 Baseline" }
# Compare two snapshots
GET /api/v1/snapshots/compare?a={snapshot_id_a}&b={snapshot_id_b}
# Returns: score_delta, improved techniques, worsened techniques, unchanged count
```
Automatic weekly snapshots are created every Sunday at 00:00 by the scheduler, with old snapshots cleaned up to keep the last 52 (one year).
+41 -73
View File
@@ -1,33 +1,27 @@
import React, { Suspense } from "react";
import { Routes, Route, Navigate } from "react-router-dom";
import LoadingSpinner from "./components/LoadingSpinner";
import Layout from "./components/Layout";
import ProtectedRoute from "./components/ProtectedRoute";
/* ── Eagerly loaded (core pages) ──────────────────────────────────── */
import LoginPage from "./pages/LoginPage";
import DashboardPage from "./pages/DashboardPage";
/* ── Lazy loaded (V1-V3 pages) ────────────────────────────────────── */
const TechniquesPage = React.lazy(() => import("./pages/TechniquesPage"));
const MatrixPage = React.lazy(() => import("./pages/MatrixPage"));
const ExecutiveDashboardPage = React.lazy(() => import("./pages/ExecutiveDashboardPage"));
const CompliancePage = React.lazy(() => import("./pages/CompliancePage"));
const TechniqueDetailPage = React.lazy(() => import("./pages/TechniqueDetailPage"));
const TestsPage = React.lazy(() => import("./pages/TestsPage"));
const TestCreatePage = React.lazy(() => import("./pages/TestCreatePage"));
const TestDetailPage = React.lazy(() => import("./pages/TestDetailPage"));
const TestCatalogPage = React.lazy(() => import("./pages/TestCatalogPage"));
const ReportsPage = React.lazy(() => import("./pages/ReportsPage"));
const SystemPage = React.lazy(() => import("./pages/SystemPage"));
const UsersPage = React.lazy(() => import("./pages/UsersPage"));
const AuditLogPage = React.lazy(() => import("./pages/AuditLogPage"));
const DataSourcesPage = React.lazy(() => import("./pages/DataSourcesPage"));
const ThreatActorsPage = React.lazy(() => import("./pages/ThreatActorsPage"));
const ThreatActorDetailPage = React.lazy(() => import("./pages/ThreatActorDetailPage"));
const CampaignsPage = React.lazy(() => import("./pages/CampaignsPage"));
const CampaignDetailPage = React.lazy(() => import("./pages/CampaignDetailPage"));
const ComparisonPage = React.lazy(() => import("./pages/ComparisonPage"));
import TechniquesPage from "./pages/TechniquesPage";
import MatrixPage from "./pages/MatrixPage";
import ExecutiveDashboardPage from "./pages/ExecutiveDashboardPage";
import CompliancePage from "./pages/CompliancePage";
import TechniqueDetailPage from "./pages/TechniqueDetailPage";
import TestsPage from "./pages/TestsPage";
import TestCreatePage from "./pages/TestCreatePage";
import TestDetailPage from "./pages/TestDetailPage";
import TestCatalogPage from "./pages/TestCatalogPage";
import ReportsPage from "./pages/ReportsPage";
import SystemPage from "./pages/SystemPage";
import UsersPage from "./pages/UsersPage";
import AuditLogPage from "./pages/AuditLogPage";
import DataSourcesPage from "./pages/DataSourcesPage";
import ThreatActorsPage from "./pages/ThreatActorsPage";
import ThreatActorDetailPage from "./pages/ThreatActorDetailPage";
import CampaignsPage from "./pages/CampaignsPage";
import CampaignDetailPage from "./pages/CampaignDetailPage";
import ComparisonPage from "./pages/ComparisonPage";
import Layout from "./components/Layout";
import ProtectedRoute from "./components/ProtectedRoute";
export default function App() {
return (
@@ -43,61 +37,35 @@ export default function App() {
</ProtectedRoute>
}
>
{/* ── Core ─────────────────────────────────────────────── */}
<Route path="/dashboard" element={<DashboardPage />} />
<Route path="/techniques" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><TechniquesPage /></Suspense>} />
<Route path="/techniques/:mitreId" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><TechniqueDetailPage /></Suspense>} />
<Route path="/matrix" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><MatrixPage /></Suspense>} />
{/* ── Executive Dashboard (leads + admin) ──────────────── */}
<Route path="/techniques" element={<TechniquesPage />} />
<Route path="/matrix" element={<MatrixPage />} />
<Route
path="/executive-dashboard"
element={
<ProtectedRoute roles={["admin", "red_lead", "blue_lead"]}>
<Suspense fallback={<LoadingSpinner text="Loading…" />}><ExecutiveDashboardPage /></Suspense>
<ExecutiveDashboardPage />
</ProtectedRoute>
}
/>
{/* ── Tests ────────────────────────────────────────────── */}
<Route path="/tests" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><TestsPage /></Suspense>} />
<Route path="/tests/new" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><TestCreatePage /></Suspense>} />
<Route path="/tests/:testId" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><TestDetailPage /></Suspense>} />
<Route path="/test-catalog" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><TestCatalogPage /></Suspense>} />
<Route path="/test-catalog/:templateId/use" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><TestCatalogPage /></Suspense>} />
{/* ── Campaigns ────────────────────────────────────────── */}
<Route path="/campaigns" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><CampaignsPage /></Suspense>} />
<Route path="/campaigns/:campaignId" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><CampaignDetailPage /></Suspense>} />
{/* ── Threat Actors ────────────────────────────────────── */}
<Route path="/threat-actors" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><ThreatActorsPage /></Suspense>} />
<Route path="/threat-actors/:actorId" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><ThreatActorDetailPage /></Suspense>} />
{/* ── Compliance ───────────────────────────────────────── */}
<Route path="/compliance" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><CompliancePage /></Suspense>} />
{/* ── Comparison (leads + admin) ───────────────────────── */}
<Route
path="/comparison"
element={
<ProtectedRoute roles={["admin", "red_lead", "blue_lead"]}>
<Suspense fallback={<LoadingSpinner text="Loading…" />}><ComparisonPage /></Suspense>
</ProtectedRoute>
}
/>
{/* ── Reports ──────────────────────────────────────────── */}
<Route path="/reports" element={<Suspense fallback={<LoadingSpinner text="Loading…" />}><ReportsPage /></Suspense>} />
{/* ── System (admin only) ──────────────────────────────── */}
<Route path="/techniques/:mitreId" element={<TechniqueDetailPage />} />
<Route path="/tests" element={<TestsPage />} />
<Route path="/tests/new" element={<TestCreatePage />} />
<Route path="/tests/:testId" element={<TestDetailPage />} />
<Route path="/test-catalog" element={<TestCatalogPage />} />
<Route path="/test-catalog/:templateId/use" element={<TestCatalogPage />} />
<Route path="/reports" element={<ReportsPage />} />
<Route path="/threat-actors" element={<ThreatActorsPage />} />
<Route path="/threat-actors/:actorId" element={<ThreatActorDetailPage />} />
<Route path="/campaigns" element={<CampaignsPage />} />
<Route path="/campaigns/:campaignId" element={<CampaignDetailPage />} />
<Route path="/comparison" element={<ComparisonPage />} />
<Route path="/compliance" element={<CompliancePage />} />
<Route
path="/system"
element={
<ProtectedRoute roles={["admin"]}>
<Suspense fallback={<LoadingSpinner text="Loading…" />}><SystemPage /></Suspense>
<SystemPage />
</ProtectedRoute>
}
/>
@@ -105,7 +73,7 @@ export default function App() {
path="/users"
element={
<ProtectedRoute roles={["admin"]}>
<Suspense fallback={<LoadingSpinner text="Loading…" />}><UsersPage /></Suspense>
<UsersPage />
</ProtectedRoute>
}
/>
@@ -113,7 +81,7 @@ export default function App() {
path="/audit"
element={
<ProtectedRoute roles={["admin"]}>
<Suspense fallback={<LoadingSpinner text="Loading…" />}><AuditLogPage /></Suspense>
<AuditLogPage />
</ProtectedRoute>
}
/>
@@ -121,7 +89,7 @@ export default function App() {
path="/data-sources"
element={
<ProtectedRoute roles={["admin"]}>
<Suspense fallback={<LoadingSpinner text="Loading…" />}><DataSourcesPage /></Suspense>
<DataSourcesPage />
</ProtectedRoute>
}
/>
+17 -27
View File
@@ -19,7 +19,6 @@ import {
Gauge,
ShieldCheck,
GitCompareArrows,
ScrollText,
} from "lucide-react";
import { useAuth } from "../context/AuthContext";
@@ -27,15 +26,13 @@ interface NavItem {
to: string;
label: string;
icon: React.FC<{ className?: string }>;
/** Roles allowed to see this item. undefined = everyone. */
roles?: string[];
children?: NavItem[];
}
const mainLinks: NavItem[] = [
{ to: "/dashboard", label: "Dashboard", icon: LayoutDashboard },
{ to: "/executive-dashboard", label: "Executive Dashboard", icon: Gauge, roles: ["admin", "red_lead", "blue_lead"] },
{ to: "/matrix", label: "ATT&CK Matrix", icon: Grid3X3 },
{ to: "/techniques", label: "ATT&CK Matrix", icon: Shield },
{ to: "/matrix", label: "Advanced Heatmap", icon: Grid3X3 },
{
to: "/tests",
label: "Tests",
@@ -46,18 +43,19 @@ const mainLinks: NavItem[] = [
{ to: "/test-catalog", label: "Test Catalog", icon: BookOpen },
],
},
{ to: "/campaigns", label: "Campaigns", icon: Zap },
{ to: "/threat-actors", label: "Threat Actors", icon: Crosshair },
{ to: "/compliance", label: "Compliance", icon: ShieldCheck },
{ to: "/comparison", label: "Comparison", icon: GitCompareArrows, roles: ["admin", "red_lead", "blue_lead"] },
{ to: "/executive-dashboard", label: "Executive Dashboard", icon: Gauge },
{ to: "/reports", label: "Reports", icon: BarChart3 },
{ to: "/threat-actors", label: "Threat Actors", icon: Crosshair },
{ to: "/campaigns", label: "Campaigns", icon: Zap },
{ to: "/comparison", label: "Comparison", icon: GitCompareArrows },
{ to: "/compliance", label: "Compliance", icon: ShieldCheck },
];
const systemLinks: NavItem[] = [
{ to: "/data-sources", label: "Data Sources", icon: Database },
{ to: "/system", label: "MITRE Sync", icon: ScrollText },
const adminLinks: NavItem[] = [
{ to: "/users", label: "Users", icon: Users },
{ to: "/audit", label: "Audit Log", icon: FileText },
{ to: "/data-sources", label: "Data Sources", icon: Database },
{ to: "/system", label: "System", icon: Settings },
];
function SidebarLink({ item }: { item: NavItem }) {
@@ -119,15 +117,7 @@ function SidebarLink({ item }: { item: NavItem }) {
export default function Sidebar() {
const { user } = useAuth();
const role = user?.role ?? "";
const isAdmin = role === "admin";
/** Returns true when the current user is allowed to see `item`. */
const canSee = (item: NavItem) => {
if (!item.roles) return true; // no restriction
if (isAdmin) return true; // admin sees everything
return item.roles.includes(role);
};
const isAdmin = user?.role === "admin";
return (
<aside className="flex h-screen w-60 flex-col border-r border-gray-800 bg-gray-900">
@@ -140,19 +130,19 @@ export default function Sidebar() {
</div>
{/* Main nav */}
<nav className="flex-1 space-y-1 overflow-y-auto px-3 py-4">
{mainLinks.filter(canSee).map((item) => (
<nav className="flex-1 space-y-1 px-3 py-4">
{mainLinks.map((item) => (
<SidebarLink key={item.to + item.label} item={item} />
))}
{/* System / Administration section — admin only */}
{/* Admin section */}
{isAdmin && (
<>
<div className="my-3 border-t border-gray-800" />
<p className="mb-2 px-3 text-[10px] font-semibold uppercase tracking-widest text-gray-600">
System
Administration
</p>
{systemLinks.map((item) => (
{adminLinks.map((item) => (
<SidebarLink key={item.to} item={item} />
))}
</>
@@ -162,7 +152,7 @@ export default function Sidebar() {
{/* Footer */}
<div className="border-t border-gray-800 px-5 py-4">
<p className="truncate text-xs text-gray-500">
{user?.username ?? "—"} · {role || "—"}
{user?.role ?? "—"}
</p>
</div>
</aside>
+13 -27
View File
@@ -1,4 +1,4 @@
import React, { useState, useMemo, useCallback } from "react";
import { useState } from "react";
import type { HeatmapTechnique } from "../../api/heatmap";
import HeatmapTooltip from "./HeatmapTooltip";
@@ -8,12 +8,7 @@ interface HeatmapCellProps {
onClick: (techniqueId: string) => void;
}
/**
* Memoized heatmap cell this component renders 3000+ times in the
* full ATT&CK matrix, so React.memo prevents unnecessary re-renders
* when only sibling cells change.
*/
const HeatmapCell = React.memo(function HeatmapCell({ technique, size, onClick }: HeatmapCellProps) {
export default function HeatmapCell({ technique, size, onClick }: HeatmapCellProps) {
const [showTooltip, setShowTooltip] = useState(false);
const sizeClasses = {
@@ -25,28 +20,21 @@ const HeatmapCell = React.memo(function HeatmapCell({ technique, size, onClick }
const bgColor = technique.enabled ? technique.color : "transparent";
const isDisabled = !technique.enabled;
// Memoize text color (derived from background hex)
const textColor = useMemo(() => {
const hex = bgColor;
// Determine text color based on background brightness
const getTextColor = (hex: string): string => {
if (!hex || hex === "transparent" || hex === "") return "text-gray-600";
const r = parseInt(hex.slice(1, 3), 16);
const g = parseInt(hex.slice(3, 5), 16);
const b = parseInt(hex.slice(5, 7), 16);
const brightness = (r * 299 + g * 587 + b * 114) / 1000;
return brightness > 128 ? "text-gray-900" : "text-white";
}, [bgColor]);
};
// Status indicators — memoized
const { testsCount, reviewRequired, isValidated } = useMemo(() => {
const hasTests = technique.metadata.find((m) => m.name === "tests_count");
return {
testsCount: hasTests ? parseInt(hasTests.value, 10) : 0,
reviewRequired: technique.comment?.toLowerCase().includes("review") ?? false,
isValidated: technique.score >= 100,
};
}, [technique.metadata, technique.comment, technique.score]);
const handleClick = useCallback(() => onClick(technique.techniqueID), [onClick, technique.techniqueID]);
// Status indicators
const hasTests = technique.metadata.find((m) => m.name === "tests_count");
const testsCount = hasTests ? parseInt(hasTests.value, 10) : 0;
const reviewRequired = technique.comment?.toLowerCase().includes("review");
const isValidated = technique.score >= 100;
return (
<div
@@ -55,7 +43,7 @@ const HeatmapCell = React.memo(function HeatmapCell({ technique, size, onClick }
onMouseLeave={() => setShowTooltip(false)}
>
<button
onClick={handleClick}
onClick={() => onClick(technique.techniqueID)}
disabled={isDisabled}
className={`
w-full rounded border transition-all duration-150
@@ -71,7 +59,7 @@ const HeatmapCell = React.memo(function HeatmapCell({ technique, size, onClick }
backgroundColor: isDisabled ? undefined : bgColor,
}}
>
<span className={`truncate font-mono font-medium leading-tight ${textColor}`}>
<span className={`truncate font-mono font-medium leading-tight ${getTextColor(bgColor)}`}>
{technique.techniqueID}
</span>
{size !== "compact" && !isDisabled && (
@@ -90,6 +78,4 @@ const HeatmapCell = React.memo(function HeatmapCell({ technique, size, onClick }
)}
</div>
);
});
export default HeatmapCell;
}
-24
View File
@@ -1,24 +0,0 @@
import { useState, useEffect } from "react";
/**
* Debounce a value useful for search inputs that trigger API calls.
*
* @param value The raw value to debounce.
* @param delay Delay in milliseconds (default 300ms).
* @returns The debounced value that updates only after `delay` ms of inactivity.
*
* @example
* const [search, setSearch] = useState("");
* const debouncedSearch = useDebounce(search, 300);
* // use `debouncedSearch` in a TanStack Query key
*/
export function useDebounce<T>(value: T, delay = 300): T {
const [debounced, setDebounced] = useState(value);
useEffect(() => {
const timer = setTimeout(() => setDebounced(value), delay);
return () => clearTimeout(timer);
}, [value, delay]);
return debounced;
}