Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
- TechniqueRiskProfile model: per-technique risk scoring (0-100) - 4-factor weighted scoring: detection_gap(35%) + threat_actors(30%) + osint(20%) + test_failures(15%) - Risk levels: critical(≥75) / high(≥50) / medium(≥25) / low(≥10) / info - Detailed scoring_breakdown (JSONB) + actionable recommendations per technique - Router /api/v1/risk: compute-all, compute-one, list, matrix, summary, recommendations, top - Alembic migration b038risk (raw SQL, idempotent) - QA script: 60+ tests across all endpoints Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
115 lines
3.6 KiB
Python
115 lines
3.6 KiB
Python
"""Phase 12: Risk Intelligence router."""
|
|
|
|
from typing import List, Optional
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.database import get_db
|
|
from app.dependencies.auth import get_current_user, require_any_role
|
|
from app.schemas.risk_schema import (
|
|
TechniqueRiskProfileOut,
|
|
RiskSummary,
|
|
ComputeResult,
|
|
)
|
|
from app.services import risk_intelligence_service as svc
|
|
|
|
router = APIRouter(prefix="/risk", tags=["risk-intelligence"])
|
|
|
|
|
|
# ── Compute ──────────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/compute", response_model=ComputeResult, status_code=202)
|
|
def compute_all(
|
|
db: Session = Depends(get_db),
|
|
user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
|
|
):
|
|
"""Recompute risk scores for ALL techniques (admin / leads only)."""
|
|
result = svc.compute_all_risk_scores(db)
|
|
return result
|
|
|
|
|
|
@router.post("/profiles/{technique_id}/compute", response_model=TechniqueRiskProfileOut)
|
|
def compute_one(
|
|
technique_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""Compute (or refresh) the risk profile for a single technique."""
|
|
return svc.compute_technique_risk(db, technique_id)
|
|
|
|
|
|
# ── Read ─────────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/profiles", response_model=List[TechniqueRiskProfileOut])
|
|
def list_profiles(
|
|
risk_level: Optional[str] = None,
|
|
min_score: Optional[float] = None,
|
|
max_score: Optional[float] = None,
|
|
stale_only: bool = False,
|
|
limit: int = Query(100, ge=1, le=500),
|
|
offset: int = Query(0, ge=0),
|
|
db: Session = Depends(get_db),
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""List risk profiles with optional filters."""
|
|
return svc.list_risk_profiles(
|
|
db,
|
|
risk_level=risk_level,
|
|
min_score=min_score,
|
|
max_score=max_score,
|
|
stale_only=stale_only,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
|
|
@router.get("/profiles/{technique_id}", response_model=TechniqueRiskProfileOut)
|
|
def get_profile(
|
|
technique_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""Get the current risk profile for a technique."""
|
|
return svc.get_risk_profile(db, technique_id)
|
|
|
|
|
|
@router.get("/matrix")
|
|
def risk_matrix(
|
|
db: Session = Depends(get_db),
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""All profiled techniques with likelihood/impact coordinates for matrix view."""
|
|
return svc.get_risk_matrix(db)
|
|
|
|
|
|
@router.get("/summary")
|
|
def risk_summary(
|
|
db: Session = Depends(get_db),
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""Aggregate risk statistics: counts by level, average score, top risks."""
|
|
return svc.get_risk_summary(db)
|
|
|
|
|
|
@router.get("/recommendations")
|
|
def recommendations(
|
|
limit: int = Query(20, ge=1, le=100),
|
|
db: Session = Depends(get_db),
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""Prioritised list of techniques with actionable recommendations."""
|
|
return svc.get_recommendations(db, limit=limit)
|
|
|
|
|
|
@router.get("/top")
|
|
def top_risks(
|
|
limit: int = Query(10, ge=1, le=50),
|
|
db: Session = Depends(get_db),
|
|
user=Depends(get_current_user),
|
|
):
|
|
"""Top N highest-risk techniques (sorted by risk score desc)."""
|
|
profiles = svc.list_risk_profiles(db, limit=limit)
|
|
return profiles
|