Files
Aegis/backend/app/routers/attack_paths.py
kitos 080ce56de7
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(attack-paths): Phase 10 — Attack Paths & Advanced Purple Team [FASE-10]
Models (5 tables):
  - AttackPath: named reusable attack scenario with template flag
  - AttackPathStep: ordered kill-chain step (technique + test link)
  - AttackPathExecution: a run with Red/Blue leads, timing, stored metrics
  - AttackPathStepResult: per-step detected/not_detected/skipped result
  - TimelineEntry: timestamped Red/Blue/system actions for MTTD/MTTR

Migration b036atk: raw SQL to avoid SQLAlchemy DDL hook issues

Service (attack_path_service.py):
  - Full CRUD for paths + steps (add, update, delete, reorder)
  - Execution lifecycle: create → start → execute steps → complete/abort
  - Pre-creates pending step results on execution creation
  - Auto-adds system timeline entries on key state transitions
  - complete_execution() computes: detection_rate, mttd_seconds,
    furthest_undetected_step, detected/not_detected/skipped counts
  - get_kill_chain_metrics(): per-step breakdown + phase summary

Router /api/v1/attack-paths (20 endpoints):
  POST/GET/PATCH/DELETE attack paths
  GET/POST/PATCH/DELETE steps + reorder
  POST/GET executions per path
  GET/POST/start/complete/abort executions
  POST/GET step results
  POST/GET timeline entries
  GET kill-chain metrics

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 13:11:01 +02:00

251 lines
8.0 KiB
Python

"""Phase 10: Attack Paths & Advanced Purple Team router."""
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_any_role
from app.schemas.attack_path_schema import (
AttackPathCreate, AttackPathUpdate, AttackPathOut,
AttackPathStepCreate, AttackPathStepUpdate, AttackPathStepOut,
ExecutionCreate, ExecutionOut,
StepExecuteRequest, StepResultOut,
TimelineEntryCreate, TimelineEntryOut,
KillChainMetrics,
)
from app.services import attack_path_service as svc
router = APIRouter(prefix="/attack-paths", tags=["attack-paths"])
# ── Attack Paths CRUD ─────────────────────────────────────────────────────────
@router.post("", response_model=AttackPathOut, status_code=201)
def create_attack_path(
body: AttackPathCreate,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.create_attack_path(db, body.model_dump(), user.id)
@router.get("", response_model=list[AttackPathOut])
def list_attack_paths(
is_template: Optional[bool] = None,
technique_id: Optional[UUID] = None,
is_active: Optional[bool] = True,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
paths = svc.list_attack_paths(db, is_template=is_template,
technique_id=technique_id, is_active=is_active)
# Inject step_count
result = []
for p in paths:
d = AttackPathOut.model_validate(p)
d.step_count = len(p.steps)
result.append(d)
return result
@router.get("/{path_id}", response_model=AttackPathOut)
def get_attack_path(
path_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
p = svc.get_attack_path(db, path_id)
d = AttackPathOut.model_validate(p)
d.step_count = len(p.steps)
return d
@router.patch("/{path_id}", response_model=AttackPathOut)
def update_attack_path(
path_id: UUID,
body: AttackPathUpdate,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.update_attack_path(db, path_id, body.model_dump(exclude_unset=True), user.id)
@router.delete("/{path_id}", status_code=204)
def delete_attack_path(
path_id: UUID,
db: Session = Depends(get_db),
user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
svc.delete_attack_path(db, path_id, user.id)
# ── Steps ─────────────────────────────────────────────────────────────────────
@router.get("/{path_id}/steps", response_model=list[AttackPathStepOut])
def list_steps(
path_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
path = svc.get_attack_path(db, path_id)
return path.steps
@router.post("/{path_id}/steps", response_model=AttackPathStepOut, status_code=201)
def add_step(
path_id: UUID,
body: AttackPathStepCreate,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.add_step(db, path_id, body.model_dump(), user.id)
@router.patch("/{path_id}/steps/{step_id}", response_model=AttackPathStepOut)
def update_step(
path_id: UUID,
step_id: UUID,
body: AttackPathStepUpdate,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.update_step(db, step_id, body.model_dump(exclude_unset=True), user.id)
@router.delete("/{path_id}/steps/{step_id}", status_code=204)
def delete_step(
path_id: UUID,
step_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
svc.delete_step(db, step_id, user.id)
@router.post("/{path_id}/steps/reorder", response_model=list[AttackPathStepOut])
def reorder_steps(
path_id: UUID,
step_ids: list[UUID],
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Pass an ordered list of step UUIDs to reorder the steps."""
return svc.reorder_steps(db, path_id, step_ids, user.id)
# ── Executions ────────────────────────────────────────────────────────────────
@router.post("/{path_id}/executions", response_model=ExecutionOut, status_code=201)
def create_execution(
path_id: UUID,
body: ExecutionCreate,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.create_execution(db, path_id, body.model_dump(), user.id)
@router.get("/{path_id}/executions", response_model=list[ExecutionOut])
def list_executions(
path_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.list_executions(db, path_id)
@router.get("/executions/{execution_id}", response_model=ExecutionOut)
def get_execution(
execution_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.get_execution(db, execution_id)
@router.post("/executions/{execution_id}/start", response_model=ExecutionOut)
def start_execution(
execution_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.start_execution(db, execution_id, user.id)
@router.post("/executions/{execution_id}/steps/{step_id}", response_model=StepResultOut)
def execute_step(
execution_id: UUID,
step_id: UUID,
body: StepExecuteRequest,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Record the result of one step (detected / not_detected / skipped)."""
return svc.execute_step(db, execution_id, step_id, body.model_dump(), user.id)
@router.get("/executions/{execution_id}/steps", response_model=list[StepResultOut])
def list_step_results(
execution_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
ex = svc.get_execution(db, execution_id)
return ex.step_results
@router.post("/executions/{execution_id}/complete", response_model=ExecutionOut)
def complete_execution(
execution_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Mark execution as complete and compute kill-chain metrics."""
return svc.complete_execution(db, execution_id, user.id)
@router.post("/executions/{execution_id}/abort", response_model=ExecutionOut)
def abort_execution(
execution_id: UUID,
db: Session = Depends(get_db),
user=Depends(require_any_role("admin", "red_lead", "blue_lead")),
):
return svc.abort_execution(db, execution_id, user.id)
# ── Timeline ──────────────────────────────────────────────────────────────────
@router.post("/executions/{execution_id}/timeline",
response_model=TimelineEntryOut, status_code=201)
def add_timeline_entry(
execution_id: UUID,
body: TimelineEntryCreate,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.add_timeline_entry(db, execution_id, body.model_dump(), user.id)
@router.get("/executions/{execution_id}/timeline", response_model=list[TimelineEntryOut])
def get_timeline(
execution_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return svc.get_timeline(db, execution_id)
# ── Kill-Chain Metrics ────────────────────────────────────────────────────────
@router.get("/executions/{execution_id}/metrics")
def get_metrics(
execution_id: UUID,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Return full kill-chain metrics for a completed (or partial) execution."""
return svc.get_kill_chain_metrics(db, execution_id)