feat(phase-39): role-based access control overhaul + forced password change
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

- Add must_change_password field to User model with migration b023

- Add POST /auth/change-password endpoint with password policy validation

- Add require_password_changed dependency to block requests until password is changed

- Add ChangePasswordModal with live password policy checklist (forced on first login)

- Show password policy in CreateUserModal and EditUserModal

- Fix backend permissions: tests, campaigns, templates, reports, evidence, worklogs

- red_tech/blue_tech: execute only, cannot create tests/campaigns/templates

- red_lead/blue_lead: create/edit tests/campaigns/templates, generate reports, no system access

- viewer: read-only everywhere, can generate reports

- Fix frontend role checks across TestDetailPage, TestDetailHeader, TeamTabs, TestsPage, CampaignsPage, CampaignDetailPage, Sidebar
This commit is contained in:
2026-02-18 10:37:02 +01:00
parent 8f764d8e39
commit a4a2adccee
24 changed files with 338 additions and 72 deletions

View File

@@ -91,6 +91,22 @@ async def get_current_user(
# ---------------------------------------------------------------------------
async def require_password_changed(
current_user: User = Depends(get_current_user),
) -> User:
"""Block all requests when the user still needs to change their password.
Only ``/auth/change-password`` and ``/auth/me`` are exempt — those
endpoints do **not** depend on this function.
"""
if getattr(current_user, "must_change_password", False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="PASSWORD_CHANGE_REQUIRED",
)
return current_user
def require_role(required_role: str):
"""Return a FastAPI dependency that enforces *required_role*.

View File

@@ -27,5 +27,6 @@ class User(Base):
hashed_password = Column(String, nullable=False)
role = Column(String, nullable=False, default="viewer")
is_active = Column(Boolean, default=True)
must_change_password = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
last_login = Column(DateTime, nullable=True)

View File

@@ -17,12 +17,13 @@ from sqlalchemy.orm import Session
from jose import jwt, JWTError
from app.auth import verify_password, create_access_token, blacklist_token
from app.auth import verify_password, hash_password, create_access_token, blacklist_token
from app.config import settings
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.models.user import User
from app.schemas.auth import TokenResponse, UserOut
from app.schemas.user import PasswordChange
# Rate limiter instance (shares backend state via app.state.limiter)
limiter = Limiter(key_func=get_remote_address)
@@ -137,3 +138,33 @@ def logout(
def read_current_user(current_user: User = Depends(get_current_user)):
"""Return the profile of the currently authenticated user."""
return current_user
# ---------------------------------------------------------------------------
# POST /auth/change-password
# ---------------------------------------------------------------------------
@router.post("/change-password")
def change_password(
body: PasswordChange,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Change the current user's password.
Requires the current password for verification. On success the
``must_change_password`` flag is cleared so the user can proceed
normally.
"""
if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect",
)
current_user.hashed_password = hash_password(body.new_password)
current_user.must_change_password = False
db.commit()
return {"detail": "Password changed successfully"}

View File

@@ -198,7 +198,7 @@ def list_campaigns(
def create_campaign(
payload: CampaignCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Create a new campaign."""
campaign = Campaign(
@@ -254,7 +254,7 @@ def update_campaign(
campaign_id: str,
payload: CampaignUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Update a campaign. Only allowed in draft or active state."""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
@@ -299,7 +299,7 @@ def add_test_to_campaign(
campaign_id: str,
payload: AddTestPayload,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Add a test to a campaign with optional ordering and dependency."""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
@@ -370,7 +370,7 @@ def remove_test_from_campaign(
campaign_id: str,
campaign_test_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Remove a test from a campaign."""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
@@ -414,7 +414,7 @@ def remove_test_from_campaign(
def activate_campaign(
campaign_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Activate a campaign, moving it from draft to active."""
campaign = db.query(Campaign).filter(Campaign.id == campaign_id).first()
@@ -524,7 +524,7 @@ def get_campaign_progress_endpoint(
def generate_campaign_from_actor(
actor_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech", "admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Auto-generate a campaign from a threat actor's uncovered techniques.
@@ -558,7 +558,7 @@ def schedule_campaign(
campaign_id: str,
payload: SchedulePayload,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Configure or update the recurrence schedule for a campaign.

View File

@@ -98,11 +98,10 @@ def _validate_upload_permission(
return
if team == TeamSide.red:
# Only red_tech can upload red evidence
if user.role != "red_tech":
if user.role not in ("red_tech", "red_lead"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only red_tech or admin can upload red evidence",
detail="Only red_tech, red_lead or admin can upload red evidence",
)
if test.state not in _RED_EDITABLE_STATES:
raise HTTPException(
@@ -111,11 +110,10 @@ def _validate_upload_permission(
f"(allowed in: draft, red_executing)",
)
elif team == TeamSide.blue:
# Only blue_tech can upload blue evidence
if user.role != "blue_tech":
if user.role not in ("blue_tech", "blue_lead"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only blue_tech or admin can upload blue evidence",
detail="Only blue_tech, blue_lead or admin can upload blue evidence",
)
if test.state not in _BLUE_EDITABLE_STATES:
raise HTTPException(
@@ -150,7 +148,7 @@ def _validate_delete_permission(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete red evidence outside draft/red_executing",
)
if user.role != "red_tech" and evidence.uploaded_by != user.id:
if user.role not in ("red_tech", "red_lead") and evidence.uploaded_by != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this evidence",
@@ -161,7 +159,7 @@ def _validate_delete_permission(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete blue evidence outside blue_evaluating",
)
if user.role != "blue_tech" and evidence.uploaded_by != user.id:
if user.role not in ("blue_tech", "blue_lead") and evidence.uploaded_by != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this evidence",

View File

@@ -25,7 +25,7 @@ def generate_purple_report(
campaign_id: UUID,
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead")),
user: User = Depends(require_any_role("red_lead", "blue_lead", "viewer")),
):
"""Generate a Purple Team campaign assessment report."""
filepath = report_generation_service.generate_purple_campaign_report(
@@ -42,7 +42,7 @@ def generate_purple_report(
def generate_coverage_report(
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
user: User = Depends(require_any_role("red_lead", "blue_lead", "viewer")),
):
"""Generate an organization-wide MITRE ATT&CK coverage report."""
filepath = report_generation_service.generate_coverage_report(
@@ -59,7 +59,7 @@ def generate_coverage_report(
def generate_executive_report(
format: str = Query("pdf", pattern="^(pdf|docx|html)$"),
db: Session = Depends(get_db),
user: User = Depends(require_any_role("red_lead", "blue_lead")),
user: User = Depends(require_any_role("red_lead", "blue_lead", "viewer")),
):
"""Generate an executive security summary report."""
filepath = report_generation_service.generate_executive_summary(

View File

@@ -30,7 +30,7 @@ from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user, require_role
from app.dependencies.auth import get_current_user, require_role, require_any_role
from app.models.test_template import TestTemplate
from app.models.user import User
from app.schemas.test_template import (
@@ -103,7 +103,7 @@ def list_templates(
@router.get("/stats")
def template_stats(
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Return catalog statistics: totals by source, platform, active/inactive."""
@@ -151,9 +151,9 @@ def template_stats(
def bulk_activate_templates(
activate: bool = Query(True, description="True to activate all, False to deactivate all"),
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Set all templates to active or inactive. Admin only."""
"""Set all templates to active or inactive."""
count = (
db.query(TestTemplate)
.filter(TestTemplate.is_active != activate)
@@ -235,9 +235,9 @@ def get_template(
def create_template(
payload: TestTemplateCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Create a custom test template. Admin only."""
"""Create a custom test template."""
template = TestTemplate(**payload.model_dump())
db.add(template)
db.commit()
@@ -269,9 +269,9 @@ def update_template(
template_id: uuid.UUID,
payload: TestTemplateCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Update fields of an existing test template. Admin only."""
"""Update fields of an existing test template."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise HTTPException(
@@ -307,9 +307,9 @@ def update_template(
def toggle_template_active(
template_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Toggle a template between active and inactive. Admin only."""
"""Toggle a template between active and inactive."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise HTTPException(
@@ -342,9 +342,9 @@ def toggle_template_active(
def delete_template(
template_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Soft-delete a test template by setting ``is_active=False``. Admin only."""
"""Soft-delete a test template by setting ``is_active=False``."""
template = db.query(TestTemplate).filter(TestTemplate.id == template_id).first()
if template is None:
raise HTTPException(

View File

@@ -143,7 +143,7 @@ def list_tests(
def create_test(
payload: TestCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Create a new test linked to an existing technique.
@@ -190,7 +190,7 @@ def create_test(
def create_test_from_template(
payload: TestTemplateInstantiate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech")),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Instantiate a real Test from an existing TestTemplate.
@@ -289,11 +289,11 @@ def update_test(
test_id: uuid.UUID,
payload: TestUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Update one or more fields of an existing test.
Only the original creator or an admin can update.
Only leads or admins can update general test fields.
The test must be in ``draft`` or ``rejected`` state.
"""
test = _get_test_or_404(db, test_id)
@@ -343,7 +343,7 @@ def update_test_red(
test_id: uuid.UUID,
payload: TestRedUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech")),
current_user: User = Depends(require_any_role("red_tech", "red_lead")),
):
"""Red Team updates their fields (allowed in ``draft`` and ``red_executing``)."""
test = _get_test_or_404(db, test_id)
@@ -387,7 +387,7 @@ def update_test_blue(
test_id: uuid.UUID,
payload: TestBlueUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("blue_tech")),
current_user: User = Depends(require_any_role("blue_tech", "blue_lead")),
):
"""Blue Team updates their fields (allowed only in ``blue_evaluating``)."""
test = _get_test_or_404(db, test_id)
@@ -430,7 +430,7 @@ def update_test_blue(
def start_execution(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech")),
current_user: User = Depends(require_any_role("red_tech", "red_lead")),
):
"""Move a test from ``draft`` to ``red_executing``."""
test = _get_test_or_404(db, test_id)
@@ -448,7 +448,7 @@ def start_execution(
def submit_red(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("red_tech")),
current_user: User = Depends(require_any_role("red_tech", "red_lead")),
):
"""Red Team finalises — move from ``red_executing`` to ``blue_evaluating``."""
test = _get_test_or_404(db, test_id)
@@ -466,7 +466,7 @@ def submit_red(
def submit_blue(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_any_role("blue_tech")),
current_user: User = Depends(require_any_role("blue_tech", "blue_lead")),
):
"""Blue Team finalises — move from ``blue_evaluating`` to ``in_review``."""
test = _get_test_or_404(db, test_id)
@@ -484,7 +484,7 @@ def submit_blue(
def pause_timer(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_any_role("red_tech", "blue_tech", "red_lead", "blue_lead")),
):
"""Pause the running timer for the current phase (red_executing or blue_evaluating)."""
test = _get_test_or_404(db, test_id)
@@ -502,7 +502,7 @@ def pause_timer(
def resume_timer(
test_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_any_role("red_tech", "blue_tech", "red_lead", "blue_lead")),
):
"""Resume the paused timer for the current phase."""
test = _get_test_or_404(db, test_id)
@@ -595,9 +595,9 @@ def update_remediation(
test_id: uuid.UUID,
payload: TestRemediationUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_any_role("red_lead", "blue_lead")),
):
"""Update remediation fields on a test (any authenticated user).
"""Update remediation fields on a test.
When ``remediation_status`` transitions to ``'completed'``, an automatic
re-test is created (subject to ``MAX_RETEST_COUNT``).

View File

@@ -9,7 +9,7 @@ from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.dependencies.auth import get_current_user, require_any_role
from app.domain.exceptions import EntityNotFoundError
from app.models.user import User
from app.models.worklog import Worklog
@@ -56,7 +56,7 @@ class WorklogOut(BaseModel):
def create(
body: WorklogCreate,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
user: User = Depends(require_any_role("red_tech", "blue_tech", "red_lead", "blue_lead")),
):
"""Create a manually-logged worklog entry."""
wl = worklog_service.create_worklog(

View File

@@ -28,6 +28,7 @@ class UserOut(BaseModel):
email: str | None = None
role: str
is_active: bool
must_change_password: bool = True
class Config:
from_attributes = True

View File

@@ -83,6 +83,18 @@ class UserUpdate(BaseModel):
# ── Read (full) ─────────────────────────────────────────────────────
class PasswordChange(BaseModel):
"""Payload for changing the current user's password."""
current_password: str
new_password: str
@field_validator("new_password")
@classmethod
def new_password_strength(cls, v: str) -> str:
return _validate_password_strength(v)
class UserOut(BaseModel):
"""Complete representation returned by the API."""
@@ -91,6 +103,7 @@ class UserOut(BaseModel):
email: str | None = None
role: str
is_active: bool
must_change_password: bool = True
created_at: datetime | None = None
last_login: datetime | None = None