"""CRUD router for MITRE ATT&CK Techniques. Uses the TechniqueRepository for data access and domain exceptions for error signaling. The error_handler middleware maps domain exceptions to HTTP responses automatically. """ from fastapi import APIRouter, Depends, Query, status from sqlalchemy.orm import Session from app.database import get_db from app.dependencies.auth import get_current_user, require_role, require_any_role from app.dependencies.repositories import get_technique_repository from app.domain.entities.technique import TechniqueEntity from app.domain.errors import DuplicateEntityError, EntityNotFoundError from app.domain.enums import TechniqueStatus from app.domain.unit_of_work import UnitOfWork from app.infrastructure.persistence.repositories.sa_technique_repository import ( SATechniqueRepository, ) from app.models.user import User from app.schemas.technique import ( TechniqueCreate, TechniqueOut, TechniqueSummary, TechniqueUpdate, ) from app.services.audit_service import log_action from app.services.technique_query_service import get_technique_detail router = APIRouter(prefix="/techniques", tags=["techniques"]) # --------------------------------------------------------------------------- # GET /techniques — list (with optional filters) # --------------------------------------------------------------------------- @router.get("", response_model=list[TechniqueSummary]) def list_techniques( tactic: str | None = Query(None, description="Filter by tactic name"), status_global: TechniqueStatus | None = Query( None, alias="status", description="Filter by global status" ), review_required: bool | None = Query(None, description="Filter by review flag"), repo: SATechniqueRepository = Depends(get_technique_repository), current_user: User = Depends(get_current_user), ): """Return a lightweight list of techniques, optionally filtered.""" return repo.list_all( tactic=tactic, status=status_global, review_required=review_required, ) # --------------------------------------------------------------------------- # GET /techniques/{mitre_id} — detail (with tests + D3FEND) # --------------------------------------------------------------------------- @router.get("/{mitre_id}") def get_technique( mitre_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return full details for a single technique, including its tests and D3FEND defenses.""" return get_technique_detail(db, mitre_id) # --------------------------------------------------------------------------- # POST /techniques — create (admin only) # --------------------------------------------------------------------------- @router.post( "", response_model=TechniqueOut, status_code=status.HTTP_201_CREATED, ) def create_technique( payload: TechniqueCreate, db: Session = Depends(get_db), repo: SATechniqueRepository = Depends(get_technique_repository), current_user: User = Depends(require_role("admin")), ): """Create a new technique manually.""" if repo.exists_by_mitre_id(payload.mitre_id): raise DuplicateEntityError("Technique", "mitre_id", payload.mitre_id) entity = TechniqueEntity.create( mitre_id=payload.mitre_id, name=payload.name, description=payload.description, tactic=payload.tactic, platforms=payload.platforms, ) with UnitOfWork(db) as uow: saved = repo.save(entity) log_action( db, user_id=current_user.id, action="create_technique", entity_type="technique", entity_id=saved.id, details={"mitre_id": saved.mitre_id, "name": saved.name}, ) uow.commit() return saved # --------------------------------------------------------------------------- # PATCH /techniques/{mitre_id} — update (admin only) # --------------------------------------------------------------------------- @router.patch("/{mitre_id}", response_model=TechniqueOut) def update_technique( mitre_id: str, payload: TechniqueUpdate, db: Session = Depends(get_db), repo: SATechniqueRepository = Depends(get_technique_repository), current_user: User = Depends(require_role("admin")), ): """Update one or more fields of an existing technique.""" entity = repo.find_by_mitre_id(mitre_id) if entity is None: raise EntityNotFoundError("Technique", mitre_id) update_data = payload.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(entity, field, value) with UnitOfWork(db) as uow: saved = repo.save(entity) log_action( db, user_id=current_user.id, action="update_technique", entity_type="technique", entity_id=saved.id, details={"mitre_id": mitre_id, "updated_fields": list(update_data.keys())}, ) uow.commit() return saved # --------------------------------------------------------------------------- # PATCH /techniques/{mitre_id}/review — mark as reviewed (leads + admin) # --------------------------------------------------------------------------- @router.patch("/{mitre_id}/review", response_model=TechniqueOut) def review_technique( mitre_id: str, db: Session = Depends(get_db), repo: SATechniqueRepository = Depends(get_technique_repository), current_user: User = Depends(require_any_role("red_lead", "blue_lead")), ): """Mark a technique as reviewed. Sets ``review_required`` to *False* and records the current timestamp in ``last_review_date``. """ entity = repo.find_by_mitre_id(mitre_id) if entity is None: raise EntityNotFoundError("Technique", mitre_id) entity.mark_reviewed() with UnitOfWork(db) as uow: saved = repo.save(entity) log_action( db, user_id=current_user.id, action="review_technique", entity_type="technique", entity_id=saved.id, details={"mitre_id": mitre_id}, ) uow.commit() return saved