diff --git a/backend/alembic/versions/b027_add_scoring_config.py b/backend/alembic/versions/b027_add_scoring_config.py new file mode 100644 index 0000000..4458239 --- /dev/null +++ b/backend/alembic/versions/b027_add_scoring_config.py @@ -0,0 +1,37 @@ +"""add_scoring_config + +Single-row table to persist scoring weights in the database, +replacing the mutable in-process Settings approach. + +Revision ID: b027scorecfg +Revises: b026techidx +Create Date: 2026-02-19 10:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision: str = "b027scorecfg" +down_revision: Union[str, None] = "b026techidx" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "scoring_config", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("weight_tests", sa.Float(), nullable=False, server_default="40.0"), + sa.Column("weight_detection_rules", sa.Float(), nullable=False, server_default="20.0"), + sa.Column("weight_d3fend", sa.Float(), nullable=False, server_default="15.0"), + sa.Column("weight_freshness", sa.Float(), nullable=False, server_default="15.0"), + sa.Column("weight_platform_diversity", sa.Float(), nullable=False, server_default="10.0"), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + + +def downgrade() -> None: + op.drop_table("scoring_config") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 6634c9b..2780d6d 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -19,6 +19,7 @@ from app.models.coverage_snapshot import CoverageSnapshot, SnapshotTechniqueStat from app.models.jira_link import JiraLink, JiraLinkEntityType, JiraSyncDirection from app.models.worklog import Worklog from app.models.osint_item import OsintItem +from app.models.scoring_config import ScoringConfig from app.models.enums import TechniqueStatus, TestState, TestResult, TeamSide __all__ = [ @@ -31,6 +32,6 @@ __all__ = [ "ComplianceFramework", "ComplianceControl", "ComplianceControlMapping", "CoverageSnapshot", "SnapshotTechniqueState", "JiraLink", "JiraLinkEntityType", "JiraSyncDirection", - "Worklog", "OsintItem", + "Worklog", "OsintItem", "ScoringConfig", "TechniqueStatus", "TestState", "TestResult", "TeamSide", ] diff --git a/backend/app/models/scoring_config.py b/backend/app/models/scoring_config.py new file mode 100644 index 0000000..e6c1b78 --- /dev/null +++ b/backend/app/models/scoring_config.py @@ -0,0 +1,24 @@ +"""ScoringConfig — single-row table for persisted scoring weights. + +Replaces the mutable-settings approach where PATCH /scores/config +mutated the in-process ``Settings`` object (lost on restart). +""" + +import uuid + +from sqlalchemy import Column, Float, DateTime, func +from sqlalchemy.dialects.postgresql import UUID + +from app.database import Base + + +class ScoringConfig(Base): + __tablename__ = "scoring_config" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + weight_tests = Column(Float, nullable=False, default=40.0) + weight_detection_rules = Column(Float, nullable=False, default=20.0) + weight_d3fend = Column(Float, nullable=False, default=15.0) + weight_freshness = Column(Float, nullable=False, default=15.0) + weight_platform_diversity = Column(Float, nullable=False, default=10.0) + updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) diff --git a/backend/app/routers/scores.py b/backend/app/routers/scores.py index a755ea9..0e9621a 100644 --- a/backend/app/routers/scores.py +++ b/backend/app/routers/scores.py @@ -14,7 +14,6 @@ from app.dependencies.auth import get_current_user, require_role from app.models.user import User from app.models.technique import Technique from app.models.threat_actor import ThreatActor -from app.config import settings from app.services.scoring_service import ( calculate_technique_score, calculate_tactic_score, @@ -22,6 +21,10 @@ from app.services.scoring_service import ( calculate_organization_score, get_score_history, ) +from app.services.scoring_config_service import ( + get_weights_dict, + update_scoring_weights, +) router = APIRouter(prefix="/scores", tags=["scores"]) @@ -117,79 +120,45 @@ def score_history( @router.get("/config") def get_scoring_config( + db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Get current scoring weights (admin only).""" - return { - "weights": { - "tests": settings.SCORING_WEIGHT_TESTS, - "detection_rules": settings.SCORING_WEIGHT_DETECTION_RULES, - "d3fend": settings.SCORING_WEIGHT_D3FEND, - "freshness": settings.SCORING_WEIGHT_FRESHNESS, - "platform_diversity": settings.SCORING_WEIGHT_PLATFORM_DIVERSITY, - }, - "total": ( - settings.SCORING_WEIGHT_TESTS - + settings.SCORING_WEIGHT_DETECTION_RULES - + settings.SCORING_WEIGHT_D3FEND - + settings.SCORING_WEIGHT_FRESHNESS - + settings.SCORING_WEIGHT_PLATFORM_DIVERSITY - ), - } + return get_weights_dict(db) # ── PATCH /scores/config ───────────────────────────────────────────── class ScoringConfigUpdate(BaseModel): - tests: Optional[int] = None - detection_rules: Optional[int] = None - d3fend: Optional[int] = None - freshness: Optional[int] = None - platform_diversity: Optional[int] = None + tests: Optional[float] = None + detection_rules: Optional[float] = None + d3fend: Optional[float] = None + freshness: Optional[float] = None + platform_diversity: Optional[float] = None @router.patch("/config") def update_scoring_config( payload: ScoringConfigUpdate, + db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Update scoring weights (admin only). - Note: Since we're using Opcion A (env vars / Settings), changes - are applied at runtime but won't persist across restarts unless - the .env file is also updated. For production, consider migrating - to Option B (database table). + Weights are persisted in the database and survive restarts. + Validation enforces that all weights are non-negative and sum to 100. """ - if payload.tests is not None: - settings.SCORING_WEIGHT_TESTS = payload.tests - if payload.detection_rules is not None: - settings.SCORING_WEIGHT_DETECTION_RULES = payload.detection_rules - if payload.d3fend is not None: - settings.SCORING_WEIGHT_D3FEND = payload.d3fend - if payload.freshness is not None: - settings.SCORING_WEIGHT_FRESHNESS = payload.freshness - if payload.platform_diversity is not None: - settings.SCORING_WEIGHT_PLATFORM_DIVERSITY = payload.platform_diversity + result = update_scoring_weights( + db, + tests=payload.tests, + detection_rules=payload.detection_rules, + d3fend=payload.d3fend, + freshness=payload.freshness, + platform_diversity=payload.platform_diversity, + ) - # Weights changed — bust the score cache from app.services.score_cache import invalidate invalidate() - return { - "message": "Scoring config updated", - "weights": { - "tests": settings.SCORING_WEIGHT_TESTS, - "detection_rules": settings.SCORING_WEIGHT_DETECTION_RULES, - "d3fend": settings.SCORING_WEIGHT_D3FEND, - "freshness": settings.SCORING_WEIGHT_FRESHNESS, - "platform_diversity": settings.SCORING_WEIGHT_PLATFORM_DIVERSITY, - }, - "total": ( - settings.SCORING_WEIGHT_TESTS - + settings.SCORING_WEIGHT_DETECTION_RULES - + settings.SCORING_WEIGHT_D3FEND - + settings.SCORING_WEIGHT_FRESHNESS - + settings.SCORING_WEIGHT_PLATFORM_DIVERSITY - ), - } + return {"message": "Scoring config updated", **result} diff --git a/backend/app/services/scoring_config_service.py b/backend/app/services/scoring_config_service.py new file mode 100644 index 0000000..95cc62a --- /dev/null +++ b/backend/app/services/scoring_config_service.py @@ -0,0 +1,107 @@ +"""Scoring configuration persistence service. + +Reads and writes scoring weights from the ``scoring_config`` table. +Falls back to environment-variable defaults (from ``Settings``) when +no row has been persisted yet. + +This module is framework-agnostic: no FastAPI imports. +""" + +from __future__ import annotations + +from typing import Any + +from sqlalchemy.orm import Session + +from app.config import settings +from app.domain.value_objects.scoring_weights import ScoringWeights +from app.models.scoring_config import ScoringConfig + + +def get_scoring_weights(db: Session) -> ScoringWeights: + """Return the active scoring weights. + + Reads the single ``scoring_config`` row. If the table is empty + (first run or migration just applied), falls back to the values + from the environment / ``Settings``. + """ + row = db.query(ScoringConfig).first() + if row is not None: + return ScoringWeights( + tests=row.weight_tests, + detection_rules=row.weight_detection_rules, + d3fend=row.weight_d3fend, + freshness=row.weight_freshness, + platform_diversity=row.weight_platform_diversity, + ) + + return ScoringWeights( + tests=float(settings.SCORING_WEIGHT_TESTS), + detection_rules=float(settings.SCORING_WEIGHT_DETECTION_RULES), + d3fend=float(settings.SCORING_WEIGHT_D3FEND), + freshness=float(settings.SCORING_WEIGHT_FRESHNESS), + platform_diversity=float(settings.SCORING_WEIGHT_PLATFORM_DIVERSITY), + ) + + +def update_scoring_weights( + db: Session, + *, + tests: float | None = None, + detection_rules: float | None = None, + d3fend: float | None = None, + freshness: float | None = None, + platform_diversity: float | None = None, +) -> dict[str, Any]: + """Upsert scoring weights into the database. + + Only provided fields are overwritten; ``None`` values keep the + current (or default) value. Validates via ``ScoringWeights`` + before persisting. + + Returns a dict with ``weights`` and ``total``. + """ + current = get_scoring_weights(db) + + new = ScoringWeights( + tests=tests if tests is not None else current.tests, + detection_rules=detection_rules if detection_rules is not None else current.detection_rules, + d3fend=d3fend if d3fend is not None else current.d3fend, + freshness=freshness if freshness is not None else current.freshness, + platform_diversity=platform_diversity if platform_diversity is not None else current.platform_diversity, + ) + + row = db.query(ScoringConfig).first() + if row is None: + row = ScoringConfig() + db.add(row) + + row.weight_tests = new.tests + row.weight_detection_rules = new.detection_rules + row.weight_d3fend = new.d3fend + row.weight_freshness = new.freshness + row.weight_platform_diversity = new.platform_diversity + + db.commit() + db.refresh(row) + + return _weights_dict(new) + + +def get_weights_dict(db: Session) -> dict[str, Any]: + """Return current weights as a serialisable dict.""" + return _weights_dict(get_scoring_weights(db)) + + +def _weights_dict(w: ScoringWeights) -> dict[str, Any]: + weights = { + "tests": w.tests, + "detection_rules": w.detection_rules, + "d3fend": w.d3fend, + "freshness": w.freshness, + "platform_diversity": w.platform_diversity, + } + return { + "weights": weights, + "total": sum(weights.values()), + } diff --git a/backend/app/services/scoring_service.py b/backend/app/services/scoring_service.py index 5209816..25436e1 100644 --- a/backend/app/services/scoring_service.py +++ b/backend/app/services/scoring_service.py @@ -1,7 +1,8 @@ """Scoring service — granular 0-100 scoring for techniques, tactics, actors, and org. -Uses configurable weights from Settings to compute coverage scores with -detailed breakdowns. +Reads configurable weights from the ``scoring_config`` table (falling +back to env-var defaults) to compute coverage scores with detailed +breakdowns. Bulk helpers (``bulk_technique_scores``) pre-fetch all scoring data in a fixed number of aggregated queries so that organisation-wide calculations @@ -14,7 +15,6 @@ from typing import Optional from sqlalchemy import case, func from sqlalchemy.orm import Session -from app.config import settings from app.models.technique import Technique from app.models.test import Test from app.models.detection_rule import DetectionRule @@ -22,20 +22,12 @@ from app.models.test_detection_result import TestDetectionResult from app.models.defensive_technique import DefensiveTechniqueMapping from app.models.threat_actor import ThreatActor, ThreatActorTechnique from app.models.enums import TestState, TestResult +from app.services.scoring_config_service import get_scoring_weights # ── Bulk scoring helpers (5 queries for ALL techniques) ─────────────── -def _build_empty_stats(): - return { - "validated": 0, - "detected": 0, - "platforms": set(), - "latest_validated_at": None, - } - - def bulk_technique_scores(db: Session) -> dict: """Pre-fetch all scoring data and compute per-technique scores in memory. @@ -48,11 +40,12 @@ def bulk_technique_scores(db: Session) -> dict: Returns ``{technique_id: {"total_score": float, "breakdown": dict}}``. """ - w_tests = settings.SCORING_WEIGHT_TESTS - w_detection = settings.SCORING_WEIGHT_DETECTION_RULES - w_d3fend = settings.SCORING_WEIGHT_D3FEND - w_freshness = settings.SCORING_WEIGHT_FRESHNESS - w_diversity = settings.SCORING_WEIGHT_PLATFORM_DIVERSITY + w = get_scoring_weights(db) + w_tests = w.tests + w_detection = w.detection_rules + w_d3fend = w.d3fend + w_freshness = w.freshness + w_diversity = w.platform_diversity # Q1: test stats grouped by technique_id test_rows = ( @@ -242,18 +235,14 @@ def bulk_technique_scores(db: Session) -> dict: def calculate_technique_score(technique: Technique, db: Session) -> dict: """Calculate a 0-100 score for a technique with detailed breakdown. - Weights (configurable via settings): - - tests_validated: weight from SCORING_WEIGHT_TESTS - - detection_rules: weight from SCORING_WEIGHT_DETECTION_RULES - - d3fend_coverage: weight from SCORING_WEIGHT_D3FEND - - freshness: weight from SCORING_WEIGHT_FRESHNESS - - platform_diversity: weight from SCORING_WEIGHT_PLATFORM_DIVERSITY + Weights are read from the ``scoring_config`` table (or env defaults). """ - w_tests = settings.SCORING_WEIGHT_TESTS - w_detection = settings.SCORING_WEIGHT_DETECTION_RULES - w_d3fend = settings.SCORING_WEIGHT_D3FEND - w_freshness = settings.SCORING_WEIGHT_FRESHNESS - w_diversity = settings.SCORING_WEIGHT_PLATFORM_DIVERSITY + w = get_scoring_weights(db) + w_tests = w.tests + w_detection = w.detection_rules + w_d3fend = w.d3fend + w_freshness = w.freshness + w_diversity = w.platform_diversity breakdown = {}