feat(phase-23): add Threat Actor profiles with MITRE CTI import, API, heatmap and gap analysis (T-208 to T-212)

This commit is contained in:
2026-02-09 16:27:38 +01:00
parent f4c8cbf768
commit 2fc0e2cafd
12 changed files with 1798 additions and 2 deletions

View File

@@ -39,7 +39,8 @@ def _get_sync_handler(source_name: str):
"gtfobins": ("app.services.lolbas_import_service", "sync_gtfobins"),
"caldera": ("app.services.caldera_import_service", "sync"),
"elastic_rules": ("app.services.elastic_import_service", "sync"),
# d3fend and mitre_cti added in later phases
"mitre_cti": ("app.services.threat_actor_import_service", "sync"),
# d3fend added in later phases
}
if source_name not in handlers:

View File

@@ -0,0 +1,309 @@
"""Threat actor endpoints.
Provides listing, detail, coverage analysis, and gap analysis for
threat actor profiles imported from MITRE CTI.
"""
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, or_
from sqlalchemy.orm import Session, joinedload
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.user import User
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_template import TestTemplate
from app.models.enums import TechniqueStatus
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/threat-actors", tags=["threat-actors"])
# ---------------------------------------------------------------------------
# GET /threat-actors — Listado con filtros
# ---------------------------------------------------------------------------
@router.get("")
def list_threat_actors(
search: Optional[str] = Query(None),
country: Optional[str] = Query(None),
motivation: Optional[str] = Query(None),
sophistication: Optional[str] = Query(None),
target_sectors: Optional[str] = Query(None),
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List threat actors with optional filters and pagination.
**Requires** authentication (any role).
"""
query = db.query(ThreatActor)
# Filters
if search:
pattern = f"%{search}%"
query = query.filter(
or_(
ThreatActor.name.ilike(pattern),
ThreatActor.description.ilike(pattern),
func.cast(ThreatActor.aliases, func.text()).ilike(pattern),
)
)
if country:
query = query.filter(ThreatActor.country == country)
if motivation:
query = query.filter(ThreatActor.motivation == motivation)
if sophistication:
query = query.filter(ThreatActor.sophistication == sophistication)
if target_sectors:
# JSONB contains check
query = query.filter(
func.cast(ThreatActor.target_sectors, func.text()).ilike(f"%{target_sectors}%")
)
# Total count
total = query.count()
# Paginate
actors = query.order_by(ThreatActor.name).offset(offset).limit(limit).all()
# For each actor, count techniques and calculate basic coverage
results = []
for actor in actors:
tech_count = (
db.query(ThreatActorTechnique)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.count()
)
# Quick coverage calculation
covered = (
db.query(ThreatActorTechnique)
.join(Technique, ThreatActorTechnique.technique_id == Technique.id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.filter(Technique.status_global.in_([
TechniqueStatus.validated,
TechniqueStatus.partial,
]))
.count()
)
coverage_pct = round((covered / tech_count * 100), 1) if tech_count > 0 else 0.0
results.append({
"id": str(actor.id),
"mitre_id": actor.mitre_id,
"name": actor.name,
"aliases": actor.aliases or [],
"country": actor.country,
"target_sectors": actor.target_sectors or [],
"target_regions": actor.target_regions or [],
"motivation": actor.motivation,
"sophistication": actor.sophistication,
"mitre_url": actor.mitre_url,
"technique_count": tech_count,
"coverage_pct": coverage_pct,
"is_active": actor.is_active,
})
return {
"total": total,
"offset": offset,
"limit": limit,
"items": results,
}
# ---------------------------------------------------------------------------
# GET /threat-actors/{id} — Detalle
# ---------------------------------------------------------------------------
@router.get("/{actor_id}")
def get_threat_actor(
actor_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get detailed info about a threat actor including techniques.
**Requires** authentication (any role).
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
raise HTTPException(status_code=404, detail="Threat actor not found")
# Get associated techniques with their coverage status
actor_techniques = (
db.query(ThreatActorTechnique, Technique)
.join(Technique, ThreatActorTechnique.technique_id == Technique.id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.order_by(Technique.mitre_id)
.all()
)
techniques_list = []
for at, tech in actor_techniques:
techniques_list.append({
"technique_id": str(tech.id),
"mitre_id": tech.mitre_id,
"name": tech.name,
"tactic": tech.tactic,
"status_global": tech.status_global.value if tech.status_global else None,
"usage_description": at.usage_description,
"first_seen_using": at.first_seen_using,
})
return {
"id": str(actor.id),
"mitre_id": actor.mitre_id,
"name": actor.name,
"aliases": actor.aliases or [],
"description": actor.description,
"country": actor.country,
"target_sectors": actor.target_sectors or [],
"target_regions": actor.target_regions or [],
"motivation": actor.motivation,
"sophistication": actor.sophistication,
"first_seen": actor.first_seen,
"last_seen": actor.last_seen,
"references": actor.references or [],
"mitre_url": actor.mitre_url,
"is_active": actor.is_active,
"techniques": techniques_list,
}
# ---------------------------------------------------------------------------
# GET /threat-actors/{id}/coverage — Cobertura
# ---------------------------------------------------------------------------
@router.get("/{actor_id}/coverage")
def get_threat_actor_coverage(
actor_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Calculate coverage percentage against a specific threat actor.
**Requires** authentication (any role).
Returns the percentage of the actor's techniques that have been
validated or partially validated, along with a breakdown.
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
raise HTTPException(status_code=404, detail="Threat actor not found")
# Get all techniques for this actor
actor_techniques = (
db.query(Technique)
.join(ThreatActorTechnique, ThreatActorTechnique.technique_id == Technique.id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.all()
)
total = len(actor_techniques)
if total == 0:
return {
"actor_id": str(actor.id),
"actor_name": actor.name,
"total_techniques": 0,
"coverage_pct": 0.0,
"breakdown": {},
}
breakdown = {}
for tech in actor_techniques:
status = tech.status_global.value if tech.status_global else "not_evaluated"
breakdown[status] = breakdown.get(status, 0) + 1
covered = breakdown.get("validated", 0) + breakdown.get("partial", 0)
coverage_pct = round((covered / total * 100), 1)
return {
"actor_id": str(actor.id),
"actor_name": actor.name,
"total_techniques": total,
"covered": covered,
"coverage_pct": coverage_pct,
"breakdown": breakdown,
}
# ---------------------------------------------------------------------------
# GET /threat-actors/{id}/gaps — Gap analysis
# ---------------------------------------------------------------------------
@router.get("/{actor_id}/gaps")
def get_threat_actor_gaps(
actor_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Identify techniques of this actor that are NOT fully validated.
**Requires** authentication (any role).
Returns list of gap techniques with available templates.
"""
actor = db.query(ThreatActor).filter(ThreatActor.id == actor_id).first()
if not actor:
raise HTTPException(status_code=404, detail="Threat actor not found")
# Get techniques NOT validated
gap_techniques = (
db.query(Technique, ThreatActorTechnique)
.join(ThreatActorTechnique, ThreatActorTechnique.technique_id == Technique.id)
.filter(ThreatActorTechnique.threat_actor_id == actor.id)
.filter(Technique.status_global != TechniqueStatus.validated)
.order_by(Technique.mitre_id)
.all()
)
gaps = []
for tech, at in gap_techniques:
# Count available templates for this technique
template_count = (
db.query(TestTemplate)
.filter(TestTemplate.mitre_technique_id == tech.mitre_id)
.filter(TestTemplate.is_active == True)
.count()
)
# Count existing tests
test_count = (
db.query(Test)
.filter(Test.technique_id == tech.id)
.count()
)
gaps.append({
"technique_id": str(tech.id),
"mitre_id": tech.mitre_id,
"name": tech.name,
"tactic": tech.tactic,
"status_global": tech.status_global.value if tech.status_global else None,
"usage_description": at.usage_description,
"available_templates": template_count,
"existing_tests": test_count,
"has_templates": template_count > 0,
})
return {
"actor_id": str(actor.id),
"actor_name": actor.name,
"total_gaps": len(gaps),
"gaps": gaps,
}