From 362a17aa1b64929b2d956a20eef9de912c1952f2 Mon Sep 17 00:00:00 2001 From: kitos Date: Wed, 20 May 2026 15:31:38 +0200 Subject: [PATCH] =?UTF-8?q?feat(risk):=20Phase=2012=20=E2=80=94=20Risk=20I?= =?UTF-8?q?ntelligence=20[FASE-12]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - TechniqueRiskProfile model: per-technique risk scoring (0-100) - 4-factor weighted scoring: detection_gap(35%) + threat_actors(30%) + osint(20%) + test_failures(15%) - Risk levels: critical(≥75) / high(≥50) / medium(≥25) / low(≥10) / info - Detailed scoring_breakdown (JSONB) + actionable recommendations per technique - Router /api/v1/risk: compute-all, compute-one, list, matrix, summary, recommendations, top - Alembic migration b038risk (raw SQL, idempotent) - QA script: 60+ tests across all endpoints Co-Authored-By: Claude Sonnet 4.6 --- .../versions/b038_risk_intelligence.py | 62 +++ backend/app/main.py | 2 + backend/app/models/__init__.py | 2 + backend/app/models/risk_intelligence.py | 69 +++ backend/app/routers/risk_intelligence.py | 114 +++++ backend/app/schemas/risk_schema.py | 71 +++ .../app/services/risk_intelligence_service.py | 428 ++++++++++++++++++ scripts/qa_phase12.py | 301 ++++++++++++ 8 files changed, 1049 insertions(+) create mode 100644 backend/alembic/versions/b038_risk_intelligence.py create mode 100644 backend/app/models/risk_intelligence.py create mode 100644 backend/app/routers/risk_intelligence.py create mode 100644 backend/app/schemas/risk_schema.py create mode 100644 backend/app/services/risk_intelligence_service.py create mode 100644 scripts/qa_phase12.py diff --git a/backend/alembic/versions/b038_risk_intelligence.py b/backend/alembic/versions/b038_risk_intelligence.py new file mode 100644 index 0000000..9c67059 --- /dev/null +++ b/backend/alembic/versions/b038_risk_intelligence.py @@ -0,0 +1,62 @@ +"""Phase 12: Risk Intelligence — technique_risk_profiles table + +Revision ID: b038risk +Revises: b037know +Create Date: 2026-05-20 + +Uses raw SQL to bypass SQLAlchemy DDL hooks. +""" + +from typing import Union +from alembic import op +import sqlalchemy as sa + +revision: str = "b038risk" +down_revision: Union[str, None] = "b037know" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + + conn.execute(sa.text(""" + CREATE TABLE IF NOT EXISTS technique_risk_profiles ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + technique_id UUID NOT NULL REFERENCES techniques(id) ON DELETE CASCADE, + risk_score FLOAT NOT NULL DEFAULT 0.0, + likelihood FLOAT NOT NULL DEFAULT 0.0, + impact FLOAT NOT NULL DEFAULT 0.0, + risk_level VARCHAR(16) NOT NULL DEFAULT 'info', + detection_gap FLOAT NOT NULL DEFAULT 1.0, + threat_actor_count INTEGER NOT NULL DEFAULT 0, + osint_signal_count INTEGER NOT NULL DEFAULT 0, + test_fail_count INTEGER NOT NULL DEFAULT 0, + test_total_count INTEGER NOT NULL DEFAULT 0, + test_failure_rate FLOAT NOT NULL DEFAULT 0.0, + confidence_level FLOAT NOT NULL DEFAULT 0.0, + scoring_breakdown JSONB, + recommendations JSONB, + computed_at TIMESTAMP DEFAULT now(), + is_stale BOOLEAN DEFAULT TRUE, + CONSTRAINT uq_risk_profile_technique UNIQUE (technique_id) + ) + """)) + + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_risk_profiles_risk_score " + "ON technique_risk_profiles (risk_score)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_risk_profiles_risk_level " + "ON technique_risk_profiles (risk_level)" + )) + conn.execute(sa.text( + "CREATE INDEX IF NOT EXISTS ix_risk_profiles_stale " + "ON technique_risk_profiles (is_stale)" + )) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text("DROP TABLE IF EXISTS technique_risk_profiles")) diff --git a/backend/app/main.py b/backend/app/main.py index 425c479..39b9bdc 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -42,6 +42,7 @@ from app.routers import detection_lifecycle as detection_lifecycle_router from app.routers import ownership as ownership_router from app.routers import attack_paths as attack_paths_router from app.routers import knowledge as knowledge_router +from app.routers import risk_intelligence as risk_router from app.domain.errors import DomainError from app.middleware.error_handler import domain_exception_handler from app.middleware.request_context import RequestContextMiddleware @@ -143,6 +144,7 @@ app.include_router(detection_lifecycle_router.router, prefix="/api/v1") app.include_router(ownership_router.router, prefix="/api/v1") app.include_router(attack_paths_router.router, prefix="/api/v1") app.include_router(knowledge_router.router, prefix="/api/v1") +app.include_router(risk_router.router, prefix="/api/v1") @app.get("/health", include_in_schema=False) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index f42a367..51e480b 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -39,6 +39,7 @@ from app.models.attack_path import ( ExecutionStatus, StepResultStatus, TimelineActorSide, TimelineEntryType, ) from app.models.knowledge import Playbook, PlaybookVersion, LessonLearned +from app.models.risk_intelligence import TechniqueRiskProfile __all__ = [ "User", "Technique", "Test", "TestTemplate", "Evidence", @@ -61,4 +62,5 @@ __all__ = [ "AttackPathStepResult", "TimelineEntry", "ExecutionStatus", "StepResultStatus", "TimelineActorSide", "TimelineEntryType", "Playbook", "PlaybookVersion", "LessonLearned", + "TechniqueRiskProfile", ] diff --git a/backend/app/models/risk_intelligence.py b/backend/app/models/risk_intelligence.py new file mode 100644 index 0000000..9014068 --- /dev/null +++ b/backend/app/models/risk_intelligence.py @@ -0,0 +1,69 @@ +"""Phase 12: Risk Intelligence model — per-technique risk scoring.""" + +import uuid +from datetime import datetime + +from sqlalchemy import ( + Boolean, Column, DateTime, Float, ForeignKey, + Index, Integer, String, UniqueConstraint, +) +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from app.database import Base + + +class TechniqueRiskProfile(Base): + """ + Aggregated risk profile for one technique. + + Combines four weighted factors: + • detection_gap (35 %) — 0=fully covered → 1=no coverage + • threat_actor_rel (30 %) — normalised actor count + • osint_signals (20 %) — normalised recent OSINT items (30 d) + • test_failure_rate (15 %) — proportion of tests where blue didn't detect + + risk_score = weighted sum × 100 → 0–100 + risk_level: critical ≥75 | high ≥50 | medium ≥25 | low ≥10 | info + """ + + __tablename__ = "technique_risk_profiles" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + technique_id = Column( + UUID(as_uuid=True), + ForeignKey("techniques.id", ondelete="CASCADE"), + nullable=False, + ) + + # ── Computed scores ─────────────────────────────────────────────────────── + risk_score = Column(Float, nullable=False, default=0.0) # 0–100 + likelihood = Column(Float, nullable=False, default=0.0) # 0–100 + impact = Column(Float, nullable=False, default=0.0) # 0–100 + risk_level = Column(String(16), nullable=False, default="info") + + # ── Raw factor values ───────────────────────────────────────────────────── + detection_gap = Column(Float, nullable=False, default=1.0) # 0–1 + threat_actor_count = Column(Integer, nullable=False, default=0) + osint_signal_count = Column(Integer, nullable=False, default=0) # last 30 d + test_fail_count = Column(Integer, nullable=False, default=0) + test_total_count = Column(Integer, nullable=False, default=0) + test_failure_rate = Column(Float, nullable=False, default=0.0) # 0–1 + confidence_level = Column(Float, nullable=False, default=0.0) # DLC 0–1 + + # ── Rich detail ────────────────────────────────────────────────────────── + scoring_breakdown = Column(JSONB, nullable=True) # per-factor contributions + recommendations = Column(JSONB, nullable=True) # list[str] + + # ── Meta ───────────────────────────────────────────────────────────────── + computed_at = Column(DateTime, default=datetime.utcnow) + is_stale = Column(Boolean, default=True) + + technique = relationship("Technique", foreign_keys=[technique_id]) + + __table_args__ = ( + UniqueConstraint("technique_id", name="uq_risk_profile_technique"), + Index("ix_risk_profiles_risk_score", "risk_score"), + Index("ix_risk_profiles_risk_level", "risk_level"), + Index("ix_risk_profiles_stale", "is_stale"), + ) diff --git a/backend/app/routers/risk_intelligence.py b/backend/app/routers/risk_intelligence.py new file mode 100644 index 0000000..993816c --- /dev/null +++ b/backend/app/routers/risk_intelligence.py @@ -0,0 +1,114 @@ +"""Phase 12: Risk Intelligence router.""" + +from typing import List, Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_any_role +from app.schemas.risk_schema import ( + TechniqueRiskProfileOut, + RiskSummary, + ComputeResult, +) +from app.services import risk_intelligence_service as svc + +router = APIRouter(prefix="/risk", tags=["risk-intelligence"]) + + +# ── Compute ────────────────────────────────────────────────────────────────── + +@router.post("/compute", response_model=ComputeResult, status_code=202) +def compute_all( + db: Session = Depends(get_db), + user=Depends(require_any_role("admin", "red_lead", "blue_lead")), +): + """Recompute risk scores for ALL techniques (admin / leads only).""" + result = svc.compute_all_risk_scores(db) + return result + + +@router.post("/profiles/{technique_id}/compute", response_model=TechniqueRiskProfileOut) +def compute_one( + technique_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Compute (or refresh) the risk profile for a single technique.""" + return svc.compute_technique_risk(db, technique_id) + + +# ── Read ───────────────────────────────────────────────────────────────────── + +@router.get("/profiles", response_model=List[TechniqueRiskProfileOut]) +def list_profiles( + risk_level: Optional[str] = None, + min_score: Optional[float] = None, + max_score: Optional[float] = None, + stale_only: bool = False, + limit: int = Query(100, ge=1, le=500), + offset: int = Query(0, ge=0), + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """List risk profiles with optional filters.""" + return svc.list_risk_profiles( + db, + risk_level=risk_level, + min_score=min_score, + max_score=max_score, + stale_only=stale_only, + limit=limit, + offset=offset, + ) + + +@router.get("/profiles/{technique_id}", response_model=TechniqueRiskProfileOut) +def get_profile( + technique_id: UUID, + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Get the current risk profile for a technique.""" + return svc.get_risk_profile(db, technique_id) + + +@router.get("/matrix") +def risk_matrix( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """All profiled techniques with likelihood/impact coordinates for matrix view.""" + return svc.get_risk_matrix(db) + + +@router.get("/summary") +def risk_summary( + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Aggregate risk statistics: counts by level, average score, top risks.""" + return svc.get_risk_summary(db) + + +@router.get("/recommendations") +def recommendations( + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Prioritised list of techniques with actionable recommendations.""" + return svc.get_recommendations(db, limit=limit) + + +@router.get("/top") +def top_risks( + limit: int = Query(10, ge=1, le=50), + db: Session = Depends(get_db), + user=Depends(get_current_user), +): + """Top N highest-risk techniques (sorted by risk score desc).""" + profiles = svc.list_risk_profiles(db, limit=limit) + return profiles diff --git a/backend/app/schemas/risk_schema.py b/backend/app/schemas/risk_schema.py new file mode 100644 index 0000000..67630c6 --- /dev/null +++ b/backend/app/schemas/risk_schema.py @@ -0,0 +1,71 @@ +"""Phase 12: Risk Intelligence schemas.""" + +from datetime import datetime +from typing import Any, Dict, List, Optional +from uuid import UUID + +from pydantic import BaseModel, ConfigDict + +VALID_RISK_LEVELS = ["critical", "high", "medium", "low", "info"] + + +class TechniqueRiskProfileOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + technique_id: UUID + risk_score: float + likelihood: float + impact: float + risk_level: str + detection_gap: float + threat_actor_count: int + osint_signal_count: int + test_fail_count: int + test_total_count: int + test_failure_rate: float + confidence_level: float + scoring_breakdown: Optional[Dict[str, Any]] + recommendations: Optional[List[str]] + computed_at: datetime + is_stale: bool + + +class RiskMatrixEntry(BaseModel): + model_config = ConfigDict(from_attributes=True) + + technique_id: UUID + technique_name: Optional[str] = None + technique_tid: Optional[str] = None # e.g. "T1059" + risk_score: float + likelihood: float + impact: float + risk_level: str + detection_gap: float + computed_at: datetime + + +class RiskSummary(BaseModel): + total_techniques: int + scored_techniques: int + stale_count: int + by_level: Dict[str, int] # {"critical": 3, "high": 12, ...} + avg_risk_score: float + top_risks: List[RiskMatrixEntry] + + +class RecommendationItem(BaseModel): + technique_id: UUID + technique_name: Optional[str] = None + technique_tid: Optional[str] = None + risk_level: str + risk_score: float + recommendations: List[str] + priority: int # 1 = highest + + +class ComputeResult(BaseModel): + computed: int + skipped: int + errors: int + duration_seconds: float diff --git a/backend/app/services/risk_intelligence_service.py b/backend/app/services/risk_intelligence_service.py new file mode 100644 index 0000000..c1bed66 --- /dev/null +++ b/backend/app/services/risk_intelligence_service.py @@ -0,0 +1,428 @@ +"""Phase 12: Risk Intelligence service — compute and query per-technique risk scores.""" + +import time +from datetime import datetime, timedelta +from typing import List, Optional +from uuid import UUID + +from sqlalchemy import func +from sqlalchemy.orm import Session + +from app.domain.errors import EntityNotFoundError +from app.models.risk_intelligence import TechniqueRiskProfile +from app.models.technique import Technique +from app.models.threat_actor import ThreatActorTechnique +from app.models.osint_item import OsintItem +from app.models.test import Test +from app.models.test_detection_result import TestDetectionResult +from app.models.detection_lifecycle import ( + TechniqueConfidenceScore, + DetectionTechniqueMapping, + DetectionConfidence, +) +from app.models.enums import TechniqueStatus + + +# ── Scoring constants ────────────────────────────────────────────────────────── + +WEIGHT_DETECTION_GAP = 0.35 +WEIGHT_THREAT_ACTORS = 0.30 +WEIGHT_OSINT = 0.20 +WEIGHT_TEST_FAILURES = 0.15 + +# Normalisation caps +MAX_THREAT_ACTORS = 5 # beyond this → factor saturates at 1.0 +MAX_OSINT_SIGNALS = 10 # OSINT items in last 30 days +OSINT_LOOKBACK_DAYS = 30 + +LEVEL_CRITICAL = 75.0 +LEVEL_HIGH = 50.0 +LEVEL_MEDIUM = 25.0 +LEVEL_LOW = 10.0 + + +def _risk_level(score: float) -> str: + if score >= LEVEL_CRITICAL: return "critical" + if score >= LEVEL_HIGH: return "high" + if score >= LEVEL_MEDIUM: return "medium" + if score >= LEVEL_LOW: return "low" + return "info" + + +def _clamp(v: float, lo: float = 0.0, hi: float = 1.0) -> float: + return max(lo, min(hi, v)) + + +# ── Single-technique computation ─────────────────────────────────────────────── + +def _compute_for_technique(db: Session, tech: Technique) -> TechniqueRiskProfile: + """Calculate the risk profile for one technique and return the (unsaved) model.""" + + breakdown: dict = {} + recs: list = [] + + # ── Factor 1: Detection gap (0=covered, 1=no coverage) ─────────────────── + # Check if technique is covered (has at least one DetectionTechniqueMapping) + mapping_count = db.query(DetectionTechniqueMapping).filter( + DetectionTechniqueMapping.technique_id == tech.id, + ).count() + + # Get DLC confidence score if available + dlc_conf = db.query(TechniqueConfidenceScore).filter( + TechniqueConfidenceScore.technique_id == tech.id, + ).order_by(TechniqueConfidenceScore.computed_at.desc()).first() + + confidence_level: float = 0.0 + if dlc_conf: + confidence_level = float(dlc_conf.score or 0.0) + + # Also factor in technique status + if tech.status == TechniqueStatus.covered: + status_coverage = 1.0 + elif tech.status == TechniqueStatus.partial: + status_coverage = 0.5 + else: # uncovered / unknown + status_coverage = 0.0 + + if mapping_count > 0: + # Has at least one asset mapped — use confidence as detection quality + raw_coverage = max(status_coverage, _clamp(confidence_level)) + else: + raw_coverage = 0.0 + + detection_gap = 1.0 - raw_coverage + detection_gap_factor = detection_gap # already 0–1 + + breakdown["detection_gap"] = { + "mapping_count": mapping_count, + "status_coverage": status_coverage, + "confidence_level": confidence_level, + "detection_gap": round(detection_gap, 3), + "contribution": round(detection_gap_factor * WEIGHT_DETECTION_GAP * 100, 2), + } + if detection_gap >= 0.8: + recs.append("Implement detection coverage — technique is largely undetected.") + elif detection_gap >= 0.5: + recs.append("Improve detection quality — coverage is partial.") + + # ── Factor 2: Threat actor relevance ───────────────────────────────────── + actor_count = db.query(ThreatActorTechnique).filter( + ThreatActorTechnique.technique_id == tech.id, + ).count() + + ta_factor = _clamp(actor_count / MAX_THREAT_ACTORS) + breakdown["threat_actor"] = { + "actor_count": actor_count, + "max_cap": MAX_THREAT_ACTORS, + "normalised": round(ta_factor, 3), + "contribution": round(ta_factor * WEIGHT_THREAT_ACTORS * 100, 2), + } + if actor_count >= 3: + recs.append( + f"High threat-actor relevance — {actor_count} tracked actors use this technique." + ) + elif actor_count >= 1: + recs.append( + f"{actor_count} threat actor(s) use this technique — monitor closely." + ) + + # ── Factor 3: OSINT signals (last 30 days) ──────────────────────────────── + cutoff = datetime.utcnow() - timedelta(days=OSINT_LOOKBACK_DAYS) + osint_count = db.query(OsintItem).filter( + OsintItem.technique_id == tech.id, + OsintItem.discovered_at >= cutoff, + ).count() + + osint_factor = _clamp(osint_count / MAX_OSINT_SIGNALS) + breakdown["osint"] = { + "signal_count_30d": osint_count, + "max_cap": MAX_OSINT_SIGNALS, + "normalised": round(osint_factor, 3), + "contribution": round(osint_factor * WEIGHT_OSINT * 100, 2), + } + if osint_count >= 5: + recs.append( + f"High OSINT activity — {osint_count} signals in the last 30 days. Review urgently." + ) + elif osint_count >= 1: + recs.append( + f"{osint_count} OSINT signal(s) detected in last 30 days. Review for IoCs." + ) + + # ── Factor 4: Test failure rate ─────────────────────────────────────────── + # Count TestDetectionResult rows for this technique's tests + from app.models.enums import TestResult + tech_tests = db.query(Test).filter(Test.technique_id == tech.id).all() + test_ids = [t.id for t in tech_tests] + + test_total = 0 + test_not_detected = 0 + if test_ids: + from app.models.test_detection_result import TestDetectionResult as TDR + results = db.query(TDR).filter(TDR.test_id.in_(test_ids)).all() + test_total = len(results) + test_not_detected = sum( + 1 for r in results + if hasattr(r, 'result') and str(getattr(r, 'result', '')) == 'not_detected' + ) + # Also count tests where overall result is not_detected + if test_total == 0: + for t in tech_tests: + if hasattr(t, 'result') and t.result is not None: + test_total += 1 + if str(t.result) in ('not_detected', 'TestResult.not_detected'): + test_not_detected += 1 + + test_failure_rate = (test_not_detected / test_total) if test_total > 0 else 0.0 + # If no tests exist at all → treat as unknown risk (moderate) + test_factor = test_failure_rate if test_total > 0 else 0.3 + + breakdown["test_failures"] = { + "total_tests": test_total, + "not_detected": test_not_detected, + "failure_rate": round(test_failure_rate, 3), + "factor_used": round(test_factor, 3), + "contribution": round(test_factor * WEIGHT_TEST_FAILURES * 100, 2), + } + if test_total == 0: + recs.append("No purple-team tests found — add tests to validate detection.") + elif test_failure_rate >= 0.5: + recs.append( + f"High test failure rate ({test_failure_rate:.0%}) — blue team is missing this technique." + ) + + # ── Weighted risk score ─────────────────────────────────────────────────── + raw_score = ( + detection_gap_factor * WEIGHT_DETECTION_GAP + + ta_factor * WEIGHT_THREAT_ACTORS + + osint_factor * WEIGHT_OSINT + + test_factor * WEIGHT_TEST_FAILURES + ) + risk_score = _clamp(raw_score) * 100.0 + + # Likelihood = detection + actor contribution (exposure) + likelihood = _clamp( + detection_gap_factor * 0.5 + ta_factor * 0.35 + osint_factor * 0.15 + ) * 100.0 + + # Impact = test failures + osint severity signal + impact = _clamp( + test_factor * 0.6 + osint_factor * 0.25 + detection_gap_factor * 0.15 + ) * 100.0 + + level = _risk_level(risk_score) + breakdown["total"] = { + "risk_score": round(risk_score, 2), + "likelihood": round(likelihood, 2), + "impact": round(impact, 2), + "risk_level": level, + } + + return TechniqueRiskProfile( + technique_id = tech.id, + risk_score = round(risk_score, 4), + likelihood = round(likelihood, 4), + impact = round(impact, 4), + risk_level = level, + detection_gap = round(detection_gap, 4), + threat_actor_count = actor_count, + osint_signal_count = osint_count, + test_fail_count = test_not_detected, + test_total_count = test_total, + test_failure_rate = round(test_failure_rate, 4), + confidence_level = round(confidence_level, 4), + scoring_breakdown = breakdown, + recommendations = recs or ["Risk profile looks healthy — continue monitoring."], + computed_at = datetime.utcnow(), + is_stale = False, + ) + + +# ── Upsert helpers ───────────────────────────────────────────────────────────── + +def _upsert_profile(db: Session, profile: TechniqueRiskProfile) -> TechniqueRiskProfile: + existing = db.query(TechniqueRiskProfile).filter( + TechniqueRiskProfile.technique_id == profile.technique_id, + ).first() + if existing: + for attr in ( + "risk_score", "likelihood", "impact", "risk_level", + "detection_gap", "threat_actor_count", "osint_signal_count", + "test_fail_count", "test_total_count", "test_failure_rate", + "confidence_level", "scoring_breakdown", "recommendations", + "computed_at", "is_stale", + ): + setattr(existing, attr, getattr(profile, attr)) + db.commit() + db.refresh(existing) + return existing + db.add(profile) + db.commit() + db.refresh(profile) + return profile + + +# ── Public API ──────────────────────────────────────────────────────────────── + +def compute_technique_risk(db: Session, technique_id: UUID) -> TechniqueRiskProfile: + """Compute (or recompute) risk profile for a single technique.""" + tech = db.query(Technique).filter(Technique.id == technique_id).first() + if not tech: + raise EntityNotFoundError("Technique", str(technique_id)) + profile = _compute_for_technique(db, tech) + return _upsert_profile(db, profile) + + +def compute_all_risk_scores(db: Session) -> dict: + """Compute risk profiles for all techniques. Returns summary counts.""" + t0 = time.monotonic() + techniques = db.query(Technique).all() + computed = 0 + errors = 0 + + for tech in techniques: + try: + profile = _compute_for_technique(db, tech) + _upsert_profile(db, profile) + computed += 1 + except Exception: + errors += 1 + + duration = time.monotonic() - t0 + return { + "computed": computed, + "skipped": 0, + "errors": errors, + "duration_seconds": round(duration, 2), + } + + +def get_risk_profile(db: Session, technique_id: UUID) -> TechniqueRiskProfile: + profile = db.query(TechniqueRiskProfile).filter( + TechniqueRiskProfile.technique_id == technique_id, + ).first() + if not profile: + raise EntityNotFoundError("TechniqueRiskProfile", str(technique_id)) + return profile + + +def list_risk_profiles( + db: Session, + risk_level: Optional[str] = None, + min_score: Optional[float] = None, + max_score: Optional[float] = None, + stale_only: bool = False, + limit: int = 100, + offset: int = 0, +) -> List[TechniqueRiskProfile]: + q = db.query(TechniqueRiskProfile) + if risk_level: + q = q.filter(TechniqueRiskProfile.risk_level == risk_level) + if min_score is not None: + q = q.filter(TechniqueRiskProfile.risk_score >= min_score) + if max_score is not None: + q = q.filter(TechniqueRiskProfile.risk_score <= max_score) + if stale_only: + q = q.filter(TechniqueRiskProfile.is_stale == True) + return ( + q.order_by(TechniqueRiskProfile.risk_score.desc()) + .offset(offset) + .limit(limit) + .all() + ) + + +def get_risk_matrix(db: Session) -> list: + """Return all profiled techniques with name+tid for the matrix view.""" + rows = ( + db.query(TechniqueRiskProfile, Technique) + .join(Technique, TechniqueRiskProfile.technique_id == Technique.id) + .order_by(TechniqueRiskProfile.risk_score.desc()) + .all() + ) + result = [] + for profile, tech in rows: + result.append({ + "technique_id": str(profile.technique_id), + "technique_name": tech.name, + "technique_tid": tech.technique_id, # MITRE T-ID string + "risk_score": profile.risk_score, + "likelihood": profile.likelihood, + "impact": profile.impact, + "risk_level": profile.risk_level, + "detection_gap": profile.detection_gap, + "computed_at": profile.computed_at.isoformat() if profile.computed_at else None, + }) + return result + + +def get_risk_summary(db: Session) -> dict: + """Aggregate statistics across all risk profiles.""" + all_profiles = db.query(TechniqueRiskProfile).all() + total_tech = db.query(Technique).count() + scored = len(all_profiles) + stale = sum(1 for p in all_profiles if p.is_stale) + + by_level: dict = {lvl: 0 for lvl in ("critical", "high", "medium", "low", "info")} + score_sum = 0.0 + for p in all_profiles: + by_level[p.risk_level] = by_level.get(p.risk_level, 0) + 1 + score_sum += p.risk_score + + avg_score = (score_sum / scored) if scored > 0 else 0.0 + + # Top 5 by risk score (with technique name) + top_rows = ( + db.query(TechniqueRiskProfile, Technique) + .join(Technique, TechniqueRiskProfile.technique_id == Technique.id) + .order_by(TechniqueRiskProfile.risk_score.desc()) + .limit(5) + .all() + ) + top_risks = [ + { + "technique_id": str(p.technique_id), + "technique_name": t.name, + "technique_tid": t.technique_id, + "risk_score": p.risk_score, + "risk_level": p.risk_level, + "likelihood": p.likelihood, + "impact": p.impact, + "detection_gap": p.detection_gap, + "computed_at": p.computed_at.isoformat() if p.computed_at else None, + } + for p, t in top_rows + ] + + return { + "total_techniques": total_tech, + "scored_techniques": scored, + "stale_count": stale, + "by_level": by_level, + "avg_risk_score": round(avg_score, 2), + "top_risks": top_risks, + } + + +def get_recommendations(db: Session, limit: int = 20) -> list: + """Prioritised list of techniques with actionable recommendations.""" + rows = ( + db.query(TechniqueRiskProfile, Technique) + .join(Technique, TechniqueRiskProfile.technique_id == Technique.id) + .filter(TechniqueRiskProfile.risk_score > 0) + .order_by(TechniqueRiskProfile.risk_score.desc()) + .limit(limit) + .all() + ) + result = [] + for priority, (profile, tech) in enumerate(rows, start=1): + result.append({ + "technique_id": str(profile.technique_id), + "technique_name": tech.name, + "technique_tid": tech.technique_id, + "risk_level": profile.risk_level, + "risk_score": profile.risk_score, + "recommendations": profile.recommendations or [], + "priority": priority, + }) + return result diff --git a/scripts/qa_phase12.py b/scripts/qa_phase12.py new file mode 100644 index 0000000..6e39648 --- /dev/null +++ b/scripts/qa_phase12.py @@ -0,0 +1,301 @@ +""" +QA script for Phase 12 — Risk Intelligence. +Run with: python -X utf8 scripts/qa_phase12.py +""" + +import sys, os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests + +BASE = "http://localhost:8000/api/v1" +PASS = "\033[92m✓\033[0m" +FAIL = "\033[91m✗\033[0m" + +passed = 0 +failed = 0 + + +def check(label: str, condition: bool, detail: str = ""): + global passed, failed + if condition: + passed += 1 + print(f" {PASS} {label}") + else: + failed += 1 + msg = f" {FAIL} {label}" + if detail: + msg += f" — {detail}" + print(msg) + + +def get_token(username="administrator", password="admin123"): + r = requests.post(f"{BASE}/auth/login", + data={"username": username, "password": password}) + if r.status_code == 200: + return r.json().get("access_token") or r.json().get("token") + raise RuntimeError(f"Login failed: {r.status_code} {r.text[:200]}") + + +def auth(token): + return {"Authorization": f"Bearer {token}"} + + +def get_first_technique(headers): + r = requests.get(f"{BASE}/techniques", headers=headers, params={"limit": 5}) + items = r.json() + if isinstance(items, dict): + items = items.get("items", []) + if not items: + raise RuntimeError("No techniques found") + return items + + +# ───────────────────────────────────────────────────────────────────────────── + +def main(): + print("\n====== Phase 12 QA — Risk Intelligence ======\n") + + token = get_token() + h = auth(token) + techniques = get_first_technique(h) + tid = techniques[0]["id"] + print(f" Using technique_id: {tid}\n") + + # ── Block 1: Compute single technique ──────────────────────────────────── + print("── Block 1: Compute single technique ──") + + r = requests.post(f"{BASE}/risk/profiles/{tid}/compute", headers=h) + check("POST /risk/profiles/{id}/compute → 200", r.status_code == 200, + r.text[:150]) + profile = r.json() if r.status_code == 200 else {} + check("Profile has technique_id", profile.get("technique_id") == tid) + check("risk_score is float 0-100", + isinstance(profile.get("risk_score"), (int, float)) + and 0 <= profile.get("risk_score", -1) <= 100) + check("risk_level is valid", + profile.get("risk_level") in ("critical", "high", "medium", "low", "info")) + check("detection_gap in 0-1", + 0 <= profile.get("detection_gap", -1) <= 1) + check("is_stale = False", profile.get("is_stale") == False) + check("scoring_breakdown present", bool(profile.get("scoring_breakdown"))) + check("recommendations is list", isinstance(profile.get("recommendations"), list)) + + # Compute a second technique + if len(techniques) > 1: + tid2 = techniques[1]["id"] + r = requests.post(f"{BASE}/risk/profiles/{tid2}/compute", headers=h) + check("POST compute second technique → 200", r.status_code == 200, + r.text[:120]) + + # Compute non-existent technique → 404 + r = requests.post( + f"{BASE}/risk/profiles/00000000-0000-0000-0000-000000000001/compute", + headers=h) + check("POST compute non-existent technique → 404", r.status_code == 404) + + print() + + # ── Block 2: Compute all techniques ────────────────────────────────────── + print("── Block 2: Compute ALL techniques ──") + + r = requests.post(f"{BASE}/risk/compute", headers=h) + check("POST /risk/compute → 202", r.status_code == 202, r.text[:150]) + result = r.json() if r.status_code == 202 else {} + check("computed > 0", result.get("computed", 0) > 0, + f"computed={result.get('computed')}") + check("errors == 0", result.get("errors", 99) == 0, + f"errors={result.get('errors')}") + check("duration_seconds present", "duration_seconds" in result) + print(f" computed={result.get('computed')} techniques in " + f"{result.get('duration_seconds', '?')}s") + + print() + + # ── Block 3: List profiles ──────────────────────────────────────────────── + print("── Block 3: List risk profiles ──") + + r = requests.get(f"{BASE}/risk/profiles", headers=h) + check("GET /risk/profiles → 200", r.status_code == 200) + profiles = r.json() if r.status_code == 200 else [] + check("Profiles list not empty", len(profiles) > 0) + check("Each profile has risk_score", + all("risk_score" in p for p in profiles[:5])) + check("Sorted by risk_score desc", + all(profiles[i]["risk_score"] >= profiles[i+1]["risk_score"] + for i in range(min(len(profiles)-1, 5)))) + + # Filter by risk_level + for level in ("critical", "high", "medium", "low", "info"): + r2 = requests.get(f"{BASE}/risk/profiles", headers=h, + params={"risk_level": level}) + if r2.status_code == 200 and r2.json(): + check(f"Filter risk_level={level} → only that level", + all(p["risk_level"] == level for p in r2.json())) + break + else: + # Just verify the filter works without crashing + r2 = requests.get(f"{BASE}/risk/profiles", headers=h, + params={"risk_level": "info"}) + check("Filter risk_level=info → 200", r2.status_code == 200) + + # Filter by min_score + r = requests.get(f"{BASE}/risk/profiles", headers=h, + params={"min_score": 0.0, "max_score": 100.0}) + check("Filter min/max_score → 200", r.status_code == 200) + + # limit + offset + r = requests.get(f"{BASE}/risk/profiles", headers=h, + params={"limit": 2, "offset": 0}) + check("GET ?limit=2 returns ≤2 profiles", r.status_code == 200 + and len(r.json()) <= 2) + + print() + + # ── Block 4: Get single profile ─────────────────────────────────────────── + print("── Block 4: Get single risk profile ──") + + r = requests.get(f"{BASE}/risk/profiles/{tid}", headers=h) + check("GET /risk/profiles/{technique_id} → 200", r.status_code == 200) + p = r.json() if r.status_code == 200 else {} + check("Correct technique_id", p.get("technique_id") == tid) + check("scoring_breakdown has detection_gap key", + "detection_gap" in (p.get("scoring_breakdown") or {})) + check("scoring_breakdown has threat_actor key", + "threat_actor" in (p.get("scoring_breakdown") or {})) + check("scoring_breakdown has osint key", + "osint" in (p.get("scoring_breakdown") or {})) + check("scoring_breakdown has test_failures key", + "test_failures" in (p.get("scoring_breakdown") or {})) + + # Not found → 404 + r = requests.get( + f"{BASE}/risk/profiles/00000000-0000-0000-0000-000000000001", + headers=h) + check("GET non-existent profile → 404", r.status_code == 404) + + print() + + # ── Block 5: Risk matrix ────────────────────────────────────────────────── + print("── Block 5: Risk matrix ──") + + r = requests.get(f"{BASE}/risk/matrix", headers=h) + check("GET /risk/matrix → 200", r.status_code == 200) + matrix = r.json() if r.status_code == 200 else [] + check("Matrix not empty", len(matrix) > 0) + if matrix: + entry = matrix[0] + check("Matrix entry has technique_name", "technique_name" in entry) + check("Matrix entry has likelihood", "likelihood" in entry) + check("Matrix entry has impact", "impact" in entry) + check("Matrix entry has risk_level", "risk_level" in entry) + + print() + + # ── Block 6: Risk summary ───────────────────────────────────────────────── + print("── Block 6: Risk summary ──") + + r = requests.get(f"{BASE}/risk/summary", headers=h) + check("GET /risk/summary → 200", r.status_code == 200) + summary = r.json() if r.status_code == 200 else {} + check("total_techniques > 0", summary.get("total_techniques", 0) > 0) + check("scored_techniques > 0", summary.get("scored_techniques", 0) > 0) + check("by_level has all levels", + all(k in summary.get("by_level", {}) + for k in ("critical", "high", "medium", "low", "info"))) + check("avg_risk_score is float", + isinstance(summary.get("avg_risk_score"), (int, float))) + check("top_risks is list", isinstance(summary.get("top_risks"), list)) + if summary.get("top_risks"): + top = summary["top_risks"][0] + check("Top risk has technique_name", "technique_name" in top) + check("Top risk has risk_score", "risk_score" in top) + + print() + + # ── Block 7: Recommendations ────────────────────────────────────────────── + print("── Block 7: Recommendations ──") + + r = requests.get(f"{BASE}/risk/recommendations", headers=h) + check("GET /risk/recommendations → 200", r.status_code == 200) + recs = r.json() if r.status_code == 200 else [] + check("Recommendations list not empty", len(recs) > 0) + if recs: + rec = recs[0] + check("Recommendation has priority", "priority" in rec) + check("Recommendation has recommendations list", + isinstance(rec.get("recommendations"), list)) + check("Recommendations sorted by priority", + all(recs[i]["priority"] <= recs[i+1]["priority"] + for i in range(min(len(recs)-1, 4)))) + + # Custom limit + r = requests.get(f"{BASE}/risk/recommendations", headers=h, + params={"limit": 3}) + check("GET ?limit=3 returns ≤3 recommendations", + r.status_code == 200 and len(r.json()) <= 3) + + print() + + # ── Block 8: Top risks ──────────────────────────────────────────────────── + print("── Block 8: Top risks ──") + + r = requests.get(f"{BASE}/risk/top", headers=h) + check("GET /risk/top → 200", r.status_code == 200) + top = r.json() if r.status_code == 200 else [] + check("Top risks not empty", len(top) > 0) + + r = requests.get(f"{BASE}/risk/top", headers=h, params={"limit": 3}) + check("GET /risk/top?limit=3 → ≤3 results", + r.status_code == 200 and len(r.json()) <= 3) + + print() + + # ── Block 9: Auth protection ────────────────────────────────────────────── + print("── Block 9: Auth protection ──") + + no_auth = [ + ("GET", f"{BASE}/risk/profiles"), + ("GET", f"{BASE}/risk/matrix"), + ("GET", f"{BASE}/risk/summary"), + ("GET", f"{BASE}/risk/recommendations"), + ("GET", f"{BASE}/risk/top"), + ] + for method, url in no_auth: + r = requests.request(method, url) + ep = url.split("/api/v1")[1] + check(f"{method} {ep} without auth → 401", r.status_code == 401) + + # Admin-only compute endpoint + # Create a non-admin token for role check + r_na = requests.post(f"{BASE}/risk/compute") + check("POST /risk/compute without auth → 401", r_na.status_code == 401) + + print() + + # ── Block 10: Regression ───────────────────────────────────────────────── + print("── Block 10: Regression ──") + + r = requests.get(f"{BASE}/knowledge/playbooks", headers=h) + check("GET /knowledge/playbooks still works", r.status_code == 200) + + r = requests.get(f"{BASE}/attack-paths", headers=h) + check("GET /attack-paths still works", r.status_code == 200) + + r = requests.get(f"{BASE}/techniques", headers=h) + check("GET /techniques still works", r.status_code == 200) + + print() + + # ── Summary ─────────────────────────────────────────────────────────────── + total = passed + failed + print(f"====== Results: {passed}/{total} passed", end="") + if failed: + print(f" — \033[91m{failed} FAILED\033[0m ======\n") + sys.exit(1) + else: + print(" ✓ ALL PASSED ======\n") + + +if __name__ == "__main__": + main()