From 022c4f288689fb258a34e989a20ef50bd0301c4c Mon Sep 17 00:00:00 2001 From: Kitos Date: Mon, 9 Feb 2026 16:06:44 +0100 Subject: [PATCH] feat(phase-21): add V3 demo seed, DataSource and DetectionRule models (T-200, T-201, T-202) --- .../versions/b008_add_data_sources_table.py | 48 ++ .../b009_add_detection_rules_table.py | 52 +++ backend/app/models/__init__.py | 5 +- backend/app/models/data_source.py | 39 ++ backend/app/models/detection_rule.py | 42 ++ backend/app/seed_data_sources.py | 180 ++++++++ backend/app/seed_demo.py | 431 ++++++++++++++++++ 7 files changed, 796 insertions(+), 1 deletion(-) create mode 100644 backend/alembic/versions/b008_add_data_sources_table.py create mode 100644 backend/alembic/versions/b009_add_detection_rules_table.py create mode 100644 backend/app/models/data_source.py create mode 100644 backend/app/models/detection_rule.py create mode 100644 backend/app/seed_data_sources.py create mode 100644 backend/app/seed_demo.py diff --git a/backend/alembic/versions/b008_add_data_sources_table.py b/backend/alembic/versions/b008_add_data_sources_table.py new file mode 100644 index 0000000..e95287e --- /dev/null +++ b/backend/alembic/versions/b008_add_data_sources_table.py @@ -0,0 +1,48 @@ +"""add_data_sources_table + +Revision ID: b008datasources +Revises: b007remediation +Create Date: 2026-02-09 14:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB + + +# revision identifiers, used by Alembic. +revision: str = 'b008datasources' +down_revision: Union[str, Sequence[str], None] = 'b007remediation' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create data_sources table.""" + op.create_table( + 'data_sources', + sa.Column('id', UUID(as_uuid=True), primary_key=True), + sa.Column('name', sa.String(), unique=True, nullable=False), + sa.Column('display_name', sa.String(), nullable=False), + sa.Column('type', sa.String(), nullable=False), + sa.Column('url', sa.String(), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('is_enabled', sa.Boolean(), server_default='true'), + sa.Column('last_sync_at', sa.DateTime(), nullable=True), + sa.Column('last_sync_status', sa.String(), nullable=True), + sa.Column('last_sync_stats', JSONB(), nullable=True), + sa.Column('sync_frequency', sa.String(), nullable=True), + sa.Column('config', JSONB(), nullable=True), + sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()), + ) + op.create_index('ix_data_sources_type', 'data_sources', ['type']) + op.create_index('ix_data_sources_is_enabled', 'data_sources', ['is_enabled']) + + +def downgrade() -> None: + """Drop data_sources table.""" + op.drop_index('ix_data_sources_is_enabled', table_name='data_sources') + op.drop_index('ix_data_sources_type', table_name='data_sources') + op.drop_table('data_sources') diff --git a/backend/alembic/versions/b009_add_detection_rules_table.py b/backend/alembic/versions/b009_add_detection_rules_table.py new file mode 100644 index 0000000..67747e8 --- /dev/null +++ b/backend/alembic/versions/b009_add_detection_rules_table.py @@ -0,0 +1,52 @@ +"""add_detection_rules_table + +Revision ID: b009detectionrules +Revises: b008datasources +Create Date: 2026-02-09 14:10:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB + + +# revision identifiers, used by Alembic. +revision: str = 'b009detectionrules' +down_revision: Union[str, Sequence[str], None] = 'b008datasources' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create detection_rules table.""" + op.create_table( + 'detection_rules', + sa.Column('id', UUID(as_uuid=True), primary_key=True), + sa.Column('mitre_technique_id', sa.String(), nullable=False), + sa.Column('title', sa.String(), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('source', sa.String(), nullable=False), + sa.Column('source_id', sa.String(), nullable=True), + sa.Column('source_url', sa.String(), nullable=True), + sa.Column('rule_content', sa.Text(), nullable=False), + sa.Column('rule_format', sa.String(), nullable=False), + sa.Column('severity', sa.String(), nullable=True), + sa.Column('platforms', JSONB(), nullable=True), + sa.Column('log_sources', JSONB(), nullable=True), + sa.Column('false_positive_rate', sa.String(), nullable=True), + sa.Column('is_active', sa.Boolean(), server_default='true'), + sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()), + ) + op.create_index('ix_detection_rules_mitre_technique_id', 'detection_rules', ['mitre_technique_id']) + op.create_index('ix_detection_rules_source', 'detection_rules', ['source']) + op.create_index('ix_detection_rules_severity', 'detection_rules', ['severity']) + + +def downgrade() -> None: + """Drop detection_rules table.""" + op.drop_index('ix_detection_rules_severity', table_name='detection_rules') + op.drop_index('ix_detection_rules_source', table_name='detection_rules') + op.drop_index('ix_detection_rules_mitre_technique_id', table_name='detection_rules') + op.drop_table('detection_rules') diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index b6fafc7..b9dafe7 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -7,10 +7,13 @@ from app.models.evidence import Evidence from app.models.intel import IntelItem from app.models.audit import AuditLog from app.models.notification import Notification +from app.models.data_source import DataSource +from app.models.detection_rule import DetectionRule from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", - "IntelItem", "AuditLog", "Notification", + "IntelItem", "AuditLog", "Notification", "DataSource", + "DetectionRule", "TechniqueStatus", "TestState", "TestResult", "TeamSide", ] diff --git a/backend/app/models/data_source.py b/backend/app/models/data_source.py new file mode 100644 index 0000000..13559c4 --- /dev/null +++ b/backend/app/models/data_source.py @@ -0,0 +1,39 @@ +"""DataSource model — registry of external data sources for import.""" + +import uuid +from datetime import datetime + +from sqlalchemy import Column, String, Text, Boolean, DateTime, Index +from sqlalchemy.dialects.postgresql import UUID, JSONB + +from app.database import Base + + +class DataSource(Base): + """ + Unified registry of all external data sources (attack procedures, + detection rules, threat intel, defensive techniques). + + Each source can be independently enabled/disabled and tracks its own + synchronisation state. + """ + __tablename__ = "data_sources" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String, unique=True, nullable=False) # e.g. "atomic_red_team" + display_name = Column(String, nullable=False) # e.g. "Atomic Red Team" + type = Column(String, nullable=False) # attack_procedure / detection_rule / threat_intel / defensive_technique + url = Column(String, nullable=True) # URL base of repo/API + description = Column(Text, nullable=True) + is_enabled = Column(Boolean, default=True) + last_sync_at = Column(DateTime, nullable=True) + last_sync_status = Column(String, nullable=True) # success / error / in_progress + last_sync_stats = Column(JSONB, nullable=True) # {"imported": X, "updated": Y, ...} + sync_frequency = Column(String, nullable=True) # daily / weekly / monthly / manual + config = Column(JSONB, nullable=True) # source-specific configuration + created_at = Column(DateTime, default=datetime.utcnow) + + __table_args__ = ( + Index('ix_data_sources_type', 'type'), + Index('ix_data_sources_is_enabled', 'is_enabled'), + ) diff --git a/backend/app/models/detection_rule.py b/backend/app/models/detection_rule.py new file mode 100644 index 0000000..5f34595 --- /dev/null +++ b/backend/app/models/detection_rule.py @@ -0,0 +1,42 @@ +"""DetectionRule model — detection rules from multiple sources.""" + +import uuid +from datetime import datetime + +from sqlalchemy import Column, String, Text, Boolean, DateTime, Index +from sqlalchemy.dialects.postgresql import UUID, JSONB + +from app.database import Base + + +class DetectionRule(Base): + """ + Detection rule from an external source (Sigma, Elastic, Splunk, custom). + + Each rule is mapped to one MITRE ATT&CK technique via + ``mitre_technique_id`` and stores the complete rule content in + ``rule_content``. + """ + __tablename__ = "detection_rules" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + mitre_technique_id = Column(String, nullable=False) # e.g. "T1059.001" + title = Column(String, nullable=False) + description = Column(Text, nullable=True) + source = Column(String, nullable=False) # sigma / elastic / splunk / custom + source_id = Column(String, nullable=True) # ID in the source repo (for dedup) + source_url = Column(String, nullable=True) + rule_content = Column(Text, nullable=False) # YAML / KQL / SPL content + rule_format = Column(String, nullable=False) # sigma_yaml / kql / spl / custom + severity = Column(String, nullable=True) # informational / low / medium / high / critical + platforms = Column(JSONB, nullable=True, default=[]) + log_sources = Column(JSONB, nullable=True) # e.g. {"product": "windows", "service": "sysmon"} + false_positive_rate = Column(String, nullable=True) # low / medium / high + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.utcnow) + + __table_args__ = ( + Index('ix_detection_rules_mitre_technique_id', 'mitre_technique_id'), + Index('ix_detection_rules_source', 'source'), + Index('ix_detection_rules_severity', 'severity'), + ) diff --git a/backend/app/seed_data_sources.py b/backend/app/seed_data_sources.py new file mode 100644 index 0000000..d8ef621 --- /dev/null +++ b/backend/app/seed_data_sources.py @@ -0,0 +1,180 @@ +""" +Seed script — registers all known data sources in the data_sources table. + +Usage: + python -m app.seed_data_sources +""" + +import logging + +from app.database import SessionLocal +from app.models.data_source import DataSource + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Data source definitions +# --------------------------------------------------------------------------- + +INITIAL_SOURCES = [ + { + "name": "atomic_red_team", + "display_name": "Atomic Red Team", + "type": "attack_procedure", + "url": "https://github.com/redcanaryco/atomic-red-team", + "description": "Open-source library of atomic tests mapped to MITRE ATT&CK. " + "Each test is a small, self-contained procedure for validating " + "detection of a specific technique.", + "sync_frequency": "weekly", + "config": { + "zip_url": "https://github.com/redcanaryco/atomic-red-team/archive/refs/heads/master.zip", + "root_prefix": "atomic-red-team-master", + "atomics_dir": "atomics", + }, + }, + { + "name": "sigma", + "display_name": "SigmaHQ Rules", + "type": "detection_rule", + "url": "https://github.com/SigmaHQ/sigma", + "description": "Generic SIEM detection rules in YAML format. " + "3 000+ rules with MITRE ATT&CK mappings.", + "sync_frequency": "weekly", + "config": { + "zip_url": "https://github.com/SigmaHQ/sigma/archive/refs/heads/main.zip", + "root_prefix": "sigma-main", + "rules_dir": "rules", + }, + }, + { + "name": "lolbas", + "display_name": "LOLBAS (Windows)", + "type": "attack_procedure", + "url": "https://github.com/LOLBAS-Project/LOLBAS", + "description": "Living Off The Land Binaries, Scripts, and Libraries — " + "legitimate Windows binaries that can be abused for attacks.", + "sync_frequency": "monthly", + "config": { + "zip_url": "https://github.com/LOLBAS-Project/LOLBAS/archive/refs/heads/master.zip", + "root_prefix": "LOLBAS-master", + "yaml_dirs": ["yml/OSBinaries", "yml/OSLibraries", "yml/OSScripts"], + }, + }, + { + "name": "gtfobins", + "display_name": "GTFOBins (Linux)", + "type": "attack_procedure", + "url": "https://gtfobins.github.io/", + "description": "Unix/Linux binaries that can be exploited for file transfer, " + "shell escape, privilege escalation, and more.", + "sync_frequency": "monthly", + "config": { + "zip_url": "https://github.com/GTFOBins/GTFOBins.github.io/archive/refs/heads/master.zip", + "root_prefix": "GTFOBins.github.io-master", + "gtfobins_dir": "_gtfobins", + }, + }, + { + "name": "caldera", + "display_name": "MITRE CALDERA", + "type": "attack_procedure", + "url": "https://github.com/mitre/caldera", + "description": "Automated adversary emulation platform by MITRE. " + "400+ abilities (executable actions) mapped to ATT&CK.", + "sync_frequency": "monthly", + "config": { + "zip_url": "https://github.com/mitre/caldera/archive/refs/heads/master.zip", + "root_prefix": "caldera-master", + "abilities_dir": "data/abilities", + }, + }, + { + "name": "elastic_rules", + "display_name": "Elastic Detection Rules", + "type": "detection_rule", + "url": "https://github.com/elastic/detection-rules", + "description": "Open-source detection rules for Elastic SIEM. " + "1 000+ rules in KQL with MITRE ATT&CK mappings.", + "sync_frequency": "weekly", + "config": { + "zip_url": "https://github.com/elastic/detection-rules/archive/refs/heads/main.zip", + "root_prefix": "detection-rules-main", + "rules_dir": "rules", + }, + }, + { + "name": "d3fend", + "display_name": "MITRE D3FEND", + "type": "defensive_technique", + "url": "https://d3fend.mitre.org/", + "description": "MITRE framework of defensive countermeasures. " + "200+ defensive techniques mapped to ATT&CK.", + "sync_frequency": "monthly", + "config": {}, + }, + { + "name": "mitre_cti", + "display_name": "MITRE CTI (Groups & Software)", + "type": "threat_intel", + "url": "https://github.com/mitre/cti", + "description": "MITRE ATT&CK STIX 2.0 data — threat actor groups, " + "software, and campaigns with TTP mappings.", + "sync_frequency": "monthly", + "config": { + "zip_url": "https://github.com/mitre/cti/archive/refs/heads/master.zip", + "root_prefix": "cti-master", + "enterprise_dir": "enterprise-attack", + }, + }, +] + + +def seed_data_sources() -> dict: + """Register all known data sources. Existing entries are skipped.""" + db = SessionLocal() + try: + created = 0 + skipped = 0 + + existing_names = { + row[0] for row in db.query(DataSource.name).all() + } + + for src in INITIAL_SOURCES: + if src["name"] in existing_names: + skipped += 1 + continue + + ds = DataSource( + name=src["name"], + display_name=src["display_name"], + type=src["type"], + url=src.get("url"), + description=src.get("description"), + sync_frequency=src.get("sync_frequency", "manual"), + config=src.get("config"), + is_enabled=True, + ) + db.add(ds) + created += 1 + + db.commit() + + summary = {"created": created, "skipped": skipped} + logger.info("Data sources seed: %s", summary) + return summary + + except Exception: + db.rollback() + raise + finally: + db.close() + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", + ) + result = seed_data_sources() + print(f"\nData sources seed complete: {result}") diff --git a/backend/app/seed_demo.py b/backend/app/seed_demo.py new file mode 100644 index 0000000..4e18009 --- /dev/null +++ b/backend/app/seed_demo.py @@ -0,0 +1,431 @@ +""" +Seed script — generates a realistic volume of demo data for V3 validation. + +Usage: + python -m app.seed_demo + +**Prerequisite**: The MITRE sync must have been completed first so that +real techniques exist in the database. + +Running twice is safe — the script detects existing demo data (by username +prefix ``demo_``) and deletes it before re-creating, ensuring idempotency. +""" + +import logging +import random +import uuid +from datetime import datetime, timedelta + +from app.auth import hash_password +from app.database import SessionLocal +from app.models.user import User +from app.models.technique import Technique +from app.models.test import Test +from app.models.test_template import TestTemplate +from app.models.evidence import Evidence +from app.models.audit import AuditLog +from app.models.notification import Notification +from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +DEMO_PREFIX = "demo_" + +ROLES = ["red_tech", "blue_tech", "red_lead", "blue_lead", "admin"] + +TECHNIQUE_STATUSES = [ + TechniqueStatus.validated, + TechniqueStatus.partial, + TechniqueStatus.not_covered, + TechniqueStatus.in_progress, + TechniqueStatus.not_evaluated, +] + +TEST_STATES = [ + TestState.draft, + TestState.red_executing, + TestState.blue_evaluating, + TestState.in_review, + TestState.validated, + TestState.rejected, +] + +TEST_RESULTS = [ + TestResult.detected, + TestResult.not_detected, + TestResult.partially_detected, +] + +NOTIFICATION_TYPES = [ + "test_assigned", + "validation_needed", + "test_rejected", + "test_validated", + "test_state_changed", +] + +AUDIT_ACTIONS = [ + "create_test", + "update_test", + "validate_technique", + "upload_evidence", + "create_user", + "import_atomic_red_team", + "sync_mitre", + "login", + "reject_test", + "approve_test", +] + +PLATFORMS = ["windows", "linux", "macos"] + +TEMPLATE_NAMES = [ + "Manual Credential Dumping Test", + "Custom Phishing Payload Delivery", + "Lateral Movement via RDP", + "Persistence via Registry Run Keys", + "Data Exfiltration over DNS", + "Process Injection via DLL", + "Privilege Escalation with Token Impersonation", + "Custom C2 Beacon Communication Test", + "Kerberoasting Attack Procedure", + "Living Off The Land Binaries Test", +] + + +# --------------------------------------------------------------------------- +# Cleanup +# --------------------------------------------------------------------------- + + +def _cleanup_demo_data(db) -> None: + """Remove all previously seeded demo data.""" + # Delete in order to respect FK constraints + demo_users = db.query(User).filter(User.username.like(f"{DEMO_PREFIX}%")).all() + demo_user_ids = [u.id for u in demo_users] + + if demo_user_ids: + # Notifications for demo users + db.query(Notification).filter( + Notification.user_id.in_(demo_user_ids) + ).delete(synchronize_session=False) + + # Audit logs for demo users + db.query(AuditLog).filter( + AuditLog.user_id.in_(demo_user_ids) + ).delete(synchronize_session=False) + + # Evidences for tests created by demo users + demo_tests = db.query(Test).filter( + Test.created_by.in_(demo_user_ids) + ).all() + demo_test_ids = [t.id for t in demo_tests] + + if demo_test_ids: + db.query(Evidence).filter( + Evidence.test_id.in_(demo_test_ids) + ).delete(synchronize_session=False) + + db.query(Test).filter( + Test.id.in_(demo_test_ids) + ).delete(synchronize_session=False) + + # Delete demo templates (by source = "demo") + db.query(TestTemplate).filter( + TestTemplate.source == "demo" + ).delete(synchronize_session=False) + + # Delete demo users + if demo_user_ids: + db.query(User).filter( + User.id.in_(demo_user_ids) + ).delete(synchronize_session=False) + + db.commit() + logger.info("Cleaned up existing demo data.") + + +# --------------------------------------------------------------------------- +# Seeders +# --------------------------------------------------------------------------- + + +def _seed_users(db) -> list[User]: + """Create 5 users per role (25 total).""" + users = [] + for role in ROLES: + for i in range(1, 6): + user = User( + username=f"{DEMO_PREFIX}{role}_{i}", + email=f"{DEMO_PREFIX}{role}_{i}@aegis-demo.local", + hashed_password=hash_password("demo123"), + role=role, + is_active=True, + ) + db.add(user) + users.append(user) + db.flush() + logger.info("Created %d demo users.", len(users)) + return users + + +def _seed_technique_statuses(db, count: int = 50) -> list[Technique]: + """Set varied statuses on up to *count* techniques.""" + techniques = db.query(Technique).limit(count).all() + if not techniques: + logger.warning("No techniques found — run MITRE sync first!") + return [] + + for tech in techniques: + tech.status_global = random.choice(TECHNIQUE_STATUSES) + if tech.status_global == TechniqueStatus.validated: + tech.last_review_date = datetime.utcnow() - timedelta( + days=random.randint(1, 30) + ) + + db.flush() + logger.info("Updated status on %d techniques.", len(techniques)) + return techniques + + +def _seed_tests(db, users: list[User], techniques: list[Technique], count: int = 100) -> list[Test]: + """Create *count* tests in various pipeline states.""" + if not techniques: + logger.warning("No techniques available — skipping test seeding.") + return [] + + red_techs = [u for u in users if u.role == "red_tech"] + blue_techs = [u for u in users if u.role == "blue_tech"] + red_leads = [u for u in users if u.role == "red_lead"] + blue_leads = [u for u in users if u.role == "blue_lead"] + + tests = [] + for i in range(count): + technique = random.choice(techniques) + state = random.choice(TEST_STATES) + creator = random.choice(red_techs + blue_techs) + + test = Test( + technique_id=technique.id, + name=f"Demo Test {i + 1} — {technique.name[:40]}", + description=f"Automated demo test #{i + 1} for {technique.mitre_id}.", + platform=random.choice(PLATFORMS), + procedure_text=f"Step 1: Prepare environment.\nStep 2: Execute {technique.mitre_id} procedure.\nStep 3: Observe results.", + tool_used=random.choice(["powershell", "bash", "cmd", "python", "caldera", "metasploit"]), + execution_date=datetime.utcnow() - timedelta(days=random.randint(0, 60)), + created_by=creator.id, + result=random.choice(TEST_RESULTS) if state not in (TestState.draft, TestState.red_executing) else None, + state=state, + created_at=datetime.utcnow() - timedelta(days=random.randint(0, 90)), + ) + + # Populate team fields based on state + if state in (TestState.blue_evaluating, TestState.in_review, TestState.validated, TestState.rejected): + test.red_summary = f"Attack executed successfully using {test.tool_used}." + test.attack_success = random.choice([True, True, True, False]) + + if state in (TestState.in_review, TestState.validated, TestState.rejected): + test.blue_summary = "Detection observed in SIEM. Alert fired." + test.detection_result = random.choice(TEST_RESULTS) + + if state == TestState.validated: + rv = random.choice(red_leads) + bv = random.choice(blue_leads) + test.red_validated_by = rv.id + test.red_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 10)) + test.red_validation_status = "approved" + test.blue_validated_by = bv.id + test.blue_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 10)) + test.blue_validation_status = "approved" + + if state == TestState.rejected: + rejector = random.choice(red_leads + blue_leads) + if rejector.role == "red_lead": + test.red_validated_by = rejector.id + test.red_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 5)) + test.red_validation_status = "rejected" + test.red_validation_notes = "Insufficient evidence of attack success." + else: + test.blue_validated_by = rejector.id + test.blue_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 5)) + test.blue_validation_status = "rejected" + test.blue_validation_notes = "Detection evidence not conclusive." + + db.add(test) + tests.append(test) + + db.flush() + logger.info("Created %d demo tests.", len(tests)) + return tests + + +def _seed_evidences(db, tests: list[Test], users: list[User], count: int = 50) -> list[Evidence]: + """Create *count* dummy evidence records.""" + if not tests: + return [] + + # Pick tests that are past draft state + eligible = [t for t in tests if t.state != TestState.draft] + if not eligible: + eligible = tests + + evidences = [] + red_blue = [u for u in users if u.role in ("red_tech", "blue_tech")] + + for i in range(count): + test = random.choice(eligible) + uploader = random.choice(red_blue) + team = TeamSide.red if uploader.role == "red_tech" else TeamSide.blue + ext = random.choice(["png", "log", "pcap", "csv", "txt", "json"]) + fname = f"evidence_{i + 1}.{ext}" + + evidence = Evidence( + test_id=test.id, + file_name=fname, + file_path=f"{test.id}/{uuid.uuid4()}_{fname}", + sha256_hash=uuid.uuid4().hex + uuid.uuid4().hex, # dummy hash + uploaded_by=uploader.id, + uploaded_at=datetime.utcnow() - timedelta(days=random.randint(0, 30)), + team=team, + notes=f"Auto-generated demo evidence #{i + 1}.", + ) + db.add(evidence) + evidences.append(evidence) + + db.flush() + logger.info("Created %d demo evidences.", len(evidences)) + return evidences + + +def _seed_audit_logs(db, users: list[User], count: int = 20) -> None: + """Create *count* varied audit log entries.""" + for i in range(count): + user = random.choice(users) + log = AuditLog( + user_id=user.id, + action=random.choice(AUDIT_ACTIONS), + entity_type=random.choice(["test", "technique", "user", "test_template"]), + entity_id=str(uuid.uuid4()), + timestamp=datetime.utcnow() - timedelta(days=random.randint(0, 60)), + details={"demo": True, "index": i}, + ) + db.add(log) + + db.flush() + logger.info("Created %d demo audit logs.", count) + + +def _seed_notifications(db, users: list[User], count: int = 30) -> None: + """Create *count* notifications spread across demo users.""" + for i in range(count): + user = random.choice(users) + ntype = random.choice(NOTIFICATION_TYPES) + notif = Notification( + user_id=user.id, + type=ntype, + title=f"Demo notification: {ntype.replace('_', ' ').title()} #{i + 1}", + message=f"This is an auto-generated demo notification ({ntype}).", + entity_type="test", + entity_id=uuid.uuid4(), + read=random.choice([True, False]), + created_at=datetime.utcnow() - timedelta(days=random.randint(0, 30)), + ) + db.add(notif) + + db.flush() + logger.info("Created %d demo notifications.", count) + + +def _seed_templates(db, techniques: list[Technique], count: int = 10) -> None: + """Create *count* manual demo templates.""" + if not techniques: + return + + for i, name in enumerate(TEMPLATE_NAMES[:count]): + technique = techniques[i % len(techniques)] + template = TestTemplate( + mitre_technique_id=technique.mitre_id, + name=name, + description=f"Demo template: {name}. Targets {technique.mitre_id} ({technique.name}).", + source="demo", + source_url=None, + attack_procedure=f"1. Set up environment for {technique.mitre_id}.\n2. Execute the procedure.\n3. Record observations.", + expected_detection=f"SIEM should alert on {technique.mitre_id} indicators.", + platform=random.choice(PLATFORMS), + tool_suggested=random.choice(["powershell", "cmd", "bash", "python"]), + severity=random.choice(["low", "medium", "high", "critical"]), + is_active=True, + ) + db.add(template) + + db.flush() + logger.info("Created %d demo templates.", count) + + +# --------------------------------------------------------------------------- +# Main entry point +# --------------------------------------------------------------------------- + + +def seed_demo() -> dict: + """Generate all demo data. Returns a summary dict.""" + db = SessionLocal() + try: + logger.info("=== Starting V3 demo seed ===") + + # Step 0: cleanup previous run + _cleanup_demo_data(db) + + # Step 1: users + users = _seed_users(db) + + # Step 2: technique statuses + techniques = _seed_technique_statuses(db, count=50) + + # Step 3: tests + tests = _seed_tests(db, users, techniques, count=100) + + # Step 4: evidences + evidences = _seed_evidences(db, tests, users, count=50) + + # Step 5: audit logs + _seed_audit_logs(db, users, count=20) + + # Step 6: notifications + _seed_notifications(db, users, count=30) + + # Step 7: templates + _seed_templates(db, techniques, count=10) + + db.commit() + + summary = { + "users": len(users), + "techniques_updated": len(techniques), + "tests": len(tests), + "evidences": len(evidences), + "audit_logs": 20, + "notifications": 30, + "templates": 10, + } + logger.info("=== Demo seed complete: %s ===", summary) + return summary + + except Exception: + db.rollback() + raise + finally: + db.close() + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", + ) + result = seed_demo() + print(f"\nSeed complete: {result}")