feat: Phase 3 - CRUD core for Techniques, Tests and Evidence (T-014 to T-017)

- Add Pydantic schemas for Technique, Test and Evidence
- Add CRUD endpoints for Techniques (list with filters, detail, create, update, review)
- Add CRUD endpoints for Tests (create, detail, update, validate, reject)
- Add evidence upload with SHA-256 integrity and presigned download URLs
- Add MinIO/S3 storage client with bucket auto-creation on startup
- Add status_service to recalculate technique coverage from test results
- Add require_any_role RBAC dependency for multi-role authorization
- Update README with API endpoints reference and project structure
This commit is contained in:
2026-02-06 13:52:27 +01:00
parent 508f0723af
commit 4f6dd838fd
12 changed files with 958 additions and 5 deletions

View File

@@ -89,6 +89,38 @@ Once the backend is running, access the interactive API documentation at:
- **Swagger UI**: http://localhost:8000/docs - **Swagger UI**: http://localhost:8000/docs
- **ReDoc**: http://localhost:8000/redoc - **ReDoc**: http://localhost:8000/redoc
## API Endpoints
### Auth
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| POST | `/api/v1/auth/login` | Public | Obtain JWT token |
| GET | `/api/v1/auth/me` | Authenticated | Current user profile |
### Techniques
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| GET | `/api/v1/techniques` | Authenticated | List all (filters: `?tactic=`, `?status=`, `?review_required=`) |
| GET | `/api/v1/techniques/{mitre_id}` | Authenticated | Detail with associated tests |
| POST | `/api/v1/techniques` | Admin | Create technique |
| PATCH | `/api/v1/techniques/{mitre_id}` | Admin | Update technique fields |
| PATCH | `/api/v1/techniques/{mitre_id}/review` | Lead, Admin | Mark as reviewed |
### Tests
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| POST | `/api/v1/tests` | Red Tech, Admin | Create test (state=draft) |
| GET | `/api/v1/tests/{id}` | Authenticated | Detail with evidences |
| PATCH | `/api/v1/tests/{id}` | Creator, Admin | Update (only draft/rejected) |
| POST | `/api/v1/tests/{id}/validate` | Lead, Admin | Validate + recalculate technique status |
| POST | `/api/v1/tests/{id}/reject` | Lead, Admin | Reject test |
### Evidence
| Method | Route | Auth | Description |
|--------|-------|------|-------------|
| POST | `/api/v1/tests/{test_id}/evidence` | Authenticated | Upload evidence file (SHA-256 verified) |
| GET | `/api/v1/evidence/{id}` | Authenticated | Get metadata + presigned download URL |
## Project Structure ## Project Structure
``` ```
@@ -117,14 +149,22 @@ Aegis/
│ │ ├── intel.py # Threat intelligence items │ │ ├── intel.py # Threat intelligence items
│ │ ├── audit.py # Audit logging │ │ ├── audit.py # Audit logging
│ │ └── enums.py # Shared enumerations │ │ └── enums.py # Shared enumerations
│ ├── storage.py # MinIO/S3 client (upload, presigned URLs)
│ ├── schemas/ # Pydantic request/response schemas │ ├── schemas/ # Pydantic request/response schemas
│ │ ── auth.py # LoginRequest, TokenResponse, UserOut │ │ ── auth.py # LoginRequest, TokenResponse, UserOut
│ │ ├── technique.py # TechniqueCreate/Update/Out/Summary
│ │ ├── test.py # TestCreate/Update/Out/Validate
│ │ └── evidence.py # EvidenceOut
│ ├── routers/ # API endpoint routers │ ├── routers/ # API endpoint routers
│ │ ── auth.py # POST /auth/login, GET /auth/me │ │ ── auth.py # POST /auth/login, GET /auth/me
│ │ ├── techniques.py # CRUD techniques (list, detail, create, update, review)
│ │ ├── tests.py # CRUD tests (create, detail, update, validate, reject)
│ │ └── evidence.py # Upload evidence, presigned download
│ ├── dependencies/ # FastAPI dependencies (DI) │ ├── dependencies/ # FastAPI dependencies (DI)
│ │ └── auth.py # get_current_user, require_role (RBAC) │ │ └── auth.py # get_current_user, require_role, require_any_role
│ └── services/ # Business logic services │ └── services/ # Business logic services
── audit_service.py ── audit_service.py
│ └── status_service.py # Recalculate technique status from tests
└── frontend/ # React frontend (coming soon) └── frontend/ # React frontend (coming soon)
``` ```

View File

@@ -87,3 +87,24 @@ def require_role(required_role: str):
return current_user return current_user
return role_checker return role_checker
def require_any_role(*roles: str):
"""Return a FastAPI dependency that enforces **any** of the given *roles*.
Admins always pass. Usage example::
@router.patch("/resource", dependencies=[Depends(require_any_role("red_lead", "blue_lead"))])
"""
async def role_checker(
current_user: User = Depends(get_current_user),
) -> User:
if current_user.role != "admin" and current_user.role not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
return current_user
return role_checker

View File

@@ -1,9 +1,22 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from app.routers import auth as auth_router from app.routers import auth as auth_router
from app.routers import techniques as techniques_router
from app.routers import tests as tests_router
from app.routers import evidence as evidence_router
from app.storage import ensure_bucket_exists
app = FastAPI(title="Attack Coverage Platform") @asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup / shutdown logic."""
ensure_bucket_exists()
yield
app = FastAPI(title="Attack Coverage Platform", lifespan=lifespan)
# ── CORS ────────────────────────────────────────────────────────────────── # ── CORS ──────────────────────────────────────────────────────────────────
app.add_middleware( app.add_middleware(
@@ -16,6 +29,9 @@ app.add_middleware(
# ── Routers ────────────────────────────────────────────────────────────── # ── Routers ──────────────────────────────────────────────────────────────
app.include_router(auth_router.router, prefix="/api/v1") app.include_router(auth_router.router, prefix="/api/v1")
app.include_router(techniques_router.router, prefix="/api/v1")
app.include_router(tests_router.router, prefix="/api/v1")
app.include_router(evidence_router.router, prefix="/api/v1")
@app.get("/health") @app.get("/health")

View File

@@ -0,0 +1,132 @@
"""Evidence upload and download router."""
import hashlib
import uuid as _uuid
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.evidence import Evidence
from app.models.test import Test
from app.models.user import User
from app.schemas.evidence import EvidenceOut
from app.services.audit_service import log_action
from app.storage import get_presigned_url, upload_file
router = APIRouter(tags=["evidence"])
# ---------------------------------------------------------------------------
# POST /tests/{test_id}/evidence — upload
# ---------------------------------------------------------------------------
@router.post(
"/tests/{test_id}/evidence",
response_model=EvidenceOut,
status_code=status.HTTP_201_CREATED,
)
async def upload_evidence(
test_id: _uuid.UUID,
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Upload a file as evidence for the given test.
Steps:
1. Read file content and compute SHA-256.
2. Build an object key ``{test_id}/{uuid}_{filename}``.
3. Upload to MinIO.
4. Persist an :class:`Evidence` row in the database.
5. Write an audit-log entry.
"""
# Verify the parent test exists
test = db.query(Test).filter(Test.id == test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
# 1. Read content + hash
content = await file.read()
sha256 = hashlib.sha256(content).hexdigest()
# 2. Object key
file_name = file.filename or "unnamed"
key = f"{test_id}/{_uuid.uuid4()}_{file_name}"
# 3. Upload to MinIO
upload_file(content, key)
# 4. Persist metadata
evidence = Evidence(
test_id=test_id,
file_name=file_name,
file_path=key,
sha256_hash=sha256,
uploaded_by=current_user.id,
)
db.add(evidence)
db.commit()
db.refresh(evidence)
# 5. Audit
log_action(
db,
user_id=current_user.id,
action="upload_evidence",
entity_type="evidence",
entity_id=evidence.id,
details={
"file_name": file_name,
"sha256": sha256,
"test_id": str(test_id),
},
)
# Build response with download URL
return _evidence_to_out(evidence)
# ---------------------------------------------------------------------------
# GET /evidence/{id} — presigned download URL
# ---------------------------------------------------------------------------
@router.get("/evidence/{evidence_id}", response_model=EvidenceOut)
def get_evidence(
evidence_id: _uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return evidence metadata together with a presigned download URL."""
evidence = db.query(Evidence).filter(Evidence.id == evidence_id).first()
if evidence is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Evidence not found",
)
return _evidence_to_out(evidence)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _evidence_to_out(evidence: Evidence) -> EvidenceOut:
"""Convert an ORM ``Evidence`` to the API schema, injecting a presigned URL."""
return EvidenceOut(
id=evidence.id,
test_id=evidence.test_id,
file_name=evidence.file_name,
sha256_hash=evidence.sha256_hash,
uploaded_by=evidence.uploaded_by,
uploaded_at=evidence.uploaded_at,
download_url=get_presigned_url(evidence.file_path),
)

View File

@@ -0,0 +1,206 @@
"""CRUD router for MITRE ATT&CK Techniques."""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session, joinedload
from app.database import get_db
from app.dependencies.auth import get_current_user, require_role, require_any_role
from app.models.enums import TechniqueStatus
from app.models.technique import Technique
from app.models.user import User
from app.schemas.technique import (
TechniqueCreate,
TechniqueOut,
TechniqueSummary,
TechniqueUpdate,
)
from app.services.audit_service import log_action
router = APIRouter(prefix="/techniques", tags=["techniques"])
# ---------------------------------------------------------------------------
# GET /techniques — list (with optional filters)
# ---------------------------------------------------------------------------
@router.get("", response_model=list[TechniqueSummary])
def list_techniques(
tactic: str | None = Query(None, description="Filter by tactic name"),
status_global: TechniqueStatus | None = Query(
None, alias="status", description="Filter by global status"
),
review_required: bool | None = Query(None, description="Filter by review flag"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return a lightweight list of techniques, optionally filtered."""
query = db.query(Technique)
if tactic is not None:
query = query.filter(Technique.tactic == tactic)
if status_global is not None:
query = query.filter(Technique.status_global == status_global)
if review_required is not None:
query = query.filter(Technique.review_required == review_required)
return query.order_by(Technique.mitre_id).all()
# ---------------------------------------------------------------------------
# GET /techniques/{mitre_id} — detail (with tests)
# ---------------------------------------------------------------------------
@router.get("/{mitre_id}", response_model=TechniqueOut)
def get_technique(
mitre_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return full details for a single technique, including its tests."""
technique = (
db.query(Technique)
.options(joinedload(Technique.tests))
.filter(Technique.mitre_id == mitre_id)
.first()
)
if technique is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Technique {mitre_id} not found",
)
return technique
# ---------------------------------------------------------------------------
# POST /techniques — create (admin only)
# ---------------------------------------------------------------------------
@router.post(
"",
response_model=TechniqueOut,
status_code=status.HTTP_201_CREATED,
)
def create_technique(
payload: TechniqueCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Create a new technique manually."""
# Ensure mitre_id is unique
existing = (
db.query(Technique).filter(Technique.mitre_id == payload.mitre_id).first()
)
if existing is not None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Technique with mitre_id '{payload.mitre_id}' already exists",
)
technique = Technique(**payload.model_dump())
db.add(technique)
db.commit()
db.refresh(technique)
log_action(
db,
user_id=current_user.id,
action="create_technique",
entity_type="technique",
entity_id=technique.id,
details={"mitre_id": technique.mitre_id, "name": technique.name},
)
return technique
# ---------------------------------------------------------------------------
# PATCH /techniques/{mitre_id} — update (admin only)
# ---------------------------------------------------------------------------
@router.patch("/{mitre_id}", response_model=TechniqueOut)
def update_technique(
mitre_id: str,
payload: TechniqueUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
):
"""Update one or more fields of an existing technique."""
technique = (
db.query(Technique).filter(Technique.mitre_id == mitre_id).first()
)
if technique is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Technique {mitre_id} not found",
)
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(technique, field, value)
db.commit()
db.refresh(technique)
log_action(
db,
user_id=current_user.id,
action="update_technique",
entity_type="technique",
entity_id=technique.id,
details={"mitre_id": mitre_id, "updated_fields": list(update_data.keys())},
)
return technique
# ---------------------------------------------------------------------------
# PATCH /techniques/{mitre_id}/review — mark as reviewed (leads + admin)
# ---------------------------------------------------------------------------
@router.patch("/{mitre_id}/review", response_model=TechniqueOut)
def review_technique(
mitre_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Mark a technique as reviewed.
Sets ``review_required`` to *False* and records the current timestamp
in ``last_review_date``.
"""
technique = (
db.query(Technique).filter(Technique.mitre_id == mitre_id).first()
)
if technique is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Technique {mitre_id} not found",
)
technique.review_required = False
technique.last_review_date = datetime.utcnow()
db.commit()
db.refresh(technique)
log_action(
db,
user_id=current_user.id,
action="review_technique",
entity_type="technique",
entity_id=technique.id,
details={"mitre_id": mitre_id},
)
return technique

View File

@@ -0,0 +1,248 @@
"""CRUD router for security Tests."""
import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session, joinedload
from app.database import get_db
from app.dependencies.auth import get_current_user, require_role, require_any_role
from app.models.enums import TestState
from app.models.technique import Technique
from app.models.test import Test
from app.models.user import User
from app.schemas.test import TestCreate, TestOut, TestUpdate, TestValidate
from app.services.audit_service import log_action
from app.services.status_service import recalculate_technique_status
router = APIRouter(prefix="/tests", tags=["tests"])
# ---------------------------------------------------------------------------
# POST /tests — create (red_tech or admin)
# ---------------------------------------------------------------------------
@router.post(
"",
response_model=TestOut,
status_code=status.HTTP_201_CREATED,
)
def create_test(
payload: TestCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech")),
):
"""Create a new test linked to an existing technique.
The ``created_by`` field is set automatically to the current user and
``state`` defaults to *draft*.
"""
# Verify the parent technique exists
technique = db.query(Technique).filter(Technique.id == payload.technique_id).first()
if technique is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Technique with id '{payload.technique_id}' not found",
)
test = Test(
**payload.model_dump(),
created_by=current_user.id,
state=TestState.draft,
)
db.add(test)
db.commit()
db.refresh(test)
log_action(
db,
user_id=current_user.id,
action="create_test",
entity_type="test",
entity_id=test.id,
details={"name": test.name, "technique_id": str(test.technique_id)},
)
return test
# ---------------------------------------------------------------------------
# GET /tests/{id} — detail (with evidences)
# ---------------------------------------------------------------------------
@router.get("/{test_id}", response_model=TestOut)
def get_test(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return full details for a single test, including its evidences."""
test = (
db.query(Test)
.options(joinedload(Test.evidences))
.filter(Test.id == test_id)
.first()
)
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
return test
# ---------------------------------------------------------------------------
# PATCH /tests/{id} — update (creator or admin, only in draft/rejected)
# ---------------------------------------------------------------------------
@router.patch("/{test_id}", response_model=TestOut)
def update_test(
test_id: uuid.UUID,
payload: TestUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update one or more fields of an existing test.
Only the original creator or an admin can update.
The test must be in ``draft`` or ``rejected`` state.
"""
test = db.query(Test).filter(Test.id == test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
# Ownership / admin check
if current_user.role != "admin" and test.created_by != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
)
# State guard
if test.state not in (TestState.draft, TestState.rejected):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot update a test in '{test.state.value}' state (must be draft or rejected)",
)
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(test, field, value)
db.commit()
db.refresh(test)
log_action(
db,
user_id=current_user.id,
action="update_test",
entity_type="test",
entity_id=test.id,
details={"updated_fields": list(update_data.keys())},
)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/validate — validate (leads + admin)
# ---------------------------------------------------------------------------
@router.post("/{test_id}/validate", response_model=TestOut)
def validate_test(
test_id: uuid.UUID,
payload: TestValidate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Mark a test as validated.
Sets ``state`` to *validated*, records ``validated_by`` / ``validated_at``,
stores the ``result``, and recalculates the parent technique's global status.
"""
test = (
db.query(Test)
.options(joinedload(Test.technique))
.filter(Test.id == test_id)
.first()
)
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
test.state = TestState.validated
test.result = payload.result
test.validated_by = current_user.id
test.validated_at = datetime.utcnow()
db.commit()
db.refresh(test)
# Recalculate the parent technique's global status
technique = test.technique
recalculate_technique_status(db, technique)
log_action(
db,
user_id=current_user.id,
action="validate_test",
entity_type="test",
entity_id=test.id,
details={
"result": payload.result.value,
"technique_id": str(test.technique_id),
},
)
return test
# ---------------------------------------------------------------------------
# POST /tests/{id}/reject — reject (leads + admin)
# ---------------------------------------------------------------------------
@router.post("/{test_id}/reject", response_model=TestOut)
def reject_test(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Reject a test, setting its state to *rejected*."""
test = db.query(Test).filter(Test.id == test_id).first()
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Test not found",
)
test.state = TestState.rejected
db.commit()
db.refresh(test)
log_action(
db,
user_id=current_user.id,
action="reject_test",
entity_type="test",
entity_id=test.id,
details={"technique_id": str(test.technique_id)},
)
return test

View File

@@ -0,0 +1,38 @@
"""Pydantic schemas — re-exported for convenient imports."""
from app.schemas.auth import LoginRequest, TokenResponse, UserOut
from app.schemas.technique import (
TechniqueCreate,
TechniqueOut,
TechniqueSummary,
TechniqueUpdate,
)
from app.schemas.test import (
TestCreate,
TestOut,
TestUpdate,
TestValidate,
)
from app.schemas.evidence import EvidenceOut
__all__ = [
# Auth
"LoginRequest",
"TokenResponse",
"UserOut",
# Technique
"TechniqueCreate",
"TechniqueOut",
"TechniqueSummary",
"TechniqueUpdate",
# Test
"TestCreate",
"TestOut",
"TestUpdate",
"TestValidate",
# Evidence
"EvidenceOut",
]

View File

@@ -0,0 +1,23 @@
"""Pydantic schemas for Evidence endpoints."""
import uuid
from datetime import datetime
from pydantic import BaseModel, ConfigDict
class EvidenceOut(BaseModel):
"""Representation of an evidence record returned by the API.
``download_url`` is a presigned URL generated at response time.
"""
id: uuid.UUID
test_id: uuid.UUID
file_name: str
sha256_hash: str
uploaded_by: uuid.UUID | None = None
uploaded_at: datetime | None = None
download_url: str | None = None
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,69 @@
"""Pydantic schemas for Technique endpoints."""
import uuid
from datetime import datetime
from pydantic import BaseModel, ConfigDict
from app.models.enums import TechniqueStatus
# ── Create ──────────────────────────────────────────────────────────
class TechniqueCreate(BaseModel):
"""Payload for creating a new technique."""
mitre_id: str
name: str
description: str | None = None
tactic: str | None = None
platforms: list[str] | None = None
# ── Update ──────────────────────────────────────────────────────────
class TechniqueUpdate(BaseModel):
"""Payload for partially updating an existing technique.
Every field is optional so callers send only what changed."""
name: str | None = None
description: str | None = None
tactic: str | None = None
platforms: list[str] | None = None
status_global: TechniqueStatus | None = None
# ── Read (full) ─────────────────────────────────────────────────────
class TechniqueOut(BaseModel):
"""Complete representation returned by the API."""
id: uuid.UUID
mitre_id: str
name: str
description: str | None = None
tactic: str | None = None
platforms: list[str] | None = None
mitre_version: str | None = None
mitre_last_modified: datetime | None = None
is_subtechnique: bool = False
parent_mitre_id: str | None = None
status_global: TechniqueStatus = TechniqueStatus.not_evaluated
review_required: bool = False
last_review_date: datetime | None = None
model_config = ConfigDict(from_attributes=True)
# ── Read (summary) ──────────────────────────────────────────────────
class TechniqueSummary(BaseModel):
"""Lightweight representation used in list endpoints."""
id: uuid.UUID
mitre_id: str
name: str
tactic: str | None = None
status_global: TechniqueStatus = TechniqueStatus.not_evaluated
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,67 @@
"""Pydantic schemas for Test endpoints."""
import uuid
from datetime import datetime
from pydantic import BaseModel, ConfigDict
from app.models.enums import TestResult, TestState
# ── Create ──────────────────────────────────────────────────────────
class TestCreate(BaseModel):
"""Payload for creating a new test."""
technique_id: uuid.UUID
name: str
description: str | None = None
platform: str | None = None
procedure_text: str | None = None
tool_used: str | None = None
# ── Update ──────────────────────────────────────────────────────────
class TestUpdate(BaseModel):
"""Payload for partially updating an existing test.
Every field is optional so callers send only what changed."""
name: str | None = None
description: str | None = None
platform: str | None = None
procedure_text: str | None = None
tool_used: str | None = None
result: TestResult | None = None
# ── Read (full) ─────────────────────────────────────────────────────
class TestOut(BaseModel):
"""Complete representation returned by the API."""
id: uuid.UUID
technique_id: uuid.UUID
name: str
description: str | None = None
platform: str | None = None
procedure_text: str | None = None
tool_used: str | None = None
execution_date: datetime | None = None
created_by: uuid.UUID | None = None
result: TestResult | None = None
state: TestState = TestState.draft
validated_by: uuid.UUID | None = None
validated_at: datetime | None = None
created_at: datetime | None = None
model_config = ConfigDict(from_attributes=True)
# ── Validate ────────────────────────────────────────────────────────
class TestValidate(BaseModel):
"""Payload sent by a reviewer to validate / reject a test."""
result: TestResult
comments: str | None = None

View File

@@ -0,0 +1,36 @@
"""Service for recalculating the global status of a Technique
based on the state and result of its associated tests."""
from sqlalchemy.orm import Session
from app.models.enums import TechniqueStatus
from app.models.technique import Technique
def recalculate_technique_status(db: Session, technique: Technique) -> None:
"""Recompute ``technique.status_global`` from its tests and commit.
Rules
-----
- No tests → ``not_evaluated``
- Any test not yet ``validated`` → ``in_progress``
- All validated and all ``detected`` → ``validated``
- All validated and any ``partially_detected`` → ``partial``
- Otherwise → ``not_covered``
"""
tests = technique.tests
if not tests:
technique.status_global = TechniqueStatus.not_evaluated
elif any(t.state != "validated" for t in tests):
technique.status_global = TechniqueStatus.in_progress
else:
results = [t.result for t in tests]
if all(r == "detected" for r in results):
technique.status_global = TechniqueStatus.validated
elif any(r == "partially_detected" for r in results):
technique.status_global = TechniqueStatus.partial
else:
technique.status_global = TechniqueStatus.not_covered
db.commit()

57
backend/app/storage.py Normal file
View File

@@ -0,0 +1,57 @@
"""MinIO / S3-compatible object-storage helpers.
Provides thin wrappers around boto3 for bucket management, file upload
and presigned-URL generation.
"""
import boto3
from botocore.exceptions import ClientError
from app.config import settings
# ---------------------------------------------------------------------------
# Shared client (module-level singleton)
# ---------------------------------------------------------------------------
_client = boto3.client(
"s3",
endpoint_url=f"http://{settings.MINIO_ENDPOINT}",
aws_access_key_id=settings.MINIO_ACCESS_KEY,
aws_secret_access_key=settings.MINIO_SECRET_KEY,
region_name="us-east-1", # MinIO ignores this but boto3 requires it
)
# ---------------------------------------------------------------------------
# Public helpers
# ---------------------------------------------------------------------------
def ensure_bucket_exists() -> None:
"""Create the evidence bucket if it does not already exist."""
try:
_client.head_bucket(Bucket=settings.MINIO_BUCKET)
except ClientError:
_client.create_bucket(Bucket=settings.MINIO_BUCKET)
def upload_file(content: bytes, key: str) -> str:
"""Upload *content* to the evidence bucket under *key*.
Returns the key that was written (same as the input).
"""
_client.put_object(
Bucket=settings.MINIO_BUCKET,
Key=key,
Body=content,
)
return key
def get_presigned_url(key: str, expiration: int = 3600) -> str:
"""Return a presigned GET URL for *key* valid for *expiration* seconds."""
return _client.generate_presigned_url(
"get_object",
Params={"Bucket": settings.MINIO_BUCKET, "Key": key},
ExpiresIn=expiration,
)