"""Scoring endpoints — technique, tactic, threat actor, and organization scores. Provides granular scoring with breakdowns and configurable weights. """ from typing import Optional from fastapi import APIRouter, Depends, Query from pydantic import BaseModel from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user, require_role from app.domain.unit_of_work import UnitOfWork from app.models.user import User from app.services.scoring_service import ( score_technique_by_mitre_id, score_actor_by_id, calculate_tactic_score, calculate_organization_score, get_score_history, ) from app.services.scoring_config_service import ( get_weights_dict, update_scoring_weights, ) router = APIRouter(prefix="/scores", tags=["scores"]) # ── GET /scores/technique/{mitre_id} ───────────────────────────────── @router.get("/technique/{mitre_id}") def score_technique( mitre_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get detailed score with breakdown for a specific technique.""" return score_technique_by_mitre_id(db, mitre_id) # ── GET /scores/tactic/{tactic} ────────────────────────────────────── @router.get("/tactic/{tactic}") def score_tactic( tactic: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get average score for a tactic.""" return calculate_tactic_score(tactic, db) # ── GET /scores/threat-actor/{id} ──────────────────────────────────── @router.get("/threat-actor/{actor_id}") def score_threat_actor( actor_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get coverage score against a specific threat actor.""" return score_actor_by_id(db, actor_id) # ── GET /scores/organization ───────────────────────────────────────── @router.get("/organization") def score_organization( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get the overall organization security score (cached for 5 min).""" from app.services.score_cache import get_organization_score_cached return get_organization_score_cached(db) # ── GET /scores/history ────────────────────────────────────────────── @router.get("/history") def score_history( period: str = Query("90d", pattern="^(30d|90d|1y)$"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get historical score data points (weekly).""" return get_score_history(db, period) # ── GET /scores/config ─────────────────────────────────────────────── @router.get("/config") def get_scoring_config( db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Get current scoring weights (admin only).""" return get_weights_dict(db) # ── PATCH /scores/config ───────────────────────────────────────────── class ScoringConfigUpdate(BaseModel): tests: Optional[float] = None detection_rules: Optional[float] = None d3fend: Optional[float] = None recency: Optional[float] = None severity: Optional[float] = None freshness: Optional[float] = None platform_diversity: Optional[float] = None @router.patch("/config") def update_scoring_config( payload: ScoringConfigUpdate, db: Session = Depends(get_db), current_user: User = Depends(require_role("admin")), ): """Update scoring weights (admin only). Weights are persisted in the database and survive restarts. Validation enforces that all weights are non-negative and sum to 100. """ with UnitOfWork(db) as uow: result = update_scoring_weights( db, tests=payload.tests, detection_rules=payload.detection_rules, d3fend=payload.d3fend, recency=payload.recency, severity=payload.severity, freshness=payload.freshness, platform_diversity=payload.platform_diversity, updated_by=current_user.id, ) uow.commit() from app.services.score_cache import invalidate invalidate() return {"message": "Scoring config updated", **result}