refactor(scoring): persist weights in DB table, replace mutable Settings with scoring_config_service

This commit is contained in:
2026-02-19 17:46:02 +01:00
parent 93fde55389
commit 4e3787d091
6 changed files with 210 additions and 83 deletions

View File

@@ -19,6 +19,7 @@ from app.models.coverage_snapshot import CoverageSnapshot, SnapshotTechniqueStat
from app.models.jira_link import JiraLink, JiraLinkEntityType, JiraSyncDirection
from app.models.worklog import Worklog
from app.models.osint_item import OsintItem
from app.models.scoring_config import ScoringConfig
from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide
__all__ = [
@@ -31,6 +32,6 @@ __all__ = [
"ComplianceFramework", "ComplianceControl", "ComplianceControlMapping",
"CoverageSnapshot", "SnapshotTechniqueState",
"JiraLink", "JiraLinkEntityType", "JiraSyncDirection",
"Worklog", "OsintItem",
"Worklog", "OsintItem", "ScoringConfig",
"TechniqueStatus", "TestState", "TestResult", "TeamSide",
]

View File

@@ -0,0 +1,24 @@
"""ScoringConfig — single-row table for persisted scoring weights.
Replaces the mutable-settings approach where PATCH /scores/config
mutated the in-process ``Settings`` object (lost on restart).
"""
import uuid
from sqlalchemy import Column, Float, DateTime, func
from sqlalchemy.dialects.postgresql import UUID
from app.database import Base
class ScoringConfig(Base):
__tablename__ = "scoring_config"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
weight_tests = Column(Float, nullable=False, default=40.0)
weight_detection_rules = Column(Float, nullable=False, default=20.0)
weight_d3fend = Column(Float, nullable=False, default=15.0)
weight_freshness = Column(Float, nullable=False, default=15.0)
weight_platform_diversity = Column(Float, nullable=False, default=10.0)
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

View File

@@ -14,7 +14,6 @@ from app.dependencies.auth import get_current_user, require_role
from app.models.user import User
from app.models.technique import Technique
from app.models.threat_actor import ThreatActor
from app.config import settings
from app.services.scoring_service import (
calculate_technique_score,
calculate_tactic_score,
@@ -22,6 +21,10 @@ from app.services.scoring_service import (
calculate_organization_score,
get_score_history,
)
from app.services.scoring_config_service import (
get_weights_dict,
update_scoring_weights,
)
router = APIRouter(prefix="/scores", tags=["scores"])
@@ -117,79 +120,45 @@ def score_history(
@router.get("/config")
def get_scoring_config(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Get current scoring weights (admin only)."""
return {
"weights": {
"tests": settings.SCORING_WEIGHT_TESTS,
"detection_rules": settings.SCORING_WEIGHT_DETECTION_RULES,
"d3fend": settings.SCORING_WEIGHT_D3FEND,
"freshness": settings.SCORING_WEIGHT_FRESHNESS,
"platform_diversity": settings.SCORING_WEIGHT_PLATFORM_DIVERSITY,
},
"total": (
settings.SCORING_WEIGHT_TESTS
+ settings.SCORING_WEIGHT_DETECTION_RULES
+ settings.SCORING_WEIGHT_D3FEND
+ settings.SCORING_WEIGHT_FRESHNESS
+ settings.SCORING_WEIGHT_PLATFORM_DIVERSITY
),
}
return get_weights_dict(db)
# ── PATCH /scores/config ─────────────────────────────────────────────
class ScoringConfigUpdate(BaseModel):
tests: Optional[int] = None
detection_rules: Optional[int] = None
d3fend: Optional[int] = None
freshness: Optional[int] = None
platform_diversity: Optional[int] = None
tests: Optional[float] = None
detection_rules: Optional[float] = None
d3fend: Optional[float] = None
freshness: Optional[float] = None
platform_diversity: Optional[float] = None
@router.patch("/config")
def update_scoring_config(
payload: ScoringConfigUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Update scoring weights (admin only).
Note: Since we're using Opcion A (env vars / Settings), changes
are applied at runtime but won't persist across restarts unless
the .env file is also updated. For production, consider migrating
to Option B (database table).
Weights are persisted in the database and survive restarts.
Validation enforces that all weights are non-negative and sum to 100.
"""
if payload.tests is not None:
settings.SCORING_WEIGHT_TESTS = payload.tests
if payload.detection_rules is not None:
settings.SCORING_WEIGHT_DETECTION_RULES = payload.detection_rules
if payload.d3fend is not None:
settings.SCORING_WEIGHT_D3FEND = payload.d3fend
if payload.freshness is not None:
settings.SCORING_WEIGHT_FRESHNESS = payload.freshness
if payload.platform_diversity is not None:
settings.SCORING_WEIGHT_PLATFORM_DIVERSITY = payload.platform_diversity
result = update_scoring_weights(
db,
tests=payload.tests,
detection_rules=payload.detection_rules,
d3fend=payload.d3fend,
freshness=payload.freshness,
platform_diversity=payload.platform_diversity,
)
# Weights changed — bust the score cache
from app.services.score_cache import invalidate
invalidate()
return {
"message": "Scoring config updated",
"weights": {
"tests": settings.SCORING_WEIGHT_TESTS,
"detection_rules": settings.SCORING_WEIGHT_DETECTION_RULES,
"d3fend": settings.SCORING_WEIGHT_D3FEND,
"freshness": settings.SCORING_WEIGHT_FRESHNESS,
"platform_diversity": settings.SCORING_WEIGHT_PLATFORM_DIVERSITY,
},
"total": (
settings.SCORING_WEIGHT_TESTS
+ settings.SCORING_WEIGHT_DETECTION_RULES
+ settings.SCORING_WEIGHT_D3FEND
+ settings.SCORING_WEIGHT_FRESHNESS
+ settings.SCORING_WEIGHT_PLATFORM_DIVERSITY
),
}
return {"message": "Scoring config updated", **result}

View File

@@ -0,0 +1,107 @@
"""Scoring configuration persistence service.
Reads and writes scoring weights from the ``scoring_config`` table.
Falls back to environment-variable defaults (from ``Settings``) when
no row has been persisted yet.
This module is framework-agnostic: no FastAPI imports.
"""
from __future__ import annotations
from typing import Any
from sqlalchemy.orm import Session
from app.config import settings
from app.domain.value_objects.scoring_weights import ScoringWeights
from app.models.scoring_config import ScoringConfig
def get_scoring_weights(db: Session) -> ScoringWeights:
"""Return the active scoring weights.
Reads the single ``scoring_config`` row. If the table is empty
(first run or migration just applied), falls back to the values
from the environment / ``Settings``.
"""
row = db.query(ScoringConfig).first()
if row is not None:
return ScoringWeights(
tests=row.weight_tests,
detection_rules=row.weight_detection_rules,
d3fend=row.weight_d3fend,
freshness=row.weight_freshness,
platform_diversity=row.weight_platform_diversity,
)
return ScoringWeights(
tests=float(settings.SCORING_WEIGHT_TESTS),
detection_rules=float(settings.SCORING_WEIGHT_DETECTION_RULES),
d3fend=float(settings.SCORING_WEIGHT_D3FEND),
freshness=float(settings.SCORING_WEIGHT_FRESHNESS),
platform_diversity=float(settings.SCORING_WEIGHT_PLATFORM_DIVERSITY),
)
def update_scoring_weights(
db: Session,
*,
tests: float | None = None,
detection_rules: float | None = None,
d3fend: float | None = None,
freshness: float | None = None,
platform_diversity: float | None = None,
) -> dict[str, Any]:
"""Upsert scoring weights into the database.
Only provided fields are overwritten; ``None`` values keep the
current (or default) value. Validates via ``ScoringWeights``
before persisting.
Returns a dict with ``weights`` and ``total``.
"""
current = get_scoring_weights(db)
new = ScoringWeights(
tests=tests if tests is not None else current.tests,
detection_rules=detection_rules if detection_rules is not None else current.detection_rules,
d3fend=d3fend if d3fend is not None else current.d3fend,
freshness=freshness if freshness is not None else current.freshness,
platform_diversity=platform_diversity if platform_diversity is not None else current.platform_diversity,
)
row = db.query(ScoringConfig).first()
if row is None:
row = ScoringConfig()
db.add(row)
row.weight_tests = new.tests
row.weight_detection_rules = new.detection_rules
row.weight_d3fend = new.d3fend
row.weight_freshness = new.freshness
row.weight_platform_diversity = new.platform_diversity
db.commit()
db.refresh(row)
return _weights_dict(new)
def get_weights_dict(db: Session) -> dict[str, Any]:
"""Return current weights as a serialisable dict."""
return _weights_dict(get_scoring_weights(db))
def _weights_dict(w: ScoringWeights) -> dict[str, Any]:
weights = {
"tests": w.tests,
"detection_rules": w.detection_rules,
"d3fend": w.d3fend,
"freshness": w.freshness,
"platform_diversity": w.platform_diversity,
}
return {
"weights": weights,
"total": sum(weights.values()),
}

View File

@@ -1,7 +1,8 @@
"""Scoring service — granular 0-100 scoring for techniques, tactics, actors, and org.
Uses configurable weights from Settings to compute coverage scores with
detailed breakdowns.
Reads configurable weights from the ``scoring_config`` table (falling
back to env-var defaults) to compute coverage scores with detailed
breakdowns.
Bulk helpers (``bulk_technique_scores``) pre-fetch all scoring data in a
fixed number of aggregated queries so that organisation-wide calculations
@@ -14,7 +15,6 @@ from typing import Optional
from sqlalchemy import case, func
from sqlalchemy.orm import Session
from app.config import settings
from app.models.technique import Technique
from app.models.test import Test
from app.models.detection_rule import DetectionRule
@@ -22,20 +22,12 @@ from app.models.test_detection_result import TestDetectionResult
from app.models.defensive_technique import DefensiveTechniqueMapping
from app.models.threat_actor import ThreatActor, ThreatActorTechnique
from app.models.enums import TestState, TestResult
from app.services.scoring_config_service import get_scoring_weights
# ── Bulk scoring helpers (5 queries for ALL techniques) ───────────────
def _build_empty_stats():
return {
"validated": 0,
"detected": 0,
"platforms": set(),
"latest_validated_at": None,
}
def bulk_technique_scores(db: Session) -> dict:
"""Pre-fetch all scoring data and compute per-technique scores in memory.
@@ -48,11 +40,12 @@ def bulk_technique_scores(db: Session) -> dict:
Returns ``{technique_id: {"total_score": float, "breakdown": dict}}``.
"""
w_tests = settings.SCORING_WEIGHT_TESTS
w_detection = settings.SCORING_WEIGHT_DETECTION_RULES
w_d3fend = settings.SCORING_WEIGHT_D3FEND
w_freshness = settings.SCORING_WEIGHT_FRESHNESS
w_diversity = settings.SCORING_WEIGHT_PLATFORM_DIVERSITY
w = get_scoring_weights(db)
w_tests = w.tests
w_detection = w.detection_rules
w_d3fend = w.d3fend
w_freshness = w.freshness
w_diversity = w.platform_diversity
# Q1: test stats grouped by technique_id
test_rows = (
@@ -242,18 +235,14 @@ def bulk_technique_scores(db: Session) -> dict:
def calculate_technique_score(technique: Technique, db: Session) -> dict:
"""Calculate a 0-100 score for a technique with detailed breakdown.
Weights (configurable via settings):
- tests_validated: weight from SCORING_WEIGHT_TESTS
- detection_rules: weight from SCORING_WEIGHT_DETECTION_RULES
- d3fend_coverage: weight from SCORING_WEIGHT_D3FEND
- freshness: weight from SCORING_WEIGHT_FRESHNESS
- platform_diversity: weight from SCORING_WEIGHT_PLATFORM_DIVERSITY
Weights are read from the ``scoring_config`` table (or env defaults).
"""
w_tests = settings.SCORING_WEIGHT_TESTS
w_detection = settings.SCORING_WEIGHT_DETECTION_RULES
w_d3fend = settings.SCORING_WEIGHT_D3FEND
w_freshness = settings.SCORING_WEIGHT_FRESHNESS
w_diversity = settings.SCORING_WEIGHT_PLATFORM_DIVERSITY
w = get_scoring_weights(db)
w_tests = w.tests
w_detection = w.detection_rules
w_d3fend = w.d3fend
w_freshness = w.freshness
w_diversity = w.platform_diversity
breakdown = {}