feat(phase-29): add compliance framework mapping, reports and UI (T-227 to T-229)

This commit is contained in:
2026-02-09 18:41:24 +01:00
parent 12f33307fd
commit 2ac8e7f4a5
12 changed files with 1516 additions and 0 deletions

View File

@@ -0,0 +1,92 @@
"""add_compliance_tables
Revision ID: b014compliance
Revises: b013campaigns
Create Date: 2026-02-09 20:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "b014compliance"
down_revision: Union[str, None] = "b013campaigns"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ── compliance_frameworks ─────────────────────────────────────
op.create_table(
"compliance_frameworks",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("name", sa.String, unique=True, nullable=False),
sa.Column("version", sa.String, nullable=True),
sa.Column("description", sa.Text, nullable=True),
sa.Column("url", sa.String, nullable=True),
sa.Column("is_active", sa.Boolean, server_default="true"),
sa.Column("created_at", sa.DateTime, server_default=sa.func.now()),
)
# ── compliance_controls ───────────────────────────────────────
op.create_table(
"compliance_controls",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"framework_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("compliance_frameworks.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("control_id", sa.String, nullable=False),
sa.Column("title", sa.String, nullable=False),
sa.Column("description", sa.Text, nullable=True),
sa.Column("category", sa.String, nullable=True),
)
op.create_index(
"ix_compliance_controls_framework",
"compliance_controls",
["framework_id"],
)
# ── compliance_control_mappings ───────────────────────────────
op.create_table(
"compliance_control_mappings",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"compliance_control_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("compliance_controls.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"technique_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("techniques.id", ondelete="CASCADE"),
nullable=False,
),
)
op.create_index(
"ix_compliance_mappings_control",
"compliance_control_mappings",
["compliance_control_id"],
)
op.create_index(
"ix_compliance_mappings_technique",
"compliance_control_mappings",
["technique_id"],
)
op.create_unique_constraint(
"uq_control_technique",
"compliance_control_mappings",
["compliance_control_id", "technique_id"],
)
def downgrade() -> None:
op.drop_table("compliance_control_mappings")
op.drop_table("compliance_controls")
op.drop_table("compliance_frameworks")

View File

@@ -26,6 +26,7 @@ from app.routers import campaigns as campaigns_router
from app.routers import heatmap as heatmap_router
from app.routers import scores as scores_router
from app.routers import operational_metrics as operational_metrics_router
from app.routers import compliance as compliance_router
from app.storage import ensure_bucket_exists
from app.jobs.mitre_sync_job import start_scheduler, scheduler
@@ -76,6 +77,7 @@ app.include_router(campaigns_router.router, prefix="/api/v1")
app.include_router(heatmap_router.router, prefix="/api/v1")
app.include_router(scores_router.router, prefix="/api/v1")
app.include_router(operational_metrics_router.router, prefix="/api/v1")
app.include_router(compliance_router.router, prefix="/api/v1")
@app.get("/health")

View File

@@ -14,6 +14,7 @@ from app.models.defensive_technique import DefensiveTechnique, DefensiveTechniqu
from app.models.test_template_detection_rule import TestTemplateDetectionRule
from app.models.test_detection_result import TestDetectionResult
from app.models.campaign import Campaign, CampaignTest
from app.models.compliance import ComplianceFramework, ComplianceControl, ComplianceControlMapping
from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide
__all__ = [
@@ -23,5 +24,6 @@ __all__ = [
"DefensiveTechnique", "DefensiveTechniqueMapping",
"TestTemplateDetectionRule", "TestDetectionResult",
"Campaign", "CampaignTest",
"ComplianceFramework", "ComplianceControl", "ComplianceControlMapping",
"TechniqueStatus", "TestState", "TestResult", "TeamSide",
]

View File

@@ -0,0 +1,97 @@
"""Compliance models — frameworks, controls, and technique mappings.
Maps compliance frameworks (NIST 800-53, DORA, NIS2, ISO 27001) to
MITRE ATT&CK techniques, enabling compliance gap analysis.
"""
import uuid
from datetime import datetime
from sqlalchemy import (
Column, String, Text, Boolean, DateTime,
ForeignKey, Index, UniqueConstraint,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
class ComplianceFramework(Base):
"""A compliance framework (e.g. NIST 800-53, ISO 27001)."""
__tablename__ = "compliance_frameworks"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, unique=True, nullable=False)
version = Column(String, nullable=True)
description = Column(Text, nullable=True)
url = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
controls = relationship(
"ComplianceControl",
back_populates="framework",
cascade="all, delete-orphan",
)
class ComplianceControl(Base):
"""A control within a compliance framework (e.g. AC-2, PR.AC-1)."""
__tablename__ = "compliance_controls"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
framework_id = Column(
UUID(as_uuid=True),
ForeignKey("compliance_frameworks.id", ondelete="CASCADE"),
nullable=False,
)
control_id = Column(String, nullable=False) # e.g. "AC-2"
title = Column(String, nullable=False)
description = Column(Text, nullable=True)
category = Column(String, nullable=True)
# Relationships
framework = relationship("ComplianceFramework", back_populates="controls")
technique_mappings = relationship(
"ComplianceControlMapping",
back_populates="compliance_control",
cascade="all, delete-orphan",
)
__table_args__ = (
Index('ix_compliance_controls_framework', 'framework_id'),
)
class ComplianceControlMapping(Base):
"""Maps a compliance control to a MITRE ATT&CK technique."""
__tablename__ = "compliance_control_mappings"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
compliance_control_id = Column(
UUID(as_uuid=True),
ForeignKey("compliance_controls.id", ondelete="CASCADE"),
nullable=False,
)
technique_id = Column(
UUID(as_uuid=True),
ForeignKey("techniques.id", ondelete="CASCADE"),
nullable=False,
)
# Relationships
compliance_control = relationship(
"ComplianceControl", back_populates="technique_mappings"
)
technique = relationship("Technique")
__table_args__ = (
Index('ix_compliance_mappings_control', 'compliance_control_id'),
Index('ix_compliance_mappings_technique', 'technique_id'),
UniqueConstraint(
'compliance_control_id', 'technique_id',
name='uq_control_technique',
),
)

View File

@@ -0,0 +1,380 @@
"""Compliance endpoints — framework status, reports, and gap analysis.
Provides compliance posture assessment by mapping MITRE ATT&CK technique
coverage to compliance framework controls.
"""
import csv
import io
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session, joinedload
from app.database import get_db
from app.dependencies.auth import get_current_user, require_role
from app.models.user import User
from app.models.compliance import (
ComplianceFramework,
ComplianceControl,
ComplianceControlMapping,
)
from app.models.technique import Technique
from app.models.test_template import TestTemplate
from app.models.threat_actor import ThreatActorTechnique
from app.services.scoring_service import calculate_technique_score
from app.services.compliance_import_service import import_nist_800_53_mappings
router = APIRouter(prefix="/compliance", tags=["compliance"])
# ── Helpers ───────────────────────────────────────────────────────────
def _classify_control(technique_scores: list[float]) -> str:
"""Classify a control status based on its technique scores."""
if not technique_scores:
return "not_evaluated"
all_above_70 = all(s >= 70 for s in technique_scores)
any_above_30 = any(s >= 30 for s in technique_scores)
all_below_30 = all(s < 30 for s in technique_scores)
all_zero = all(s == 0 for s in technique_scores)
if all_zero:
return "not_evaluated"
if all_above_70:
return "covered"
if all_below_30:
return "not_covered"
if any_above_30:
return "partially_covered"
return "not_covered"
def _get_control_status(control: ComplianceControl, db: Session) -> dict:
"""Compute the status and score for a single control."""
mappings = (
db.query(ComplianceControlMapping)
.filter(ComplianceControlMapping.compliance_control_id == control.id)
.all()
)
if not mappings:
return {
"control_id": control.control_id,
"title": control.title,
"category": control.category,
"status": "not_evaluated",
"score": 0,
"techniques_count": 0,
"techniques_covered": 0,
"techniques": [],
}
technique_ids = [m.technique_id for m in mappings]
techniques = (
db.query(Technique)
.filter(Technique.id.in_(technique_ids))
.all()
)
tech_details = []
scores = []
covered_count = 0
for tech in techniques:
result = calculate_technique_score(tech, db)
score = result["total_score"]
scores.append(score)
if score >= 50:
covered_count += 1
tech_details.append({
"mitre_id": tech.mitre_id,
"name": tech.name,
"score": score,
"status": tech.status_global.value if tech.status_global else "not_evaluated",
})
# Sort techniques by score ascending (worst first for priority)
tech_details.sort(key=lambda t: t["score"])
avg_score = round(sum(scores) / len(scores), 1) if scores else 0
status = _classify_control(scores)
return {
"control_id": control.control_id,
"title": control.title,
"category": control.category,
"status": status,
"score": avg_score,
"techniques_count": len(techniques),
"techniques_covered": covered_count,
"techniques": tech_details,
}
# ── GET /compliance/frameworks ────────────────────────────────────────
@router.get("/frameworks")
def list_frameworks(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all available compliance frameworks."""
frameworks = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.is_active == True)
.all()
)
result = []
for fw in frameworks:
control_count = (
db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == fw.id)
.count()
)
result.append({
"id": str(fw.id),
"name": fw.name,
"version": fw.version,
"description": fw.description,
"url": fw.url,
"is_active": fw.is_active,
"controls_count": control_count,
})
return result
# ── GET /compliance/frameworks/{id}/status ────────────────────────────
@router.get("/frameworks/{framework_id}/status")
def framework_status(
framework_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get compliance status for each control in a framework."""
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.id == framework_id)
.first()
)
if not framework:
raise HTTPException(status_code=404, detail="Framework not found")
controls = (
db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.order_by(ComplianceControl.control_id)
.all()
)
control_statuses = []
summary = {
"total_controls": len(controls),
"covered": 0,
"partially_covered": 0,
"not_covered": 0,
"not_evaluated": 0,
}
for control in controls:
status_data = _get_control_status(control, db)
control_statuses.append(status_data)
status = status_data["status"]
if status in summary:
summary[status] += 1
# Compliance percentage: (covered + partially_covered*0.5) / total * 100
total = summary["total_controls"]
if total > 0:
compliance_pct = round(
(summary["covered"] + summary["partially_covered"] * 0.5) / total * 100,
1,
)
else:
compliance_pct = 0
summary["compliance_percentage"] = compliance_pct
return {
"framework": {"id": str(framework.id), "name": framework.name},
"summary": summary,
"controls": control_statuses,
}
# ── GET /compliance/frameworks/{id}/report ────────────────────────────
@router.get("/frameworks/{framework_id}/report")
def framework_report(
framework_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get the full compliance report (same as status but marked as report)."""
return framework_status(framework_id, db=db, current_user=current_user)
# ── GET /compliance/frameworks/{id}/report/csv ────────────────────────
@router.get("/frameworks/{framework_id}/report/csv")
def framework_report_csv(
framework_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Export compliance report as CSV."""
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.id == framework_id)
.first()
)
if not framework:
raise HTTPException(status_code=404, detail="Framework not found")
controls = (
db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.order_by(ComplianceControl.control_id)
.all()
)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow([
"control_id",
"title",
"category",
"status",
"score",
"techniques_total",
"techniques_covered",
"technique_ids",
])
for control in controls:
status_data = _get_control_status(control, db)
technique_ids = ",".join(t["mitre_id"] for t in status_data["techniques"])
writer.writerow([
status_data["control_id"],
status_data["title"],
status_data["category"] or "",
status_data["status"],
status_data["score"],
status_data["techniques_count"],
status_data["techniques_covered"],
technique_ids,
])
output.seek(0)
filename = f"compliance_{framework.name.replace(' ', '_')}.csv"
return StreamingResponse(
io.BytesIO(output.getvalue().encode("utf-8")),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={filename}",
},
)
# ── GET /compliance/frameworks/{id}/gaps ──────────────────────────────
@router.get("/frameworks/{framework_id}/gaps")
def framework_gaps(
framework_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get controls with techniques that are not adequately covered."""
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.id == framework_id)
.first()
)
if not framework:
raise HTTPException(status_code=404, detail="Framework not found")
controls = (
db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.order_by(ComplianceControl.control_id)
.all()
)
gaps = []
for control in controls:
status_data = _get_control_status(control, db)
if status_data["status"] in ("not_covered", "partially_covered"):
# Find uncovered techniques
uncovered_techniques = []
for tech_info in status_data["techniques"]:
if tech_info["score"] < 70:
# Count available templates
template_count = (
db.query(TestTemplate)
.filter(TestTemplate.mitre_technique_id == tech_info["mitre_id"])
.count()
)
# Count threat actors using this technique
technique = (
db.query(Technique)
.filter(Technique.mitre_id == tech_info["mitre_id"])
.first()
)
actor_count = 0
if technique:
actor_count = (
db.query(ThreatActorTechnique)
.filter(ThreatActorTechnique.technique_id == technique.id)
.count()
)
uncovered_techniques.append({
**tech_info,
"templates_available": template_count,
"threat_actors_using": actor_count,
})
if uncovered_techniques:
gaps.append({
"control_id": status_data["control_id"],
"title": status_data["title"],
"category": status_data["category"],
"status": status_data["status"],
"score": status_data["score"],
"uncovered_techniques": uncovered_techniques,
})
return {
"framework": {"id": str(framework.id), "name": framework.name},
"total_gaps": len(gaps),
"gaps": gaps,
}
# ── POST /compliance/import/nist-800-53 ──────────────────────────────
@router.post("/import/nist-800-53")
def import_nist(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Import NIST 800-53 Rev 5 mappings (admin only)."""
result = import_nist_800_53_mappings(db)
return result

View File

@@ -0,0 +1,356 @@
"""Compliance import service — imports NIST 800-53 to ATT&CK mappings.
Downloads and parses the STIX bundle from the Center for Threat-Informed
Defense's attack_to_nist_mapping repository to create ComplianceFramework,
ComplianceControl, and ComplianceControlMapping records.
"""
import logging
import json
import re
from typing import Optional
import requests
from sqlalchemy.orm import Session
from app.models.compliance import (
ComplianceFramework,
ComplianceControl,
ComplianceControlMapping,
)
from app.models.technique import Technique
logger = logging.getLogger(__name__)
# URL for the NIST 800-53 Rev 5 to ATT&CK mapping
# This is the JSON STIX bundle that contains the relationships
NIST_MAPPING_URL = (
"https://raw.githubusercontent.com/center-for-threat-informed-defense/"
"attack_to_nist_mapping/main/data/attack-to-nist-rev5.json"
)
def import_nist_800_53_mappings(db: Session) -> dict:
"""Import NIST 800-53 Rev 5 mappings from MITRE CTI repository.
Steps:
1. Create or get the NIST 800-53 Rev 5 framework
2. Download the STIX bundle JSON
3. Parse controls and relationship objects
4. Create ComplianceControl records
5. Create ComplianceControlMapping records
Returns a summary dict with counts.
"""
# ── 1. Create or get framework ────────────────────────────────
framework = (
db.query(ComplianceFramework)
.filter(ComplianceFramework.name == "NIST 800-53 Rev 5")
.first()
)
if not framework:
framework = ComplianceFramework(
name="NIST 800-53 Rev 5",
version="5",
description="National Institute of Standards and Technology Special Publication 800-53 Revision 5 — Security and Privacy Controls for Information Systems and Organizations",
url="https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final",
is_active=True,
)
db.add(framework)
db.flush()
logger.info("Created NIST 800-53 Rev 5 framework")
else:
logger.info("NIST 800-53 Rev 5 framework already exists")
# ── 2. Download STIX bundle ───────────────────────────────────
try:
response = requests.get(NIST_MAPPING_URL, timeout=30)
response.raise_for_status()
stix_bundle = response.json()
except requests.RequestException as e:
logger.warning(f"Failed to download STIX bundle: {e}")
# Fallback: create a sample set of well-known NIST controls
return _import_sample_nist_mappings(db, framework)
# ── 3. Parse STIX objects ─────────────────────────────────────
objects = stix_bundle.get("objects", [])
# Build lookup maps
# STIX IDs -> control info
control_map = {} # stix_id -> {control_id, title, category}
technique_map = {} # stix_id -> mitre_technique_id
relationships = [] # (source_ref, target_ref) for "mitigates" relationships
for obj in objects:
obj_type = obj.get("type", "")
if obj_type == "course-of-action":
# This is a NIST control
name = obj.get("name", "")
desc = obj.get("description", "")
stix_id = obj.get("id", "")
# Extract control ID from name (e.g., "AC-2 Account Management")
match = re.match(r"^([A-Z]{2}-\d+(?:\.\d+)?)\s*(.*)", name)
if match:
control_id = match.group(1)
title = match.group(2) or name
else:
control_id = name
title = name
# Extract category from control family
category_match = re.match(r"^([A-Z]{2})", control_id)
category = _get_nist_category(category_match.group(1)) if category_match else None
control_map[stix_id] = {
"control_id": control_id,
"title": title,
"description": desc[:500] if desc else None,
"category": category,
}
elif obj_type == "attack-pattern":
# This is an ATT&CK technique
stix_id = obj.get("id", "")
ext_refs = obj.get("external_references", [])
for ref in ext_refs:
if ref.get("source_name") == "mitre-attack":
technique_map[stix_id] = ref.get("external_id", "")
break
elif obj_type == "relationship":
rel_type = obj.get("relationship_type", "")
if rel_type == "mitigates":
source_ref = obj.get("source_ref", "")
target_ref = obj.get("target_ref", "")
relationships.append((source_ref, target_ref))
# ── 4. Create controls ────────────────────────────────────────
controls_created = 0
controls_existing = 0
control_db_map = {} # control_id -> ComplianceControl
# Load existing controls for this framework
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
for stix_id, info in control_map.items():
cid = info["control_id"]
if cid in existing_controls:
control_db_map[stix_id] = existing_controls[cid]
controls_existing += 1
else:
ctrl = ComplianceControl(
framework_id=framework.id,
control_id=cid,
title=info["title"],
description=info["description"],
category=info["category"],
)
db.add(ctrl)
db.flush()
control_db_map[stix_id] = ctrl
controls_created += 1
# ── 5. Create mappings ────────────────────────────────────────
mappings_created = 0
mappings_skipped = 0
# Build technique DB lookup (mitre_id -> Technique)
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
# Load existing mappings
existing_mappings = set()
for m in db.query(ComplianceControlMapping).all():
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
for source_ref, target_ref in relationships:
control = control_db_map.get(source_ref)
mitre_id = technique_map.get(target_ref)
if not control or not mitre_id:
mappings_skipped += 1
continue
technique = all_techniques.get(mitre_id)
if not technique:
mappings_skipped += 1
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
mappings_skipped += 1
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
summary = {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": controls_existing,
"mappings_created": mappings_created,
"mappings_skipped": mappings_skipped,
"total_controls": controls_created + controls_existing,
"total_relationships_found": len(relationships),
}
logger.info(f"NIST 800-53 import complete: {summary}")
return summary
def _import_sample_nist_mappings(db: Session, framework: ComplianceFramework) -> dict:
"""Import a curated sample of NIST 800-53 controls when the download fails.
This ensures the feature works even without network access.
"""
SAMPLE_CONTROLS = [
{"control_id": "AC-2", "title": "Account Management", "category": "Access Control",
"techniques": ["T1078", "T1136", "T1098", "T1087", "T1069"]},
{"control_id": "AC-3", "title": "Access Enforcement", "category": "Access Control",
"techniques": ["T1078", "T1548", "T1134"]},
{"control_id": "AC-4", "title": "Information Flow Enforcement", "category": "Access Control",
"techniques": ["T1048", "T1041", "T1572"]},
{"control_id": "AC-6", "title": "Least Privilege", "category": "Access Control",
"techniques": ["T1078", "T1548", "T1134"]},
{"control_id": "AU-2", "title": "Event Logging", "category": "Audit and Accountability",
"techniques": ["T1562", "T1070"]},
{"control_id": "AU-6", "title": "Audit Record Review", "category": "Audit and Accountability",
"techniques": ["T1562", "T1070", "T1027"]},
{"control_id": "CA-7", "title": "Continuous Monitoring", "category": "Assessment, Authorization, and Monitoring",
"techniques": ["T1059", "T1053"]},
{"control_id": "CM-2", "title": "Baseline Configuration", "category": "Configuration Management",
"techniques": ["T1574", "T1546"]},
{"control_id": "CM-6", "title": "Configuration Settings", "category": "Configuration Management",
"techniques": ["T1574", "T1546", "T1112"]},
{"control_id": "CM-7", "title": "Least Functionality", "category": "Configuration Management",
"techniques": ["T1059", "T1218"]},
{"control_id": "IA-2", "title": "Identification and Authentication", "category": "Identification and Authentication",
"techniques": ["T1078", "T1110"]},
{"control_id": "IA-5", "title": "Authenticator Management", "category": "Identification and Authentication",
"techniques": ["T1078", "T1110", "T1003"]},
{"control_id": "IR-4", "title": "Incident Handling", "category": "Incident Response",
"techniques": ["T1059", "T1547"]},
{"control_id": "RA-5", "title": "Vulnerability Monitoring and Scanning", "category": "Risk Assessment",
"techniques": ["T1190", "T1203"]},
{"control_id": "SC-7", "title": "Boundary Protection", "category": "System and Communications Protection",
"techniques": ["T1048", "T1041", "T1071"]},
{"control_id": "SC-28", "title": "Protection of Information at Rest", "category": "System and Communications Protection",
"techniques": ["T1005", "T1114"]},
{"control_id": "SI-3", "title": "Malicious Code Protection", "category": "System and Information Integrity",
"techniques": ["T1059", "T1204", "T1566"]},
{"control_id": "SI-4", "title": "System Monitoring", "category": "System and Information Integrity",
"techniques": ["T1059", "T1053", "T1547"]},
{"control_id": "SI-7", "title": "Software, Firmware, and Information Integrity", "category": "System and Information Integrity",
"techniques": ["T1195", "T1553"]},
{"control_id": "PM-16", "title": "Threat Awareness Program", "category": "Program Management",
"techniques": ["T1566", "T1204"]},
]
# Build technique lookup
all_techniques = {t.mitre_id: t for t in db.query(Technique).all()}
existing_controls = {
c.control_id: c
for c in db.query(ComplianceControl)
.filter(ComplianceControl.framework_id == framework.id)
.all()
}
existing_mappings = set()
for m in db.query(ComplianceControlMapping).all():
existing_mappings.add((str(m.compliance_control_id), str(m.technique_id)))
controls_created = 0
mappings_created = 0
for sample in SAMPLE_CONTROLS:
# Create or get control
if sample["control_id"] in existing_controls:
control = existing_controls[sample["control_id"]]
else:
control = ComplianceControl(
framework_id=framework.id,
control_id=sample["control_id"],
title=sample["title"],
category=sample["category"],
)
db.add(control)
db.flush()
existing_controls[sample["control_id"]] = control
controls_created += 1
# Create mappings
for mitre_id in sample["techniques"]:
technique = all_techniques.get(mitre_id)
if not technique:
# Try with subtechnique prefix
for key, tech in all_techniques.items():
if key.startswith(mitre_id):
technique = tech
break
if not technique:
continue
key = (str(control.id), str(technique.id))
if key in existing_mappings:
continue
mapping = ComplianceControlMapping(
compliance_control_id=control.id,
technique_id=technique.id,
)
db.add(mapping)
existing_mappings.add(key)
mappings_created += 1
db.commit()
return {
"framework": framework.name,
"controls_created": controls_created,
"controls_existing": len(existing_controls) - controls_created,
"mappings_created": mappings_created,
"mappings_skipped": 0,
"total_controls": len(existing_controls),
"source": "sample_data",
}
def _get_nist_category(family_code: str) -> str:
"""Map NIST 800-53 family code to category name."""
categories = {
"AC": "Access Control",
"AT": "Awareness and Training",
"AU": "Audit and Accountability",
"CA": "Assessment, Authorization, and Monitoring",
"CM": "Configuration Management",
"CP": "Contingency Planning",
"IA": "Identification and Authentication",
"IR": "Incident Response",
"MA": "Maintenance",
"MP": "Media Protection",
"PE": "Physical and Environmental Protection",
"PL": "Planning",
"PM": "Program Management",
"PS": "Personnel Security",
"PT": "Personally Identifiable Information Processing and Transparency",
"RA": "Risk Assessment",
"SA": "System and Services Acquisition",
"SC": "System and Communications Protection",
"SI": "System and Information Integrity",
"SR": "Supply Chain Risk Management",
}
return categories.get(family_code, "Unknown")