feat(phase-21): add V3 demo seed, DataSource and DetectionRule models (T-200, T-201, T-202)

This commit is contained in:
2026-02-09 16:06:44 +01:00
parent 29eab4ef77
commit 022c4f2886
7 changed files with 796 additions and 1 deletions

View File

@@ -0,0 +1,48 @@
"""add_data_sources_table
Revision ID: b008datasources
Revises: b007remediation
Create Date: 2026-02-09 14:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
# revision identifiers, used by Alembic.
revision: str = 'b008datasources'
down_revision: Union[str, Sequence[str], None] = 'b007remediation'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create data_sources table."""
op.create_table(
'data_sources',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('name', sa.String(), unique=True, nullable=False),
sa.Column('display_name', sa.String(), nullable=False),
sa.Column('type', sa.String(), nullable=False),
sa.Column('url', sa.String(), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('is_enabled', sa.Boolean(), server_default='true'),
sa.Column('last_sync_at', sa.DateTime(), nullable=True),
sa.Column('last_sync_status', sa.String(), nullable=True),
sa.Column('last_sync_stats', JSONB(), nullable=True),
sa.Column('sync_frequency', sa.String(), nullable=True),
sa.Column('config', JSONB(), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
)
op.create_index('ix_data_sources_type', 'data_sources', ['type'])
op.create_index('ix_data_sources_is_enabled', 'data_sources', ['is_enabled'])
def downgrade() -> None:
"""Drop data_sources table."""
op.drop_index('ix_data_sources_is_enabled', table_name='data_sources')
op.drop_index('ix_data_sources_type', table_name='data_sources')
op.drop_table('data_sources')

View File

@@ -0,0 +1,52 @@
"""add_detection_rules_table
Revision ID: b009detectionrules
Revises: b008datasources
Create Date: 2026-02-09 14:10:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
# revision identifiers, used by Alembic.
revision: str = 'b009detectionrules'
down_revision: Union[str, Sequence[str], None] = 'b008datasources'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create detection_rules table."""
op.create_table(
'detection_rules',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('mitre_technique_id', sa.String(), nullable=False),
sa.Column('title', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('source', sa.String(), nullable=False),
sa.Column('source_id', sa.String(), nullable=True),
sa.Column('source_url', sa.String(), nullable=True),
sa.Column('rule_content', sa.Text(), nullable=False),
sa.Column('rule_format', sa.String(), nullable=False),
sa.Column('severity', sa.String(), nullable=True),
sa.Column('platforms', JSONB(), nullable=True),
sa.Column('log_sources', JSONB(), nullable=True),
sa.Column('false_positive_rate', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), server_default='true'),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
)
op.create_index('ix_detection_rules_mitre_technique_id', 'detection_rules', ['mitre_technique_id'])
op.create_index('ix_detection_rules_source', 'detection_rules', ['source'])
op.create_index('ix_detection_rules_severity', 'detection_rules', ['severity'])
def downgrade() -> None:
"""Drop detection_rules table."""
op.drop_index('ix_detection_rules_severity', table_name='detection_rules')
op.drop_index('ix_detection_rules_source', table_name='detection_rules')
op.drop_index('ix_detection_rules_mitre_technique_id', table_name='detection_rules')
op.drop_table('detection_rules')

View File

@@ -7,10 +7,13 @@ from app.models.evidence import Evidence
from app.models.intel import IntelItem from app.models.intel import IntelItem
from app.models.audit import AuditLog from app.models.audit import AuditLog
from app.models.notification import Notification from app.models.notification import Notification
from app.models.data_source import DataSource
from app.models.detection_rule import DetectionRule
from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide
__all__ = [ __all__ = [
"User", "Technique", "Test", "TestTemplate", "Evidence", "User", "Technique", "Test", "TestTemplate", "Evidence",
"IntelItem", "AuditLog", "Notification", "IntelItem", "AuditLog", "Notification", "DataSource",
"DetectionRule",
"TechniqueStatus", "TestState", "TestResult", "TeamSide", "TechniqueStatus", "TestState", "TestResult", "TeamSide",
] ]

View File

@@ -0,0 +1,39 @@
"""DataSource model — registry of external data sources for import."""
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, Boolean, DateTime, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.database import Base
class DataSource(Base):
"""
Unified registry of all external data sources (attack procedures,
detection rules, threat intel, defensive techniques).
Each source can be independently enabled/disabled and tracks its own
synchronisation state.
"""
__tablename__ = "data_sources"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, unique=True, nullable=False) # e.g. "atomic_red_team"
display_name = Column(String, nullable=False) # e.g. "Atomic Red Team"
type = Column(String, nullable=False) # attack_procedure / detection_rule / threat_intel / defensive_technique
url = Column(String, nullable=True) # URL base of repo/API
description = Column(Text, nullable=True)
is_enabled = Column(Boolean, default=True)
last_sync_at = Column(DateTime, nullable=True)
last_sync_status = Column(String, nullable=True) # success / error / in_progress
last_sync_stats = Column(JSONB, nullable=True) # {"imported": X, "updated": Y, ...}
sync_frequency = Column(String, nullable=True) # daily / weekly / monthly / manual
config = Column(JSONB, nullable=True) # source-specific configuration
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index('ix_data_sources_type', 'type'),
Index('ix_data_sources_is_enabled', 'is_enabled'),
)

View File

@@ -0,0 +1,42 @@
"""DetectionRule model — detection rules from multiple sources."""
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, Boolean, DateTime, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.database import Base
class DetectionRule(Base):
"""
Detection rule from an external source (Sigma, Elastic, Splunk, custom).
Each rule is mapped to one MITRE ATT&CK technique via
``mitre_technique_id`` and stores the complete rule content in
``rule_content``.
"""
__tablename__ = "detection_rules"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
mitre_technique_id = Column(String, nullable=False) # e.g. "T1059.001"
title = Column(String, nullable=False)
description = Column(Text, nullable=True)
source = Column(String, nullable=False) # sigma / elastic / splunk / custom
source_id = Column(String, nullable=True) # ID in the source repo (for dedup)
source_url = Column(String, nullable=True)
rule_content = Column(Text, nullable=False) # YAML / KQL / SPL content
rule_format = Column(String, nullable=False) # sigma_yaml / kql / spl / custom
severity = Column(String, nullable=True) # informational / low / medium / high / critical
platforms = Column(JSONB, nullable=True, default=[])
log_sources = Column(JSONB, nullable=True) # e.g. {"product": "windows", "service": "sysmon"}
false_positive_rate = Column(String, nullable=True) # low / medium / high
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index('ix_detection_rules_mitre_technique_id', 'mitre_technique_id'),
Index('ix_detection_rules_source', 'source'),
Index('ix_detection_rules_severity', 'severity'),
)

View File

@@ -0,0 +1,180 @@
"""
Seed script — registers all known data sources in the data_sources table.
Usage:
python -m app.seed_data_sources
"""
import logging
from app.database import SessionLocal
from app.models.data_source import DataSource
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data source definitions
# ---------------------------------------------------------------------------
INITIAL_SOURCES = [
{
"name": "atomic_red_team",
"display_name": "Atomic Red Team",
"type": "attack_procedure",
"url": "https://github.com/redcanaryco/atomic-red-team",
"description": "Open-source library of atomic tests mapped to MITRE ATT&CK. "
"Each test is a small, self-contained procedure for validating "
"detection of a specific technique.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/redcanaryco/atomic-red-team/archive/refs/heads/master.zip",
"root_prefix": "atomic-red-team-master",
"atomics_dir": "atomics",
},
},
{
"name": "sigma",
"display_name": "SigmaHQ Rules",
"type": "detection_rule",
"url": "https://github.com/SigmaHQ/sigma",
"description": "Generic SIEM detection rules in YAML format. "
"3 000+ rules with MITRE ATT&CK mappings.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/SigmaHQ/sigma/archive/refs/heads/main.zip",
"root_prefix": "sigma-main",
"rules_dir": "rules",
},
},
{
"name": "lolbas",
"display_name": "LOLBAS (Windows)",
"type": "attack_procedure",
"url": "https://github.com/LOLBAS-Project/LOLBAS",
"description": "Living Off The Land Binaries, Scripts, and Libraries — "
"legitimate Windows binaries that can be abused for attacks.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/LOLBAS-Project/LOLBAS/archive/refs/heads/master.zip",
"root_prefix": "LOLBAS-master",
"yaml_dirs": ["yml/OSBinaries", "yml/OSLibraries", "yml/OSScripts"],
},
},
{
"name": "gtfobins",
"display_name": "GTFOBins (Linux)",
"type": "attack_procedure",
"url": "https://gtfobins.github.io/",
"description": "Unix/Linux binaries that can be exploited for file transfer, "
"shell escape, privilege escalation, and more.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/GTFOBins/GTFOBins.github.io/archive/refs/heads/master.zip",
"root_prefix": "GTFOBins.github.io-master",
"gtfobins_dir": "_gtfobins",
},
},
{
"name": "caldera",
"display_name": "MITRE CALDERA",
"type": "attack_procedure",
"url": "https://github.com/mitre/caldera",
"description": "Automated adversary emulation platform by MITRE. "
"400+ abilities (executable actions) mapped to ATT&CK.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/mitre/caldera/archive/refs/heads/master.zip",
"root_prefix": "caldera-master",
"abilities_dir": "data/abilities",
},
},
{
"name": "elastic_rules",
"display_name": "Elastic Detection Rules",
"type": "detection_rule",
"url": "https://github.com/elastic/detection-rules",
"description": "Open-source detection rules for Elastic SIEM. "
"1 000+ rules in KQL with MITRE ATT&CK mappings.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/elastic/detection-rules/archive/refs/heads/main.zip",
"root_prefix": "detection-rules-main",
"rules_dir": "rules",
},
},
{
"name": "d3fend",
"display_name": "MITRE D3FEND",
"type": "defensive_technique",
"url": "https://d3fend.mitre.org/",
"description": "MITRE framework of defensive countermeasures. "
"200+ defensive techniques mapped to ATT&CK.",
"sync_frequency": "monthly",
"config": {},
},
{
"name": "mitre_cti",
"display_name": "MITRE CTI (Groups & Software)",
"type": "threat_intel",
"url": "https://github.com/mitre/cti",
"description": "MITRE ATT&CK STIX 2.0 data — threat actor groups, "
"software, and campaigns with TTP mappings.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/mitre/cti/archive/refs/heads/master.zip",
"root_prefix": "cti-master",
"enterprise_dir": "enterprise-attack",
},
},
]
def seed_data_sources() -> dict:
"""Register all known data sources. Existing entries are skipped."""
db = SessionLocal()
try:
created = 0
skipped = 0
existing_names = {
row[0] for row in db.query(DataSource.name).all()
}
for src in INITIAL_SOURCES:
if src["name"] in existing_names:
skipped += 1
continue
ds = DataSource(
name=src["name"],
display_name=src["display_name"],
type=src["type"],
url=src.get("url"),
description=src.get("description"),
sync_frequency=src.get("sync_frequency", "manual"),
config=src.get("config"),
is_enabled=True,
)
db.add(ds)
created += 1
db.commit()
summary = {"created": created, "skipped": skipped}
logger.info("Data sources seed: %s", summary)
return summary
except Exception:
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
)
result = seed_data_sources()
print(f"\nData sources seed complete: {result}")

431
backend/app/seed_demo.py Normal file
View File

@@ -0,0 +1,431 @@
"""
Seed script — generates a realistic volume of demo data for V3 validation.
Usage:
python -m app.seed_demo
**Prerequisite**: The MITRE sync must have been completed first so that
real techniques exist in the database.
Running twice is safe — the script detects existing demo data (by username
prefix ``demo_``) and deletes it before re-creating, ensuring idempotency.
"""
import logging
import random
import uuid
from datetime import datetime, timedelta
from app.auth import hash_password
from app.database import SessionLocal
from app.models.user import User
from app.models.technique import Technique
from app.models.test import Test
from app.models.test_template import TestTemplate
from app.models.evidence import Evidence
from app.models.audit import AuditLog
from app.models.notification import Notification
from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEMO_PREFIX = "demo_"
ROLES = ["red_tech", "blue_tech", "red_lead", "blue_lead", "admin"]
TECHNIQUE_STATUSES = [
TechniqueStatus.validated,
TechniqueStatus.partial,
TechniqueStatus.not_covered,
TechniqueStatus.in_progress,
TechniqueStatus.not_evaluated,
]
TEST_STATES = [
TestState.draft,
TestState.red_executing,
TestState.blue_evaluating,
TestState.in_review,
TestState.validated,
TestState.rejected,
]
TEST_RESULTS = [
TestResult.detected,
TestResult.not_detected,
TestResult.partially_detected,
]
NOTIFICATION_TYPES = [
"test_assigned",
"validation_needed",
"test_rejected",
"test_validated",
"test_state_changed",
]
AUDIT_ACTIONS = [
"create_test",
"update_test",
"validate_technique",
"upload_evidence",
"create_user",
"import_atomic_red_team",
"sync_mitre",
"login",
"reject_test",
"approve_test",
]
PLATFORMS = ["windows", "linux", "macos"]
TEMPLATE_NAMES = [
"Manual Credential Dumping Test",
"Custom Phishing Payload Delivery",
"Lateral Movement via RDP",
"Persistence via Registry Run Keys",
"Data Exfiltration over DNS",
"Process Injection via DLL",
"Privilege Escalation with Token Impersonation",
"Custom C2 Beacon Communication Test",
"Kerberoasting Attack Procedure",
"Living Off The Land Binaries Test",
]
# ---------------------------------------------------------------------------
# Cleanup
# ---------------------------------------------------------------------------
def _cleanup_demo_data(db) -> None:
"""Remove all previously seeded demo data."""
# Delete in order to respect FK constraints
demo_users = db.query(User).filter(User.username.like(f"{DEMO_PREFIX}%")).all()
demo_user_ids = [u.id for u in demo_users]
if demo_user_ids:
# Notifications for demo users
db.query(Notification).filter(
Notification.user_id.in_(demo_user_ids)
).delete(synchronize_session=False)
# Audit logs for demo users
db.query(AuditLog).filter(
AuditLog.user_id.in_(demo_user_ids)
).delete(synchronize_session=False)
# Evidences for tests created by demo users
demo_tests = db.query(Test).filter(
Test.created_by.in_(demo_user_ids)
).all()
demo_test_ids = [t.id for t in demo_tests]
if demo_test_ids:
db.query(Evidence).filter(
Evidence.test_id.in_(demo_test_ids)
).delete(synchronize_session=False)
db.query(Test).filter(
Test.id.in_(demo_test_ids)
).delete(synchronize_session=False)
# Delete demo templates (by source = "demo")
db.query(TestTemplate).filter(
TestTemplate.source == "demo"
).delete(synchronize_session=False)
# Delete demo users
if demo_user_ids:
db.query(User).filter(
User.id.in_(demo_user_ids)
).delete(synchronize_session=False)
db.commit()
logger.info("Cleaned up existing demo data.")
# ---------------------------------------------------------------------------
# Seeders
# ---------------------------------------------------------------------------
def _seed_users(db) -> list[User]:
"""Create 5 users per role (25 total)."""
users = []
for role in ROLES:
for i in range(1, 6):
user = User(
username=f"{DEMO_PREFIX}{role}_{i}",
email=f"{DEMO_PREFIX}{role}_{i}@aegis-demo.local",
hashed_password=hash_password("demo123"),
role=role,
is_active=True,
)
db.add(user)
users.append(user)
db.flush()
logger.info("Created %d demo users.", len(users))
return users
def _seed_technique_statuses(db, count: int = 50) -> list[Technique]:
"""Set varied statuses on up to *count* techniques."""
techniques = db.query(Technique).limit(count).all()
if not techniques:
logger.warning("No techniques found — run MITRE sync first!")
return []
for tech in techniques:
tech.status_global = random.choice(TECHNIQUE_STATUSES)
if tech.status_global == TechniqueStatus.validated:
tech.last_review_date = datetime.utcnow() - timedelta(
days=random.randint(1, 30)
)
db.flush()
logger.info("Updated status on %d techniques.", len(techniques))
return techniques
def _seed_tests(db, users: list[User], techniques: list[Technique], count: int = 100) -> list[Test]:
"""Create *count* tests in various pipeline states."""
if not techniques:
logger.warning("No techniques available — skipping test seeding.")
return []
red_techs = [u for u in users if u.role == "red_tech"]
blue_techs = [u for u in users if u.role == "blue_tech"]
red_leads = [u for u in users if u.role == "red_lead"]
blue_leads = [u for u in users if u.role == "blue_lead"]
tests = []
for i in range(count):
technique = random.choice(techniques)
state = random.choice(TEST_STATES)
creator = random.choice(red_techs + blue_techs)
test = Test(
technique_id=technique.id,
name=f"Demo Test {i + 1}{technique.name[:40]}",
description=f"Automated demo test #{i + 1} for {technique.mitre_id}.",
platform=random.choice(PLATFORMS),
procedure_text=f"Step 1: Prepare environment.\nStep 2: Execute {technique.mitre_id} procedure.\nStep 3: Observe results.",
tool_used=random.choice(["powershell", "bash", "cmd", "python", "caldera", "metasploit"]),
execution_date=datetime.utcnow() - timedelta(days=random.randint(0, 60)),
created_by=creator.id,
result=random.choice(TEST_RESULTS) if state not in (TestState.draft, TestState.red_executing) else None,
state=state,
created_at=datetime.utcnow() - timedelta(days=random.randint(0, 90)),
)
# Populate team fields based on state
if state in (TestState.blue_evaluating, TestState.in_review, TestState.validated, TestState.rejected):
test.red_summary = f"Attack executed successfully using {test.tool_used}."
test.attack_success = random.choice([True, True, True, False])
if state in (TestState.in_review, TestState.validated, TestState.rejected):
test.blue_summary = "Detection observed in SIEM. Alert fired."
test.detection_result = random.choice(TEST_RESULTS)
if state == TestState.validated:
rv = random.choice(red_leads)
bv = random.choice(blue_leads)
test.red_validated_by = rv.id
test.red_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 10))
test.red_validation_status = "approved"
test.blue_validated_by = bv.id
test.blue_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 10))
test.blue_validation_status = "approved"
if state == TestState.rejected:
rejector = random.choice(red_leads + blue_leads)
if rejector.role == "red_lead":
test.red_validated_by = rejector.id
test.red_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 5))
test.red_validation_status = "rejected"
test.red_validation_notes = "Insufficient evidence of attack success."
else:
test.blue_validated_by = rejector.id
test.blue_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 5))
test.blue_validation_status = "rejected"
test.blue_validation_notes = "Detection evidence not conclusive."
db.add(test)
tests.append(test)
db.flush()
logger.info("Created %d demo tests.", len(tests))
return tests
def _seed_evidences(db, tests: list[Test], users: list[User], count: int = 50) -> list[Evidence]:
"""Create *count* dummy evidence records."""
if not tests:
return []
# Pick tests that are past draft state
eligible = [t for t in tests if t.state != TestState.draft]
if not eligible:
eligible = tests
evidences = []
red_blue = [u for u in users if u.role in ("red_tech", "blue_tech")]
for i in range(count):
test = random.choice(eligible)
uploader = random.choice(red_blue)
team = TeamSide.red if uploader.role == "red_tech" else TeamSide.blue
ext = random.choice(["png", "log", "pcap", "csv", "txt", "json"])
fname = f"evidence_{i + 1}.{ext}"
evidence = Evidence(
test_id=test.id,
file_name=fname,
file_path=f"{test.id}/{uuid.uuid4()}_{fname}",
sha256_hash=uuid.uuid4().hex + uuid.uuid4().hex, # dummy hash
uploaded_by=uploader.id,
uploaded_at=datetime.utcnow() - timedelta(days=random.randint(0, 30)),
team=team,
notes=f"Auto-generated demo evidence #{i + 1}.",
)
db.add(evidence)
evidences.append(evidence)
db.flush()
logger.info("Created %d demo evidences.", len(evidences))
return evidences
def _seed_audit_logs(db, users: list[User], count: int = 20) -> None:
"""Create *count* varied audit log entries."""
for i in range(count):
user = random.choice(users)
log = AuditLog(
user_id=user.id,
action=random.choice(AUDIT_ACTIONS),
entity_type=random.choice(["test", "technique", "user", "test_template"]),
entity_id=str(uuid.uuid4()),
timestamp=datetime.utcnow() - timedelta(days=random.randint(0, 60)),
details={"demo": True, "index": i},
)
db.add(log)
db.flush()
logger.info("Created %d demo audit logs.", count)
def _seed_notifications(db, users: list[User], count: int = 30) -> None:
"""Create *count* notifications spread across demo users."""
for i in range(count):
user = random.choice(users)
ntype = random.choice(NOTIFICATION_TYPES)
notif = Notification(
user_id=user.id,
type=ntype,
title=f"Demo notification: {ntype.replace('_', ' ').title()} #{i + 1}",
message=f"This is an auto-generated demo notification ({ntype}).",
entity_type="test",
entity_id=uuid.uuid4(),
read=random.choice([True, False]),
created_at=datetime.utcnow() - timedelta(days=random.randint(0, 30)),
)
db.add(notif)
db.flush()
logger.info("Created %d demo notifications.", count)
def _seed_templates(db, techniques: list[Technique], count: int = 10) -> None:
"""Create *count* manual demo templates."""
if not techniques:
return
for i, name in enumerate(TEMPLATE_NAMES[:count]):
technique = techniques[i % len(techniques)]
template = TestTemplate(
mitre_technique_id=technique.mitre_id,
name=name,
description=f"Demo template: {name}. Targets {technique.mitre_id} ({technique.name}).",
source="demo",
source_url=None,
attack_procedure=f"1. Set up environment for {technique.mitre_id}.\n2. Execute the procedure.\n3. Record observations.",
expected_detection=f"SIEM should alert on {technique.mitre_id} indicators.",
platform=random.choice(PLATFORMS),
tool_suggested=random.choice(["powershell", "cmd", "bash", "python"]),
severity=random.choice(["low", "medium", "high", "critical"]),
is_active=True,
)
db.add(template)
db.flush()
logger.info("Created %d demo templates.", count)
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def seed_demo() -> dict:
"""Generate all demo data. Returns a summary dict."""
db = SessionLocal()
try:
logger.info("=== Starting V3 demo seed ===")
# Step 0: cleanup previous run
_cleanup_demo_data(db)
# Step 1: users
users = _seed_users(db)
# Step 2: technique statuses
techniques = _seed_technique_statuses(db, count=50)
# Step 3: tests
tests = _seed_tests(db, users, techniques, count=100)
# Step 4: evidences
evidences = _seed_evidences(db, tests, users, count=50)
# Step 5: audit logs
_seed_audit_logs(db, users, count=20)
# Step 6: notifications
_seed_notifications(db, users, count=30)
# Step 7: templates
_seed_templates(db, techniques, count=10)
db.commit()
summary = {
"users": len(users),
"techniques_updated": len(techniques),
"tests": len(tests),
"evidences": len(evidences),
"audit_logs": 20,
"notifications": 30,
"templates": 10,
}
logger.info("=== Demo seed complete: %s ===", summary)
return summary
except Exception:
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
)
result = seed_demo()
print(f"\nSeed complete: {result}")