Files
Aegis/backend/app/services/score_cache.py
T
kitos ec26183e2e refactor(pep8): enforce full PEP8 compliance across backend Python codebase
- ruff.toml: select E/W/F/I/N rules, line-length=120, drop legacy ignores
- Auto-fix: sort 82 import blocks (isort), remove 29 unused imports,
  strip 6 trailing-whitespace blank lines in docstrings
- main.py: move setup_logging and settings imports to top (E402)
- errors.py: noqa N818 on DDD exception names (96 call sites, safe)
- intel_service.py: noqa N817 for universal ET alias
- atomic/elastic/sigma import services: move _MAX_UNCOMPRESSED_SIZE and
  _MAX_ENTRIES to module level (N806)
- compliance_import_service.py: move SAMPLE_CONTROLS / CIS_CONTROLS to
  module level; wrap long description strings (N806 + E501)
- snapshot_service.py: move STATUS_ORDER dict to module level (N806)
- sigma_import_service.py: remove dead dedup_key expression (F841)
- threat_actor_import_service.py: remove dead stix_to_actor expression (F841)
- data_source.py, seed_demo.py, campaign_scheduler_service.py,
  lolbas_import_service.py: wrap lines exceeding 120 chars (E501)
- d3fend_import_service.py: per-file E501 ignore (data file with long strings)

All 439 unit tests pass. ruff check app/ → All checks passed!

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-09 16:40:14 +02:00

85 lines
2.5 KiB
Python

"""In-memory TTL cache for expensive scoring and metrics calculations.
The cache is a simple dict with timestamps. It is invalidated when tests
are validated, scores change, or an explicit ``invalidate`` call is made.
Thread-safe: each worker process has its own dict, and the TTL ensures
stale data does not persist longer than ``CACHE_TTL`` seconds.
"""
import time
from typing import Any, Optional
CACHE_TTL = 300 # 5 minutes
_cache: dict[str, dict[str, Any]] = {}
def get(key: str) -> Optional[Any]:
"""Return cached value if present and not expired, else None."""
entry = _cache.get(key)
if entry is None:
return None
if time.time() - entry["ts"] > CACHE_TTL:
_cache.pop(key, None)
return None
return entry["data"]
def put(key: str, data: Any) -> None:
"""Store *data* under *key* with the current timestamp."""
_cache[key] = {"data": data, "ts": time.time()}
def invalidate(key: Optional[str] = None) -> None:
"""Remove one key or clear the whole cache."""
if key is None:
_cache.clear()
else:
_cache.pop(key, None)
# ── High-level helpers ────────────────────────────────────────────────
def get_organization_score_cached(db):
"""Cached wrapper around ``calculate_organization_score``."""
from app.services.scoring_service import calculate_organization_score
cached = get("org_score")
if cached is not None:
return cached
result = calculate_organization_score(db)
put("org_score", result)
return result
def get_operational_metrics_cached(db):
"""Cached wrapper around operational metrics (MTTD, MTTR, efficacy)."""
from app.services.operational_metrics_service import (
calculate_alert_fidelity,
calculate_coverage_velocity,
calculate_detection_efficacy,
calculate_mttd,
calculate_mttr,
calculate_rejection_rate,
calculate_validation_throughput,
)
cached = get("op_metrics")
if cached is not None:
return cached
result = {
"mttd": calculate_mttd(db),
"mttr": calculate_mttr(db),
"detection_efficacy": calculate_detection_efficacy(db),
"alert_fidelity": calculate_alert_fidelity(db),
"coverage_velocity": calculate_coverage_velocity(db),
"validation_throughput": calculate_validation_throughput(db),
"rejection_rate": calculate_rejection_rate(db),
}
put("op_metrics", result)
return result