"""CRUD router for MITRE ATT&CK Techniques. Uses the TechniqueRepository for data access and domain exceptions for error signaling. The error_handler middleware maps domain exceptions to HTTP responses automatically. """ from fastapi import APIRouter, Depends, Query, status from sqlalchemy.orm import Session, joinedload from app.database import get_db from app.dependencies.auth import get_current_user, require_role, require_any_role from app.dependencies.repositories import get_technique_repository from app.domain.entities.technique import TechniqueEntity from app.domain.errors import DuplicateEntityError, EntityNotFoundError from app.domain.enums import TechniqueStatus from app.domain.unit_of_work import UnitOfWork from app.infrastructure.persistence.repositories.sa_technique_repository import ( SATechniqueRepository, ) from app.models.technique import Technique from app.models.user import User from app.schemas.technique import ( TechniqueCreate, TechniqueOut, TechniqueSummary, TechniqueUpdate, ) from app.services.audit_service import log_action from app.services.d3fend_import_service import get_defenses_for_technique router = APIRouter(prefix="/techniques", tags=["techniques"]) # --------------------------------------------------------------------------- # GET /techniques — list (with optional filters) # --------------------------------------------------------------------------- @router.get("", response_model=list[TechniqueSummary]) def list_techniques( tactic: str | None = Query(None, description="Filter by tactic name"), status_global: TechniqueStatus | None = Query( None, alias="status", description="Filter by global status" ), review_required: bool | None = Query(None, description="Filter by review flag"), repo: SATechniqueRepository = Depends(get_technique_repository), current_user: User = Depends(get_current_user), ): """Return a lightweight list of techniques, optionally filtered.""" return repo.list_all( tactic=tactic, status=status_global, review_required=review_required, ) # --------------------------------------------------------------------------- # GET /techniques/{mitre_id} — detail (with tests + D3FEND) # --------------------------------------------------------------------------- @router.get("/{mitre_id}") def get_technique( mitre_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return full details for a single technique, including its tests and D3FEND defenses.""" technique = ( db.query(Technique) .options(joinedload(Technique.tests)) .filter(Technique.mitre_id == mitre_id) .first() ) if technique is None: raise EntityNotFoundError("Technique", mitre_id) defenses = get_defenses_for_technique(db, technique.id) return { "id": str(technique.id), "mitre_id": technique.mitre_id, "name": technique.name, "description": technique.description, "tactic": technique.tactic, "platforms": technique.platforms or [], "mitre_version": technique.mitre_version, "mitre_last_modified": technique.mitre_last_modified, "is_subtechnique": technique.is_subtechnique, "parent_mitre_id": technique.parent_mitre_id, "status_global": technique.status_global.value if technique.status_global else "not_evaluated", "review_required": technique.review_required, "last_review_date": technique.last_review_date, "tests": [ { "id": str(t.id), "name": t.name, "state": t.state.value if t.state else None, "result": t.result.value if t.result else None, "platform": t.platform, "created_at": t.created_at.isoformat() if t.created_at else None, } for t in technique.tests ], "d3fend_defenses": defenses, } # --------------------------------------------------------------------------- # POST /techniques — create (admin only) # --------------------------------------------------------------------------- @router.post( "", response_model=TechniqueOut, status_code=status.HTTP_201_CREATED, ) def create_technique( payload: TechniqueCreate, db: Session = Depends(get_db), repo: SATechniqueRepository = Depends(get_technique_repository), current_user: User = Depends(require_role("admin")), ): """Create a new technique manually.""" if repo.exists_by_mitre_id(payload.mitre_id): raise DuplicateEntityError("Technique", "mitre_id", payload.mitre_id) entity = TechniqueEntity.create( mitre_id=payload.mitre_id, name=payload.name, description=payload.description, tactic=payload.tactic, platforms=payload.platforms, ) with UnitOfWork(db) as uow: saved = repo.save(entity) uow.commit() log_action( db, user_id=current_user.id, action="create_technique", entity_type="technique", entity_id=saved.id, details={"mitre_id": saved.mitre_id, "name": saved.name}, ) return saved # --------------------------------------------------------------------------- # PATCH /techniques/{mitre_id} — update (admin only) # --------------------------------------------------------------------------- @router.patch("/{mitre_id}", response_model=TechniqueOut) def update_technique( mitre_id: str, payload: TechniqueUpdate, db: Session = Depends(get_db), repo: SATechniqueRepository = Depends(get_technique_repository), current_user: User = Depends(require_role("admin")), ): """Update one or more fields of an existing technique.""" entity = repo.find_by_mitre_id(mitre_id) if entity is None: raise EntityNotFoundError("Technique", mitre_id) update_data = payload.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(entity, field, value) with UnitOfWork(db) as uow: saved = repo.save(entity) uow.commit() log_action( db, user_id=current_user.id, action="update_technique", entity_type="technique", entity_id=saved.id, details={"mitre_id": mitre_id, "updated_fields": list(update_data.keys())}, ) return saved # --------------------------------------------------------------------------- # PATCH /techniques/{mitre_id}/review — mark as reviewed (leads + admin) # --------------------------------------------------------------------------- @router.patch("/{mitre_id}/review", response_model=TechniqueOut) def review_technique( mitre_id: str, db: Session = Depends(get_db), repo: SATechniqueRepository = Depends(get_technique_repository), current_user: User = Depends(require_any_role("red_lead", "blue_lead")), ): """Mark a technique as reviewed. Sets ``review_required`` to *False* and records the current timestamp in ``last_review_date``. """ entity = repo.find_by_mitre_id(mitre_id) if entity is None: raise EntityNotFoundError("Technique", mitre_id) entity.mark_reviewed() with UnitOfWork(db) as uow: saved = repo.save(entity) uow.commit() log_action( db, user_id=current_user.id, action="review_technique", entity_type="technique", entity_id=saved.id, details={"mitre_id": mitre_id}, ) return saved