Files
Aegis/backend/app/seed_data_sources.py

181 lines
6.2 KiB
Python

"""
Seed script — registers all known data sources in the data_sources table.
Usage:
python -m app.seed_data_sources
"""
import logging
from app.database import SessionLocal
from app.models.data_source import DataSource
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data source definitions
# ---------------------------------------------------------------------------
INITIAL_SOURCES = [
{
"name": "atomic_red_team",
"display_name": "Atomic Red Team",
"type": "attack_procedure",
"url": "https://github.com/redcanaryco/atomic-red-team",
"description": "Open-source library of atomic tests mapped to MITRE ATT&CK. "
"Each test is a small, self-contained procedure for validating "
"detection of a specific technique.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/redcanaryco/atomic-red-team/archive/refs/heads/master.zip",
"root_prefix": "atomic-red-team-master",
"atomics_dir": "atomics",
},
},
{
"name": "sigma",
"display_name": "SigmaHQ Rules",
"type": "detection_rule",
"url": "https://github.com/SigmaHQ/sigma",
"description": "Generic SIEM detection rules in YAML format. "
"3 000+ rules with MITRE ATT&CK mappings.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/SigmaHQ/sigma/archive/refs/heads/main.zip",
"root_prefix": "sigma-main",
"rules_dir": "rules",
},
},
{
"name": "lolbas",
"display_name": "LOLBAS (Windows)",
"type": "attack_procedure",
"url": "https://github.com/LOLBAS-Project/LOLBAS",
"description": "Living Off The Land Binaries, Scripts, and Libraries — "
"legitimate Windows binaries that can be abused for attacks.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/LOLBAS-Project/LOLBAS/archive/refs/heads/master.zip",
"root_prefix": "LOLBAS-master",
"yaml_dirs": ["yml/OSBinaries", "yml/OSLibraries", "yml/OSScripts"],
},
},
{
"name": "gtfobins",
"display_name": "GTFOBins (Linux)",
"type": "attack_procedure",
"url": "https://gtfobins.github.io/",
"description": "Unix/Linux binaries that can be exploited for file transfer, "
"shell escape, privilege escalation, and more.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/GTFOBins/GTFOBins.github.io/archive/refs/heads/master.zip",
"root_prefix": "GTFOBins.github.io-master",
"gtfobins_dir": "_gtfobins",
},
},
{
"name": "caldera",
"display_name": "MITRE CALDERA",
"type": "attack_procedure",
"url": "https://github.com/mitre/caldera",
"description": "Automated adversary emulation platform by MITRE. "
"400+ abilities (executable actions) mapped to ATT&CK.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/mitre/caldera/archive/refs/heads/master.zip",
"root_prefix": "caldera-master",
"abilities_dir": "data/abilities",
},
},
{
"name": "elastic_rules",
"display_name": "Elastic Detection Rules",
"type": "detection_rule",
"url": "https://github.com/elastic/detection-rules",
"description": "Open-source detection rules for Elastic SIEM. "
"1 000+ rules in KQL with MITRE ATT&CK mappings.",
"sync_frequency": "weekly",
"config": {
"zip_url": "https://github.com/elastic/detection-rules/archive/refs/heads/main.zip",
"root_prefix": "detection-rules-main",
"rules_dir": "rules",
},
},
{
"name": "d3fend",
"display_name": "MITRE D3FEND",
"type": "defensive_technique",
"url": "https://d3fend.mitre.org/",
"description": "MITRE framework of defensive countermeasures. "
"200+ defensive techniques mapped to ATT&CK.",
"sync_frequency": "monthly",
"config": {},
},
{
"name": "mitre_cti",
"display_name": "MITRE CTI (Groups & Software)",
"type": "threat_intel",
"url": "https://github.com/mitre/cti",
"description": "MITRE ATT&CK STIX 2.0 data — threat actor groups, "
"software, and campaigns with TTP mappings.",
"sync_frequency": "monthly",
"config": {
"zip_url": "https://github.com/mitre/cti/archive/refs/heads/master.zip",
"root_prefix": "cti-master",
"enterprise_dir": "enterprise-attack",
},
},
]
def seed_data_sources() -> dict:
"""Register all known data sources. Existing entries are skipped."""
db = SessionLocal()
try:
created = 0
skipped = 0
existing_names = {
row[0] for row in db.query(DataSource.name).all()
}
for src in INITIAL_SOURCES:
if src["name"] in existing_names:
skipped += 1
continue
ds = DataSource(
name=src["name"],
display_name=src["display_name"],
type=src["type"],
url=src.get("url"),
description=src.get("description"),
sync_frequency=src.get("sync_frequency", "manual"),
config=src.get("config"),
is_enabled=True,
)
db.add(ds)
created += 1
db.commit()
summary = {"created": created, "skipped": skipped}
logger.info("Data sources seed: %s", summary)
return summary
except Exception:
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
)
result = seed_data_sources()
print(f"\nData sources seed complete: {result}")