432 lines
14 KiB
Python
432 lines
14 KiB
Python
"""
|
|
Seed script — generates a realistic volume of demo data for V3 validation.
|
|
|
|
Usage:
|
|
python -m app.seed_demo
|
|
|
|
**Prerequisite**: The MITRE sync must have been completed first so that
|
|
real techniques exist in the database.
|
|
|
|
Running twice is safe — the script detects existing demo data (by username
|
|
prefix ``demo_``) and deletes it before re-creating, ensuring idempotency.
|
|
"""
|
|
|
|
import logging
|
|
import random
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.auth import hash_password
|
|
from app.database import SessionLocal
|
|
from app.models.user import User
|
|
from app.models.technique import Technique
|
|
from app.models.test import Test
|
|
from app.models.test_template import TestTemplate
|
|
from app.models.evidence import Evidence
|
|
from app.models.audit import AuditLog
|
|
from app.models.notification import Notification
|
|
from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DEMO_PREFIX = "demo_"
|
|
|
|
ROLES = ["red_tech", "blue_tech", "red_lead", "blue_lead", "admin"]
|
|
|
|
TECHNIQUE_STATUSES = [
|
|
TechniqueStatus.validated,
|
|
TechniqueStatus.partial,
|
|
TechniqueStatus.not_covered,
|
|
TechniqueStatus.in_progress,
|
|
TechniqueStatus.not_evaluated,
|
|
]
|
|
|
|
TEST_STATES = [
|
|
TestState.draft,
|
|
TestState.red_executing,
|
|
TestState.blue_evaluating,
|
|
TestState.in_review,
|
|
TestState.validated,
|
|
TestState.rejected,
|
|
]
|
|
|
|
TEST_RESULTS = [
|
|
TestResult.detected,
|
|
TestResult.not_detected,
|
|
TestResult.partially_detected,
|
|
]
|
|
|
|
NOTIFICATION_TYPES = [
|
|
"test_assigned",
|
|
"validation_needed",
|
|
"test_rejected",
|
|
"test_validated",
|
|
"test_state_changed",
|
|
]
|
|
|
|
AUDIT_ACTIONS = [
|
|
"create_test",
|
|
"update_test",
|
|
"validate_technique",
|
|
"upload_evidence",
|
|
"create_user",
|
|
"import_atomic_red_team",
|
|
"sync_mitre",
|
|
"login",
|
|
"reject_test",
|
|
"approve_test",
|
|
]
|
|
|
|
PLATFORMS = ["windows", "linux", "macos"]
|
|
|
|
TEMPLATE_NAMES = [
|
|
"Manual Credential Dumping Test",
|
|
"Custom Phishing Payload Delivery",
|
|
"Lateral Movement via RDP",
|
|
"Persistence via Registry Run Keys",
|
|
"Data Exfiltration over DNS",
|
|
"Process Injection via DLL",
|
|
"Privilege Escalation with Token Impersonation",
|
|
"Custom C2 Beacon Communication Test",
|
|
"Kerberoasting Attack Procedure",
|
|
"Living Off The Land Binaries Test",
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cleanup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _cleanup_demo_data(db) -> None:
|
|
"""Remove all previously seeded demo data."""
|
|
# Delete in order to respect FK constraints
|
|
demo_users = db.query(User).filter(User.username.like(f"{DEMO_PREFIX}%")).all()
|
|
demo_user_ids = [u.id for u in demo_users]
|
|
|
|
if demo_user_ids:
|
|
# Notifications for demo users
|
|
db.query(Notification).filter(
|
|
Notification.user_id.in_(demo_user_ids)
|
|
).delete(synchronize_session=False)
|
|
|
|
# Audit logs for demo users
|
|
db.query(AuditLog).filter(
|
|
AuditLog.user_id.in_(demo_user_ids)
|
|
).delete(synchronize_session=False)
|
|
|
|
# Evidences for tests created by demo users
|
|
demo_tests = db.query(Test).filter(
|
|
Test.created_by.in_(demo_user_ids)
|
|
).all()
|
|
demo_test_ids = [t.id for t in demo_tests]
|
|
|
|
if demo_test_ids:
|
|
db.query(Evidence).filter(
|
|
Evidence.test_id.in_(demo_test_ids)
|
|
).delete(synchronize_session=False)
|
|
|
|
db.query(Test).filter(
|
|
Test.id.in_(demo_test_ids)
|
|
).delete(synchronize_session=False)
|
|
|
|
# Delete demo templates (by source = "demo")
|
|
db.query(TestTemplate).filter(
|
|
TestTemplate.source == "demo"
|
|
).delete(synchronize_session=False)
|
|
|
|
# Delete demo users
|
|
if demo_user_ids:
|
|
db.query(User).filter(
|
|
User.id.in_(demo_user_ids)
|
|
).delete(synchronize_session=False)
|
|
|
|
db.commit()
|
|
logger.info("Cleaned up existing demo data.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Seeders
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _seed_users(db) -> list[User]:
|
|
"""Create 5 users per role (25 total)."""
|
|
users = []
|
|
for role in ROLES:
|
|
for i in range(1, 6):
|
|
user = User(
|
|
username=f"{DEMO_PREFIX}{role}_{i}",
|
|
email=f"{DEMO_PREFIX}{role}_{i}@aegis-demo.local",
|
|
hashed_password=hash_password("demo123"),
|
|
role=role,
|
|
is_active=True,
|
|
)
|
|
db.add(user)
|
|
users.append(user)
|
|
db.flush()
|
|
logger.info("Created %d demo users.", len(users))
|
|
return users
|
|
|
|
|
|
def _seed_technique_statuses(db, count: int = 50) -> list[Technique]:
|
|
"""Set varied statuses on up to *count* techniques."""
|
|
techniques = db.query(Technique).limit(count).all()
|
|
if not techniques:
|
|
logger.warning("No techniques found — run MITRE sync first!")
|
|
return []
|
|
|
|
for tech in techniques:
|
|
tech.status_global = random.choice(TECHNIQUE_STATUSES)
|
|
if tech.status_global == TechniqueStatus.validated:
|
|
tech.last_review_date = datetime.utcnow() - timedelta(
|
|
days=random.randint(1, 30)
|
|
)
|
|
|
|
db.flush()
|
|
logger.info("Updated status on %d techniques.", len(techniques))
|
|
return techniques
|
|
|
|
|
|
def _seed_tests(db, users: list[User], techniques: list[Technique], count: int = 100) -> list[Test]:
|
|
"""Create *count* tests in various pipeline states."""
|
|
if not techniques:
|
|
logger.warning("No techniques available — skipping test seeding.")
|
|
return []
|
|
|
|
red_techs = [u for u in users if u.role == "red_tech"]
|
|
blue_techs = [u for u in users if u.role == "blue_tech"]
|
|
red_leads = [u for u in users if u.role == "red_lead"]
|
|
blue_leads = [u for u in users if u.role == "blue_lead"]
|
|
|
|
tests = []
|
|
for i in range(count):
|
|
technique = random.choice(techniques)
|
|
state = random.choice(TEST_STATES)
|
|
creator = random.choice(red_techs + blue_techs)
|
|
|
|
test = Test(
|
|
technique_id=technique.id,
|
|
name=f"Demo Test {i + 1} — {technique.name[:40]}",
|
|
description=f"Automated demo test #{i + 1} for {technique.mitre_id}.",
|
|
platform=random.choice(PLATFORMS),
|
|
procedure_text=f"Step 1: Prepare environment.\nStep 2: Execute {technique.mitre_id} procedure.\nStep 3: Observe results.",
|
|
tool_used=random.choice(["powershell", "bash", "cmd", "python", "caldera", "metasploit"]),
|
|
execution_date=datetime.utcnow() - timedelta(days=random.randint(0, 60)),
|
|
created_by=creator.id,
|
|
result=random.choice(TEST_RESULTS) if state not in (TestState.draft, TestState.red_executing) else None,
|
|
state=state,
|
|
created_at=datetime.utcnow() - timedelta(days=random.randint(0, 90)),
|
|
)
|
|
|
|
# Populate team fields based on state
|
|
if state in (TestState.blue_evaluating, TestState.in_review, TestState.validated, TestState.rejected):
|
|
test.red_summary = f"Attack executed successfully using {test.tool_used}."
|
|
test.attack_success = random.choice([True, True, True, False])
|
|
|
|
if state in (TestState.in_review, TestState.validated, TestState.rejected):
|
|
test.blue_summary = "Detection observed in SIEM. Alert fired."
|
|
test.detection_result = random.choice(TEST_RESULTS)
|
|
|
|
if state == TestState.validated:
|
|
rv = random.choice(red_leads)
|
|
bv = random.choice(blue_leads)
|
|
test.red_validated_by = rv.id
|
|
test.red_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 10))
|
|
test.red_validation_status = "approved"
|
|
test.blue_validated_by = bv.id
|
|
test.blue_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 10))
|
|
test.blue_validation_status = "approved"
|
|
|
|
if state == TestState.rejected:
|
|
rejector = random.choice(red_leads + blue_leads)
|
|
if rejector.role == "red_lead":
|
|
test.red_validated_by = rejector.id
|
|
test.red_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 5))
|
|
test.red_validation_status = "rejected"
|
|
test.red_validation_notes = "Insufficient evidence of attack success."
|
|
else:
|
|
test.blue_validated_by = rejector.id
|
|
test.blue_validated_at = datetime.utcnow() - timedelta(days=random.randint(0, 5))
|
|
test.blue_validation_status = "rejected"
|
|
test.blue_validation_notes = "Detection evidence not conclusive."
|
|
|
|
db.add(test)
|
|
tests.append(test)
|
|
|
|
db.flush()
|
|
logger.info("Created %d demo tests.", len(tests))
|
|
return tests
|
|
|
|
|
|
def _seed_evidences(db, tests: list[Test], users: list[User], count: int = 50) -> list[Evidence]:
|
|
"""Create *count* dummy evidence records."""
|
|
if not tests:
|
|
return []
|
|
|
|
# Pick tests that are past draft state
|
|
eligible = [t for t in tests if t.state != TestState.draft]
|
|
if not eligible:
|
|
eligible = tests
|
|
|
|
evidences = []
|
|
red_blue = [u for u in users if u.role in ("red_tech", "blue_tech")]
|
|
|
|
for i in range(count):
|
|
test = random.choice(eligible)
|
|
uploader = random.choice(red_blue)
|
|
team = TeamSide.red if uploader.role == "red_tech" else TeamSide.blue
|
|
ext = random.choice(["png", "log", "pcap", "csv", "txt", "json"])
|
|
fname = f"evidence_{i + 1}.{ext}"
|
|
|
|
evidence = Evidence(
|
|
test_id=test.id,
|
|
file_name=fname,
|
|
file_path=f"{test.id}/{uuid.uuid4()}_{fname}",
|
|
sha256_hash=uuid.uuid4().hex + uuid.uuid4().hex, # dummy hash
|
|
uploaded_by=uploader.id,
|
|
uploaded_at=datetime.utcnow() - timedelta(days=random.randint(0, 30)),
|
|
team=team,
|
|
notes=f"Auto-generated demo evidence #{i + 1}.",
|
|
)
|
|
db.add(evidence)
|
|
evidences.append(evidence)
|
|
|
|
db.flush()
|
|
logger.info("Created %d demo evidences.", len(evidences))
|
|
return evidences
|
|
|
|
|
|
def _seed_audit_logs(db, users: list[User], count: int = 20) -> None:
|
|
"""Create *count* varied audit log entries."""
|
|
for i in range(count):
|
|
user = random.choice(users)
|
|
log = AuditLog(
|
|
user_id=user.id,
|
|
action=random.choice(AUDIT_ACTIONS),
|
|
entity_type=random.choice(["test", "technique", "user", "test_template"]),
|
|
entity_id=str(uuid.uuid4()),
|
|
timestamp=datetime.utcnow() - timedelta(days=random.randint(0, 60)),
|
|
details={"demo": True, "index": i},
|
|
)
|
|
db.add(log)
|
|
|
|
db.flush()
|
|
logger.info("Created %d demo audit logs.", count)
|
|
|
|
|
|
def _seed_notifications(db, users: list[User], count: int = 30) -> None:
|
|
"""Create *count* notifications spread across demo users."""
|
|
for i in range(count):
|
|
user = random.choice(users)
|
|
ntype = random.choice(NOTIFICATION_TYPES)
|
|
notif = Notification(
|
|
user_id=user.id,
|
|
type=ntype,
|
|
title=f"Demo notification: {ntype.replace('_', ' ').title()} #{i + 1}",
|
|
message=f"This is an auto-generated demo notification ({ntype}).",
|
|
entity_type="test",
|
|
entity_id=uuid.uuid4(),
|
|
read=random.choice([True, False]),
|
|
created_at=datetime.utcnow() - timedelta(days=random.randint(0, 30)),
|
|
)
|
|
db.add(notif)
|
|
|
|
db.flush()
|
|
logger.info("Created %d demo notifications.", count)
|
|
|
|
|
|
def _seed_templates(db, techniques: list[Technique], count: int = 10) -> None:
|
|
"""Create *count* manual demo templates."""
|
|
if not techniques:
|
|
return
|
|
|
|
for i, name in enumerate(TEMPLATE_NAMES[:count]):
|
|
technique = techniques[i % len(techniques)]
|
|
template = TestTemplate(
|
|
mitre_technique_id=technique.mitre_id,
|
|
name=name,
|
|
description=f"Demo template: {name}. Targets {technique.mitre_id} ({technique.name}).",
|
|
source="demo",
|
|
source_url=None,
|
|
attack_procedure=f"1. Set up environment for {technique.mitre_id}.\n2. Execute the procedure.\n3. Record observations.",
|
|
expected_detection=f"SIEM should alert on {technique.mitre_id} indicators.",
|
|
platform=random.choice(PLATFORMS),
|
|
tool_suggested=random.choice(["powershell", "cmd", "bash", "python"]),
|
|
severity=random.choice(["low", "medium", "high", "critical"]),
|
|
is_active=True,
|
|
)
|
|
db.add(template)
|
|
|
|
db.flush()
|
|
logger.info("Created %d demo templates.", count)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def seed_demo() -> dict:
|
|
"""Generate all demo data. Returns a summary dict."""
|
|
db = SessionLocal()
|
|
try:
|
|
logger.info("=== Starting V3 demo seed ===")
|
|
|
|
# Step 0: cleanup previous run
|
|
_cleanup_demo_data(db)
|
|
|
|
# Step 1: users
|
|
users = _seed_users(db)
|
|
|
|
# Step 2: technique statuses
|
|
techniques = _seed_technique_statuses(db, count=50)
|
|
|
|
# Step 3: tests
|
|
tests = _seed_tests(db, users, techniques, count=100)
|
|
|
|
# Step 4: evidences
|
|
evidences = _seed_evidences(db, tests, users, count=50)
|
|
|
|
# Step 5: audit logs
|
|
_seed_audit_logs(db, users, count=20)
|
|
|
|
# Step 6: notifications
|
|
_seed_notifications(db, users, count=30)
|
|
|
|
# Step 7: templates
|
|
_seed_templates(db, techniques, count=10)
|
|
|
|
db.commit()
|
|
|
|
summary = {
|
|
"users": len(users),
|
|
"techniques_updated": len(techniques),
|
|
"tests": len(tests),
|
|
"evidences": len(evidences),
|
|
"audit_logs": 20,
|
|
"notifications": 30,
|
|
"templates": 10,
|
|
}
|
|
logger.info("=== Demo seed complete: %s ===", summary)
|
|
return summary
|
|
|
|
except Exception:
|
|
db.rollback()
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)-8s %(name)s — %(message)s",
|
|
)
|
|
result = seed_demo()
|
|
print(f"\nSeed complete: {result}")
|