feat: Phase 1 - Data models and migrations (T-004 to T-009)

Implements all database models for the Aegis platform with full
Alembic migration support.

Models created:
- User: Authentication with role-based access control
- Technique: MITRE ATT&CK techniques with coverage status tracking
- Test: Security tests with validation workflow (draft/review/validated)
- Evidence: File metadata for test evidence (stored in MinIO)
- IntelItem: Threat intelligence items linked to techniques
- AuditLog: System-wide audit trail with JSONB details

Enumerations:
- TechniqueStatus: not_evaluated, in_progress, validated, partial, etc.
- TestState: draft, in_review, validated, rejected
- TestResult: detected, not_detected, partially_detected

Services:
- audit_service.py: log_action() helper for audit logging

All models include proper foreign key relationships and PostgreSQL
enum types are managed correctly in migrations (create/drop).
This commit is contained in:
2026-02-06 12:26:26 +01:00
parent b479acdea0
commit ec65991ac1
18 changed files with 561 additions and 1 deletions

View File

@@ -85,10 +85,33 @@ Aegis/
│ ├── __init__.py │ ├── __init__.py
│ ├── main.py # FastAPI application entry point │ ├── main.py # FastAPI application entry point
│ ├── config.py # Application settings │ ├── config.py # Application settings
── database.py # SQLAlchemy configuration ── database.py # SQLAlchemy configuration
│ ├── models/ # SQLAlchemy models
│ │ ├── user.py # User authentication model
│ │ ├── technique.py # MITRE ATT&CK techniques
│ │ ├── test.py # Security tests
│ │ ├── evidence.py # Test evidence files
│ │ ├── intel.py # Threat intelligence items
│ │ ├── audit.py # Audit logging
│ │ └── enums.py # Shared enumerations
│ └── services/ # Business logic services
│ └── audit_service.py
└── frontend/ # React frontend (coming soon) └── frontend/ # React frontend (coming soon)
``` ```
## Database Schema
The platform uses the following data models:
| Table | Description |
|-------|-------------|
| `users` | User accounts with role-based access |
| `techniques` | MITRE ATT&CK techniques with coverage status |
| `tests` | Security tests validating technique coverage |
| `evidences` | File evidence attached to tests (stored in MinIO) |
| `intel_items` | Threat intelligence items linked to techniques |
| `audit_logs` | System-wide audit trail for all actions |
## Configuration ## Configuration
The application can be configured via environment variables: The application can be configured via environment variables:

View File

@@ -7,6 +7,7 @@ from alembic import context
from app.database import Base from app.database import Base
from app.config import settings from app.config import settings
from app.models import * # noqa: F401, F403 - Import all models for Alembic detection
# this is the Alembic Config object, which provides # this is the Alembic Config object, which provides
# access to the values within the .ini file in use. # access to the values within the .ini file in use.

View File

@@ -0,0 +1,57 @@
"""add_tests_table
Revision ID: 14c6f07b47cb
Revises: 476930a8d86e
Create Date: 2026-02-06 11:01:01.822240
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '14c6f07b47cb'
down_revision: Union[str, Sequence[str], None] = '476930a8d86e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# Create enum types
testresult_enum = postgresql.ENUM('detected', 'not_detected', 'partially_detected', name='testresult')
teststate_enum = postgresql.ENUM('draft', 'in_review', 'validated', 'rejected', name='teststate')
testresult_enum.create(op.get_bind(), checkfirst=True)
teststate_enum.create(op.get_bind(), checkfirst=True)
# Create table
op.create_table('tests',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('technique_id', sa.UUID(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('platform', sa.String(), nullable=True),
sa.Column('procedure_text', sa.Text(), nullable=True),
sa.Column('tool_used', sa.String(), nullable=True),
sa.Column('execution_date', sa.DateTime(), nullable=True),
sa.Column('created_by', sa.UUID(), nullable=True),
sa.Column('result', postgresql.ENUM('detected', 'not_detected', 'partially_detected', name='testresult', create_type=False), nullable=True),
sa.Column('state', postgresql.ENUM('draft', 'in_review', 'validated', 'rejected', name='teststate', create_type=False), nullable=True),
sa.Column('validated_by', sa.UUID(), nullable=True),
sa.Column('validated_at', sa.DateTime(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['created_by'], ['users.id'], ),
sa.ForeignKeyConstraint(['technique_id'], ['techniques.id'], ),
sa.ForeignKeyConstraint(['validated_by'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_table('tests')
# Drop enum types
postgresql.ENUM(name='testresult').drop(op.get_bind(), checkfirst=True)
postgresql.ENUM(name='teststate').drop(op.get_bind(), checkfirst=True)

View File

@@ -0,0 +1,43 @@
"""add_users_table
Revision ID: 4671fccfa705
Revises: f03e5712c21c
Create Date: 2026-02-06 10:30:07.210676
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '4671fccfa705'
down_revision: Union[str, Sequence[str], None] = 'f03e5712c21c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('users',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('username', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=True),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('role', sa.String(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('last_login', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('username')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('users')
# ### end Alembic commands ###

View File

@@ -0,0 +1,54 @@
"""add_techniques_table
Revision ID: 476930a8d86e
Revises: 4671fccfa705
Create Date: 2026-02-06 10:37:17.777106
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '476930a8d86e'
down_revision: Union[str, Sequence[str], None] = '4671fccfa705'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# Create enum type explicitly
techniquestatus_enum = postgresql.ENUM(
'not_evaluated', 'in_progress', 'validated', 'partial', 'not_covered', 'review_required',
name='techniquestatus'
)
techniquestatus_enum.create(op.get_bind(), checkfirst=True)
# Create table with create_type=False since we already created the enum
op.create_table('techniques',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('mitre_id', sa.String(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('tactic', sa.String(), nullable=True),
sa.Column('platforms', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('mitre_version', sa.String(), nullable=True),
sa.Column('mitre_last_modified', sa.DateTime(), nullable=True),
sa.Column('is_subtechnique', sa.Boolean(), nullable=True),
sa.Column('parent_mitre_id', sa.String(), nullable=True),
sa.Column('status_global', postgresql.ENUM('not_evaluated', 'in_progress', 'validated', 'partial', 'not_covered', 'review_required', name='techniquestatus', create_type=False), nullable=True),
sa.Column('review_required', sa.Boolean(), nullable=True),
sa.Column('last_review_date', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('mitre_id')
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_table('techniques')
# Drop enum type after dropping table
postgresql.ENUM(name='techniquestatus').drop(op.get_bind(), checkfirst=True)

View File

@@ -0,0 +1,39 @@
"""add_evidences_table
Revision ID: 7cec59461d53
Revises: 14c6f07b47cb
Create Date: 2026-02-06 11:06:14.535381
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '7cec59461d53'
down_revision: Union[str, Sequence[str], None] = '14c6f07b47cb'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
op.create_table('evidences',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('test_id', sa.UUID(), nullable=False),
sa.Column('file_name', sa.String(), nullable=False),
sa.Column('file_path', sa.String(), nullable=False),
sa.Column('sha256_hash', sa.String(), nullable=False),
sa.Column('uploaded_by', sa.UUID(), nullable=True),
sa.Column('uploaded_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['test_id'], ['tests.id'], ),
sa.ForeignKeyConstraint(['uploaded_by'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_table('evidences')

View File

@@ -0,0 +1,38 @@
"""add_audit_logs_table
Revision ID: a1412d1ef337
Revises: ef21f4a670ed
Create Date: 2026-02-06 11:19:09.293785
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'a1412d1ef337'
down_revision: Union[str, Sequence[str], None] = 'ef21f4a670ed'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
op.create_table('audit_logs',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('user_id', sa.UUID(), nullable=True),
sa.Column('action', sa.String(), nullable=False),
sa.Column('entity_type', sa.String(), nullable=True),
sa.Column('entity_id', sa.String(), nullable=True),
sa.Column('timestamp', sa.DateTime(), nullable=True),
sa.Column('details', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_table('audit_logs')

View File

@@ -0,0 +1,38 @@
"""add_intel_items_table
Revision ID: ef21f4a670ed
Revises: 7cec59461d53
Create Date: 2026-02-06 11:10:30.452222
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'ef21f4a670ed'
down_revision: Union[str, Sequence[str], None] = '7cec59461d53'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
op.create_table('intel_items',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('technique_id', sa.UUID(), nullable=True),
sa.Column('url', sa.String(), nullable=False),
sa.Column('title', sa.String(), nullable=True),
sa.Column('source', sa.String(), nullable=True),
sa.Column('detected_at', sa.DateTime(), nullable=True),
sa.Column('reviewed', sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(['technique_id'], ['techniques.id'], ),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_table('intel_items')

View File

@@ -0,0 +1,13 @@
# Import all models here so Alembic can detect them
from app.models.user import User
from app.models.technique import Technique
from app.models.test import Test
from app.models.evidence import Evidence
from app.models.intel import IntelItem
from app.models.audit import AuditLog
from app.models.enums import TechniqueStatus, TestState, TestResult
__all__ = [
"User", "Technique", "Test", "Evidence", "IntelItem", "AuditLog",
"TechniqueStatus", "TestState", "TestResult"
]

View File

@@ -0,0 +1,29 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, String, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.database import Base
class AuditLog(Base):
"""
Audit log model for tracking all system actions.
Records user actions, entity changes, and system events
for security auditing and compliance purposes.
"""
__tablename__ = "audit_logs"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
action = Column(String, nullable=False)
entity_type = Column(String, nullable=True)
entity_id = Column(String, nullable=True)
timestamp = Column(DateTime, default=datetime.utcnow)
details = Column(JSONB, nullable=True)
# Relationships
user = relationship("User")

View File

@@ -0,0 +1,23 @@
import enum
class TechniqueStatus(str, enum.Enum):
not_evaluated = "not_evaluated"
in_progress = "in_progress"
validated = "validated"
partial = "partial"
not_covered = "not_covered"
review_required = "review_required"
class TestState(str, enum.Enum):
draft = "draft"
in_review = "in_review"
validated = "validated"
rejected = "rejected"
class TestResult(str, enum.Enum):
detected = "detected"
not_detected = "not_detected"
partially_detected = "partially_detected"

View File

@@ -0,0 +1,30 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, String, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
class Evidence(Base):
"""
Evidence model for storing file metadata associated with tests.
Files are stored in MinIO, and this model tracks the file location,
integrity hash, and upload metadata.
"""
__tablename__ = "evidences"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
test_id = Column(UUID(as_uuid=True), ForeignKey("tests.id"), nullable=False)
file_name = Column(String, nullable=False)
file_path = Column(String, nullable=False) # Path in MinIO
sha256_hash = Column(String, nullable=False)
uploaded_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
uploaded_at = Column(DateTime, default=datetime.utcnow)
# Relationships
test = relationship("Test", back_populates="evidences")
uploader = relationship("User", foreign_keys=[uploaded_by])

View File

@@ -0,0 +1,29 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Boolean, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
class IntelItem(Base):
"""
Intelligence item model for tracking threat intelligence related to techniques.
Stores URLs and metadata from automated intel scans that may indicate
new attack variations or detection bypasses for specific techniques.
"""
__tablename__ = "intel_items"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
technique_id = Column(UUID(as_uuid=True), ForeignKey("techniques.id"), nullable=True)
url = Column(String, nullable=False)
title = Column(String, nullable=True)
source = Column(String, nullable=True)
detected_at = Column(DateTime, default=datetime.utcnow)
reviewed = Column(Boolean, default=False)
# Relationships
technique = relationship("Technique")

View File

@@ -0,0 +1,39 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, Boolean, DateTime, Enum
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.database import Base
from app.models.enums import TechniqueStatus
class Technique(Base):
"""
MITRE ATT&CK Technique model.
Represents an attack technique from the MITRE ATT&CK framework,
including its coverage status and associated tests.
"""
__tablename__ = "techniques"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
mitre_id = Column(String, unique=True, nullable=False) # e.g., "T1059.001"
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
tactic = Column(String, nullable=True)
platforms = Column(JSONB, nullable=True, default=[])
mitre_version = Column(String, nullable=True)
mitre_last_modified = Column(DateTime, nullable=True)
is_subtechnique = Column(Boolean, default=False)
parent_mitre_id = Column(String, nullable=True)
status_global = Column(
Enum(TechniqueStatus, name="techniquestatus"),
default=TechniqueStatus.not_evaluated
)
review_required = Column(Boolean, default=False)
last_review_date = Column(DateTime, nullable=True)
# Relationships
tests = relationship("Test", back_populates="technique")

View File

@@ -0,0 +1,40 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, DateTime, ForeignKey, Enum
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
from app.models.enums import TestState, TestResult
class Test(Base):
"""
Test model representing a security test for a MITRE ATT&CK technique.
Each test documents an attempt to validate coverage of a specific technique,
including the procedure, tools used, and outcome.
"""
__tablename__ = "tests"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
technique_id = Column(UUID(as_uuid=True), ForeignKey("techniques.id"), nullable=False)
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
platform = Column(String, nullable=True)
procedure_text = Column(Text, nullable=True)
tool_used = Column(String, nullable=True)
execution_date = Column(DateTime, nullable=True)
created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
result = Column(Enum(TestResult, name="testresult"), nullable=True)
state = Column(Enum(TestState, name="teststate"), default=TestState.draft)
validated_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
validated_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
technique = relationship("Technique", back_populates="tests")
evidences = relationship("Evidence", back_populates="test")
creator = relationship("User", foreign_keys=[created_by])
validator = relationship("User", foreign_keys=[validated_by])

View File

@@ -0,0 +1,31 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Boolean, DateTime
from sqlalchemy.dialects.postgresql import UUID
from app.database import Base
class User(Base):
"""
User model for authentication and authorization.
Possible roles:
- admin: Full system access
- red_tech: Red team technician - can create and edit tests
- blue_tech: Blue team technician - can create and edit tests
- red_lead: Red team lead - can validate tests
- blue_lead: Blue team lead - can validate tests
- viewer: Read-only access (default)
"""
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username = Column(String, unique=True, nullable=False)
email = Column(String, nullable=True)
hashed_password = Column(String, nullable=False)
role = Column(String, nullable=False, default="viewer")
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
last_login = Column(DateTime, nullable=True)

View File

View File

@@ -0,0 +1,33 @@
from sqlalchemy.orm import Session
from app.models.audit import AuditLog
def log_action(
db: Session,
user_id,
action: str,
entity_type: str = None,
entity_id: str = None,
details: dict = None
):
"""
Log an action to the audit log.
Args:
db: Database session
user_id: UUID of the user performing the action (can be None for system actions)
action: Description of the action (e.g., "create_test", "validate_technique")
entity_type: Type of entity affected (e.g., "technique", "test", "user")
entity_id: ID of the entity affected
details: Additional details as a dictionary (stored as JSONB)
"""
log = AuditLog(
user_id=user_id,
action=action,
entity_type=entity_type,
entity_id=str(entity_id) if entity_id else None,
details=details,
)
db.add(log)
db.commit()