From 4f6dd838fd36b5bd818f46728da94fd9dd80f9c0 Mon Sep 17 00:00:00 2001 From: Kitos Date: Fri, 6 Feb 2026 13:52:27 +0100 Subject: [PATCH] feat: Phase 3 - CRUD core for Techniques, Tests and Evidence (T-014 to T-017) - Add Pydantic schemas for Technique, Test and Evidence - Add CRUD endpoints for Techniques (list with filters, detail, create, update, review) - Add CRUD endpoints for Tests (create, detail, update, validate, reject) - Add evidence upload with SHA-256 integrity and presigned download URLs - Add MinIO/S3 storage client with bucket auto-creation on startup - Add status_service to recalculate technique coverage from test results - Add require_any_role RBAC dependency for multi-role authorization - Update README with API endpoints reference and project structure --- README.md | 48 ++++- backend/app/dependencies/auth.py | 21 +++ backend/app/main.py | 18 +- backend/app/routers/evidence.py | 132 +++++++++++++ backend/app/routers/techniques.py | 206 ++++++++++++++++++++ backend/app/routers/tests.py | 248 +++++++++++++++++++++++++ backend/app/schemas/__init__.py | 38 ++++ backend/app/schemas/evidence.py | 23 +++ backend/app/schemas/technique.py | 69 +++++++ backend/app/schemas/test.py | 67 +++++++ backend/app/services/status_service.py | 36 ++++ backend/app/storage.py | 57 ++++++ 12 files changed, 958 insertions(+), 5 deletions(-) create mode 100644 backend/app/routers/evidence.py create mode 100644 backend/app/routers/techniques.py create mode 100644 backend/app/routers/tests.py create mode 100644 backend/app/schemas/evidence.py create mode 100644 backend/app/schemas/technique.py create mode 100644 backend/app/schemas/test.py create mode 100644 backend/app/services/status_service.py create mode 100644 backend/app/storage.py diff --git a/README.md b/README.md index d5b85fa..9ea0592 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,38 @@ Once the backend is running, access the interactive API documentation at: - **Swagger UI**: http://localhost:8000/docs - **ReDoc**: http://localhost:8000/redoc +## API Endpoints + +### Auth +| Method | Route | Auth | Description | +|--------|-------|------|-------------| +| POST | `/api/v1/auth/login` | Public | Obtain JWT token | +| GET | `/api/v1/auth/me` | Authenticated | Current user profile | + +### Techniques +| Method | Route | Auth | Description | +|--------|-------|------|-------------| +| GET | `/api/v1/techniques` | Authenticated | List all (filters: `?tactic=`, `?status=`, `?review_required=`) | +| GET | `/api/v1/techniques/{mitre_id}` | Authenticated | Detail with associated tests | +| POST | `/api/v1/techniques` | Admin | Create technique | +| PATCH | `/api/v1/techniques/{mitre_id}` | Admin | Update technique fields | +| PATCH | `/api/v1/techniques/{mitre_id}/review` | Lead, Admin | Mark as reviewed | + +### Tests +| Method | Route | Auth | Description | +|--------|-------|------|-------------| +| POST | `/api/v1/tests` | Red Tech, Admin | Create test (state=draft) | +| GET | `/api/v1/tests/{id}` | Authenticated | Detail with evidences | +| PATCH | `/api/v1/tests/{id}` | Creator, Admin | Update (only draft/rejected) | +| POST | `/api/v1/tests/{id}/validate` | Lead, Admin | Validate + recalculate technique status | +| POST | `/api/v1/tests/{id}/reject` | Lead, Admin | Reject test | + +### Evidence +| Method | Route | Auth | Description | +|--------|-------|------|-------------| +| POST | `/api/v1/tests/{test_id}/evidence` | Authenticated | Upload evidence file (SHA-256 verified) | +| GET | `/api/v1/evidence/{id}` | Authenticated | Get metadata + presigned download URL | + ## Project Structure ``` @@ -117,14 +149,22 @@ Aegis/ │ │ ├── intel.py # Threat intelligence items │ │ ├── audit.py # Audit logging │ │ └── enums.py # Shared enumerations +│ ├── storage.py # MinIO/S3 client (upload, presigned URLs) │ ├── schemas/ # Pydantic request/response schemas -│ │ └── auth.py # LoginRequest, TokenResponse, UserOut +│ │ ├── auth.py # LoginRequest, TokenResponse, UserOut +│ │ ├── technique.py # TechniqueCreate/Update/Out/Summary +│ │ ├── test.py # TestCreate/Update/Out/Validate +│ │ └── evidence.py # EvidenceOut │ ├── routers/ # API endpoint routers -│ │ └── auth.py # POST /auth/login, GET /auth/me +│ │ ├── auth.py # POST /auth/login, GET /auth/me +│ │ ├── techniques.py # CRUD techniques (list, detail, create, update, review) +│ │ ├── tests.py # CRUD tests (create, detail, update, validate, reject) +│ │ └── evidence.py # Upload evidence, presigned download │ ├── dependencies/ # FastAPI dependencies (DI) -│ │ └── auth.py # get_current_user, require_role (RBAC) +│ │ └── auth.py # get_current_user, require_role, require_any_role │ └── services/ # Business logic services -│ └── audit_service.py +│ ├── audit_service.py +│ └── status_service.py # Recalculate technique status from tests └── frontend/ # React frontend (coming soon) ``` diff --git a/backend/app/dependencies/auth.py b/backend/app/dependencies/auth.py index c1443a0..7775d33 100644 --- a/backend/app/dependencies/auth.py +++ b/backend/app/dependencies/auth.py @@ -87,3 +87,24 @@ def require_role(required_role: str): return current_user return role_checker + + +def require_any_role(*roles: str): + """Return a FastAPI dependency that enforces **any** of the given *roles*. + + Admins always pass. Usage example:: + + @router.patch("/resource", dependencies=[Depends(require_any_role("red_lead", "blue_lead"))]) + """ + + async def role_checker( + current_user: User = Depends(get_current_user), + ) -> User: + if current_user.role != "admin" and current_user.role not in roles: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not enough permissions", + ) + return current_user + + return role_checker diff --git a/backend/app/main.py b/backend/app/main.py index c8e20e5..ff29ef7 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,9 +1,22 @@ +from contextlib import asynccontextmanager + from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.routers import auth as auth_router +from app.routers import techniques as techniques_router +from app.routers import tests as tests_router +from app.routers import evidence as evidence_router +from app.storage import ensure_bucket_exists -app = FastAPI(title="Attack Coverage Platform") +@asynccontextmanager +async def lifespan(app: FastAPI): + """Startup / shutdown logic.""" + ensure_bucket_exists() + yield + + +app = FastAPI(title="Attack Coverage Platform", lifespan=lifespan) # ── CORS ────────────────────────────────────────────────────────────────── app.add_middleware( @@ -16,6 +29,9 @@ app.add_middleware( # ── Routers ────────────────────────────────────────────────────────────── app.include_router(auth_router.router, prefix="/api/v1") +app.include_router(techniques_router.router, prefix="/api/v1") +app.include_router(tests_router.router, prefix="/api/v1") +app.include_router(evidence_router.router, prefix="/api/v1") @app.get("/health") diff --git a/backend/app/routers/evidence.py b/backend/app/routers/evidence.py new file mode 100644 index 0000000..4ea8120 --- /dev/null +++ b/backend/app/routers/evidence.py @@ -0,0 +1,132 @@ +"""Evidence upload and download router.""" + +import hashlib +import uuid as _uuid + +from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status +from sqlalchemy.orm import Session + +from app.database import get_db +from app.dependencies.auth import get_current_user +from app.models.evidence import Evidence +from app.models.test import Test +from app.models.user import User +from app.schemas.evidence import EvidenceOut +from app.services.audit_service import log_action +from app.storage import get_presigned_url, upload_file + +router = APIRouter(tags=["evidence"]) + + +# --------------------------------------------------------------------------- +# POST /tests/{test_id}/evidence — upload +# --------------------------------------------------------------------------- + + +@router.post( + "/tests/{test_id}/evidence", + response_model=EvidenceOut, + status_code=status.HTTP_201_CREATED, +) +async def upload_evidence( + test_id: _uuid.UUID, + file: UploadFile = File(...), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Upload a file as evidence for the given test. + + Steps: + 1. Read file content and compute SHA-256. + 2. Build an object key ``{test_id}/{uuid}_{filename}``. + 3. Upload to MinIO. + 4. Persist an :class:`Evidence` row in the database. + 5. Write an audit-log entry. + """ + # Verify the parent test exists + test = db.query(Test).filter(Test.id == test_id).first() + if test is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Test not found", + ) + + # 1. Read content + hash + content = await file.read() + sha256 = hashlib.sha256(content).hexdigest() + + # 2. Object key + file_name = file.filename or "unnamed" + key = f"{test_id}/{_uuid.uuid4()}_{file_name}" + + # 3. Upload to MinIO + upload_file(content, key) + + # 4. Persist metadata + evidence = Evidence( + test_id=test_id, + file_name=file_name, + file_path=key, + sha256_hash=sha256, + uploaded_by=current_user.id, + ) + db.add(evidence) + db.commit() + db.refresh(evidence) + + # 5. Audit + log_action( + db, + user_id=current_user.id, + action="upload_evidence", + entity_type="evidence", + entity_id=evidence.id, + details={ + "file_name": file_name, + "sha256": sha256, + "test_id": str(test_id), + }, + ) + + # Build response with download URL + return _evidence_to_out(evidence) + + +# --------------------------------------------------------------------------- +# GET /evidence/{id} — presigned download URL +# --------------------------------------------------------------------------- + + +@router.get("/evidence/{evidence_id}", response_model=EvidenceOut) +def get_evidence( + evidence_id: _uuid.UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Return evidence metadata together with a presigned download URL.""" + evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first() + if evidence is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Evidence not found", + ) + + return _evidence_to_out(evidence) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _evidence_to_out(evidence: Evidence) -> EvidenceOut: + """Convert an ORM ``Evidence`` to the API schema, injecting a presigned URL.""" + return EvidenceOut( + id=evidence.id, + test_id=evidence.test_id, + file_name=evidence.file_name, + sha256_hash=evidence.sha256_hash, + uploaded_by=evidence.uploaded_by, + uploaded_at=evidence.uploaded_at, + download_url=get_presigned_url(evidence.file_path), + ) diff --git a/backend/app/routers/techniques.py b/backend/app/routers/techniques.py new file mode 100644 index 0000000..3101502 --- /dev/null +++ b/backend/app/routers/techniques.py @@ -0,0 +1,206 @@ +"""CRUD router for MITRE ATT&CK Techniques.""" + +from datetime import datetime + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy.orm import Session, joinedload + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_role, require_any_role +from app.models.enums import TechniqueStatus +from app.models.technique import Technique +from app.models.user import User +from app.schemas.technique import ( + TechniqueCreate, + TechniqueOut, + TechniqueSummary, + TechniqueUpdate, +) +from app.services.audit_service import log_action + +router = APIRouter(prefix="/techniques", tags=["techniques"]) + + +# --------------------------------------------------------------------------- +# GET /techniques — list (with optional filters) +# --------------------------------------------------------------------------- + + +@router.get("", response_model=list[TechniqueSummary]) +def list_techniques( + tactic: str | None = Query(None, description="Filter by tactic name"), + status_global: TechniqueStatus | None = Query( + None, alias="status", description="Filter by global status" + ), + review_required: bool | None = Query(None, description="Filter by review flag"), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Return a lightweight list of techniques, optionally filtered.""" + query = db.query(Technique) + + if tactic is not None: + query = query.filter(Technique.tactic == tactic) + if status_global is not None: + query = query.filter(Technique.status_global == status_global) + if review_required is not None: + query = query.filter(Technique.review_required == review_required) + + return query.order_by(Technique.mitre_id).all() + + +# --------------------------------------------------------------------------- +# GET /techniques/{mitre_id} — detail (with tests) +# --------------------------------------------------------------------------- + + +@router.get("/{mitre_id}", response_model=TechniqueOut) +def get_technique( + mitre_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Return full details for a single technique, including its tests.""" + technique = ( + db.query(Technique) + .options(joinedload(Technique.tests)) + .filter(Technique.mitre_id == mitre_id) + .first() + ) + + if technique is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Technique {mitre_id} not found", + ) + + return technique + + +# --------------------------------------------------------------------------- +# POST /techniques — create (admin only) +# --------------------------------------------------------------------------- + + +@router.post( + "", + response_model=TechniqueOut, + status_code=status.HTTP_201_CREATED, +) +def create_technique( + payload: TechniqueCreate, + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """Create a new technique manually.""" + # Ensure mitre_id is unique + existing = ( + db.query(Technique).filter(Technique.mitre_id == payload.mitre_id).first() + ) + if existing is not None: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Technique with mitre_id '{payload.mitre_id}' already exists", + ) + + technique = Technique(**payload.model_dump()) + db.add(technique) + db.commit() + db.refresh(technique) + + log_action( + db, + user_id=current_user.id, + action="create_technique", + entity_type="technique", + entity_id=technique.id, + details={"mitre_id": technique.mitre_id, "name": technique.name}, + ) + + return technique + + +# --------------------------------------------------------------------------- +# PATCH /techniques/{mitre_id} — update (admin only) +# --------------------------------------------------------------------------- + + +@router.patch("/{mitre_id}", response_model=TechniqueOut) +def update_technique( + mitre_id: str, + payload: TechniqueUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(require_role("admin")), +): + """Update one or more fields of an existing technique.""" + technique = ( + db.query(Technique).filter(Technique.mitre_id == mitre_id).first() + ) + + if technique is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Technique {mitre_id} not found", + ) + + update_data = payload.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(technique, field, value) + + db.commit() + db.refresh(technique) + + log_action( + db, + user_id=current_user.id, + action="update_technique", + entity_type="technique", + entity_id=technique.id, + details={"mitre_id": mitre_id, "updated_fields": list(update_data.keys())}, + ) + + return technique + + +# --------------------------------------------------------------------------- +# PATCH /techniques/{mitre_id}/review — mark as reviewed (leads + admin) +# --------------------------------------------------------------------------- + + +@router.patch("/{mitre_id}/review", response_model=TechniqueOut) +def review_technique( + mitre_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(require_any_role("red_lead", "blue_lead")), +): + """Mark a technique as reviewed. + + Sets ``review_required`` to *False* and records the current timestamp + in ``last_review_date``. + """ + technique = ( + db.query(Technique).filter(Technique.mitre_id == mitre_id).first() + ) + + if technique is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Technique {mitre_id} not found", + ) + + technique.review_required = False + technique.last_review_date = datetime.utcnow() + + db.commit() + db.refresh(technique) + + log_action( + db, + user_id=current_user.id, + action="review_technique", + entity_type="technique", + entity_id=technique.id, + details={"mitre_id": mitre_id}, + ) + + return technique diff --git a/backend/app/routers/tests.py b/backend/app/routers/tests.py new file mode 100644 index 0000000..76dab76 --- /dev/null +++ b/backend/app/routers/tests.py @@ -0,0 +1,248 @@ +"""CRUD router for security Tests.""" + +import uuid +from datetime import datetime + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session, joinedload + +from app.database import get_db +from app.dependencies.auth import get_current_user, require_role, require_any_role +from app.models.enums import TestState +from app.models.technique import Technique +from app.models.test import Test +from app.models.user import User +from app.schemas.test import TestCreate, TestOut, TestUpdate, TestValidate +from app.services.audit_service import log_action +from app.services.status_service import recalculate_technique_status + +router = APIRouter(prefix="/tests", tags=["tests"]) + + +# --------------------------------------------------------------------------- +# POST /tests — create (red_tech or admin) +# --------------------------------------------------------------------------- + + +@router.post( + "", + response_model=TestOut, + status_code=status.HTTP_201_CREATED, +) +def create_test( + payload: TestCreate, + db: Session = Depends(get_db), + current_user: User = Depends(require_any_role("red_tech")), +): + """Create a new test linked to an existing technique. + + The ``created_by`` field is set automatically to the current user and + ``state`` defaults to *draft*. + """ + # Verify the parent technique exists + technique = db.query(Technique).filter(Technique.id == payload.technique_id).first() + if technique is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Technique with id '{payload.technique_id}' not found", + ) + + test = Test( + **payload.model_dump(), + created_by=current_user.id, + state=TestState.draft, + ) + db.add(test) + db.commit() + db.refresh(test) + + log_action( + db, + user_id=current_user.id, + action="create_test", + entity_type="test", + entity_id=test.id, + details={"name": test.name, "technique_id": str(test.technique_id)}, + ) + + return test + + +# --------------------------------------------------------------------------- +# GET /tests/{id} — detail (with evidences) +# --------------------------------------------------------------------------- + + +@router.get("/{test_id}", response_model=TestOut) +def get_test( + test_id: uuid.UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Return full details for a single test, including its evidences.""" + test = ( + db.query(Test) + .options(joinedload(Test.evidences)) + .filter(Test.id == test_id) + .first() + ) + + if test is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Test not found", + ) + + return test + + +# --------------------------------------------------------------------------- +# PATCH /tests/{id} — update (creator or admin, only in draft/rejected) +# --------------------------------------------------------------------------- + + +@router.patch("/{test_id}", response_model=TestOut) +def update_test( + test_id: uuid.UUID, + payload: TestUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Update one or more fields of an existing test. + + Only the original creator or an admin can update. + The test must be in ``draft`` or ``rejected`` state. + """ + test = db.query(Test).filter(Test.id == test_id).first() + + if test is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Test not found", + ) + + # Ownership / admin check + if current_user.role != "admin" and test.created_by != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not enough permissions", + ) + + # State guard + if test.state not in (TestState.draft, TestState.rejected): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Cannot update a test in '{test.state.value}' state (must be draft or rejected)", + ) + + update_data = payload.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(test, field, value) + + db.commit() + db.refresh(test) + + log_action( + db, + user_id=current_user.id, + action="update_test", + entity_type="test", + entity_id=test.id, + details={"updated_fields": list(update_data.keys())}, + ) + + return test + + +# --------------------------------------------------------------------------- +# POST /tests/{id}/validate — validate (leads + admin) +# --------------------------------------------------------------------------- + + +@router.post("/{test_id}/validate", response_model=TestOut) +def validate_test( + test_id: uuid.UUID, + payload: TestValidate, + db: Session = Depends(get_db), + current_user: User = Depends(require_any_role("red_lead", "blue_lead")), +): + """Mark a test as validated. + + Sets ``state`` to *validated*, records ``validated_by`` / ``validated_at``, + stores the ``result``, and recalculates the parent technique's global status. + """ + test = ( + db.query(Test) + .options(joinedload(Test.technique)) + .filter(Test.id == test_id) + .first() + ) + + if test is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Test not found", + ) + + test.state = TestState.validated + test.result = payload.result + test.validated_by = current_user.id + test.validated_at = datetime.utcnow() + + db.commit() + db.refresh(test) + + # Recalculate the parent technique's global status + technique = test.technique + recalculate_technique_status(db, technique) + + log_action( + db, + user_id=current_user.id, + action="validate_test", + entity_type="test", + entity_id=test.id, + details={ + "result": payload.result.value, + "technique_id": str(test.technique_id), + }, + ) + + return test + + +# --------------------------------------------------------------------------- +# POST /tests/{id}/reject — reject (leads + admin) +# --------------------------------------------------------------------------- + + +@router.post("/{test_id}/reject", response_model=TestOut) +def reject_test( + test_id: uuid.UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_any_role("red_lead", "blue_lead")), +): + """Reject a test, setting its state to *rejected*.""" + test = db.query(Test).filter(Test.id == test_id).first() + + if test is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Test not found", + ) + + test.state = TestState.rejected + + db.commit() + db.refresh(test) + + log_action( + db, + user_id=current_user.id, + action="reject_test", + entity_type="test", + entity_id=test.id, + details={"technique_id": str(test.technique_id)}, + ) + + return test diff --git a/backend/app/schemas/__init__.py b/backend/app/schemas/__init__.py index e69de29..421ab11 100644 --- a/backend/app/schemas/__init__.py +++ b/backend/app/schemas/__init__.py @@ -0,0 +1,38 @@ +"""Pydantic schemas — re-exported for convenient imports.""" + +from app.schemas.auth import LoginRequest, TokenResponse, UserOut + +from app.schemas.technique import ( + TechniqueCreate, + TechniqueOut, + TechniqueSummary, + TechniqueUpdate, +) + +from app.schemas.test import ( + TestCreate, + TestOut, + TestUpdate, + TestValidate, +) + +from app.schemas.evidence import EvidenceOut + +__all__ = [ + # Auth + "LoginRequest", + "TokenResponse", + "UserOut", + # Technique + "TechniqueCreate", + "TechniqueOut", + "TechniqueSummary", + "TechniqueUpdate", + # Test + "TestCreate", + "TestOut", + "TestUpdate", + "TestValidate", + # Evidence + "EvidenceOut", +] diff --git a/backend/app/schemas/evidence.py b/backend/app/schemas/evidence.py new file mode 100644 index 0000000..60ec6e7 --- /dev/null +++ b/backend/app/schemas/evidence.py @@ -0,0 +1,23 @@ +"""Pydantic schemas for Evidence endpoints.""" + +import uuid +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class EvidenceOut(BaseModel): + """Representation of an evidence record returned by the API. + + ``download_url`` is a presigned URL generated at response time. + """ + + id: uuid.UUID + test_id: uuid.UUID + file_name: str + sha256_hash: str + uploaded_by: uuid.UUID | None = None + uploaded_at: datetime | None = None + download_url: str | None = None + + model_config = ConfigDict(from_attributes=True) diff --git a/backend/app/schemas/technique.py b/backend/app/schemas/technique.py new file mode 100644 index 0000000..9cb8042 --- /dev/null +++ b/backend/app/schemas/technique.py @@ -0,0 +1,69 @@ +"""Pydantic schemas for Technique endpoints.""" + +import uuid +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + +from app.models.enums import TechniqueStatus + + +# ── Create ────────────────────────────────────────────────────────── + +class TechniqueCreate(BaseModel): + """Payload for creating a new technique.""" + + mitre_id: str + name: str + description: str | None = None + tactic: str | None = None + platforms: list[str] | None = None + + +# ── Update ────────────────────────────────────────────────────────── + +class TechniqueUpdate(BaseModel): + """Payload for partially updating an existing technique. + Every field is optional so callers send only what changed.""" + + name: str | None = None + description: str | None = None + tactic: str | None = None + platforms: list[str] | None = None + status_global: TechniqueStatus | None = None + + +# ── Read (full) ───────────────────────────────────────────────────── + +class TechniqueOut(BaseModel): + """Complete representation returned by the API.""" + + id: uuid.UUID + mitre_id: str + name: str + description: str | None = None + tactic: str | None = None + platforms: list[str] | None = None + mitre_version: str | None = None + mitre_last_modified: datetime | None = None + is_subtechnique: bool = False + parent_mitre_id: str | None = None + status_global: TechniqueStatus = TechniqueStatus.not_evaluated + review_required: bool = False + last_review_date: datetime | None = None + + model_config = ConfigDict(from_attributes=True) + + +# ── Read (summary) ────────────────────────────────────────────────── + +class TechniqueSummary(BaseModel): + """Lightweight representation used in list endpoints.""" + + id: uuid.UUID + mitre_id: str + name: str + tactic: str | None = None + status_global: TechniqueStatus = TechniqueStatus.not_evaluated + + model_config = ConfigDict(from_attributes=True) diff --git a/backend/app/schemas/test.py b/backend/app/schemas/test.py new file mode 100644 index 0000000..e93cac3 --- /dev/null +++ b/backend/app/schemas/test.py @@ -0,0 +1,67 @@ +"""Pydantic schemas for Test endpoints.""" + +import uuid +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + +from app.models.enums import TestResult, TestState + + +# ── Create ────────────────────────────────────────────────────────── + +class TestCreate(BaseModel): + """Payload for creating a new test.""" + + technique_id: uuid.UUID + name: str + description: str | None = None + platform: str | None = None + procedure_text: str | None = None + tool_used: str | None = None + + +# ── Update ────────────────────────────────────────────────────────── + +class TestUpdate(BaseModel): + """Payload for partially updating an existing test. + Every field is optional so callers send only what changed.""" + + name: str | None = None + description: str | None = None + platform: str | None = None + procedure_text: str | None = None + tool_used: str | None = None + result: TestResult | None = None + + +# ── Read (full) ───────────────────────────────────────────────────── + +class TestOut(BaseModel): + """Complete representation returned by the API.""" + + id: uuid.UUID + technique_id: uuid.UUID + name: str + description: str | None = None + platform: str | None = None + procedure_text: str | None = None + tool_used: str | None = None + execution_date: datetime | None = None + created_by: uuid.UUID | None = None + result: TestResult | None = None + state: TestState = TestState.draft + validated_by: uuid.UUID | None = None + validated_at: datetime | None = None + created_at: datetime | None = None + + model_config = ConfigDict(from_attributes=True) + + +# ── Validate ──────────────────────────────────────────────────────── + +class TestValidate(BaseModel): + """Payload sent by a reviewer to validate / reject a test.""" + + result: TestResult + comments: str | None = None diff --git a/backend/app/services/status_service.py b/backend/app/services/status_service.py new file mode 100644 index 0000000..82f0fb1 --- /dev/null +++ b/backend/app/services/status_service.py @@ -0,0 +1,36 @@ +"""Service for recalculating the global status of a Technique +based on the state and result of its associated tests.""" + +from sqlalchemy.orm import Session + +from app.models.enums import TechniqueStatus +from app.models.technique import Technique + + +def recalculate_technique_status(db: Session, technique: Technique) -> None: + """Recompute ``technique.status_global`` from its tests and commit. + + Rules + ----- + - No tests → ``not_evaluated`` + - Any test not yet ``validated`` → ``in_progress`` + - All validated and all ``detected`` → ``validated`` + - All validated and any ``partially_detected`` → ``partial`` + - Otherwise → ``not_covered`` + """ + tests = technique.tests + + if not tests: + technique.status_global = TechniqueStatus.not_evaluated + elif any(t.state != "validated" for t in tests): + technique.status_global = TechniqueStatus.in_progress + else: + results = [t.result for t in tests] + if all(r == "detected" for r in results): + technique.status_global = TechniqueStatus.validated + elif any(r == "partially_detected" for r in results): + technique.status_global = TechniqueStatus.partial + else: + technique.status_global = TechniqueStatus.not_covered + + db.commit() diff --git a/backend/app/storage.py b/backend/app/storage.py new file mode 100644 index 0000000..03603b8 --- /dev/null +++ b/backend/app/storage.py @@ -0,0 +1,57 @@ +"""MinIO / S3-compatible object-storage helpers. + +Provides thin wrappers around boto3 for bucket management, file upload +and presigned-URL generation. +""" + +import boto3 +from botocore.exceptions import ClientError + +from app.config import settings + +# --------------------------------------------------------------------------- +# Shared client (module-level singleton) +# --------------------------------------------------------------------------- + +_client = boto3.client( + "s3", + endpoint_url=f"http://{settings.MINIO_ENDPOINT}", + aws_access_key_id=settings.MINIO_ACCESS_KEY, + aws_secret_access_key=settings.MINIO_SECRET_KEY, + region_name="us-east-1", # MinIO ignores this but boto3 requires it +) + + +# --------------------------------------------------------------------------- +# Public helpers +# --------------------------------------------------------------------------- + + +def ensure_bucket_exists() -> None: + """Create the evidence bucket if it does not already exist.""" + try: + _client.head_bucket(Bucket=settings.MINIO_BUCKET) + except ClientError: + _client.create_bucket(Bucket=settings.MINIO_BUCKET) + + +def upload_file(content: bytes, key: str) -> str: + """Upload *content* to the evidence bucket under *key*. + + Returns the key that was written (same as the input). + """ + _client.put_object( + Bucket=settings.MINIO_BUCKET, + Key=key, + Body=content, + ) + return key + + +def get_presigned_url(key: str, expiration: int = 3600) -> str: + """Return a presigned GET URL for *key* valid for *expiration* seconds.""" + return _client.generate_presigned_url( + "get_object", + Params={"Bucket": settings.MINIO_BUCKET, "Key": key}, + ExpiresIn=expiration, + )