"""Scheduled background jobs. Registers periodic tasks on an APScheduler ``BackgroundScheduler``: * **MITRE sync** — every 24 hours (see :func:`sync_mitre`) * **Intel scan** — every 7 days (see :func:`scan_intel`) Each job manages its own database session (created on entry, closed in ``finally``) so it is fully independent from FastAPI's request-scoped sessions. """ import logging from datetime import datetime, timedelta, timezone from apscheduler.schedulers.background import BackgroundScheduler from app.database import SessionLocal from app.services.mitre_sync_service import sync_mitre from app.services.intel_service import scan_intel from app.services.notification_service import cleanup_old_notifications from app.services.snapshot_service import create_snapshot, cleanup_old_snapshots from app.services.campaign_scheduler_service import check_and_run_recurring_campaigns from app.jobs.jira_sync_job import sync_all_jira_links from app.services.osint_enrichment_service import enrich_all_techniques from app.services.stale_detection_service import detect_stale_coverage from app.jobs.retention_job import run_retention_job logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Module-level scheduler instance # --------------------------------------------------------------------------- scheduler = BackgroundScheduler() # --------------------------------------------------------------------------- # Job functions # --------------------------------------------------------------------------- def _run_mitre_sync() -> None: """Execute a MITRE sync inside its own DB session.""" from app.services.webhook_service import dispatch_webhook logger.info("Scheduled MITRE sync job starting...") db = SessionLocal() try: summary = sync_mitre(db) logger.info("Scheduled MITRE sync job finished — %s", summary) dispatch_webhook("mitre.synced", {"created": summary.get("created", 0), "updated": summary.get("updated", 0)}) except Exception: logger.exception("Scheduled MITRE sync job failed") finally: db.close() def _run_notification_cleanup() -> None: """Clean up old read notifications.""" logger.info("Scheduled notification cleanup job starting...") db = SessionLocal() try: deleted = cleanup_old_notifications(db, days=90) logger.info("Notification cleanup finished — deleted %d old notifications", deleted) except Exception: logger.exception("Notification cleanup job failed") finally: db.close() def _run_weekly_snapshot() -> None: """Create a weekly coverage snapshot and clean up old ones.""" logger.info("Scheduled weekly snapshot job starting...") db = SessionLocal() try: snapshot = create_snapshot(db, name="Auto-weekly") logger.info( "Weekly snapshot created — score %.1f, %d techniques", snapshot.organization_score, snapshot.total_techniques, ) deleted = cleanup_old_snapshots(db, keep_last=52) if deleted: logger.info("Cleaned up %d old snapshots", deleted) except Exception: logger.exception("Weekly snapshot job failed") finally: db.close() def _run_recurring_campaigns() -> None: """Check and run any due recurring campaigns.""" logger.info("Scheduled recurring campaigns check starting...") db = SessionLocal() try: spawned = check_and_run_recurring_campaigns(db) logger.info("Recurring campaigns check finished — spawned %d campaigns", spawned) except Exception: logger.exception("Recurring campaigns check failed") finally: db.close() def _run_scheduled_campaign_activation() -> None: """Auto-activate campaigns whose start_date has arrived. Finds all campaigns in 'draft' state with a start_date <= now, activates them, creates Jira tickets, and notifies the red_tech team. Runs every hour so campaigns activate within ~1 hour of their scheduled time. """ logger.info("Scheduled campaign auto-activation check starting...") db = SessionLocal() try: from datetime import datetime as _dt from app.models.campaign import Campaign from app.models.user import User from app.services.campaign_crud_service import activate_campaign as _activate from app.services.notification_service import notify_role from app.services.audit_service import log_action now = _dt.utcnow() due_campaigns = ( db.query(Campaign) .filter( Campaign.status == "draft", Campaign.start_date != None, # noqa: E711 Campaign.start_date <= now, ) .all() ) activated = 0 for campaign in due_campaigns: try: _activate(db, str(campaign.id)) notify_role( db, role="red_tech", type="campaign_activated", title="Campaign auto-activated", message=f'Campaign "{campaign.name}" has been automatically activated on its scheduled start date.', entity_type="campaign", entity_id=campaign.id, ) log_action( db, user_id=None, action="auto_activate_campaign", entity_type="campaign", entity_id=campaign.id, details={"name": campaign.name, "start_date": str(campaign.start_date)}, ) # Create Jira tickets non-fatally try: from app.services.jira_service import ( auto_create_campaign_issue, auto_create_test_issue, get_campaign_jira_key, get_test_jira_key, ) # Use first admin user as actor for Jira auth admin_user = db.query(User).filter(User.role == "admin").first() if admin_user: db.refresh(campaign) campaign_jira_key = get_campaign_jira_key(db, str(campaign.id)) if not campaign_jira_key: campaign_jira_key = auto_create_campaign_issue(db, campaign, admin_user) if campaign_jira_key: for ct in campaign.campaign_tests: if ct.test and not get_test_jira_key(db, ct.test.id): auto_create_test_issue( db, ct.test, admin_user, parent_ticket_override=campaign_jira_key, campaign_start_date=campaign.start_date, ) except Exception: logger.exception("Jira auto-create failed for auto-activated campaign %s", campaign.id) db.commit() activated += 1 logger.info("Auto-activated campaign %s (%s)", campaign.id, campaign.name) except Exception: logger.exception("Failed to auto-activate campaign %s", campaign.id) db.rollback() logger.info("Campaign auto-activation check finished — activated %d campaigns", activated) except Exception: logger.exception("Campaign auto-activation job failed") finally: db.close() def _run_intel_scan() -> None: """Execute an intel scan inside its own DB session.""" logger.info("Scheduled intel scan job starting...") db = SessionLocal() try: summary = scan_intel(db) logger.info("Scheduled intel scan job finished — %s", summary) except Exception: logger.exception("Scheduled intel scan job failed") finally: db.close() def _run_evaluation_round_check() -> None: """Weekly job: check if a new ATT&CK Evaluation round is available. If a new round is found it is imported automatically and an admin notification is created so the team knows new baseline data is available. """ logger.info("ATT&CK Evaluations new-round check starting...") db = SessionLocal() try: from app.services.attck_evaluations_service import check_for_new_round, import_evaluation_round from app.models.user import User as UserModel result = check_for_new_round(db) if result.get("error"): logger.warning("ATT&CK Evaluations check failed: %s", result["error"]) return if not result.get("new_round_available"): logger.info( "ATT&CK Evaluations check — latest round '%s' already imported", result.get("latest_round", {}).get("display_name", "?"), ) return latest = result["latest_round"] logger.info( "New ATT&CK Evaluation round detected: %s (round %d) — starting auto-import", latest["display_name"], latest["eval_round"], ) # Use the first admin user as the importer (system action) admin = db.query(UserModel).filter(UserModel.role == "admin").first() if not admin: logger.warning("ATT&CK Evaluations auto-import: no admin user found — skipping") return summary = import_evaluation_round( db, latest["name"], latest["display_name"], latest["eval_round"], admin, ) logger.info( "ATT&CK Evaluations auto-import complete — round %d (%s): %d tests created", latest["eval_round"], latest["display_name"], summary["created"], ) # Notify all admins try: from app.services.notification_service import create_notification admins = db.query(UserModel).filter(UserModel.role == "admin").all() for adm in admins: create_notification( db, user_id=adm.id, title="New ATT&CK Evaluation round imported", message=( f"Round {latest['eval_round']} — {latest['display_name']} — " f"has been automatically imported. " f"{summary['created']} tests created in In Review state. " f"Blue Leads must validate each result before it counts as coverage." ), notification_type="eval_import", entity_type="evaluation", entity_id=None, ) db.commit() except Exception: logger.warning("Failed to send eval import notifications", exc_info=True) except Exception: logger.exception("ATT&CK Evaluations round check job failed") finally: db.close() def _run_osint_enrichment() -> None: """Execute weekly OSINT enrichment inside its own DB session.""" logger.info("Scheduled OSINT enrichment job starting...") db = SessionLocal() try: total = enrich_all_techniques(db) logger.info("OSINT enrichment finished — %d new items", total) except Exception: logger.exception("OSINT enrichment job failed") finally: db.close() _FREQUENCY_INTERVALS: dict[str, timedelta] = { "daily": timedelta(days=1), "weekly": timedelta(weeks=1), "monthly": timedelta(days=30), } def _run_data_sources_sync() -> None: """Check all enabled data sources and sync those that are overdue.""" logger.info("Scheduled data sources sync check starting...") db = SessionLocal() try: from app.models.data_source import DataSource from app.services.data_source_service import sync_source now = datetime.now(timezone.utc) sources = ( db.query(DataSource) .filter(DataSource.is_enabled == True) # noqa: E712 .all() ) synced = 0 for ds in sources: freq = ds.sync_frequency if not freq or freq == "manual": continue interval = _FREQUENCY_INTERVALS.get(freq) if interval is None: continue last = ds.last_sync_at if last is None: # Never synced — run it now overdue = True else: # Make last timezone-aware if needed if last.tzinfo is None: last = last.replace(tzinfo=timezone.utc) overdue = now - last >= interval if overdue: logger.info( "Data source '%s' is overdue (freq=%s, last=%s) — syncing", ds.name, freq, last, ) try: sync_source(db, str(ds.id)) synced += 1 except Exception: logger.exception("Failed to sync data source '%s'", ds.name) logger.info("Data sources sync check finished — %d source(s) synced", synced) except Exception: logger.exception("Data sources sync check failed") finally: db.close() def _run_stale_detection() -> None: """Execute daily stale coverage detection inside its own DB session.""" logger.info("Scheduled stale coverage detection starting...") db = SessionLocal() try: count = detect_stale_coverage(db) logger.info("Stale detection finished — %d techniques flagged", count) except Exception: logger.exception("Stale coverage detection job failed") finally: db.close() def _run_decay_engine() -> None: """Execute the decay engine inside its own DB session.""" logger.info("Scheduled decay engine job starting...") db = SessionLocal() try: from app.services.decay_engine_service import run_decay_engine results = run_decay_engine(db) logger.info("Decay engine job finished — %s", results) except Exception: logger.exception("Decay engine job failed") finally: db.close() def _run_queue_generation() -> None: """Generate revalidation queue items for analysts — runs after decay engine.""" logger.info("Scheduled revalidation queue generation starting...") db = SessionLocal() try: from app.services.revalidation_queue_service import generate_queue_items results = generate_queue_items(db) logger.info("Queue generation finished — %s", results) except Exception: logger.exception("Queue generation job failed") finally: db.close() def _run_alert_evaluation() -> None: """Evaluate all enabled operational alert rules (hourly).""" logger.info("Scheduled alert evaluation job starting...") db = SessionLocal() try: from app.services.operational_alert_service import evaluate_all_rules result = evaluate_all_rules(db) logger.info( "Alert evaluation finished — %d rules, %d alerts fired in %.3fs", result["rules_evaluated"], result["alerts_fired"], result["duration_seconds"], ) except Exception: logger.exception("Alert evaluation job failed") finally: db.close() # --------------------------------------------------------------------------- # Scheduler bootstrap # --------------------------------------------------------------------------- def start_scheduler() -> None: """Register all periodic jobs and start the background scheduler. Jobs registered: * ``mitre_sync`` — every **24 hours** * ``intel_scan`` — every **7 days** Neither job fires immediately on startup. """ scheduler.add_job( _run_mitre_sync, trigger="interval", hours=24, id="mitre_sync", name="MITRE ATT&CK sync (every 24h)", replace_existing=True, ) scheduler.add_job( _run_intel_scan, trigger="interval", weeks=1, id="intel_scan", name="Intel scan (every 7d)", replace_existing=True, ) scheduler.add_job( _run_notification_cleanup, trigger="interval", hours=24, id="notification_cleanup", name="Notification cleanup (daily)", replace_existing=True, ) scheduler.add_job( _run_weekly_snapshot, trigger="cron", day_of_week="sun", hour=0, minute=0, id="weekly_snapshot", name="Weekly coverage snapshot (Sundays 00:00)", replace_existing=True, ) scheduler.add_job( _run_scheduled_campaign_activation, trigger="interval", hours=1, id="scheduled_campaign_activation", name="Auto-activate campaigns on start_date (hourly)", replace_existing=True, ) scheduler.add_job( _run_recurring_campaigns, trigger="interval", hours=24, id="recurring_campaigns", name="Recurring campaigns check (daily)", replace_existing=True, ) scheduler.add_job( sync_all_jira_links, trigger="interval", hours=1, id="jira_sync", name="Jira link sync (hourly)", replace_existing=True, ) scheduler.add_job( _run_osint_enrichment, trigger="interval", weeks=1, id="osint_enrichment", name="OSINT enrichment (weekly)", replace_existing=True, ) scheduler.add_job( _run_stale_detection, trigger="interval", hours=24, id="stale_detection", name="Stale coverage detection (daily)", replace_existing=True, ) scheduler.add_job( run_retention_job, trigger="interval", hours=24, id="retention_policies", name="Data retention policies (daily)", replace_existing=True, ) scheduler.add_job( _run_data_sources_sync, trigger="interval", hours=6, id="data_sources_sync", name="Data sources auto-sync (every 6h)", replace_existing=True, ) scheduler.add_job( _run_decay_engine, trigger="cron", hour=2, minute=0, id="decay_engine", name="Detection decay engine (daily 02:00)", replace_existing=True, ) scheduler.add_job( _run_queue_generation, trigger="cron", hour=2, minute=30, id="queue_generation", name="Revalidation queue generation (daily 02:30)", replace_existing=True, ) scheduler.add_job( _run_alert_evaluation, trigger="interval", hours=1, id="alert_evaluation", name="Operational alert evaluation (hourly)", replace_existing=True, ) scheduler.add_job( _run_evaluation_round_check, trigger="cron", day_of_week="mon", hour=6, minute=0, id="attck_evaluation_check", name="ATT&CK Evaluations new-round check (Mondays 06:00)", replace_existing=True, ) scheduler.start() logger.info( "Background scheduler started — mitre_sync (24h), intel_scan (7d), " "notification_cleanup (24h), weekly_snapshot (Sundays 00:00), " "recurring_campaigns (daily), jira_sync (1h), " "osint_enrichment (weekly), stale_detection (daily), " "retention_policies (daily), data_sources_sync (6h), " "alert_evaluation (1h), attck_evaluation_check (Mondays 06:00)" )