Files
Aegis/backend/app/services/scoring_config_service.py

123 lines
3.9 KiB
Python

"""Scoring configuration persistence service."""
from __future__ import annotations
import uuid
from typing import Any
from sqlalchemy.orm import Session
from app.config import settings
from app.domain.value_objects.scoring_weights import ScoringWeights
from app.models.scoring_config import ScoringConfig
def _row_recency(row: ScoringConfig) -> float:
return float(getattr(row, "weight_recency", None) or getattr(row, "weight_freshness", 10.0))
def _row_severity(row: ScoringConfig) -> float:
return float(
getattr(row, "weight_severity", None)
or getattr(row, "weight_platform_diversity", 10.0)
)
def get_scoring_weights(db: Session) -> ScoringWeights:
"""Return the active scoring weights from the database or env defaults."""
row = db.query(ScoringConfig).first()
if row is not None:
return ScoringWeights(
tests=row.weight_tests,
detection_rules=row.weight_detection_rules,
d3fend=row.weight_d3fend,
recency=_row_recency(row),
severity=_row_severity(row),
)
return ScoringWeights(
tests=float(settings.SCORING_WEIGHT_TESTS),
detection_rules=float(settings.SCORING_WEIGHT_DETECTION_RULES),
d3fend=float(settings.SCORING_WEIGHT_D3FEND),
recency=float(
getattr(settings, "SCORING_WEIGHT_RECENCY", settings.SCORING_WEIGHT_FRESHNESS)
),
severity=float(
getattr(settings, "SCORING_WEIGHT_SEVERITY", settings.SCORING_WEIGHT_PLATFORM_DIVERSITY)
),
)
def update_scoring_weights(
db: Session,
*,
tests: float | None = None,
detection_rules: float | None = None,
d3fend: float | None = None,
recency: float | None = None,
severity: float | None = None,
freshness: float | None = None,
platform_diversity: float | None = None,
updated_by: uuid.UUID | None = None,
) -> dict[str, Any]:
"""Upsert scoring weights. Does not commit."""
if freshness is not None and recency is None:
recency = freshness
if platform_diversity is not None and severity is None:
severity = platform_diversity
current = get_scoring_weights(db)
new = ScoringWeights(
tests=tests if tests is not None else current.tests,
detection_rules=detection_rules if detection_rules is not None else current.detection_rules,
d3fend=d3fend if d3fend is not None else current.d3fend,
recency=recency if recency is not None else current.recency,
severity=severity if severity is not None else current.severity,
)
row = db.query(ScoringConfig).first()
if row is None:
row = ScoringConfig()
db.add(row)
row.weight_tests = new.tests
row.weight_detection_rules = new.detection_rules
row.weight_d3fend = new.d3fend
if hasattr(row, "weight_recency"):
row.weight_recency = new.recency
elif hasattr(row, "weight_freshness"):
row.weight_freshness = new.recency
if hasattr(row, "weight_severity"):
row.weight_severity = new.severity
elif hasattr(row, "weight_platform_diversity"):
row.weight_platform_diversity = new.severity
if updated_by is not None and hasattr(row, "updated_by"):
row.updated_by = updated_by
return _weights_dict(new)
def get_weights_dict(db: Session) -> dict[str, Any]:
"""Return current weights as a serialisable dict."""
return _weights_dict(get_scoring_weights(db))
def _weights_dict(w: ScoringWeights) -> dict[str, Any]:
weights = {
"tests": w.tests,
"detection_rules": w.detection_rules,
"d3fend": w.d3fend,
"recency": w.recency,
"severity": w.severity,
# Legacy keys for older clients
"freshness": w.recency,
"platform_diversity": w.severity,
}
return {
"weights": weights,
"total": sum(
[w.tests, w.detection_rules, w.d3fend, w.recency, w.severity]
),
}