Files
Aegis/backend/app/routers/risk_intelligence.py
kitos 362a17aa1b
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(risk): Phase 12 — Risk Intelligence [FASE-12]
- TechniqueRiskProfile model: per-technique risk scoring (0-100)
- 4-factor weighted scoring: detection_gap(35%) + threat_actors(30%) + osint(20%) + test_failures(15%)
- Risk levels: critical(≥75) / high(≥50) / medium(≥25) / low(≥10) / info
- Detailed scoring_breakdown (JSONB) + actionable recommendations per technique
- Router /api/v1/risk: compute-all, compute-one, list, matrix, summary, recommendations, top
- Alembic migration b038risk (raw SQL, idempotent)
- QA script: 60+ tests across all endpoints

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 15:31:38 +02:00

115 lines
3.6 KiB
Python

"""Phase 12: Risk Intelligence router."""
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.schemas.risk_schema import (
TechniqueRiskProfileOut,
RiskSummary,
ComputeResult,
)
from app.services import risk_intelligence_service as svc
router = APIRouter(prefix="/risk", tags=["risk-intelligence"])
# ── Compute ──────────────────────────────────────────────────────────────────
@router.post("/compute", response_model=ComputeResult, status_code=202)
def compute_all(
db: Session = Depends(get_db),
user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
"""Recompute risk scores for ALL techniques (admin / leads only)."""
result = svc.compute_all_risk_scores(db)
return result
@router.post("/profiles/{technique_id}/compute", response_model=TechniqueRiskProfileOut)
def compute_one(
technique_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Compute (or refresh) the risk profile for a single technique."""
return svc.compute_technique_risk(db, technique_id)
# ── Read ─────────────────────────────────────────────────────────────────────
@router.get("/profiles", response_model=List[TechniqueRiskProfileOut])
def list_profiles(
risk_level: Optional[str] = None,
min_score: Optional[float] = None,
max_score: Optional[float] = None,
stale_only: bool = False,
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""List risk profiles with optional filters."""
return svc.list_risk_profiles(
db,
risk_level=risk_level,
min_score=min_score,
max_score=max_score,
stale_only=stale_only,
limit=limit,
offset=offset,
)
@router.get("/profiles/{technique_id}", response_model=TechniqueRiskProfileOut)
def get_profile(
technique_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Get the current risk profile for a technique."""
return svc.get_risk_profile(db, technique_id)
@router.get("/matrix")
def risk_matrix(
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""All profiled techniques with likelihood/impact coordinates for matrix view."""
return svc.get_risk_matrix(db)
@router.get("/summary")
def risk_summary(
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Aggregate risk statistics: counts by level, average score, top risks."""
return svc.get_risk_summary(db)
@router.get("/recommendations")
def recommendations(
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Prioritised list of techniques with actionable recommendations."""
return svc.get_recommendations(db, limit=limit)
@router.get("/top")
def top_risks(
limit: int = Query(10, ge=1, le=50),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Top N highest-risk techniques (sorted by risk score desc)."""
profiles = svc.list_risk_profiles(db, limit=limit)
return profiles