Compare commits

...

3 Commits

25 changed files with 4631 additions and 1 deletions
@@ -0,0 +1,48 @@
"""add_data_sources_table
Revision ID: b008datasources
Revises: b007remediation
Create Date: 2026-02-09 14:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
# revision identifiers, used by Alembic.
revision: str = 'b008datasources'
down_revision: Union[str, Sequence[str], None] = 'b007remediation'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create data_sources table."""
op.create_table(
'data_sources',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('name', sa.String(), unique=True, nullable=False),
sa.Column('display_name', sa.String(), nullable=False),
sa.Column('type', sa.String(), nullable=False),
sa.Column('url', sa.String(), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('is_enabled', sa.Boolean(), server_default='true'),
sa.Column('last_sync_at', sa.DateTime(), nullable=True),
sa.Column('last_sync_status', sa.String(), nullable=True),
sa.Column('last_sync_stats', JSONB(), nullable=True),
sa.Column('sync_frequency', sa.String(), nullable=True),
sa.Column('config', JSONB(), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
)
op.create_index('ix_data_sources_type', 'data_sources', ['type'])
op.create_index('ix_data_sources_is_enabled', 'data_sources', ['is_enabled'])
def downgrade() -> None:
"""Drop data_sources table."""
op.drop_index('ix_data_sources_is_enabled', table_name='data_sources')
op.drop_index('ix_data_sources_type', table_name='data_sources')
op.drop_table('data_sources')
@@ -0,0 +1,52 @@
"""add_detection_rules_table
Revision ID: b009detectionrules
Revises: b008datasources
Create Date: 2026-02-09 14:10:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
# revision identifiers, used by Alembic.
revision: str = 'b009detectionrules'
down_revision: Union[str, Sequence[str], None] = 'b008datasources'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create detection_rules table."""
op.create_table(
'detection_rules',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('mitre_technique_id', sa.String(), nullable=False),
sa.Column('title', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('source', sa.String(), nullable=False),
sa.Column('source_id', sa.String(), nullable=True),
sa.Column('source_url', sa.String(), nullable=True),
sa.Column('rule_content', sa.Text(), nullable=False),
sa.Column('rule_format', sa.String(), nullable=False),
sa.Column('severity', sa.String(), nullable=True),
sa.Column('platforms', JSONB(), nullable=True),
sa.Column('log_sources', JSONB(), nullable=True),
sa.Column('false_positive_rate', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), server_default='true'),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
)
op.create_index('ix_detection_rules_mitre_technique_id', 'detection_rules', ['mitre_technique_id'])
op.create_index('ix_detection_rules_source', 'detection_rules', ['source'])
op.create_index('ix_detection_rules_severity', 'detection_rules', ['severity'])
def downgrade() -> None:
"""Drop detection_rules table."""
op.drop_index('ix_detection_rules_severity', table_name='detection_rules')
op.drop_index('ix_detection_rules_source', table_name='detection_rules')
op.drop_index('ix_detection_rules_mitre_technique_id', table_name='detection_rules')
op.drop_table('detection_rules')
@@ -0,0 +1,72 @@
"""add_threat_actors_tables
Revision ID: b010threatactors
Revises: b009detectionrules
Create Date: 2026-02-09 15:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
# revision identifiers, used by Alembic.
revision: str = 'b010threatactors'
down_revision: Union[str, Sequence[str], None] = 'b009detectionrules'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create threat_actors and threat_actor_techniques tables."""
# threat_actors
op.create_table(
'threat_actors',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('mitre_id', sa.String(), unique=True, nullable=True),
sa.Column('name', sa.String(), nullable=False),
sa.Column('aliases', JSONB(), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('country', sa.String(), nullable=True),
sa.Column('target_sectors', JSONB(), nullable=True),
sa.Column('target_regions', JSONB(), nullable=True),
sa.Column('motivation', sa.String(), nullable=True),
sa.Column('sophistication', sa.String(), nullable=True),
sa.Column('first_seen', sa.String(), nullable=True),
sa.Column('last_seen', sa.String(), nullable=True),
sa.Column('references', JSONB(), nullable=True),
sa.Column('mitre_url', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), server_default='true'),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
)
op.create_index('ix_threat_actors_country', 'threat_actors', ['country'])
op.create_index('ix_threat_actors_motivation', 'threat_actors', ['motivation'])
# threat_actor_techniques (junction table)
op.create_table(
'threat_actor_techniques',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('threat_actor_id', UUID(as_uuid=True),
sa.ForeignKey('threat_actors.id', ondelete='CASCADE'), nullable=False),
sa.Column('technique_id', UUID(as_uuid=True),
sa.ForeignKey('techniques.id', ondelete='CASCADE'), nullable=False),
sa.Column('usage_description', sa.Text(), nullable=True),
sa.Column('first_seen_using', sa.String(), nullable=True),
)
op.create_index('ix_threat_actor_techniques_actor', 'threat_actor_techniques', ['threat_actor_id'])
op.create_index('ix_threat_actor_techniques_technique', 'threat_actor_techniques', ['technique_id'])
op.create_unique_constraint('uq_actor_technique', 'threat_actor_techniques',
['threat_actor_id', 'technique_id'])
def downgrade() -> None:
"""Drop threat_actor_techniques and threat_actors tables."""
op.drop_constraint('uq_actor_technique', 'threat_actor_techniques', type_='unique')
op.drop_index('ix_threat_actor_techniques_technique', table_name='threat_actor_techniques')
op.drop_index('ix_threat_actor_techniques_actor', table_name='threat_actor_techniques')
op.drop_table('threat_actor_techniques')
op.drop_index('ix_threat_actors_motivation', table_name='threat_actors')
op.drop_index('ix_threat_actors_country', table_name='threat_actors')
op.drop_table('threat_actors')
+4
View File
@@ -18,6 +18,8 @@ from app.routers import users as users_router
from app.routers import audit as audit_router
from app.routers import notifications as notifications_router
from app.routers import reports as reports_router
from app.routers import data_sources as data_sources_router
from app.routers import threat_actors as threat_actors_router
from app.storage import ensure_bucket_exists
from app.jobs.mitre_sync_job import start_scheduler, scheduler
@@ -60,6 +62,8 @@ app.include_router(users_router.router, prefix="/api/v1")
app.include_router(audit_router.router, prefix="/api/v1")
app.include_router(notifications_router.router, prefix="/api/v1")
app.include_router(reports_router.router, prefix="/api/v1")
app.include_router(data_sources_router.router, prefix="/api/v1")
app.include_router(threat_actors_router.router, prefix="/api/v1")
@app.get("/health")
+5 -1
View File
@@ -7,10 +7,14 @@ from app.models.evidence import Evidence
from app.models.intel import IntelItem
from app.models.audit import AuditLog
from app.models.notification import Notification
from app.models.data_source import DataSource
from app.models.detection_rule import DetectionRule
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide
__all__ = [
"User", "Technique", "Test", "TestTemplate", "Evidence",
"IntelItem", "AuditLog", "Notification",
"IntelItem", "AuditLog", "Notification", "DataSource",
"DetectionRule", "ThreatActor", "ThreatActorTechnique",
"TechniqueStatus", "TestState", "TestResult", "TeamSide",
]
+39
View File
@@ -0,0 +1,39 @@
"""DataSource model — registry of external data sources for import."""
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, Boolean, DateTime, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.database import Base
class DataSource(Base):
"""
Unified registry of all external data sources (attack procedures,
detection rules, threat intel, defensive techniques).
Each source can be independently enabled/disabled and tracks its own
synchronisation state.
"""
__tablename__ = "data_sources"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, unique=True, nullable=False) # e.g. "atomic_red_team"
display_name = Column(String, nullable=False) # e.g. "Atomic Red Team"
type = Column(String, nullable=False) # attack_procedure / detection_rule / threat_intel / defensive_technique
url = Column(String, nullable=True) # URL base of repo/API
description = Column(Text, nullable=True)
is_enabled = Column(Boolean, default=True)
last_sync_at = Column(DateTime, nullable=True)
last_sync_status = Column(String, nullable=True) # success / error / in_progress
last_sync_stats = Column(JSONB, nullable=True) # {"imported": X, "updated": Y, ...}
sync_frequency = Column(String, nullable=True) # daily / weekly / monthly / manual
config = Column(JSONB, nullable=True) # source-specific configuration
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index('ix_data_sources_type', 'type'),
Index('ix_data_sources_is_enabled', 'is_enabled'),
)
+42
View File
@@ -0,0 +1,42 @@
"""DetectionRule model — detection rules from multiple sources."""
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, Boolean, DateTime, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.database import Base
class DetectionRule(Base):
"""
Detection rule from an external source (Sigma, Elastic, Splunk, custom).
Each rule is mapped to one MITRE ATT&CK technique via
``mitre_technique_id`` and stores the complete rule content in
``rule_content``.
"""
__tablename__ = "detection_rules"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
mitre_technique_id = Column(String, nullable=False) # e.g. "T1059.001"
title = Column(String, nullable=False)
description = Column(Text, nullable=True)
source = Column(String, nullable=False) # sigma / elastic / splunk / custom
source_id = Column(String, nullable=True) # ID in the source repo (for dedup)
source_url = Column(String, nullable=True)
rule_content = Column(Text, nullable=False) # YAML / KQL / SPL content
rule_format = Column(String, nullable=False) # sigma_yaml / kql / spl / custom
severity = Column(String, nullable=True) # informational / low / medium / high / critical
platforms = Column(JSONB, nullable=True, default=[])
log_sources = Column(JSONB, nullable=True) # e.g. {"product": "windows", "service": "sysmon"}
false_positive_rate = Column(String, nullable=True) # low / medium / high
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index('ix_detection_rules_mitre_technique_id', 'mitre_technique_id'),
Index('ix_detection_rules_source', 'source'),
Index('ix_detection_rules_severity', 'severity'),
)
+92
View File
@@ -0,0 +1,92 @@
"""ThreatActor and ThreatActorTechnique models.
Stores profiles of APT groups and their associated MITRE ATT&CK
techniques, imported from MITRE CTI (STIX 2.0).
"""
import uuid
from datetime import datetime
from sqlalchemy import (
Column, String, Text, Boolean, DateTime,
ForeignKey, Index, UniqueConstraint,
)
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship
from app.database import Base
class ThreatActor(Base):
"""
Threat actor / APT group profile.
Imported from MITRE CTI ``intrusion-set`` STIX objects.
"""
__tablename__ = "threat_actors"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
mitre_id = Column(String, unique=True, nullable=True) # e.g. "G0016" (APT29)
name = Column(String, nullable=False)
aliases = Column(JSONB, nullable=True, default=[]) # ["Cozy Bear", "The Dukes", ...]
description = Column(Text, nullable=True)
country = Column(String, nullable=True)
target_sectors = Column(JSONB, nullable=True, default=[]) # ["government", "defense", ...]
target_regions = Column(JSONB, nullable=True, default=[]) # ["north-america", "europe", ...]
motivation = Column(String, nullable=True) # espionage / financial / destruction / ...
sophistication = Column(String, nullable=True) # low / medium / high / advanced
first_seen = Column(String, nullable=True)
last_seen = Column(String, nullable=True)
references = Column(JSONB, nullable=True, default=[]) # [{"url": "...", "description": "..."}]
mitre_url = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
techniques = relationship(
"ThreatActorTechnique",
back_populates="threat_actor",
cascade="all, delete-orphan",
)
__table_args__ = (
Index('ix_threat_actors_country', 'country'),
Index('ix_threat_actors_motivation', 'motivation'),
)
class ThreatActorTechnique(Base):
"""
Association between a threat actor and a MITRE ATT&CK technique.
Stores additional context about how the actor uses the technique
(from the STIX ``relationship`` ``uses`` objects).
"""
__tablename__ = "threat_actor_techniques"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
threat_actor_id = Column(
UUID(as_uuid=True),
ForeignKey("threat_actors.id", ondelete="CASCADE"),
nullable=False,
)
technique_id = Column(
UUID(as_uuid=True),
ForeignKey("techniques.id", ondelete="CASCADE"),
nullable=False,
)
usage_description = Column(Text, nullable=True)
first_seen_using = Column(String, nullable=True)
# Relationships
threat_actor = relationship("ThreatActor", back_populates="techniques")
technique = relationship("Technique")
__table_args__ = (
Index('ix_threat_actor_techniques_actor', 'threat_actor_id'),
Index('ix_threat_actor_techniques_technique', 'technique_id'),
UniqueConstraint(
'threat_actor_id', 'technique_id',
name='uq_actor_technique',
),
)
+293
View File
@@ -0,0 +1,293 @@
"""Data sources management endpoints (admin only).
Provides a centralized panel for managing all external data sources
(Atomic Red Team, Sigma, LOLBAS, GTFOBins, CALDERA, Elastic, etc.)
including sync triggers, enable/disable toggles, and statistics.
"""
import logging
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import require_role
from app.models.user import User
from app.models.data_source import DataSource
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/data-sources", tags=["data-sources"])
# ---------------------------------------------------------------------------
# Sync dispatcher — maps source name → import function
# ---------------------------------------------------------------------------
def _get_sync_handler(source_name: str):
"""Lazily import and return the sync function for *source_name*.
We import lazily to avoid circular imports and to only load the
modules that are actually needed.
"""
handlers = {
"atomic_red_team": ("app.services.atomic_import_service", "import_atomic_red_team"),
"sigma": ("app.services.sigma_import_service", "sync"),
"lolbas": ("app.services.lolbas_import_service", "sync"),
"gtfobins": ("app.services.lolbas_import_service", "sync_gtfobins"),
"caldera": ("app.services.caldera_import_service", "sync"),
"elastic_rules": ("app.services.elastic_import_service", "sync"),
"mitre_cti": ("app.services.threat_actor_import_service", "sync"),
# d3fend added in later phases
}
if source_name not in handlers:
return None
module_path, func_name = handlers[source_name]
import importlib
mod = importlib.import_module(module_path)
return getattr(mod, func_name)
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("")
def list_data_sources(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""List all registered data sources.
**Requires** the ``admin`` role.
"""
sources = db.query(DataSource).order_by(DataSource.name).all()
return [
{
"id": str(s.id),
"name": s.name,
"display_name": s.display_name,
"type": s.type,
"url": s.url,
"description": s.description,
"is_enabled": s.is_enabled,
"last_sync_at": s.last_sync_at.isoformat() if s.last_sync_at else None,
"last_sync_status": s.last_sync_status,
"last_sync_stats": s.last_sync_stats,
"sync_frequency": s.sync_frequency,
"config": s.config,
"created_at": s.created_at.isoformat() if s.created_at else None,
}
for s in sources
]
@router.patch("/{source_id}")
def update_data_source(
source_id: str,
body: dict,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Update a data source (enable/disable, change config).
**Requires** the ``admin`` role.
Body fields (all optional):
- ``is_enabled`` (bool)
- ``sync_frequency`` (str)
- ``config`` (dict)
"""
ds = db.query(DataSource).filter(DataSource.id == source_id).first()
if not ds:
raise HTTPException(status_code=404, detail="Data source not found")
if "is_enabled" in body:
ds.is_enabled = bool(body["is_enabled"])
if "sync_frequency" in body:
ds.sync_frequency = body["sync_frequency"]
if "config" in body:
ds.config = body["config"]
db.commit()
log_action(
db,
user_id=current_user.id,
action="update_data_source",
entity_type="data_source",
entity_id=str(ds.id),
details={"updates": body},
)
return {"message": "Data source updated", "id": str(ds.id)}
@router.post("/{source_id}/sync")
def sync_data_source(
source_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Trigger sync/import for a specific data source.
**Requires** the ``admin`` role.
"""
ds = db.query(DataSource).filter(DataSource.id == source_id).first()
if not ds:
raise HTTPException(status_code=404, detail="Data source not found")
handler = _get_sync_handler(ds.name)
if handler is None:
raise HTTPException(
status_code=400,
detail=f"No sync handler available for '{ds.name}'",
)
# Mark as in_progress
ds.last_sync_status = "in_progress"
db.commit()
try:
summary = handler(db)
except Exception as exc:
logger.error("Sync failed for %s: %s", ds.name, exc)
ds.last_sync_status = "error"
ds.last_sync_at = datetime.utcnow()
ds.last_sync_stats = {"error": str(exc)}
db.commit()
raise HTTPException(
status_code=500,
detail=f"Sync failed: {str(exc)}",
)
# Update DS record (the handler may already have done this,
# but we ensure it here as well)
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
return {
"message": f"Sync complete for {ds.display_name}",
"source": ds.name,
"stats": summary,
}
@router.post("/sync-all")
def sync_all_data_sources(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Trigger sync for all enabled data sources (sequentially).
**Requires** the ``admin`` role.
"""
enabled_sources = (
db.query(DataSource)
.filter(DataSource.is_enabled == True)
.order_by(DataSource.name)
.all()
)
results = []
for ds in enabled_sources:
handler = _get_sync_handler(ds.name)
if handler is None:
results.append({
"source": ds.name,
"status": "skipped",
"detail": "No sync handler available",
})
continue
ds.last_sync_status = "in_progress"
db.commit()
try:
summary = handler(db)
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
results.append({
"source": ds.name,
"status": "success",
"stats": summary,
})
except Exception as exc:
logger.error("Sync failed for %s: %s", ds.name, exc)
ds.last_sync_status = "error"
ds.last_sync_at = datetime.utcnow()
ds.last_sync_stats = {"error": str(exc)}
db.commit()
results.append({
"source": ds.name,
"status": "error",
"detail": str(exc),
})
log_action(
db,
user_id=current_user.id,
action="sync_all_data_sources",
entity_type="data_source",
entity_id=None,
details={"results": results},
)
return {"message": "Sync all complete", "results": results}
@router.get("/{source_id}/stats")
def get_data_source_stats(
source_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Get detailed statistics for a specific data source.
**Requires** the ``admin`` role.
"""
ds = db.query(DataSource).filter(DataSource.id == source_id).first()
if not ds:
raise HTTPException(status_code=404, detail="Data source not found")
# Count items from this source
from app.models.test_template import TestTemplate
from app.models.detection_rule import DetectionRule
template_count = 0
rule_count = 0
if ds.type == "attack_procedure":
template_count = (
db.query(TestTemplate)
.filter(TestTemplate.source == ds.name)
.count()
)
elif ds.type == "detection_rule":
rule_count = (
db.query(DetectionRule)
.filter(DetectionRule.source == ds.name)
.count()
)
return {
"id": str(ds.id),
"name": ds.name,
"display_name": ds.display_name,
"type": ds.type,
"is_enabled": ds.is_enabled,
"last_sync_at": ds.last_sync_at.isoformat() if ds.last_sync_at else None,
"last_sync_status": ds.last_sync_status,
"last_sync_stats": ds.last_sync_stats,
"total_templates": template_count,
"total_rules": rule_count,
}
+309
View File
@@ -0,0 +1,309 @@
"""Threat actor endpoints.
Provides listing, detail, coverage analysis, and gap analysis for
threat actor profiles imported from MITRE CTI.
"""
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, or_
from sqlalchemy.orm import Session, joinedload
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.user import User
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_template import TestTemplate
from app.models.enums import TechniqueStatus
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/threat-actors", tags=["threat-actors"])
# ---------------------------------------------------------------------------
# GET /threat-actors — Listado con filtros
# ---------------------------------------------------------------------------
@router.get("")
def list_threat_actors(
search: Optional[str] = Query(None),
country: Optional[str] = Query(None),
motivation: Optional[str] = Query(None),
sophistication: Optional[str] = Query(None),
target_sectors: Optional[str] = Query(None),
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List threat actors with optional filters and pagination.
**Requires** authentication (any role).
"""
query = db.query(ThreatActor)
# Filters
if search:
pattern = f"%{search}%"
query = query.filter(
or_(
ThreatActor.name.ilike(pattern),
ThreatActor.description.ilike(pattern),
func.cast(ThreatActor.aliases, func.text()).ilike(pattern),
)
)
if country:
query = query.filter(ThreatActor.country == country)
if motivation:
query = query.filter(ThreatActor.motivation == motivation)
if sophistication:
query = query.filter(ThreatActor.sophistication == sophistication)
if target_sectors:
# JSONB contains check
query = query.filter(
func.cast(ThreatActor.target_sectors, func.text()).ilike(f"%{target_sectors}%")
)
# Total count
total = query.count()
# Paginate
actors = query.order_by(ThreatActor.name).offset(offset).limit(limit).all()
# For each actor, count techniques and calculate basic coverage
results = []
for actor in actors:
tech_count = (
db.query(ThreatActorTechnique)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.count()
)
# Quick coverage calculation
covered = (
db.query(ThreatActorTechnique)
.join(Technique, ThreatActorTechnique.technique_id == Technique.id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.filter(Technique.status_global.in_([
TechniqueStatus.validated,
TechniqueStatus.partial,
]))
.count()
)
coverage_pct = round((covered / tech_count * 100), 1) if tech_count > 0 else 0.0
results.append({
"id": str(actor.id),
"mitre_id": actor.mitre_id,
"name": actor.name,
"aliases": actor.aliases or [],
"country": actor.country,
"target_sectors": actor.target_sectors or [],
"target_regions": actor.target_regions or [],
"motivation": actor.motivation,
"sophistication": actor.sophistication,
"mitre_url": actor.mitre_url,
"technique_count": tech_count,
"coverage_pct": coverage_pct,
"is_active": actor.is_active,
})
return {
"total": total,
"offset": offset,
"limit": limit,
"items": results,
}
# ---------------------------------------------------------------------------
# GET /threat-actors/{id} — Detalle
# ---------------------------------------------------------------------------
@router.get("/{actor_id}")
def get_threat_actor(
actor_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get detailed info about a threat actor including techniques.
**Requires** authentication (any role).
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
raise HTTPException(status_code=404, detail="Threat actor not found")
# Get associated techniques with their coverage status
actor_techniques = (
db.query(ThreatActorTechnique, Technique)
.join(Technique, ThreatActorTechnique.technique_id == Technique.id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.order_by(Technique.mitre_id)
.all()
)
techniques_list = []
for at, tech in actor_techniques:
techniques_list.append({
"technique_id": str(tech.id),
"mitre_id": tech.mitre_id,
"name": tech.name,
"tactic": tech.tactic,
"status_global": tech.status_global.value if tech.status_global else None,
"usage_description": at.usage_description,
"first_seen_using": at.first_seen_using,
})
return {
"id": str(actor.id),
"mitre_id": actor.mitre_id,
"name": actor.name,
"aliases": actor.aliases or [],
"description": actor.description,
"country": actor.country,
"target_sectors": actor.target_sectors or [],
"target_regions": actor.target_regions or [],
"motivation": actor.motivation,
"sophistication": actor.sophistication,
"first_seen": actor.first_seen,
"last_seen": actor.last_seen,
"references": actor.references or [],
"mitre_url": actor.mitre_url,
"is_active": actor.is_active,
"techniques": techniques_list,
}
# ---------------------------------------------------------------------------
# GET /threat-actors/{id}/coverage — Cobertura
# ---------------------------------------------------------------------------
@router.get("/{actor_id}/coverage")
def get_threat_actor_coverage(
actor_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Calculate coverage percentage against a specific threat actor.
**Requires** authentication (any role).
Returns the percentage of the actor's techniques that have been
validated or partially validated, along with a breakdown.
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
raise HTTPException(status_code=404, detail="Threat actor not found")
# Get all techniques for this actor
actor_techniques = (
db.query(Technique)
.join(ThreatActorTechnique, ThreatActorTechnique.technique_id == Technique.id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.all()
)
total = len(actor_techniques)
if total == 0:
return {
"actor_id": str(actor.id),
"actor_name": actor.name,
"total_techniques": 0,
"coverage_pct": 0.0,
"breakdown": {},
}
breakdown = {}
for tech in actor_techniques:
status = tech.status_global.value if tech.status_global else "not_evaluated"
breakdown[status] = breakdown.get(status, 0) + 1
covered = breakdown.get("validated", 0) + breakdown.get("partial", 0)
coverage_pct = round((covered / total * 100), 1)
return {
"actor_id": str(actor.id),
"actor_name": actor.name,
"total_techniques": total,
"covered": covered,
"coverage_pct": coverage_pct,
"breakdown": breakdown,
}
# ---------------------------------------------------------------------------
# GET /threat-actors/{id}/gaps — Gap analysis
# ---------------------------------------------------------------------------
@router.get("/{actor_id}/gaps")
def get_threat_actor_gaps(
actor_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Identify techniques of this actor that are NOT fully validated.
**Requires** authentication (any role).
Returns list of gap techniques with available templates.
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
raise HTTPException(status_code=404, detail="Threat actor not found")
# Get techniques NOT validated
gap_techniques = (
db.query(Technique, ThreatActorTechnique)
.join(ThreatActorTechnique, ThreatActorTechnique.technique_id == Technique.id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.filter(Technique.status_global != TechniqueStatus.validated)
.order_by(Technique.mitre_id)
.all()
)
gaps = []
for tech, at in gap_techniques:
# Count available templates for this technique
template_count = (
db.query(TestTemplate)
.filter(TestTemplate.mitre_technique_id == tech.mitre_id)
.filter(TestTemplate.is_active == True)
.count()
)
# Count existing tests
test_count = (
db.query(Test)
.filter(Test.technique_id == tech.id)
.count()
)
gaps.append({
"technique_id": str(tech.id),
"mitre_id": tech.mitre_id,
"name": tech.name,
"tactic": tech.tactic,
"status_global": tech.status_global.value if tech.status_global else None,
"usage_description": at.usage_description,
"available_templates": template_count,
"existing_tests": test_count,
"has_templates": template_count > 0,
})
return {
"actor_id": str(actor.id),
"actor_name": actor.name,
"total_gaps": len(gaps),
"gaps": gaps,
}
+180
View File
@@ -0,0 +1,180 @@
"""
Seed script — registers all known data sources in the data_sources table.
Usage:
python -m app.seed_data_sources
"""
import logging
from app.database import SessionLocal
from app.models.data_source import DataSource
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data source definitions
# ---------------------------------------------------------------------------
INITIAL_SOURCES = [
{
"name": "atomic_red_team",
"display_name": "Atomic Red Team",
"type": "attack_procedure",
"url": "https://github.com/redcanaryco/atomic-red-team",
"description": "Open-source library of atomic tests mapped to MITRE ATT&CK. "
"Each test is a small, self-contained procedure for validating "
"detection of a specific technique.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/redcanaryco/atomic-red-team/archive/refs/heads/master.zip",
"root_prefix": "atomic-red-team-master",
"atomics_dir": "atomics",
},
},
{
"name": "sigma",
"display_name": "SigmaHQ Rules",
"type": "detection_rule",
"url": "https://github.com/SigmaHQ/sigma",
"description": "Generic SIEM detection rules in YAML format. "
"3 000+ rules with MITRE ATT&CK mappings.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/SigmaHQ/sigma/archive/refs/heads/main.zip",
"root_prefix": "sigma-main",
"rules_dir": "rules",
},
},
{
"name": "lolbas",
"display_name": "LOLBAS (Windows)",
"type": "attack_procedure",
"url": "https://github.com/LOLBAS-Project/LOLBAS",
"description": "Living Off The Land Binaries, Scripts, and Libraries — "
"legitimate Windows binaries that can be abused for attacks.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/LOLBAS-Project/LOLBAS/archive/refs/heads/master.zip",
"root_prefix": "LOLBAS-master",
"yaml_dirs": ["yml/OSBinaries", "yml/OSLibraries", "yml/OSScripts"],
},
},
{
"name": "gtfobins",
"display_name": "GTFOBins (Linux)",
"type": "attack_procedure",
"url": "https://gtfobins.github.io/",
"description": "Unix/Linux binaries that can be exploited for file transfer, "
"shell escape, privilege escalation, and more.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/GTFOBins/GTFOBins.github.io/archive/refs/heads/master.zip",
"root_prefix": "GTFOBins.github.io-master",
"gtfobins_dir": "_gtfobins",
},
},
{
"name": "caldera",
"display_name": "MITRE CALDERA",
"type": "attack_procedure",
"url": "https://github.com/mitre/caldera",
"description": "Automated adversary emulation platform by MITRE. "
"400+ abilities (executable actions) mapped to ATT&CK.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/mitre/caldera/archive/refs/heads/master.zip",
"root_prefix": "caldera-master",
"abilities_dir": "data/abilities",
},
},
{
"name": "elastic_rules",
"display_name": "Elastic Detection Rules",
"type": "detection_rule",
"url": "https://github.com/elastic/detection-rules",
"description": "Open-source detection rules for Elastic SIEM. "
"1 000+ rules in KQL with MITRE ATT&CK mappings.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/elastic/detection-rules/archive/refs/heads/main.zip",
"root_prefix": "detection-rules-main",
"rules_dir": "rules",
},
},
{
"name": "d3fend",
"display_name": "MITRE D3FEND",
"type": "defensive_technique",
"url": "https://d3fend.mitre.org/",
"description": "MITRE framework of defensive countermeasures. "
"200+ defensive techniques mapped to ATT&CK.",
"sync_frequency": "monthly",
"config": {},
},
{
"name": "mitre_cti",
"display_name": "MITRE CTI (Groups & Software)",
"type": "threat_intel",
"url": "https://github.com/mitre/cti",
"description": "MITRE ATT&CK STIX 2.0 data — threat actor groups, "
"software, and campaigns with TTP mappings.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/mitre/cti/archive/refs/heads/master.zip",
"root_prefix": "cti-master",
"enterprise_dir": "enterprise-attack",
},
},
]
def seed_data_sources() -> dict:
"""Register all known data sources. Existing entries are skipped."""
db = SessionLocal()
try:
created = 0
skipped = 0
existing_names = {
row[0] for row in db.query(DataSource.name).all()
}
for src in INITIAL_SOURCES:
if src["name"] in existing_names:
skipped += 1
continue
ds = DataSource(
name=src["name"],
display_name=src["display_name"],
type=src["type"],
url=src.get("url"),
description=src.get("description"),
sync_frequency=src.get("sync_frequency", "manual"),
config=src.get("config"),
is_enabled=True,
)
db.add(ds)
created += 1
db.commit()
summary = {"created": created, "skipped": skipped}
logger.info("Data sources seed: %s", summary)
return summary
except Exception:
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
)
result = seed_data_sources()
print(f"\nData sources seed complete: {result}")
+431
View File
@@ -0,0 +1,431 @@
"""
Seed script — generates a realistic volume of demo data for V3 validation.
Usage:
python -m app.seed_demo
**Prerequisite**: The MITRE sync must have been completed first so that
real techniques exist in the database.
Running twice is safe — the script detects existing demo data (by username
prefix ``demo_``) and deletes it before re-creating, ensuring idempotency.
"""
import logging
import random
import uuid
from datetime import datetime, timedelta
from app.auth import hash_password
from app.database import SessionLocal
from app.models.user import User
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_template import TestTemplate
from app.models.evidence import Evidence
from app.models.audit import AuditLog
from app.models.notification import Notification
from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEMO_PREFIX = "demo_"
ROLES = ["red_tech", "blue_tech", "red_lead", "blue_lead", "admin"]
TECHNIQUE_STATUSES = [
TechniqueStatus.validated,
TechniqueStatus.partial,
TechniqueStatus.not_covered,
TechniqueStatus.in_progress,
TechniqueStatus.not_evaluated,
]
TEST_STATES = [
TestState.draft,
TestState.red_executing,
TestState.blue_evaluating,
TestState.in_review,
TestState.validated,
TestState.rejected,
]
TEST_RESULTS = [
TestResult.detected,
TestResult.not_detected,
TestResult.partially_detected,
]
NOTIFICATION_TYPES = [
"test_assigned",
"validation_needed",
"test_rejected",
"test_validated",
"test_state_changed",
]
AUDIT_ACTIONS = [
"create_test",
"update_test",
"validate_technique",
"upload_evidence",
"create_user",
"import_atomic_red_team",
"sync_mitre",
"login",
"reject_test",
"approve_test",
]
PLATFORMS = ["windows", "linux", "macos"]
TEMPLATE_NAMES = [
"Manual Credential Dumping Test",
"Custom Phishing Payload Delivery",
"Lateral Movement via RDP",
"Persistence via Registry Run Keys",
"Data Exfiltration over DNS",
"Process Injection via DLL",
"Privilege Escalation with Token Impersonation",
"Custom C2 Beacon Communication Test",
"Kerberoasting Attack Procedure",
"Living Off The Land Binaries Test",
]
# ---------------------------------------------------------------------------
# Cleanup
# ---------------------------------------------------------------------------
def _cleanup_demo_data(db) -> None:
"""Remove all previously seeded demo data."""
# Delete in order to respect FK constraints
demo_users = db.query(User).filter(User.username.like(f"{DEMO_PREFIX}%")).all()
demo_user_ids = [u.id for u in demo_users]
if demo_user_ids:
# Notifications for demo users
db.query(Notification).filter(
Notification.user_id.in_(demo_user_ids)
).delete(synchronize_session=False)
# Audit logs for demo users
db.query(AuditLog).filter(
AuditLog.user_id.in_(demo_user_ids)
).delete(synchronize_session=False)
# Evidences for tests created by demo users
demo_tests = db.query(Test).filter(
Test.created_by.in_(demo_user_ids)
).all()
demo_test_ids = [t.id for t in demo_tests]
if demo_test_ids:
db.query(Evidence).filter(
Evidence.test_id.in_(demo_test_ids)
).delete(synchronize_session=False)
db.query(Test).filter(
Test.id.in_(demo_test_ids)
).delete(synchronize_session=False)
# Delete demo templates (by source = "demo")
db.query(TestTemplate).filter(
TestTemplate.source == "demo"
).delete(synchronize_session=False)
# Delete demo users
if demo_user_ids:
db.query(User).filter(
User.id.in_(demo_user_ids)
).delete(synchronize_session=False)
db.commit()
logger.info("Cleaned up existing demo data.")
# ---------------------------------------------------------------------------
# Seeders
# ---------------------------------------------------------------------------
def _seed_users(db) -> list[User]:
"""Create 5 users per role (25 total)."""
users = []
for role in ROLES:
for i in range(1, 6):
user = User(
username=f"{DEMO_PREFIX}{role}_{i}",
email=f"{DEMO_PREFIX}{role}_{i}@aegis-demo.local",
hashed_password=hash_password("demo123"),
role=role,
is_active=True,
)
db.add(user)
users.append(user)
db.flush()
logger.info("Created %d demo users.", len(users))
return users
def _seed_technique_statuses(db, count: int = 50) -> list[Technique]:
"""Set varied statuses on up to *count* techniques."""
techniques = db.query(Technique).limit(count).all()
if not techniques:
logger.warning("No techniques found — run MITRE sync first!")
return []
for tech in techniques:
tech.status_global = random.choice(TECHNIQUE_STATUSES)
if tech.status_global == TechniqueStatus.validated:
tech.last_review_date = datetime.utcnow() - timedelta(
days=random.randint(1, 30)
)
db.flush()
logger.info("Updated status on %d techniques.", len(techniques))
return techniques
def _seed_tests(db, users: list[User], techniques: list[Technique], count: int = 100) -> list[Test]:
"""Create *count* tests in various pipeline states."""
if not techniques:
logger.warning("No techniques available — skipping test seeding.")
return []
red_techs = [u for u in users if u.role == "red_tech"]
blue_techs = [u for u in users if u.role == "blue_tech"]
red_leads = [u for u in users if u.role == "red_lead"]
blue_leads = [u for u in users if u.role == "blue_lead"]
tests = []
for i in range(count):
technique = random.choice(techniques)
state = random.choice(TEST_STATES)
creator = random.choice(red_techs + blue_techs)
test = Test(
technique_id=technique.id,
name=f"Demo Test {i + 1}{technique.name[:40]}",
description=f"Automated demo test #{i + 1} for {technique.mitre_id}.",
platform=random.choice(PLATFORMS),
procedure_text=f"Step 1: Prepare environment.\nStep 2: Execute {technique.mitre_id} procedure.\nStep 3: Observe results.",
tool_used=random.choice(["powershell", "bash", "cmd", "python", "caldera", "metasploit"]),
execution_date=datetime.utcnow() - timedelta(days=random.randint(0, 60)),
created_by=creator.id,
result=random.choice(TEST_RESULTS) if state not in (TestState.draft, TestState.red_executing) else None,
state=state,
created_at=datetime.utcnow() - timedelta(days=random.randint(0, 90)),
)
# Populate team fields based on state
if state in (TestState.blue_evaluating, TestState.in_review, TestState.validated, TestState.rejected):
test.red_summary = f"Attack executed successfully using {test.tool_used}."
test.attack_success = random.choice([True, True, True, False])
if state in (TestState.in_review, TestState.validated, TestState.rejected):
test.blue_summary = "Detection observed in SIEM. Alert fired."
test.detection_result = random.choice(TEST_RESULTS)
if state == TestState.validated:
rv = random.choice(red_leads)
bv = random.choice(blue_leads)
test.red_validated_by = rv.id
test.red_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 10))
test.red_validation_status = "approved"
test.blue_validated_by = bv.id
test.blue_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 10))
test.blue_validation_status = "approved"
if state == TestState.rejected:
rejector = random.choice(red_leads + blue_leads)
if rejector.role == "red_lead":
test.red_validated_by = rejector.id
test.red_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 5))
test.red_validation_status = "rejected"
test.red_validation_notes = "Insufficient evidence of attack success."
else:
test.blue_validated_by = rejector.id
test.blue_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 5))
test.blue_validation_status = "rejected"
test.blue_validation_notes = "Detection evidence not conclusive."
db.add(test)
tests.append(test)
db.flush()
logger.info("Created %d demo tests.", len(tests))
return tests
def _seed_evidences(db, tests: list[Test], users: list[User], count: int = 50) -> list[Evidence]:
"""Create *count* dummy evidence records."""
if not tests:
return []
# Pick tests that are past draft state
eligible = [t for t in tests if t.state != TestState.draft]
if not eligible:
eligible = tests
evidences = []
red_blue = [u for u in users if u.role in ("red_tech", "blue_tech")]
for i in range(count):
test = random.choice(eligible)
uploader = random.choice(red_blue)
team = TeamSide.red if uploader.role == "red_tech" else TeamSide.blue
ext = random.choice(["png", "log", "pcap", "csv", "txt", "json"])
fname = f"evidence_{i + 1}.{ext}"
evidence = Evidence(
test_id=test.id,
file_name=fname,
file_path=f"{test.id}/{uuid.uuid4()}_{fname}",
sha256_hash=uuid.uuid4().hex + uuid.uuid4().hex, # dummy hash
uploaded_by=uploader.id,
uploaded_at=datetime.utcnow() - timedelta(days=random.randint(0, 30)),
team=team,
notes=f"Auto-generated demo evidence #{i + 1}.",
)
db.add(evidence)
evidences.append(evidence)
db.flush()
logger.info("Created %d demo evidences.", len(evidences))
return evidences
def _seed_audit_logs(db, users: list[User], count: int = 20) -> None:
"""Create *count* varied audit log entries."""
for i in range(count):
user = random.choice(users)
log = AuditLog(
user_id=user.id,
action=random.choice(AUDIT_ACTIONS),
entity_type=random.choice(["test", "technique", "user", "test_template"]),
entity_id=str(uuid.uuid4()),
timestamp=datetime.utcnow() - timedelta(days=random.randint(0, 60)),
details={"demo": True, "index": i},
)
db.add(log)
db.flush()
logger.info("Created %d demo audit logs.", count)
def _seed_notifications(db, users: list[User], count: int = 30) -> None:
"""Create *count* notifications spread across demo users."""
for i in range(count):
user = random.choice(users)
ntype = random.choice(NOTIFICATION_TYPES)
notif = Notification(
user_id=user.id,
type=ntype,
title=f"Demo notification: {ntype.replace('_', ' ').title()} #{i + 1}",
message=f"This is an auto-generated demo notification ({ntype}).",
entity_type="test",
entity_id=uuid.uuid4(),
read=random.choice([True, False]),
created_at=datetime.utcnow() - timedelta(days=random.randint(0, 30)),
)
db.add(notif)
db.flush()
logger.info("Created %d demo notifications.", count)
def _seed_templates(db, techniques: list[Technique], count: int = 10) -> None:
"""Create *count* manual demo templates."""
if not techniques:
return
for i, name in enumerate(TEMPLATE_NAMES[:count]):
technique = techniques[i % len(techniques)]
template = TestTemplate(
mitre_technique_id=technique.mitre_id,
name=name,
description=f"Demo template: {name}. Targets {technique.mitre_id} ({technique.name}).",
source="demo",
source_url=None,
attack_procedure=f"1. Set up environment for {technique.mitre_id}.\n2. Execute the procedure.\n3. Record observations.",
expected_detection=f"SIEM should alert on {technique.mitre_id} indicators.",
platform=random.choice(PLATFORMS),
tool_suggested=random.choice(["powershell", "cmd", "bash", "python"]),
severity=random.choice(["low", "medium", "high", "critical"]),
is_active=True,
)
db.add(template)
db.flush()
logger.info("Created %d demo templates.", count)
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def seed_demo() -> dict:
"""Generate all demo data. Returns a summary dict."""
db = SessionLocal()
try:
logger.info("=== Starting V3 demo seed ===")
# Step 0: cleanup previous run
_cleanup_demo_data(db)
# Step 1: users
users = _seed_users(db)
# Step 2: technique statuses
techniques = _seed_technique_statuses(db, count=50)
# Step 3: tests
tests = _seed_tests(db, users, techniques, count=100)
# Step 4: evidences
evidences = _seed_evidences(db, tests, users, count=50)
# Step 5: audit logs
_seed_audit_logs(db, users, count=20)
# Step 6: notifications
_seed_notifications(db, users, count=30)
# Step 7: templates
_seed_templates(db, techniques, count=10)
db.commit()
summary = {
"users": len(users),
"techniques_updated": len(techniques),
"tests": len(tests),
"evidences": len(evidences),
"audit_logs": 20,
"notifications": 30,
"templates": 10,
}
logger.info("=== Demo seed complete: %s ===", summary)
return summary
except Exception:
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
)
result = seed_demo()
print(f"\nSeed complete: {result}")
@@ -0,0 +1,274 @@
"""MITRE CALDERA abilities import service.
Downloads the CALDERA repository ZIP from GitHub, parses the ability YAML
files under ``data/abilities/{tactic}/``, and creates :class:`TestTemplate`
records in the database.
Strategy
--------
1. Download the CALDERA repo as a ZIP.
2. Extract into a temporary directory.
3. Walk ``data/abilities/{tactic}/*.yml`` files.
4. For each ability: extract name, description, technique ID, platforms,
and executor commands.
5. Create TestTemplate rows keyed by the ability's ``id`` field.
6. Clean up.
Idempotency
-----------
Running the import twice does **not** create duplicates. Existing
templates are identified by ``source = "caldera"`` + ``atomic_test_id``
(the CALDERA ability ``id``).
"""
import io
import logging
import shutil
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
import requests as _requests
import yaml
from sqlalchemy.orm import Session
from app.models.test_template import TestTemplate
from app.models.data_source import DataSource
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CALDERA_ZIP_URL = (
"https://github.com/mitre/caldera"
"/archive/refs/heads/master.zip"
)
_DOWNLOAD_TIMEOUT = 300
_ZIP_ROOT_PREFIX = "caldera-master"
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _download_zip(url: str = CALDERA_ZIP_URL) -> bytes:
"""Download the CALDERA ZIP and return raw bytes."""
logger.info("Downloading CALDERA ZIP from %s", url)
resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True)
resp.raise_for_status()
content = resp.content
logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024))
return content
def _extract_zip(zip_bytes: bytes, dest: str) -> Path:
"""Extract *zip_bytes* into *dest* and return abilities dir."""
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
zf.extractall(dest)
abilities_dir = Path(dest) / _ZIP_ROOT_PREFIX / "data" / "abilities"
if not abilities_dir.is_dir():
raise FileNotFoundError(
f"Expected abilities directory not found at {abilities_dir}"
)
return abilities_dir
def _extract_commands(platforms_dict: dict) -> str:
"""Extract executor commands from CALDERA platforms dict.
The structure is typically::
platforms:
windows:
psh:
command: "whoami"
linux:
sh:
command: "id"
Returns a formatted string with all commands.
"""
lines = []
if not isinstance(platforms_dict, dict):
return ""
for os_name, executors in platforms_dict.items():
if not isinstance(executors, dict):
continue
for executor_name, executor_data in executors.items():
if isinstance(executor_data, dict):
cmd = executor_data.get("command", "")
if cmd:
lines.append(f"[{os_name}/{executor_name}]\n{cmd}")
elif isinstance(executor_data, str):
lines.append(f"[{os_name}/{executor_name}]\n{executor_data}")
return "\n\n".join(lines)
def _extract_platforms(platforms_dict: dict) -> str:
"""Extract platform names from CALDERA platforms dict."""
if not isinstance(platforms_dict, dict):
return ""
platform_names = []
for os_name in platforms_dict:
normalized = str(os_name).lower().strip()
if normalized in ("windows", "linux", "darwin", "macos"):
if normalized == "darwin":
normalized = "macos"
if normalized not in platform_names:
platform_names.append(normalized)
return ", ".join(platform_names)
def _parse_abilities(abilities_dir: Path) -> list[dict]:
"""Walk abilities directories and parse all YAML files.
Returns a flat list of dicts, each representing one ability.
"""
results: list[dict] = []
yaml_files = sorted(abilities_dir.rglob("*.yml"))
logger.info("Found %d ability YAML files", len(yaml_files))
for yaml_path in yaml_files:
try:
with open(yaml_path, "r", encoding="utf-8") as fh:
data_list = list(yaml.safe_load_all(fh))
except Exception as exc:
logger.debug("Failed to parse %s: %s", yaml_path, exc)
continue
for data in data_list:
if not isinstance(data, dict):
continue
ability_id = data.get("id", "")
if not ability_id:
continue
name = data.get("name", "").strip()
description = data.get("description", "").strip()
tactic = data.get("tactic", "").strip()
# Extract technique info
technique = data.get("technique", {})
if isinstance(technique, dict):
attack_id = technique.get("attack_id", "")
else:
attack_id = ""
if not attack_id:
continue
# Normalise technique ID
attack_id = str(attack_id).strip().upper()
if not attack_id.startswith("T"):
continue
# Extract platforms and commands
platforms_dict = data.get("platforms", {})
commands = _extract_commands(platforms_dict)
platform_str = _extract_platforms(platforms_dict)
# Determine executor type
executors = set()
if isinstance(platforms_dict, dict):
for os_executors in platforms_dict.values():
if isinstance(os_executors, dict):
executors.update(os_executors.keys())
executor_str = ", ".join(sorted(executors)) if executors else None
results.append({
"mitre_technique_id": attack_id,
"name": f"CALDERA: {name}"[:500] if name else f"CALDERA ability {ability_id}"[:500],
"description": f"{description}\n\nTactic: {tactic}".strip()[:2000] if description else None,
"source": "caldera",
"platform": platform_str,
"tool_suggested": executor_str,
"attack_procedure": commands[:4000] if commands else None,
"atomic_test_id": f"caldera:{ability_id}",
"source_url": f"https://github.com/mitre/caldera/tree/master/data/abilities/{tactic}",
})
logger.info("Parsed %d CALDERA abilities total", len(results))
return results
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def sync(db: Session) -> dict:
"""Download and import CALDERA abilities as TestTemplates.
Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``.
"""
tmp_dir = tempfile.mkdtemp(prefix="aegis_caldera_")
try:
zip_bytes = _download_zip()
abilities_dir = _extract_zip(zip_bytes, tmp_dir)
parsed = _parse_abilities(abilities_dir)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
logger.info("Cleaned up temp directory %s", tmp_dir)
# Pre-load existing for dedup
existing_ids: set[str] = {
row[0]
for row in db.query(TestTemplate.atomic_test_id)
.filter(TestTemplate.source == "caldera")
.filter(TestTemplate.atomic_test_id.isnot(None))
.all()
}
created = 0
skipped = 0
for item in parsed:
if item["atomic_test_id"] in existing_ids:
skipped += 1
continue
template = TestTemplate(
mitre_technique_id=item["mitre_technique_id"],
name=item["name"],
description=item["description"],
source=item["source"],
source_url=item["source_url"],
attack_procedure=item["attack_procedure"],
platform=item["platform"],
tool_suggested=item["tool_suggested"],
atomic_test_id=item["atomic_test_id"],
is_active=True,
)
db.add(template)
existing_ids.add(item["atomic_test_id"])
created += 1
db.commit()
summary = {
"created": created,
"skipped_existing": skipped,
"total_parsed": len(parsed),
}
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "caldera").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("CALDERA import complete — %s", summary)
log_action(db, user_id=None, action="import_caldera",
entity_type="test_template", entity_id=None, details=summary)
return summary
@@ -0,0 +1,321 @@
"""Elastic Detection Rules import service.
Downloads the Elastic detection-rules repository ZIP from GitHub, parses
every ``.toml`` rule file under ``rules/``, extracts MITRE ATT&CK
mappings, and creates :class:`DetectionRule` records in the database.
Strategy
--------
1. Download the full repo as a ZIP archive.
2. Extract into a temporary directory.
3. Walk all ``.toml`` files under ``rules/``.
4. Parse each TOML file — extract rule name, description, query (KQL),
severity, and MITRE ATT&CK threat mappings.
5. Create / skip ``DetectionRule`` rows keyed by ``(source, source_id)``.
6. Clean up.
Idempotency
-----------
Running the import twice does **not** create duplicates. Existing
rules are identified by ``source = "elastic"`` + ``source_id`` (the
TOML filename).
"""
import io
import logging
import shutil
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
import requests as _requests
from sqlalchemy.orm import Session
from app.models.detection_rule import DetectionRule
from app.models.data_source import DataSource
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
ELASTIC_ZIP_URL = (
"https://github.com/elastic/detection-rules"
"/archive/refs/heads/main.zip"
)
_DOWNLOAD_TIMEOUT = 300
_ZIP_ROOT_PREFIX = "detection-rules-main"
# Severity normalisation
_SEVERITY_MAP = {
"informational": "informational",
"low": "low",
"medium": "medium",
"high": "high",
"critical": "critical",
}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _download_zip(url: str = ELASTIC_ZIP_URL) -> bytes:
"""Download the Elastic Detection Rules ZIP and return raw bytes."""
logger.info("Downloading Elastic Detection Rules ZIP from %s", url)
resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True)
resp.raise_for_status()
content = resp.content
logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024))
return content
def _extract_zip(zip_bytes: bytes, dest: str) -> Path:
"""Extract *zip_bytes* into *dest* and return rules/ dir."""
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
zf.extractall(dest)
rules_dir = Path(dest) / _ZIP_ROOT_PREFIX / "rules"
if not rules_dir.is_dir():
raise FileNotFoundError(
f"Expected rules directory not found at {rules_dir}"
)
return rules_dir
def _parse_toml_safe(path: Path) -> dict | None:
"""Parse a TOML file. Uses the ``toml`` library."""
try:
import toml
with open(path, "r", encoding="utf-8") as fh:
return toml.load(fh)
except Exception as exc:
logger.debug("Failed to parse %s: %s", path, exc)
return None
def _extract_mitre_techniques(threat_list: list) -> list[str]:
"""Extract MITRE technique IDs from Elastic's ``rule.threat`` array.
Each entry looks like::
[[rule.threat]]
framework = "MITRE ATT&CK"
[rule.threat.tactic]
name = "Credential Access"
id = "TA0006"
[[rule.threat.technique]]
name = "OS Credential Dumping"
id = "T1003"
[[rule.threat.technique.subtechnique]]
name = "LSASS Memory"
id = "T1003.001"
"""
technique_ids = []
if not isinstance(threat_list, list):
return technique_ids
for threat_entry in threat_list:
if not isinstance(threat_entry, dict):
continue
# Skip non-MITRE frameworks
framework = threat_entry.get("framework", "")
if "MITRE" not in str(framework).upper():
continue
techniques = threat_entry.get("technique", [])
if not isinstance(techniques, list):
continue
for tech in techniques:
if not isinstance(tech, dict):
continue
tech_id = tech.get("id", "")
if tech_id and str(tech_id).upper().startswith("T"):
technique_ids.append(str(tech_id).upper())
# Check subtechniques
subtechniques = tech.get("subtechnique", [])
if isinstance(subtechniques, list):
for subtech in subtechniques:
if isinstance(subtech, dict):
sub_id = subtech.get("id", "")
if sub_id and str(sub_id).upper().startswith("T"):
technique_ids.append(str(sub_id).upper())
return list(set(technique_ids))
def _parse_elastic_rules(rules_dir: Path) -> list[dict]:
"""Walk the rules directory and parse all TOML files.
Returns a flat list of dicts, one per (rule, technique) combination.
"""
results: list[dict] = []
toml_files = sorted(rules_dir.rglob("*.toml"))
logger.info("Found %d TOML files to parse", len(toml_files))
for toml_path in toml_files:
data = _parse_toml_safe(toml_path)
if not data:
continue
rule = data.get("rule", {})
if not isinstance(rule, dict):
continue
name = rule.get("name", "").strip()
if not name:
continue
# Extract MITRE technique IDs
threat_list = rule.get("threat", [])
technique_ids = _extract_mitre_techniques(threat_list)
if not technique_ids:
continue
description = rule.get("description", "")
query = rule.get("query", "")
severity = _SEVERITY_MAP.get(str(rule.get("severity", "")).lower())
rule_type = rule.get("type", "query") # query, eql, threshold, etc.
# Determine rule format based on type
if rule_type == "eql":
rule_format = "eql"
elif rule_type == "esql":
rule_format = "esql"
else:
rule_format = "kql"
# Use filename as source_id
source_id = toml_path.name
# Read raw content
try:
with open(toml_path, "r", encoding="utf-8") as fh:
raw_content = fh.read()
except Exception:
raw_content = query or str(data)
# Build source URL
relative = str(toml_path.relative_to(rules_dir.parent)).replace("\\", "/")
source_url = (
f"https://github.com/elastic/detection-rules/blob/main/{relative}"
)
# One entry per technique
for tech_id in technique_ids:
results.append({
"mitre_technique_id": tech_id,
"title": name[:500],
"description": str(description)[:2000] if description else None,
"source_id": source_id,
"source_url": source_url,
"rule_content": query[:50000] if query else raw_content[:50000],
"rule_format": rule_format,
"severity": severity,
"platforms": _infer_platforms(rules_dir, toml_path),
})
logger.info("Parsed %d (rule, technique) pairs total", len(results))
return results
def _infer_platforms(rules_dir: Path, toml_path: Path) -> list[str] | None:
"""Infer platforms from the rule's directory structure.
Elastic organizes rules by OS: rules/windows/, rules/linux/, etc.
"""
relative = toml_path.relative_to(rules_dir)
parts = [p.lower() for p in relative.parts]
platforms = []
if "windows" in parts:
platforms.append("windows")
if "linux" in parts:
platforms.append("linux")
if "macos" in parts:
platforms.append("macos")
return platforms if platforms else None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def sync(db: Session) -> dict:
"""Download and import Elastic detection rules.
Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``.
"""
tmp_dir = tempfile.mkdtemp(prefix="aegis_elastic_")
try:
zip_bytes = _download_zip()
rules_dir = _extract_zip(zip_bytes, tmp_dir)
parsed_rules = _parse_elastic_rules(rules_dir)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
logger.info("Cleaned up temp directory %s", tmp_dir)
# Pre-load existing source_ids for dedup
existing_ids: set[str] = {
row[0]
for row in db.query(DetectionRule.source_id)
.filter(DetectionRule.source == "elastic")
.filter(DetectionRule.source_id.isnot(None))
.all()
}
created = 0
skipped = 0
for item in parsed_rules:
if item["source_id"] in existing_ids:
skipped += 1
continue
rule = DetectionRule(
mitre_technique_id=item["mitre_technique_id"],
title=item["title"],
description=item["description"],
source="elastic",
source_id=item["source_id"],
source_url=item["source_url"],
rule_content=item["rule_content"],
rule_format=item["rule_format"],
severity=item["severity"],
platforms=item["platforms"],
is_active=True,
)
db.add(rule)
existing_ids.add(item["source_id"])
created += 1
db.commit()
summary = {
"created": created,
"skipped_existing": skipped,
"total_parsed": len(parsed_rules),
}
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "elastic_rules").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("Elastic import complete — %s", summary)
log_action(db, user_id=None, action="import_elastic_rules",
entity_type="detection_rule", entity_id=None, details=summary)
return summary
@@ -0,0 +1,375 @@
"""LOLBAS and GTFOBins import service.
Downloads the LOLBAS (Windows) and GTFOBins (Linux) repositories,
parses their YAML / Markdown files, and creates :class:`TestTemplate`
records mapped to MITRE ATT&CK techniques.
LOLBAS
------
- ZIP from ``LOLBAS-Project/LOLBAS``
- YAML files in ``yml/OSBinaries/``, ``yml/OSLibraries/``, ``yml/OSScripts/``
- Each YAML contains: Name, Description, Commands (list with MitreID)
GTFOBins
--------
- ZIP from ``GTFOBins/GTFOBins.github.io``
- Markdown files in ``_gtfobins/``
- Each Markdown has YAML front-matter with function names
- Functions mapped to MITRE via a static dictionary
Idempotency
-----------
Deduplication keys:
- LOLBAS: ``source + Name + MitreID`` → stored in ``atomic_test_id``
- GTFOBins: ``source + binary_name + function`` → stored in ``atomic_test_id``
"""
import io
import logging
import re
import shutil
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
import requests as _requests
import yaml
from sqlalchemy.orm import Session
from app.models.test_template import TestTemplate
from app.models.data_source import DataSource
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
LOLBAS_ZIP_URL = (
"https://github.com/LOLBAS-Project/LOLBAS"
"/archive/refs/heads/master.zip"
)
GTFOBINS_ZIP_URL = (
"https://github.com/GTFOBins/GTFOBins.github.io"
"/archive/refs/heads/master.zip"
)
_DOWNLOAD_TIMEOUT = 300
# GTFOBins function → MITRE technique mapping
_GTFOBINS_FUNCTION_MAP: dict[str, str] = {
"shell": "T1059",
"command": "T1059",
"reverse-shell": "T1059",
"non-interactive-reverse-shell": "T1059",
"bind-shell": "T1059",
"non-interactive-bind-shell": "T1059",
"file-upload": "T1105",
"file-download": "T1105",
"file-write": "T1105",
"file-read": "T1005",
"library-load": "T1129",
"sudo": "T1548.003",
"suid": "T1548.001",
"capabilities": "T1548",
"limited-suid": "T1548.001",
}
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _download_zip(url: str) -> bytes:
"""Download a ZIP from *url* and return raw bytes."""
logger.info("Downloading ZIP from %s", url)
resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True)
resp.raise_for_status()
content = resp.content
logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024))
return content
def _extract_zip(zip_bytes: bytes, dest: str) -> Path:
"""Extract *zip_bytes* into *dest* and return the root directory."""
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
zf.extractall(dest)
return Path(dest)
# ---------------------------------------------------------------------------
# LOLBAS import
# ---------------------------------------------------------------------------
def _parse_lolbas(root_dir: Path) -> list[dict]:
"""Parse LOLBAS YAML files and return template dicts."""
results: list[dict] = []
lolbas_root = root_dir / "LOLBAS-master"
yaml_dirs = [
lolbas_root / "yml" / "OSBinaries",
lolbas_root / "yml" / "OSLibraries",
lolbas_root / "yml" / "OSScripts",
]
yaml_files = []
for d in yaml_dirs:
if d.is_dir():
yaml_files.extend(sorted(d.rglob("*.yml")))
logger.info("LOLBAS: Found %d YAML files", len(yaml_files))
for yaml_path in yaml_files:
try:
with open(yaml_path, "r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
except Exception as exc:
logger.debug("Failed to parse %s: %s", yaml_path, exc)
continue
if not isinstance(data, dict):
continue
binary_name = data.get("Name", "").strip()
if not binary_name:
continue
description = data.get("Description", "")
commands = data.get("Commands", [])
if not isinstance(commands, list):
continue
for cmd_entry in commands:
if not isinstance(cmd_entry, dict):
continue
mitre_id = cmd_entry.get("MitreID")
if not mitre_id:
continue
# Normalise the MITRE ID
mitre_id = str(mitre_id).strip().upper()
if not mitre_id.startswith("T"):
continue
command = cmd_entry.get("Command", "")
usecase = cmd_entry.get("Usecase", "")
cmd_description = cmd_entry.get("Description", "")
# Dedup key
dedup_key = f"lolbas:{binary_name}:{mitre_id}"
procedure = []
if cmd_description:
procedure.append(f"Description: {cmd_description}")
if usecase:
procedure.append(f"Use case: {usecase}")
if command:
procedure.append(f"Command: {command}")
results.append({
"mitre_technique_id": mitre_id,
"name": f"LOLBAS: {binary_name}{usecase or cmd_description or mitre_id}"[:500],
"description": f"{description}\n\n{cmd_description}".strip()[:2000] if description else cmd_description[:2000] if cmd_description else None,
"source": "lolbas",
"platform": "windows",
"tool_suggested": binary_name,
"attack_procedure": "\n".join(procedure)[:4000] if procedure else None,
"atomic_test_id": dedup_key,
"source_url": f"https://lolbas-project.github.io/lolbas/Binaries/{binary_name}/",
})
logger.info("LOLBAS: Parsed %d templates", len(results))
return results
# ---------------------------------------------------------------------------
# GTFOBins import
# ---------------------------------------------------------------------------
def _parse_gtfobins(root_dir: Path) -> list[dict]:
"""Parse GTFOBins markdown files and return template dicts."""
results: list[dict] = []
gtfobins_root = root_dir / "GTFOBins.github.io-master" / "_gtfobins"
if not gtfobins_root.is_dir():
logger.warning("GTFOBins directory not found at %s", gtfobins_root)
return results
md_files = sorted(gtfobins_root.glob("*.md"))
logger.info("GTFOBins: Found %d markdown files", len(md_files))
for md_path in md_files:
binary_name = md_path.stem # e.g. "awk"
try:
with open(md_path, "r", encoding="utf-8") as fh:
content = fh.read()
except Exception as exc:
logger.debug("Failed to read %s: %s", md_path, exc)
continue
# Extract YAML front-matter
front_matter = _extract_front_matter(content)
if not front_matter:
continue
functions = front_matter.get("functions", {})
if not isinstance(functions, dict):
continue
for func_name, func_data in functions.items():
# Map function to MITRE technique
mitre_id = _GTFOBINS_FUNCTION_MAP.get(func_name.lower())
if not mitre_id:
continue
# Extract code examples from function data
examples = []
if isinstance(func_data, list):
for entry in func_data:
if isinstance(entry, dict):
code = entry.get("code", "")
if code:
examples.append(str(code))
elif isinstance(entry, str):
examples.append(entry)
procedure = "\n\n".join(examples) if examples else None
dedup_key = f"gtfobins:{binary_name}:{func_name}"
results.append({
"mitre_technique_id": mitre_id,
"name": f"GTFOBins: {binary_name}{func_name}"[:500],
"description": f"Abuse {binary_name} binary for {func_name} on Linux/Unix."[:2000],
"source": "gtfobins",
"platform": "linux",
"tool_suggested": binary_name,
"attack_procedure": procedure[:4000] if procedure else None,
"atomic_test_id": dedup_key,
"source_url": f"https://gtfobins.github.io/gtfobins/{binary_name}/",
})
logger.info("GTFOBins: Parsed %d templates", len(results))
return results
def _extract_front_matter(content: str) -> dict | None:
"""Extract YAML front-matter from a markdown file."""
match = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL)
if not match:
return None
try:
return yaml.safe_load(match.group(1))
except Exception:
return None
# ---------------------------------------------------------------------------
# Upsert logic
# ---------------------------------------------------------------------------
def _upsert_templates(db: Session, items: list[dict], source_name: str) -> dict:
"""Insert templates, skipping existing ones by atomic_test_id."""
existing_ids: set[str] = {
row[0]
for row in db.query(TestTemplate.atomic_test_id)
.filter(TestTemplate.source == source_name)
.filter(TestTemplate.atomic_test_id.isnot(None))
.all()
}
created = 0
skipped = 0
for item in items:
if item["atomic_test_id"] in existing_ids:
skipped += 1
continue
template = TestTemplate(
mitre_technique_id=item["mitre_technique_id"],
name=item["name"],
description=item["description"],
source=item["source"],
source_url=item.get("source_url"),
attack_procedure=item.get("attack_procedure"),
platform=item["platform"],
tool_suggested=item.get("tool_suggested"),
atomic_test_id=item["atomic_test_id"],
is_active=True,
)
db.add(template)
existing_ids.add(item["atomic_test_id"])
created += 1
db.commit()
return {"created": created, "skipped_existing": skipped, "total_parsed": len(items)}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def sync(db: Session) -> dict:
"""Import LOLBAS templates.
Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``.
"""
tmp_dir = tempfile.mkdtemp(prefix="aegis_lolbas_")
try:
zip_bytes = _download_zip(LOLBAS_ZIP_URL)
root_dir = _extract_zip(zip_bytes, tmp_dir)
parsed = _parse_lolbas(root_dir)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
summary = _upsert_templates(db, parsed, "lolbas")
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "lolbas").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("LOLBAS import complete — %s", summary)
log_action(db, user_id=None, action="import_lolbas",
entity_type="test_template", entity_id=None, details=summary)
return summary
def sync_gtfobins(db: Session) -> dict:
"""Import GTFOBins templates.
Returns a summary dict with ``created``, ``skipped_existing``, ``total_parsed``.
"""
tmp_dir = tempfile.mkdtemp(prefix="aegis_gtfobins_")
try:
zip_bytes = _download_zip(GTFOBINS_ZIP_URL)
root_dir = _extract_zip(zip_bytes, tmp_dir)
parsed = _parse_gtfobins(root_dir)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
summary = _upsert_templates(db, parsed, "gtfobins")
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "gtfobins").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("GTFOBins import complete — %s", summary)
log_action(db, user_id=None, action="import_gtfobins",
entity_type="test_template", entity_id=None, details=summary)
return summary
@@ -0,0 +1,308 @@
"""Sigma Rules import service.
Downloads the SigmaHQ repository ZIP from GitHub, parses every YAML rule
file under ``rules/``, extracts MITRE ATT&CK tags, and creates
:class:`DetectionRule` records in the database.
Strategy
--------
1. Download the full SigmaHQ repo as a ZIP archive.
2. Extract in a temporary directory.
3. Walk all ``.yml`` files under ``rules/``.
4. Parse each YAML file — extract title, description, logsource,
detection tags, severity (``level``), and the raw YAML content.
5. Filter: only import rules that have at least one ``attack.tXXXX`` tag.
6. Create / skip ``DetectionRule`` rows keyed by ``(source, source_id)``.
7. Clean up the temporary directory.
Idempotency
-----------
Running the import twice does **not** create duplicates. Existing
rules are identified by ``source = "sigma"`` + ``source_id`` (relative
file path) and simply skipped.
"""
import io
import logging
import re
import shutil
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
import requests as _requests
import yaml
from sqlalchemy.orm import Session
from app.models.detection_rule import DetectionRule
from app.models.data_source import DataSource
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SIGMA_ZIP_URL = (
"https://github.com/SigmaHQ/sigma/archive/refs/heads/main.zip"
)
_DOWNLOAD_TIMEOUT = 300
_ZIP_ROOT_PREFIX = "sigma-main"
# Regex to extract MITRE ATT&CK technique IDs from Sigma tags
# e.g. "attack.t1059.001" → "T1059.001"
_ATTACK_TAG_RE = re.compile(r"attack\.(t\d{4}(?:\.\d{3})?)", re.IGNORECASE)
# Sigma severity levels
_SEVERITY_MAP = {
"informational": "informational",
"low": "low",
"medium": "medium",
"high": "high",
"critical": "critical",
}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _download_zip(url: str = SIGMA_ZIP_URL) -> bytes:
"""Download the SigmaHQ ZIP and return raw bytes."""
logger.info("Downloading SigmaHQ ZIP from %s", url)
resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True)
resp.raise_for_status()
content = resp.content
logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024))
return content
def _extract_zip(zip_bytes: bytes, dest: str) -> Path:
"""Extract *zip_bytes* into *dest* and return the path to rules/ dir."""
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
zf.extractall(dest)
rules_dir = Path(dest) / _ZIP_ROOT_PREFIX / "rules"
if not rules_dir.is_dir():
raise FileNotFoundError(
f"Expected rules directory not found at {rules_dir}"
)
return rules_dir
def _extract_attack_tags(tags: list) -> list[str]:
"""Extract MITRE technique IDs from Sigma tag list.
Example input: ["attack.defense_evasion", "attack.t1059.001", "cve.2021.44228"]
Example output: ["T1059.001"]
"""
technique_ids = []
for tag in tags:
m = _ATTACK_TAG_RE.match(str(tag).strip())
if m:
technique_ids.append(m.group(1).upper())
return list(set(technique_ids))
def _parse_sigma_rules(rules_dir: Path) -> list[dict]:
"""Walk the rules directory and parse all Sigma YAML files.
Returns a flat list of dicts, one per (rule, technique) combination.
A single Sigma rule tagged with N techniques produces N entries.
"""
results: list[dict] = []
yaml_files = sorted(rules_dir.rglob("*.yml"))
logger.info("Found %d YAML files to parse", len(yaml_files))
for yaml_path in yaml_files:
relative_path = str(yaml_path.relative_to(rules_dir.parent))
try:
with open(yaml_path, "r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
except Exception as exc:
logger.debug("Failed to parse %s: %s", yaml_path, exc)
continue
if not isinstance(data, dict):
continue
title = data.get("title", "").strip()
if not title:
continue
# Extract ATT&CK technique IDs from tags
tags = data.get("tags", [])
if not isinstance(tags, list):
continue
technique_ids = _extract_attack_tags(tags)
if not technique_ids:
continue # Skip rules without ATT&CK mapping
description = data.get("description", "")
level = str(data.get("level", "")).lower()
severity = _SEVERITY_MAP.get(level)
# Extract logsource
logsource = data.get("logsource", {})
if not isinstance(logsource, dict):
logsource = {}
# Read full YAML content for storage
try:
with open(yaml_path, "r", encoding="utf-8") as fh:
raw_content = fh.read()
except Exception:
raw_content = yaml.dump(data, default_flow_style=False)
# False positive assessment
falsepositives = data.get("falsepositives", [])
if isinstance(falsepositives, list) and len(falsepositives) > 3:
fp_rate = "high"
elif isinstance(falsepositives, list) and len(falsepositives) > 1:
fp_rate = "medium"
else:
fp_rate = "low"
# Create one entry per technique
for tech_id in technique_ids:
source_url = (
f"https://github.com/SigmaHQ/sigma/blob/main/"
f"{relative_path.replace(chr(92), '/')}"
)
results.append({
"mitre_technique_id": tech_id,
"title": title[:500],
"description": str(description)[:2000] if description else None,
"source_id": relative_path,
"source_url": source_url,
"rule_content": raw_content,
"severity": severity,
"log_sources": logsource if logsource else None,
"false_positive_rate": fp_rate,
"platforms": _platforms_from_logsource(logsource),
})
logger.info("Parsed %d (rule, technique) pairs total", len(results))
return results
def _platforms_from_logsource(logsource: dict) -> list[str]:
"""Infer platform list from Sigma logsource."""
platforms = []
product = str(logsource.get("product", "")).lower()
service = str(logsource.get("service", "")).lower()
if "windows" in product or "windows" in service:
platforms.append("windows")
if "linux" in product or "linux" in service:
platforms.append("linux")
if "macos" in product or "macos" in service:
platforms.append("macos")
# Sysmon → Windows
if "sysmon" in service and "windows" not in platforms:
platforms.append("windows")
return platforms if platforms else None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def sync(db: Session) -> dict:
"""Download and import Sigma detection rules.
Parameters
----------
db : Session
Active SQLAlchemy database session.
Returns
-------
dict
Summary with ``created``, ``skipped_existing``, ``total_parsed``.
"""
tmp_dir = tempfile.mkdtemp(prefix="aegis_sigma_")
try:
zip_bytes = _download_zip()
rules_dir = _extract_zip(zip_bytes, tmp_dir)
parsed_rules = _parse_sigma_rules(rules_dir)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
logger.info("Cleaned up temp directory %s", tmp_dir)
# Pre-load existing source_ids for dedup
existing_ids: set[str] = {
row[0]
for row in db.query(DetectionRule.source_id)
.filter(DetectionRule.source == "sigma")
.filter(DetectionRule.source_id.isnot(None))
.all()
}
created = 0
skipped = 0
for item in parsed_rules:
# Dedup key: source_id (relative path). A rule file may produce
# multiple entries (one per technique), but we deduplicate by
# source_id so re-runs are safe. For multi-technique rules we
# only skip if the exact same source_id is already present.
dedup_key = f"{item['source_id']}::{item['mitre_technique_id']}"
if item["source_id"] in existing_ids:
skipped += 1
continue
rule = DetectionRule(
mitre_technique_id=item["mitre_technique_id"],
title=item["title"],
description=item["description"],
source="sigma",
source_id=item["source_id"],
source_url=item["source_url"],
rule_content=item["rule_content"],
rule_format="sigma_yaml",
severity=item["severity"],
platforms=item["platforms"],
log_sources=item["log_sources"],
false_positive_rate=item["false_positive_rate"],
is_active=True,
)
db.add(rule)
existing_ids.add(item["source_id"])
created += 1
db.commit()
summary = {
"created": created,
"skipped_existing": skipped,
"total_parsed": len(parsed_rules),
}
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "sigma").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("Sigma import complete — %s", summary)
log_action(
db,
user_id=None,
action="import_sigma_rules",
entity_type="detection_rule",
entity_id=None,
details=summary,
)
return summary
@@ -0,0 +1,373 @@
"""Threat Actor import service (MITRE CTI / STIX 2.0).
Downloads the MITRE CTI repository, parses the STIX 2.0 bundle for
``intrusion-set`` objects (APT groups) and ``relationship`` objects
linking them to ``attack-pattern`` (techniques), then creates
:class:`ThreatActor` and :class:`ThreatActorTechnique` records.
STIX 2.0 structure
------------------
The enterprise-attack bundle contains:
- ``intrusion-set`` objects → our ThreatActor rows
- ``attack-pattern`` objects → already in our Technique table
- ``relationship`` objects (type=uses) → connects intrusion-set → attack-pattern
Strategy
--------
1. Download ZIP of ``github.com/mitre/cti``.
2. Load ``enterprise-attack/enterprise-attack.json`` (single STIX bundle).
3. Build lookup maps for intrusion-sets and attack-patterns.
4. Parse relationships to connect actors → techniques.
5. Upsert into database.
Idempotency
-----------
Deduplication by ``mitre_id`` for ThreatActor and by the unique
constraint ``(threat_actor_id, technique_id)`` for ThreatActorTechnique.
"""
import io
import json
import logging
import shutil
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
import requests as _requests
from sqlalchemy.orm import Session
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.models.technique import Technique
from app.models.data_source import DataSource
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
MITRE_CTI_ZIP_URL = (
"https://github.com/mitre/cti"
"/archive/refs/heads/master.zip"
)
_DOWNLOAD_TIMEOUT = 300
_ZIP_ROOT_PREFIX = "cti-master"
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _download_zip(url: str = MITRE_CTI_ZIP_URL) -> bytes:
"""Download the MITRE CTI ZIP and return raw bytes."""
logger.info("Downloading MITRE CTI ZIP from %s", url)
resp = _requests.get(url, timeout=_DOWNLOAD_TIMEOUT, stream=True)
resp.raise_for_status()
content = resp.content
logger.info("Downloaded %.1f MB", len(content) / (1024 * 1024))
return content
def _extract_zip_and_load_bundle(zip_bytes: bytes, dest: str) -> dict:
"""Extract ZIP and load the enterprise-attack STIX bundle."""
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
zf.extractall(dest)
bundle_path = (
Path(dest) / _ZIP_ROOT_PREFIX
/ "enterprise-attack" / "enterprise-attack.json"
)
if not bundle_path.is_file():
raise FileNotFoundError(
f"STIX bundle not found at {bundle_path}"
)
logger.info("Loading STIX bundle from %s", bundle_path)
with open(bundle_path, "r", encoding="utf-8") as fh:
bundle = json.load(fh)
objects = bundle.get("objects", [])
logger.info("Loaded %d STIX objects", len(objects))
return bundle
def _extract_mitre_id(external_references: list) -> str | None:
"""Extract the MITRE ATT&CK ID from external_references."""
if not isinstance(external_references, list):
return None
for ref in external_references:
if isinstance(ref, dict) and ref.get("source_name") == "mitre-attack":
return ref.get("external_id")
return None
def _extract_mitre_url(external_references: list) -> str | None:
"""Extract the MITRE ATT&CK URL from external_references."""
if not isinstance(external_references, list):
return None
for ref in external_references:
if isinstance(ref, dict) and ref.get("source_name") == "mitre-attack":
return ref.get("url")
return None
def _parse_intrusion_sets(objects: list) -> list[dict]:
"""Parse STIX intrusion-set objects into ThreatActor dicts."""
actors = []
for obj in objects:
if obj.get("type") != "intrusion-set":
continue
if obj.get("revoked"):
continue
ext_refs = obj.get("external_references", [])
mitre_id = _extract_mitre_id(ext_refs)
mitre_url = _extract_mitre_url(ext_refs)
name = obj.get("name", "").strip()
if not name:
continue
aliases = obj.get("aliases", [])
if isinstance(aliases, list) and name in aliases:
aliases = [a for a in aliases if a != name]
description = obj.get("description", "")
# Extract references (non-MITRE)
references = []
for ref in ext_refs:
if isinstance(ref, dict) and ref.get("source_name") != "mitre-attack":
references.append({
"source": ref.get("source_name", ""),
"url": ref.get("url", ""),
"description": ref.get("description", ""),
})
actors.append({
"stix_id": obj.get("id"), # e.g. "intrusion-set--abc123"
"mitre_id": mitre_id,
"name": name,
"aliases": aliases if aliases else [],
"description": description,
"mitre_url": mitre_url,
"references": references[:20], # cap to avoid bloat
"first_seen": obj.get("first_seen"),
"last_seen": obj.get("last_seen"),
})
logger.info("Parsed %d intrusion-sets (threat actors)", len(actors))
return actors
def _parse_relationships(objects: list) -> list[dict]:
"""Parse STIX relationship objects (type=uses) linking
intrusion-sets to attack-patterns.
"""
relationships = []
for obj in objects:
if obj.get("type") != "relationship":
continue
if obj.get("relationship_type") != "uses":
continue
if obj.get("revoked"):
continue
source_ref = obj.get("source_ref", "")
target_ref = obj.get("target_ref", "")
# We want intrusion-set → attack-pattern
if not source_ref.startswith("intrusion-set--"):
continue
if not target_ref.startswith("attack-pattern--"):
continue
relationships.append({
"source_ref": source_ref,
"target_ref": target_ref,
"description": obj.get("description", ""),
})
logger.info("Parsed %d uses-relationships (actor→technique)", len(relationships))
return relationships
def _build_attack_pattern_map(objects: list) -> dict[str, str]:
"""Build a map from STIX attack-pattern ID → MITRE technique ID.
e.g. {"attack-pattern--abc123": "T1059.001"}
"""
mapping = {}
for obj in objects:
if obj.get("type") != "attack-pattern":
continue
if obj.get("revoked"):
continue
stix_id = obj.get("id", "")
mitre_id = _extract_mitre_id(obj.get("external_references", []))
if stix_id and mitre_id:
mapping[stix_id] = mitre_id
logger.info("Built attack-pattern map with %d entries", len(mapping))
return mapping
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def sync(db: Session) -> dict:
"""Download and import threat actors from MITRE CTI.
Returns a summary dict.
"""
tmp_dir = tempfile.mkdtemp(prefix="aegis_mitre_cti_")
try:
zip_bytes = _download_zip()
bundle = _extract_zip_and_load_bundle(zip_bytes, tmp_dir)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
logger.info("Cleaned up temp directory %s", tmp_dir)
objects = bundle.get("objects", [])
# Step 1: Parse data
actor_dicts = _parse_intrusion_sets(objects)
relationships = _parse_relationships(objects)
attack_pattern_map = _build_attack_pattern_map(objects)
# Step 2: Build STIX-ID → actor dict map
stix_to_actor = {a["stix_id"]: a for a in actor_dicts}
# Step 3: Load existing actors and techniques from DB
existing_actors = {
row.mitre_id: row
for row in db.query(ThreatActor).all()
if row.mitre_id
}
technique_by_mitre_id = {
row.mitre_id: row
for row in db.query(Technique).all()
}
# Step 4: Upsert threat actors
actors_created = 0
actors_skipped = 0
stix_to_db_actor: dict[str, ThreatActor] = {}
for actor_dict in actor_dicts:
mitre_id = actor_dict["mitre_id"]
stix_id = actor_dict["stix_id"]
if mitre_id and mitre_id in existing_actors:
# Update existing actor
db_actor = existing_actors[mitre_id]
db_actor.name = actor_dict["name"]
db_actor.aliases = actor_dict["aliases"]
db_actor.description = actor_dict["description"]
db_actor.mitre_url = actor_dict["mitre_url"]
db_actor.references = actor_dict["references"]
db_actor.first_seen = actor_dict.get("first_seen")
db_actor.last_seen = actor_dict.get("last_seen")
stix_to_db_actor[stix_id] = db_actor
actors_skipped += 1
else:
# Create new actor
db_actor = ThreatActor(
mitre_id=mitre_id,
name=actor_dict["name"],
aliases=actor_dict["aliases"],
description=actor_dict["description"],
mitre_url=actor_dict["mitre_url"],
references=actor_dict["references"],
first_seen=actor_dict.get("first_seen"),
last_seen=actor_dict.get("last_seen"),
is_active=True,
)
db.add(db_actor)
db.flush() # get the ID
if mitre_id:
existing_actors[mitre_id] = db_actor
stix_to_db_actor[stix_id] = db_actor
actors_created += 1
db.flush()
# Step 5: Upsert actor-technique relationships
# Load existing relationships
existing_rels: set[tuple] = set()
for row in db.query(ThreatActorTechnique).all():
existing_rels.add((str(row.threat_actor_id), str(row.technique_id)))
rels_created = 0
rels_skipped = 0
for rel in relationships:
source_ref = rel["source_ref"]
target_ref = rel["target_ref"]
# Resolve actor
db_actor = stix_to_db_actor.get(source_ref)
if not db_actor:
continue
# Resolve technique
mitre_technique_id = attack_pattern_map.get(target_ref)
if not mitre_technique_id:
continue
db_technique = technique_by_mitre_id.get(mitre_technique_id)
if not db_technique:
continue
rel_key = (str(db_actor.id), str(db_technique.id))
if rel_key in existing_rels:
rels_skipped += 1
continue
actor_technique = ThreatActorTechnique(
threat_actor_id=db_actor.id,
technique_id=db_technique.id,
usage_description=rel["description"][:5000] if rel["description"] else None,
)
db.add(actor_technique)
existing_rels.add(rel_key)
rels_created += 1
db.commit()
summary = {
"actors_created": actors_created,
"actors_updated": actors_skipped,
"relationships_created": rels_created,
"relationships_skipped": rels_skipped,
"total_actors_parsed": len(actor_dicts),
"total_relationships_parsed": len(relationships),
}
# Update DataSource record
ds = db.query(DataSource).filter(DataSource.name == "mitre_cti").first()
if ds:
ds.last_sync_at = datetime.utcnow()
ds.last_sync_status = "success"
ds.last_sync_stats = summary
db.commit()
logger.info("MITRE CTI threat actor import complete — %s", summary)
log_action(
db,
user_id=None,
action="import_threat_actors",
entity_type="threat_actor",
entity_id=None,
details=summary,
)
return summary
+2
View File
@@ -10,6 +10,8 @@ boto3
apscheduler
requests
pyyaml
pySigma
toml
taxii2-client
python-multipart
pydantic-settings
+13
View File
@@ -11,6 +11,9 @@ import ReportsPage from "./pages/ReportsPage";
import SystemPage from "./pages/SystemPage";
import UsersPage from "./pages/UsersPage";
import AuditLogPage from "./pages/AuditLogPage";
import DataSourcesPage from "./pages/DataSourcesPage";
import ThreatActorsPage from "./pages/ThreatActorsPage";
import ThreatActorDetailPage from "./pages/ThreatActorDetailPage";
import Layout from "./components/Layout";
import ProtectedRoute from "./components/ProtectedRoute";
@@ -37,6 +40,8 @@ export default function App() {
<Route path="/test-catalog" element={<TestCatalogPage />} />
<Route path="/test-catalog/:templateId/use" element={<TestCatalogPage />} />
<Route path="/reports" element={<ReportsPage />} />
<Route path="/threat-actors" element={<ThreatActorsPage />} />
<Route path="/threat-actors/:actorId" element={<ThreatActorDetailPage />} />
<Route
path="/system"
element={
@@ -61,6 +66,14 @@ export default function App() {
</ProtectedRoute>
}
/>
<Route
path="/data-sources"
element={
<ProtectedRoute roles={["admin"]}>
<DataSourcesPage />
</ProtectedRoute>
}
/>
</Route>
{/* Catch-all → dashboard */}
+79
View File
@@ -0,0 +1,79 @@
import client from "./client";
export interface DataSource {
id: string;
name: string;
display_name: string;
type: string;
url: string | null;
description: string | null;
is_enabled: boolean;
last_sync_at: string | null;
last_sync_status: string | null;
last_sync_stats: Record<string, unknown> | null;
sync_frequency: string | null;
config: Record<string, unknown> | null;
created_at: string | null;
}
export interface SyncResult {
message: string;
source: string;
stats: Record<string, unknown>;
}
export interface SyncAllResult {
message: string;
results: Array<{
source: string;
status: string;
stats?: Record<string, unknown>;
detail?: string;
}>;
}
export interface DataSourceStats {
id: string;
name: string;
display_name: string;
type: string;
is_enabled: boolean;
last_sync_at: string | null;
last_sync_status: string | null;
last_sync_stats: Record<string, unknown> | null;
total_templates: number;
total_rules: number;
}
/** List all data sources. */
export async function getDataSources(): Promise<DataSource[]> {
const { data } = await client.get<DataSource[]>("/data-sources");
return data;
}
/** Update a data source (enable/disable, config). */
export async function updateDataSource(
id: string,
body: Partial<{ is_enabled: boolean; sync_frequency: string; config: Record<string, unknown> }>
): Promise<{ message: string; id: string }> {
const { data } = await client.patch(`/data-sources/${id}`, body);
return data;
}
/** Trigger sync for a specific data source. */
export async function syncDataSource(id: string): Promise<SyncResult> {
const { data } = await client.post<SyncResult>(`/data-sources/${id}/sync`);
return data;
}
/** Trigger sync for all enabled data sources. */
export async function syncAllDataSources(): Promise<SyncAllResult> {
const { data } = await client.post<SyncAllResult>("/data-sources/sync-all");
return data;
}
/** Get stats for a specific data source. */
export async function getDataSourceStats(id: string): Promise<DataSourceStats> {
const { data } = await client.get<DataSourceStats>(`/data-sources/${id}/stats`);
return data;
}
+123
View File
@@ -0,0 +1,123 @@
import client from "./client";
// ── Types ─────────────────────────────────────────────────────────
export interface ThreatActorSummary {
id: string;
mitre_id: string | null;
name: string;
aliases: string[];
country: string | null;
target_sectors: string[];
target_regions: string[];
motivation: string | null;
sophistication: string | null;
mitre_url: string | null;
technique_count: number;
coverage_pct: number;
is_active: boolean;
}
export interface ThreatActorListResponse {
total: number;
offset: number;
limit: number;
items: ThreatActorSummary[];
}
export interface ThreatActorTechnique {
technique_id: string;
mitre_id: string;
name: string;
tactic: string | null;
status_global: string | null;
usage_description: string | null;
first_seen_using: string | null;
}
export interface ThreatActorDetail {
id: string;
mitre_id: string | null;
name: string;
aliases: string[];
description: string | null;
country: string | null;
target_sectors: string[];
target_regions: string[];
motivation: string | null;
sophistication: string | null;
first_seen: string | null;
last_seen: string | null;
references: Array<{ source: string; url: string; description: string }>;
mitre_url: string | null;
is_active: boolean;
techniques: ThreatActorTechnique[];
}
export interface CoverageResponse {
actor_id: string;
actor_name: string;
total_techniques: number;
covered: number;
coverage_pct: number;
breakdown: Record<string, number>;
}
export interface GapItem {
technique_id: string;
mitre_id: string;
name: string;
tactic: string | null;
status_global: string | null;
usage_description: string | null;
available_templates: number;
existing_tests: number;
has_templates: boolean;
}
export interface GapsResponse {
actor_id: string;
actor_name: string;
total_gaps: number;
gaps: GapItem[];
}
// ── API Functions ─────────────────────────────────────────────────
export interface ListThreatActorsParams {
search?: string;
country?: string;
motivation?: string;
sophistication?: string;
target_sectors?: string;
offset?: number;
limit?: number;
}
/** List threat actors with filters. */
export async function getThreatActors(
params?: ListThreatActorsParams
): Promise<ThreatActorListResponse> {
const { data } = await client.get<ThreatActorListResponse>("/threat-actors", {
params,
});
return data;
}
/** Get detailed info about a threat actor. */
export async function getThreatActor(id: string): Promise<ThreatActorDetail> {
const { data } = await client.get<ThreatActorDetail>(`/threat-actors/${id}`);
return data;
}
/** Get coverage analysis for a threat actor. */
export async function getThreatActorCoverage(id: string): Promise<CoverageResponse> {
const { data } = await client.get<CoverageResponse>(`/threat-actors/${id}/coverage`);
return data;
}
/** Get gap analysis for a threat actor. */
export async function getThreatActorGaps(id: string): Promise<GapsResponse> {
const { data } = await client.get<GapsResponse>(`/threat-actors/${id}/gaps`);
return data;
}
+4
View File
@@ -12,6 +12,8 @@ import {
ChevronDown,
ListChecks,
ClipboardList,
Database,
Crosshair,
} from "lucide-react";
import { useAuth } from "../context/AuthContext";
@@ -36,11 +38,13 @@ const mainLinks: NavItem[] = [
],
},
{ to: "/reports", label: "Reports", icon: BarChart3 },
{ to: "/threat-actors", label: "Threat Actors", icon: Crosshair },
];
const adminLinks: NavItem[] = [
{ to: "/users", label: "Users", icon: Users },
{ to: "/audit", label: "Audit Log", icon: FileText },
{ to: "/data-sources", label: "Data Sources", icon: Database },
{ to: "/system", label: "System", icon: Settings },
];
+375
View File
@@ -0,0 +1,375 @@
import { useState } from "react";
import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query";
import {
Loader2,
RefreshCw,
Database,
CheckCircle,
XCircle,
AlertCircle,
Clock,
ToggleLeft,
ToggleRight,
Play,
ExternalLink,
Shield,
Search,
Swords,
Bug,
} from "lucide-react";
import {
getDataSources,
updateDataSource,
syncDataSource,
syncAllDataSources,
type DataSource,
type SyncAllResult,
} from "../api/data-sources";
/** Map source type to visual props. */
function typeProps(type: string) {
switch (type) {
case "attack_procedure":
return { label: "Attack Procedure", color: "text-red-400 bg-red-900/50 border-red-500/30", icon: Swords };
case "detection_rule":
return { label: "Detection Rule", color: "text-blue-400 bg-blue-900/50 border-blue-500/30", icon: Shield };
case "threat_intel":
return { label: "Threat Intel", color: "text-purple-400 bg-purple-900/50 border-purple-500/30", icon: Search };
case "defensive_technique":
return { label: "Defensive", color: "text-green-400 bg-green-900/50 border-green-500/30", icon: Shield };
default:
return { label: type, color: "text-gray-400 bg-gray-800/50 border-gray-600/30", icon: Bug };
}
}
function statusBadge(status: string | null) {
if (!status) return null;
switch (status) {
case "success":
return (
<span className="inline-flex items-center gap-1 rounded-full border border-green-500/30 bg-green-900/50 px-2 py-0.5 text-xs font-medium text-green-400">
<CheckCircle className="h-3 w-3" /> Success
</span>
);
case "error":
return (
<span className="inline-flex items-center gap-1 rounded-full border border-red-500/30 bg-red-900/50 px-2 py-0.5 text-xs font-medium text-red-400">
<XCircle className="h-3 w-3" /> Error
</span>
);
case "in_progress":
return (
<span className="inline-flex items-center gap-1 rounded-full border border-yellow-500/30 bg-yellow-900/50 px-2 py-0.5 text-xs font-medium text-yellow-400">
<Loader2 className="h-3 w-3 animate-spin" /> In Progress
</span>
);
default:
return (
<span className="inline-flex rounded-full border border-gray-600/30 bg-gray-800/50 px-2 py-0.5 text-xs text-gray-400">
{status}
</span>
);
}
}
export default function DataSourcesPage() {
const queryClient = useQueryClient();
const [syncingId, setSyncingId] = useState<string | null>(null);
const [syncAllResult, setSyncAllResult] = useState<SyncAllResult | null>(null);
// ── Queries ─────────────────────────────────────────────────────
const {
data: sources,
isLoading,
error,
} = useQuery({
queryKey: ["data-sources"],
queryFn: getDataSources,
});
// ── Toggle enable/disable ───────────────────────────────────────
const toggleMutation = useMutation({
mutationFn: ({ id, enabled }: { id: string; enabled: boolean }) =>
updateDataSource(id, { is_enabled: enabled }),
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: ["data-sources"] });
},
});
// ── Sync individual source ──────────────────────────────────────
const syncMutation = useMutation({
mutationFn: (id: string) => syncDataSource(id),
onSuccess: () => {
setSyncingId(null);
queryClient.invalidateQueries({ queryKey: ["data-sources"] });
},
onError: () => {
setSyncingId(null);
queryClient.invalidateQueries({ queryKey: ["data-sources"] });
},
});
// ── Sync all ────────────────────────────────────────────────────
const syncAllMutation = useMutation({
mutationFn: syncAllDataSources,
onSuccess: (data) => {
setSyncAllResult(data);
queryClient.invalidateQueries({ queryKey: ["data-sources"] });
},
});
const handleSync = (id: string) => {
setSyncingId(id);
syncMutation.mutate(id);
};
const formatDate = (dateStr: string | null) => {
if (!dateStr) return "Never";
const date = new Date(dateStr);
return date.toLocaleString("en-US", { dateStyle: "medium", timeStyle: "short" });
};
const formatStats = (stats: Record<string, unknown> | null) => {
if (!stats) return null;
return Object.entries(stats)
.filter(([k]) => k !== "error")
.map(([k, v]) => `${k.replace(/_/g, " ")}: ${v}`)
.join(" | ");
};
return (
<div className="space-y-6">
{/* Header */}
<div className="flex items-center justify-between">
<div>
<h1 className="text-2xl font-bold text-white">Data Sources</h1>
<p className="mt-1 text-sm text-gray-400">
Manage external data sources for test templates and detection rules
</p>
</div>
<button
onClick={() => syncAllMutation.mutate()}
disabled={syncAllMutation.isPending}
className="flex items-center gap-2 rounded-lg bg-cyan-600 px-4 py-2.5 text-sm font-medium text-white hover:bg-cyan-500 disabled:opacity-50 transition-colors"
>
{syncAllMutation.isPending ? (
<Loader2 className="h-4 w-4 animate-spin" />
) : (
<RefreshCw className="h-4 w-4" />
)}
{syncAllMutation.isPending ? "Syncing All..." : "Sync All"}
</button>
</div>
{/* Sync All Result */}
{syncAllResult && (
<div className="rounded-xl border border-cyan-500/30 bg-gray-900 p-4">
<div className="flex items-center justify-between mb-3">
<h3 className="text-sm font-semibold text-white">Sync All Results</h3>
<button
onClick={() => setSyncAllResult(null)}
className="text-gray-400 hover:text-white text-xs"
>
Dismiss
</button>
</div>
<div className="space-y-2">
{syncAllResult.results.map((r, i) => (
<div key={i} className="flex items-center gap-3 text-sm">
{r.status === "success" ? (
<CheckCircle className="h-4 w-4 text-green-400 shrink-0" />
) : r.status === "error" ? (
<XCircle className="h-4 w-4 text-red-400 shrink-0" />
) : (
<AlertCircle className="h-4 w-4 text-yellow-400 shrink-0" />
)}
<span className="text-gray-300 font-medium">{r.source}</span>
{r.stats && (
<span className="text-gray-500 text-xs">
{formatStats(r.stats as Record<string, unknown>)}
</span>
)}
{r.detail && (
<span className="text-gray-500 text-xs">{r.detail}</span>
)}
</div>
))}
</div>
</div>
)}
{/* Loading */}
{isLoading && (
<div className="flex items-center justify-center py-16">
<Loader2 className="h-8 w-8 animate-spin text-cyan-400" />
</div>
)}
{/* Error */}
{error && (
<div className="rounded-xl border border-red-500/30 bg-red-900/20 p-6 text-center">
<AlertCircle className="mx-auto h-8 w-8 text-red-400" />
<p className="mt-2 text-sm text-red-400">
Failed to load data sources: {(error as Error)?.message}
</p>
</div>
)}
{/* Data Sources Table */}
{sources && sources.length > 0 && (
<div className="rounded-xl border border-gray-800 bg-gray-900 overflow-hidden">
<div className="overflow-x-auto">
<table className="w-full text-left text-sm">
<thead>
<tr className="border-b border-gray-800 bg-gray-900/50">
<th className="px-4 py-3 font-medium text-gray-400">Source</th>
<th className="px-4 py-3 font-medium text-gray-400">Type</th>
<th className="px-4 py-3 font-medium text-gray-400">Status</th>
<th className="px-4 py-3 font-medium text-gray-400">Last Sync</th>
<th className="px-4 py-3 font-medium text-gray-400">Stats</th>
<th className="px-4 py-3 font-medium text-gray-400">Enabled</th>
<th className="px-4 py-3 font-medium text-gray-400">Actions</th>
</tr>
</thead>
<tbody>
{sources.map((src: DataSource) => {
const tp = typeProps(src.type);
const TypeIcon = tp.icon;
const isSyncing = syncingId === src.id;
return (
<tr
key={src.id}
className="border-b border-gray-800/50 hover:bg-gray-800/30 transition-colors"
>
{/* Source */}
<td className="px-4 py-3">
<div className="flex items-center gap-3">
<div className="rounded-lg bg-gray-800 p-2">
<Database className="h-4 w-4 text-cyan-400" />
</div>
<div>
<p className="font-medium text-gray-200">{src.display_name}</p>
<div className="flex items-center gap-2">
<span className="text-xs text-gray-500 font-mono">{src.name}</span>
{src.url && (
<a
href={src.url}
target="_blank"
rel="noreferrer"
className="text-gray-500 hover:text-cyan-400"
>
<ExternalLink className="h-3 w-3" />
</a>
)}
</div>
</div>
</div>
</td>
{/* Type */}
<td className="px-4 py-3">
<span
className={`inline-flex items-center gap-1 rounded-full border px-2 py-0.5 text-xs font-medium ${tp.color}`}
>
<TypeIcon className="h-3 w-3" />
{tp.label}
</span>
</td>
{/* Sync Status */}
<td className="px-4 py-3">
{isSyncing ? statusBadge("in_progress") : statusBadge(src.last_sync_status)}
</td>
{/* Last Sync */}
<td className="px-4 py-3">
<div className="flex items-center gap-1.5 text-xs text-gray-400">
<Clock className="h-3.5 w-3.5" />
{formatDate(src.last_sync_at)}
</div>
{src.sync_frequency && (
<span className="text-[10px] text-gray-600">
Frequency: {src.sync_frequency}
</span>
)}
</td>
{/* Stats */}
<td className="px-4 py-3">
{src.last_sync_stats ? (
<span className="text-xs text-gray-400">
{formatStats(src.last_sync_stats)}
</span>
) : (
<span className="text-xs text-gray-600">-</span>
)}
</td>
{/* Toggle */}
<td className="px-4 py-3">
<button
onClick={() =>
toggleMutation.mutate({
id: src.id,
enabled: !src.is_enabled,
})
}
disabled={toggleMutation.isPending}
className={`flex items-center gap-1 text-xs font-medium transition-colors ${
src.is_enabled
? "text-green-400 hover:text-green-300"
: "text-gray-500 hover:text-gray-400"
}`}
>
{src.is_enabled ? (
<>
<ToggleRight className="h-5 w-5" />
On
</>
) : (
<>
<ToggleLeft className="h-5 w-5" />
Off
</>
)}
</button>
</td>
{/* Actions */}
<td className="px-4 py-3">
<button
onClick={() => handleSync(src.id)}
disabled={isSyncing || !src.is_enabled}
className="flex items-center gap-1.5 rounded-lg bg-gray-800 border border-gray-700 px-3 py-1.5 text-xs font-medium text-gray-300 hover:bg-gray-700 hover:text-white disabled:opacity-40 transition-colors"
>
{isSyncing ? (
<Loader2 className="h-3.5 w-3.5 animate-spin" />
) : (
<Play className="h-3.5 w-3.5" />
)}
{isSyncing ? "Syncing..." : "Sync"}
</button>
</td>
</tr>
);
})}
</tbody>
</table>
</div>
</div>
)}
{/* Empty State */}
{sources && sources.length === 0 && (
<div className="rounded-xl border border-gray-800 bg-gray-900 p-12 text-center">
<Database className="mx-auto h-12 w-12 text-gray-600" />
<h3 className="mt-4 text-lg font-medium text-gray-300">No Data Sources</h3>
<p className="mt-1 text-sm text-gray-500">
Run the data sources seed script to register initial sources.
</p>
</div>
)}
</div>
);
}
@@ -0,0 +1,571 @@
import { useParams, useNavigate } from "react-router-dom";
import { useQuery } from "@tanstack/react-query";
import {
Loader2,
AlertCircle,
ArrowLeft,
Globe,
Target,
Shield,
ExternalLink,
BookOpen,
AlertTriangle,
CheckCircle,
XCircle,
Clock,
Crosshair,
FlaskConical,
} from "lucide-react";
import {
getThreatActor,
getThreatActorCoverage,
getThreatActorGaps,
type ThreatActorTechnique,
type GapItem,
} from "../api/threat-actors";
// ── MITRE ATT&CK Tactics in kill chain order ──────────────────────
const TACTICS_ORDER = [
"reconnaissance",
"resource-development",
"initial-access",
"execution",
"persistence",
"privilege-escalation",
"defense-evasion",
"credential-access",
"discovery",
"lateral-movement",
"collection",
"command-and-control",
"exfiltration",
"impact",
];
/** Status → cell colour. */
function statusCellColor(status: string | null) {
switch (status) {
case "validated":
return "bg-green-500/80 border-green-400/40";
case "partial":
return "bg-yellow-500/80 border-yellow-400/40";
case "in_progress":
return "bg-blue-500/60 border-blue-400/40";
case "not_covered":
return "bg-red-500/70 border-red-400/40";
case "not_evaluated":
default:
return "bg-gray-700/50 border-gray-600/40";
}
}
function statusLabel(status: string | null) {
switch (status) {
case "validated":
return "Validated";
case "partial":
return "Partial";
case "in_progress":
return "In Progress";
case "not_covered":
return "Not Covered";
case "review_required":
return "Review Required";
case "not_evaluated":
default:
return "Not Evaluated";
}
}
function statusIcon(status: string | null) {
switch (status) {
case "validated":
return <CheckCircle className="h-3.5 w-3.5 text-green-400" />;
case "partial":
return <AlertTriangle className="h-3.5 w-3.5 text-yellow-400" />;
case "in_progress":
return <Clock className="h-3.5 w-3.5 text-blue-400" />;
case "not_covered":
return <XCircle className="h-3.5 w-3.5 text-red-400" />;
default:
return <Shield className="h-3.5 w-3.5 text-gray-500" />;
}
}
export default function ThreatActorDetailPage() {
const { actorId } = useParams();
const navigate = useNavigate();
// ── Queries ─────────────────────────────────────────────────────
const { data: actor, isLoading, error } = useQuery({
queryKey: ["threat-actor", actorId],
queryFn: () => getThreatActor(actorId!),
enabled: !!actorId,
});
const { data: coverage } = useQuery({
queryKey: ["threat-actor-coverage", actorId],
queryFn: () => getThreatActorCoverage(actorId!),
enabled: !!actorId,
});
const { data: gaps } = useQuery({
queryKey: ["threat-actor-gaps", actorId],
queryFn: () => getThreatActorGaps(actorId!),
enabled: !!actorId,
});
// ── Loading / Error ─────────────────────────────────────────────
if (isLoading) {
return (
<div className="flex items-center justify-center py-24">
<Loader2 className="h-8 w-8 animate-spin text-cyan-400" />
</div>
);
}
if (error || !actor) {
return (
<div className="rounded-xl border border-red-500/30 bg-red-900/20 p-6 text-center">
<AlertCircle className="mx-auto h-8 w-8 text-red-400" />
<p className="mt-2 text-sm text-red-400">
{error ? (error as Error)?.message : "Threat actor not found"}
</p>
</div>
);
}
// ── Organise techniques by tactic for heatmap ───────────────────
const techniquesByTactic: Record<string, ThreatActorTechnique[]> = {};
for (const tech of actor.techniques) {
const tactic = tech.tactic || "unknown";
// A technique's tactic field may be comma-separated
const tactics = tactic.split(",").map((t) => t.trim().toLowerCase().replace(/\s+/g, "-"));
for (const t of tactics) {
if (!techniquesByTactic[t]) techniquesByTactic[t] = [];
techniquesByTactic[t].push(tech);
}
}
// Sort tactics by kill chain order
const orderedTactics = TACTICS_ORDER.filter((t) => techniquesByTactic[t]);
const unknownTactics = Object.keys(techniquesByTactic).filter(
(t) => !TACTICS_ORDER.includes(t)
);
const allTactics = [...orderedTactics, ...unknownTactics];
return (
<div className="space-y-6">
{/* Back Button */}
<button
onClick={() => navigate("/threat-actors")}
className="flex items-center gap-1.5 text-sm text-gray-400 hover:text-white transition-colors"
>
<ArrowLeft className="h-4 w-4" />
Back to Threat Actors
</button>
{/* ── SECTION 1: Header ──────────────────────────────────────── */}
<div className="rounded-xl border border-gray-800 bg-gray-900 p-6">
<div className="flex items-start justify-between">
<div>
<div className="flex items-center gap-3">
<Crosshair className="h-7 w-7 text-purple-400" />
<h1 className="text-2xl font-bold text-white">{actor.name}</h1>
{actor.mitre_id && (
<span className="rounded-full border border-purple-500/30 bg-purple-900/50 px-2.5 py-0.5 text-xs font-mono text-purple-400">
{actor.mitre_id}
</span>
)}
</div>
{/* Aliases */}
{actor.aliases && actor.aliases.length > 0 && (
<div className="mt-2 flex flex-wrap gap-1.5">
{actor.aliases.map((alias, i) => (
<span
key={i}
className="rounded-full border border-gray-700 bg-gray-800 px-2 py-0.5 text-xs text-gray-400"
>
{alias}
</span>
))}
</div>
)}
</div>
{/* MITRE link */}
{actor.mitre_url && (
<a
href={actor.mitre_url}
target="_blank"
rel="noreferrer"
className="flex items-center gap-1.5 rounded-lg border border-gray-700 bg-gray-800 px-3 py-2 text-xs text-gray-400 hover:text-white transition-colors"
>
<ExternalLink className="h-3.5 w-3.5" />
MITRE ATT&CK
</a>
)}
</div>
{/* Meta badges */}
<div className="mt-4 flex flex-wrap items-center gap-3">
{actor.country && (
<span className="inline-flex items-center gap-1.5 rounded-full border border-gray-700 bg-gray-800 px-3 py-1 text-xs text-gray-300">
<Globe className="h-3.5 w-3.5 text-gray-500" />
{actor.country}
</span>
)}
{actor.motivation && (
<span className="inline-flex items-center gap-1.5 rounded-full border border-purple-500/30 bg-purple-900/50 px-3 py-1 text-xs text-purple-400">
<Target className="h-3.5 w-3.5" />
{actor.motivation}
</span>
)}
{actor.sophistication && (
<span className="inline-flex items-center gap-1.5 rounded-full border border-cyan-500/30 bg-cyan-900/50 px-3 py-1 text-xs text-cyan-400">
{actor.sophistication}
</span>
)}
{actor.first_seen && (
<span className="text-xs text-gray-500">
First seen: {actor.first_seen}
</span>
)}
{actor.last_seen && (
<span className="text-xs text-gray-500">
Last seen: {actor.last_seen}
</span>
)}
</div>
{/* Target sectors */}
{actor.target_sectors && actor.target_sectors.length > 0 && (
<div className="mt-3 flex items-center gap-2">
<Target className="h-4 w-4 text-gray-600 shrink-0" />
<div className="flex flex-wrap gap-1.5">
{actor.target_sectors.map((s, i) => (
<span
key={i}
className="rounded-full border border-gray-700 bg-gray-800 px-2 py-0.5 text-[11px] text-gray-400"
>
{s}
</span>
))}
</div>
</div>
)}
</div>
{/* ── SECTION 2: Description ─────────────────────────────────── */}
{actor.description && (
<div className="rounded-xl border border-gray-800 bg-gray-900 p-6">
<h2 className="mb-3 text-sm font-semibold uppercase tracking-wider text-gray-500">
Description
</h2>
<p className="text-sm leading-relaxed text-gray-300 whitespace-pre-line">
{actor.description}
</p>
</div>
)}
{/* ── SECTION 3: Coverage Overview ───────────────────────────── */}
{coverage && (
<div className="rounded-xl border border-gray-800 bg-gray-900 p-6">
<h2 className="mb-4 text-sm font-semibold uppercase tracking-wider text-gray-500">
Coverage Overview
</h2>
<div className="grid gap-4 sm:grid-cols-4">
<div className="rounded-lg border border-gray-700 bg-gray-800/50 p-4 text-center">
<p className="text-3xl font-bold text-white">{coverage.total_techniques}</p>
<p className="text-xs text-gray-400">Total Techniques</p>
</div>
<div className="rounded-lg border border-gray-700 bg-gray-800/50 p-4 text-center">
<p className="text-3xl font-bold text-green-400">{coverage.covered}</p>
<p className="text-xs text-gray-400">Covered</p>
</div>
<div className="rounded-lg border border-gray-700 bg-gray-800/50 p-4 text-center">
<p className="text-3xl font-bold text-red-400">
{coverage.total_techniques - coverage.covered}
</p>
<p className="text-xs text-gray-400">Gaps</p>
</div>
<div className="rounded-lg border border-gray-700 bg-gray-800/50 p-4 text-center">
<p className={`text-3xl font-bold ${
coverage.coverage_pct >= 80 ? "text-green-400" :
coverage.coverage_pct >= 50 ? "text-yellow-400" : "text-red-400"
}`}>
{coverage.coverage_pct}%
</p>
<p className="text-xs text-gray-400">Coverage</p>
</div>
</div>
{/* Breakdown bar */}
{coverage.total_techniques > 0 && (
<div className="mt-4">
<div className="flex h-3 overflow-hidden rounded-full bg-gray-800">
{coverage.breakdown.validated && (
<div
className="bg-green-500 transition-all"
style={{
width: `${(coverage.breakdown.validated / coverage.total_techniques) * 100}%`,
}}
title={`Validated: ${coverage.breakdown.validated}`}
/>
)}
{coverage.breakdown.partial && (
<div
className="bg-yellow-500 transition-all"
style={{
width: `${(coverage.breakdown.partial / coverage.total_techniques) * 100}%`,
}}
title={`Partial: ${coverage.breakdown.partial}`}
/>
)}
{coverage.breakdown.in_progress && (
<div
className="bg-blue-500 transition-all"
style={{
width: `${(coverage.breakdown.in_progress / coverage.total_techniques) * 100}%`,
}}
title={`In Progress: ${coverage.breakdown.in_progress}`}
/>
)}
{coverage.breakdown.not_covered && (
<div
className="bg-red-500/70 transition-all"
style={{
width: `${(coverage.breakdown.not_covered / coverage.total_techniques) * 100}%`,
}}
title={`Not Covered: ${coverage.breakdown.not_covered}`}
/>
)}
</div>
<div className="mt-2 flex flex-wrap gap-4 text-xs text-gray-400">
{Object.entries(coverage.breakdown).map(([status, count]) => (
<span key={status} className="flex items-center gap-1.5">
{statusIcon(status)}
{statusLabel(status)}: {count}
</span>
))}
</div>
</div>
)}
</div>
)}
{/* ── SECTION 4: Technique Heatmap ───────────────────────────── */}
{allTactics.length > 0 && (
<div className="rounded-xl border border-gray-800 bg-gray-900 p-6">
<h2 className="mb-4 text-sm font-semibold uppercase tracking-wider text-gray-500">
Technique Heatmap
</h2>
{/* Legend */}
<div className="mb-4 flex flex-wrap gap-3 text-xs">
<span className="flex items-center gap-1.5">
<span className="h-3 w-3 rounded bg-green-500/80" /> Validated
</span>
<span className="flex items-center gap-1.5">
<span className="h-3 w-3 rounded bg-yellow-500/80" /> Partial
</span>
<span className="flex items-center gap-1.5">
<span className="h-3 w-3 rounded bg-blue-500/60" /> In Progress
</span>
<span className="flex items-center gap-1.5">
<span className="h-3 w-3 rounded bg-red-500/70" /> Not Covered
</span>
<span className="flex items-center gap-1.5">
<span className="h-3 w-3 rounded bg-gray-700/50" /> Not Evaluated
</span>
</div>
{/* Heatmap grid — one column per tactic */}
<div className="overflow-x-auto">
<div className="inline-flex gap-2 min-w-max">
{allTactics.map((tactic) => {
const techs = techniquesByTactic[tactic] || [];
return (
<div key={tactic} className="flex flex-col gap-1" style={{ minWidth: 120 }}>
{/* Tactic header */}
<div className="rounded bg-gray-800 px-2 py-1.5 text-center">
<span className="text-[10px] font-semibold uppercase tracking-wide text-gray-400">
{tactic.replace(/-/g, " ")}
</span>
<span className="ml-1 text-[10px] text-gray-600">({techs.length})</span>
</div>
{/* Technique cells */}
{techs.map((tech) => (
<button
key={tech.technique_id}
onClick={() => navigate(`/techniques/${tech.mitre_id}`)}
className={`rounded border p-1.5 text-left transition-all hover:opacity-80 ${statusCellColor(tech.status_global)}`}
title={`${tech.mitre_id}: ${tech.name} (${statusLabel(tech.status_global)})`}
>
<span className="block truncate text-[10px] font-mono text-white/90">
{tech.mitre_id}
</span>
<span className="block truncate text-[9px] text-white/60">
{tech.name}
</span>
</button>
))}
</div>
);
})}
</div>
</div>
</div>
)}
{/* ── SECTION 5: Gap Analysis ────────────────────────────────── */}
{gaps && gaps.gaps.length > 0 && (
<div className="rounded-xl border border-gray-800 bg-gray-900 p-6">
<h2 className="mb-4 flex items-center gap-2 text-sm font-semibold uppercase tracking-wider text-gray-500">
<AlertTriangle className="h-4 w-4 text-orange-400" />
Coverage Gap Analysis ({gaps.total_gaps} gaps)
</h2>
<div className="overflow-x-auto">
<table className="w-full text-left text-sm">
<thead>
<tr className="border-b border-gray-800">
<th className="pb-3 pr-4 font-medium text-gray-400">Technique</th>
<th className="pb-3 px-4 font-medium text-gray-400">Tactic</th>
<th className="pb-3 px-4 font-medium text-gray-400">Status</th>
<th className="pb-3 px-4 font-medium text-gray-400">Templates</th>
<th className="pb-3 px-4 font-medium text-gray-400">Tests</th>
</tr>
</thead>
<tbody>
{gaps.gaps.map((gap: GapItem) => (
<tr
key={gap.technique_id}
className="border-b border-gray-800/50 hover:bg-gray-800/30 transition-colors cursor-pointer"
onClick={() => navigate(`/techniques/${gap.mitre_id}`)}
>
<td className="py-2.5 pr-4">
<div className="flex items-center gap-2">
<span className="font-mono text-xs text-cyan-400">{gap.mitre_id}</span>
<span className="truncate text-gray-300 text-xs max-w-[200px]">
{gap.name}
</span>
</div>
</td>
<td className="py-2.5 px-4 text-xs text-gray-400">
{gap.tactic || "-"}
</td>
<td className="py-2.5 px-4">
<span className="flex items-center gap-1.5 text-xs">
{statusIcon(gap.status_global)}
{statusLabel(gap.status_global)}
</span>
</td>
<td className="py-2.5 px-4">
{gap.has_templates ? (
<span className="inline-flex items-center gap-1 text-xs text-green-400">
<BookOpen className="h-3.5 w-3.5" />
{gap.available_templates}
</span>
) : (
<span className="text-xs text-gray-600">0</span>
)}
</td>
<td className="py-2.5 px-4">
{gap.existing_tests > 0 ? (
<span className="inline-flex items-center gap-1 text-xs text-blue-400">
<FlaskConical className="h-3.5 w-3.5" />
{gap.existing_tests}
</span>
) : (
<span className="text-xs text-gray-600">0</span>
)}
</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
)}
{/* ── SECTION 6: All Techniques List ─────────────────────────── */}
{actor.techniques.length > 0 && (
<div className="rounded-xl border border-gray-800 bg-gray-900 p-6">
<h2 className="mb-4 text-sm font-semibold uppercase tracking-wider text-gray-500">
All Techniques ({actor.techniques.length})
</h2>
<div className="overflow-x-auto">
<table className="w-full text-left text-sm">
<thead>
<tr className="border-b border-gray-800">
<th className="pb-3 pr-4 font-medium text-gray-400">ID</th>
<th className="pb-3 px-4 font-medium text-gray-400">Name</th>
<th className="pb-3 px-4 font-medium text-gray-400">Tactic</th>
<th className="pb-3 px-4 font-medium text-gray-400">Status</th>
</tr>
</thead>
<tbody>
{actor.techniques.map((tech: ThreatActorTechnique) => (
<tr
key={tech.technique_id}
className="border-b border-gray-800/50 hover:bg-gray-800/30 transition-colors cursor-pointer"
onClick={() => navigate(`/techniques/${tech.mitre_id}`)}
>
<td className="py-2.5 pr-4 font-mono text-xs text-cyan-400">
{tech.mitre_id}
</td>
<td className="py-2.5 px-4 text-xs text-gray-300 truncate max-w-[250px]">
{tech.name}
</td>
<td className="py-2.5 px-4 text-xs text-gray-400">
{tech.tactic || "-"}
</td>
<td className="py-2.5 px-4">
<span className="flex items-center gap-1.5 text-xs">
{statusIcon(tech.status_global)}
{statusLabel(tech.status_global)}
</span>
</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
)}
{/* ── References ─────────────────────────────────────────────── */}
{actor.references && actor.references.length > 0 && (
<div className="rounded-xl border border-gray-800 bg-gray-900 p-6">
<h2 className="mb-3 text-sm font-semibold uppercase tracking-wider text-gray-500">
References
</h2>
<ul className="space-y-1.5">
{actor.references.map((ref, i) => (
<li key={i} className="text-xs">
{ref.url ? (
<a
href={ref.url}
target="_blank"
rel="noreferrer"
className="text-cyan-400 hover:text-cyan-300 hover:underline"
>
{ref.source || ref.url}
</a>
) : (
<span className="text-gray-400">{ref.source}</span>
)}
{ref.description && (
<span className="ml-2 text-gray-500">{ref.description}</span>
)}
</li>
))}
</ul>
</div>
)}
</div>
);
}
+246
View File
@@ -0,0 +1,246 @@
import { useState } from "react";
import { useQuery } from "@tanstack/react-query";
import { useNavigate } from "react-router-dom";
import {
Loader2,
AlertCircle,
Search,
Users,
Shield,
ChevronLeft,
ChevronRight,
Globe,
Target,
Crosshair,
} from "lucide-react";
import {
getThreatActors,
type ThreatActorSummary,
type ListThreatActorsParams,
} from "../api/threat-actors";
/** Coverage colour based on percentage. */
function coverageColor(pct: number) {
if (pct >= 80) return "text-green-400";
if (pct >= 50) return "text-yellow-400";
if (pct >= 20) return "text-orange-400";
return "text-red-400";
}
function coverageBg(pct: number) {
if (pct >= 80) return "bg-green-500";
if (pct >= 50) return "bg-yellow-500";
if (pct >= 20) return "bg-orange-500";
return "bg-red-500";
}
/** Motivation badge colour. */
function motivationColor(m: string | null) {
switch (m?.toLowerCase()) {
case "espionage":
return "border-purple-500/30 bg-purple-900/50 text-purple-400";
case "financial":
return "border-yellow-500/30 bg-yellow-900/50 text-yellow-400";
case "destruction":
return "border-red-500/30 bg-red-900/50 text-red-400";
case "hacktivism":
return "border-cyan-500/30 bg-cyan-900/50 text-cyan-400";
default:
return "border-gray-600/30 bg-gray-800/50 text-gray-400";
}
}
export default function ThreatActorsPage() {
const navigate = useNavigate();
const [search, setSearch] = useState("");
const [motivation, setMotivation] = useState("");
const [page, setPage] = useState(0);
const limit = 24;
const params: ListThreatActorsParams = {
offset: page * limit,
limit,
...(search ? { search } : {}),
...(motivation ? { motivation } : {}),
};
const { data, isLoading, error } = useQuery({
queryKey: ["threat-actors", params],
queryFn: () => getThreatActors(params),
});
const totalPages = data ? Math.ceil(data.total / limit) : 0;
return (
<div className="space-y-6">
{/* Header */}
<div>
<h1 className="text-2xl font-bold text-white flex items-center gap-2">
<Users className="h-7 w-7 text-purple-400" />
Threat Actors
</h1>
<p className="mt-1 text-sm text-gray-400">
APT groups and threat actor profiles from MITRE ATT&CK with coverage analysis
</p>
</div>
{/* Filters */}
<div className="flex flex-wrap items-center gap-3">
{/* Search */}
<div className="relative flex-1 min-w-[200px] max-w-md">
<Search className="absolute left-3 top-1/2 h-4 w-4 -translate-y-1/2 text-gray-500" />
<input
type="text"
placeholder="Search actors, aliases..."
value={search}
onChange={(e) => { setSearch(e.target.value); setPage(0); }}
className="w-full rounded-lg border border-gray-700 bg-gray-800 py-2 pl-10 pr-4 text-sm text-gray-300 placeholder-gray-500 focus:border-cyan-500 focus:outline-none"
/>
</div>
{/* Motivation filter */}
<select
value={motivation}
onChange={(e) => { setMotivation(e.target.value); setPage(0); }}
className="rounded-lg border border-gray-700 bg-gray-800 px-3 py-2 text-sm text-gray-300 focus:border-cyan-500 focus:outline-none"
>
<option value="">All Motivations</option>
<option value="espionage">Espionage</option>
<option value="financial">Financial</option>
<option value="destruction">Destruction</option>
<option value="hacktivism">Hacktivism</option>
</select>
</div>
{/* Loading */}
{isLoading && (
<div className="flex items-center justify-center py-16">
<Loader2 className="h-8 w-8 animate-spin text-cyan-400" />
</div>
)}
{/* Error */}
{error && (
<div className="rounded-xl border border-red-500/30 bg-red-900/20 p-6 text-center">
<AlertCircle className="mx-auto h-8 w-8 text-red-400" />
<p className="mt-2 text-sm text-red-400">
Failed to load threat actors: {(error as Error)?.message}
</p>
</div>
)}
{/* Grid */}
{data && data.items.length > 0 && (
<>
<div className="grid gap-4 sm:grid-cols-2 lg:grid-cols-3 xl:grid-cols-4">
{data.items.map((actor: ThreatActorSummary) => (
<button
key={actor.id}
onClick={() => navigate(`/threat-actors/${actor.id}`)}
className="group rounded-xl border border-gray-800 bg-gray-900 p-5 text-left transition-all hover:border-purple-500/40 hover:bg-gray-900/80"
>
{/* Name + ID */}
<div className="flex items-start justify-between">
<div className="min-w-0">
<h3 className="truncate text-base font-semibold text-gray-200 group-hover:text-white">
{actor.name}
</h3>
{actor.mitre_id && (
<span className="text-xs font-mono text-purple-400">{actor.mitre_id}</span>
)}
</div>
<Crosshair className="h-5 w-5 shrink-0 text-gray-600 group-hover:text-purple-400 transition-colors" />
</div>
{/* Country + Motivation */}
<div className="mt-3 flex flex-wrap items-center gap-2">
{actor.country && (
<span className="inline-flex items-center gap-1 rounded-full border border-gray-700 bg-gray-800 px-2 py-0.5 text-[11px] text-gray-400">
<Globe className="h-3 w-3" />
{actor.country}
</span>
)}
{actor.motivation && (
<span className={`inline-flex rounded-full border px-2 py-0.5 text-[11px] font-medium ${motivationColor(actor.motivation)}`}>
{actor.motivation}
</span>
)}
</div>
{/* Sectors */}
{actor.target_sectors && actor.target_sectors.length > 0 && (
<div className="mt-2 flex items-center gap-1.5">
<Target className="h-3 w-3 text-gray-600 shrink-0" />
<span className="truncate text-[11px] text-gray-500">
{actor.target_sectors.slice(0, 3).join(", ")}
{actor.target_sectors.length > 3 && ` +${actor.target_sectors.length - 3}`}
</span>
</div>
)}
{/* Stats */}
<div className="mt-4 flex items-center justify-between border-t border-gray-800 pt-3">
<div className="flex items-center gap-1.5 text-xs text-gray-400">
<Shield className="h-3.5 w-3.5" />
{actor.technique_count} techniques
</div>
<div className="flex items-center gap-2">
<div className="h-1.5 w-16 overflow-hidden rounded-full bg-gray-800">
<div
className={`h-full rounded-full ${coverageBg(actor.coverage_pct)}`}
style={{ width: `${Math.min(actor.coverage_pct, 100)}%` }}
/>
</div>
<span className={`text-xs font-medium ${coverageColor(actor.coverage_pct)}`}>
{actor.coverage_pct}%
</span>
</div>
</div>
</button>
))}
</div>
{/* Pagination */}
{totalPages > 1 && (
<div className="flex items-center justify-between">
<span className="text-sm text-gray-400">
Showing {page * limit + 1}{Math.min((page + 1) * limit, data.total)} of{" "}
{data.total}
</span>
<div className="flex items-center gap-2">
<button
onClick={() => setPage(Math.max(0, page - 1))}
disabled={page === 0}
className="rounded-lg border border-gray-700 bg-gray-800 p-2 text-gray-400 hover:text-white disabled:opacity-40"
>
<ChevronLeft className="h-4 w-4" />
</button>
<span className="text-sm text-gray-400">
Page {page + 1} of {totalPages}
</span>
<button
onClick={() => setPage(Math.min(totalPages - 1, page + 1))}
disabled={page >= totalPages - 1}
className="rounded-lg border border-gray-700 bg-gray-800 p-2 text-gray-400 hover:text-white disabled:opacity-40"
>
<ChevronRight className="h-4 w-4" />
</button>
</div>
</div>
)}
</>
)}
{/* Empty */}
{data && data.items.length === 0 && (
<div className="rounded-xl border border-gray-800 bg-gray-900 p-12 text-center">
<Users className="mx-auto h-12 w-12 text-gray-600" />
<h3 className="mt-4 text-lg font-medium text-gray-300">No Threat Actors Found</h3>
<p className="mt-1 text-sm text-gray-500">
Import threat actors from MITRE CTI via the Data Sources panel.
</p>
</div>
)}
</div>
);
}