Compare commits

...

6 Commits

Author SHA1 Message Date
kitos f45b7ea926 ci: add GitHub Actions lint and test pipeline [FASE-0.6]
Aegis CI / lint-and-test (push) Has been cancelled
Run ruff and pytest against Postgres and Redis service containers; document CI in README.
2026-05-18 13:19:29 +02:00
kitos 6b28934f05 test: stabilize Phase 0 API and workflow tests [FASE-0.4]
Assert INVALID_TRANSITION JSON code on duplicate start, remove sys.modules stubs from T-106 tests, and complete boto3 stubs in integration tests.
2026-05-18 13:19:27 +02:00
kitos 6f35d85a97 feat(db): add Phase 0 composite indexes migration [FASE-0.3]
Add idempotent Alembic revision b028 for campaign_tests (campaign_id, test_id) to support campaign-scoped queries.
2026-05-18 13:19:20 +02:00
kitos c5eb6f6dc1 feat(auth): move JWT blacklist to Redis with TTL [FASE-0.2]
Revoke tokens by jti in a dedicated Redis DB, honor TTL from JWT exp on logout, reject revoked tokens in get_current_user, and add FakeRedis-backed API tests.
2026-05-18 13:19:15 +02:00
kitos 9b70655b7e feat(infra): add Redis service and client for Phase 0 [FASE-0.1]
Add Redis 7 to Docker Compose with healthcheck and persistence, separate logical DBs for blacklist and cache, singleton redis client helpers, and unit tests with fakeredis.
2026-05-18 13:18:45 +02:00
kitos 821c4ac5ec test(jira): add JiraLink model and jira_service tests [FASE-1.1]
Model and migration b020 were already present; adds regression coverage for persistence, schema validation, and link CRUD with Jira disabled.
2026-05-18 12:02:21 +02:00
18 changed files with 413 additions and 155 deletions
+1 -1
View File
@@ -54,7 +54,7 @@ jobs:
pip install ruff
- name: Lint
run: ruff check app/
run: ruff check app/ tests/
- name: Test
env:
+2
View File
@@ -1,5 +1,7 @@
# Aegis — MITRE ATT&CK Coverage Platform
Continuous integration (lint + tests against PostgreSQL and Redis) is defined in [`.github/workflows/ci.yml`](.github/workflows/ci.yml).
Aegis is a comprehensive platform for tracking and managing security coverage against the MITRE ATT&CK framework. It enables security teams to document, validate, and visualize their defensive capabilities against known adversary techniques through a structured Red Team / Blue Team validation workflow.
## Features
@@ -0,0 +1,35 @@
"""phase0 SR-006 — campaign_tests composite index
Most SR-006 indexes already ship in b005, b009, b018, b019, and b026.
``tests`` has no ``campaign_id`` column (membership is ``campaign_tests``),
so this revision adds a composite index to speed “tests in campaign” joins.
Revision ID: b028phase0
Revises: b027scorecfg
Create Date: 2026-05-18 12:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
revision: str = "b028phase0"
down_revision: Union[str, None] = "b027scorecfg"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_index(
"ix_campaign_tests_campaign_id_test_id",
"campaign_tests",
["campaign_id", "test_id"],
unique=False,
)
def downgrade() -> None:
op.drop_index(
"ix_campaign_tests_campaign_id_test_id",
table_name="campaign_tests",
)
+4 -4
View File
@@ -80,11 +80,11 @@ def blacklist_token(jti: str, exp: float) -> None:
to ``exp - now`` so the key vanishes when the token would have expired
naturally.
"""
from app.infrastructure.redis_client import get_redis
from app.infrastructure.redis_client import get_redis_blacklist
ttl = max(int(exp - datetime.now(timezone.utc).timestamp()), 1)
try:
r = get_redis()
r = get_redis_blacklist()
r.setex(f"{_BLACKLIST_PREFIX}{jti}", ttl, "1")
except Exception:
logger.warning("Failed to blacklist token %s in Redis", jti, exc_info=True)
@@ -92,10 +92,10 @@ def blacklist_token(jti: str, exp: float) -> None:
def is_token_blacklisted(jti: str) -> bool:
"""Return ``True`` if *jti* has been revoked (exists in Redis)."""
from app.infrastructure.redis_client import get_redis
from app.infrastructure.redis_client import get_redis_blacklist
try:
r = get_redis()
r = get_redis_blacklist()
return r.exists(f"{_BLACKLIST_PREFIX}{jti}") > 0
except Exception:
logger.warning("Failed to check blacklist for %s in Redis", jti, exc_info=True)
+3
View File
@@ -24,6 +24,9 @@ class Settings(BaseSettings):
# ── Redis ─────────────────────────────────────────────────────────
REDIS_URL: str = "redis://redis:6379/0"
# Logical DB indices on the same Redis instance (PATH in URL is overridden).
REDIS_TOKEN_BLACKLIST_DB: int = 1
REDIS_CACHE_DB: int = 2
# ── CORS ─────────────────────────────────────────────────────────
# Comma-separated list of allowed origins, or a JSON array.
+8 -3
View File
@@ -15,7 +15,7 @@ from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.auth import is_token_blacklisted
from app import auth as auth_lib
from app.config import settings
from app.database import get_db
from app.models.user import User
@@ -57,6 +57,11 @@ async def get_current_user(
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
revoked_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
# Prefer cookie, fall back to header
token = aegis_token or bearer_token
@@ -74,8 +79,8 @@ async def get_current_user(
raise credentials_exception
# Check token blacklist (revoked tokens)
jti: str | None = payload.get("jti")
if jti and is_token_blacklisted(jti):
raise credentials_exception
if jti and auth_lib.is_token_blacklisted(jti):
raise revoked_exception
except JWTError:
raise credentials_exception
+48 -16
View File
@@ -1,17 +1,22 @@
"""Redis client singleton.
"""Redis client factories.
Provides a lazily-initialised Redis connection that is reused across
the application. The connection URL is read from ``settings.REDIS_URL``.
``settings.REDIS_URL`` selects the default logical database (usually ``0``).
Token blacklist and application cache use separate logical DBs on the same
Redis instance (``REDIS_TOKEN_BLACKLIST_DB``, ``REDIS_CACHE_DB``) so keys never
collide and TTL policies can differ per workload.
Usage::
from app.infrastructure.redis_client import get_redis
from app.infrastructure.redis_client import get_redis, get_redis_blacklist
r = get_redis()
r.set("key", "value", ex=300)
get_redis().set("key", "value", ex=300)
get_redis_blacklist().setex("blacklist:…", ttl, "1")
"""
from __future__ import annotations
import logging
from urllib.parse import urlparse, urlunparse
import redis
@@ -19,16 +24,43 @@ from app.config import settings
logger = logging.getLogger(__name__)
_redis_client: redis.Redis | None = None
_clients: dict[str, redis.Redis] = {}
def _redis_url_with_db(base_url: str, db_index: int) -> str:
"""Return *base_url* with its path replaced by ``/{db_index}``."""
parsed = urlparse(base_url)
path = f"/{db_index}"
return urlunparse(
(parsed.scheme, parsed.netloc, path, "", "", ""),
)
def _get_client(url: str) -> redis.Redis:
if url not in _clients:
_clients[url] = redis.from_url(url, decode_responses=True)
logger.info("Redis client connected to %s", url)
return _clients[url]
def get_redis() -> redis.Redis:
"""Return a shared Redis client, creating it on first call."""
global _redis_client
if _redis_client is None:
_redis_client = redis.from_url(
settings.REDIS_URL,
decode_responses=True,
)
logger.info("Redis client connected to %s", settings.REDIS_URL)
return _redis_client
"""Default Redis connection (URL from ``settings.REDIS_URL``)."""
return _get_client(settings.REDIS_URL)
def get_redis_blacklist() -> redis.Redis:
"""Redis DB used for JWT revocation (``jti`` keys with TTL)."""
url = _redis_url_with_db(
settings.REDIS_URL,
settings.REDIS_TOKEN_BLACKLIST_DB,
)
return _get_client(url)
def get_redis_cache() -> redis.Redis:
"""Redis DB reserved for shared cache (scores, queues, etc.)."""
url = _redis_url_with_db(
settings.REDIS_URL,
settings.REDIS_CACHE_DB,
)
return _get_client(url)
+18 -5
View File
@@ -96,13 +96,26 @@ def logout(
The token's ``jti`` is added to the Redis blacklist so it cannot
be reused even if the cookie has already been copied elsewhere.
The blacklist entry auto-expires when the token's ``exp`` is reached.
When both HttpOnly cookie and ``Authorization: Bearer`` are present
(typical for API clients), **both** are revoked so the session cannot
survive on whichever credential the next request prefers.
"""
# Attempt to blacklist the token's jti
token = aegis_token or request.headers.get("Authorization", "").removeprefix("Bearer ").strip()
if token:
bearer = (
request.headers.get("Authorization")
or request.headers.get("authorization")
or ""
)
bearer = bearer.removeprefix("Bearer ").removeprefix("bearer ").strip()
seen: set[str] = set()
for raw in (aegis_token, bearer):
if not raw or raw in seen:
continue
seen.add(raw)
try:
payload = jwt.decode(
token,
raw,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
@@ -111,7 +124,7 @@ def logout(
if jti:
blacklist_token(jti, float(exp))
except JWTError:
pass # token already invalid — nothing to revoke
pass # token already invalid — nothing to revoke for this raw value
response.delete_cookie(
key=_COOKIE_NAME,
+1
View File
@@ -26,3 +26,4 @@ docxtpl>=0.18.0
pytest
pytest-asyncio
httpx
fakeredis>=2.23.0
+31 -1
View File
@@ -80,12 +80,37 @@ def db():
@pytest.fixture(scope="function")
def client(db):
def client(db, monkeypatch):
"""Create a test client with database override.
Imports ``app.main`` lazily to avoid pulling in boto3 / APScheduler
when only the ``db`` fixture is needed.
MinIO and the background scheduler are no-ops here so tests do not
require Docker services for application startup.
JWT blacklist uses an in-memory FakeRedis so tests do not require a
real Redis instance.
"""
import fakeredis
_fake_redis = fakeredis.FakeRedis(decode_responses=True)
def _blacklist_conn():
return _fake_redis
monkeypatch.setattr(
"app.infrastructure.redis_client.get_redis_blacklist",
_blacklist_conn,
)
monkeypatch.setattr("app.main.ensure_bucket_exists", lambda: None)
monkeypatch.setattr("app.main.start_scheduler", lambda: None)
monkeypatch.setattr(
"app.main.scheduler.shutdown",
lambda wait=False: None,
)
from app.main import app
from app.database import get_db
import app.database as _db_mod
@@ -118,6 +143,7 @@ def admin_user(db):
hashed_password=hash_password("admin123"),
role="admin",
is_active=True,
must_change_password=False,
)
db.add(user)
db.commit()
@@ -134,6 +160,7 @@ def red_tech_user(db):
hashed_password=hash_password("redtech123"),
role="red_tech",
is_active=True,
must_change_password=False,
)
db.add(user)
db.commit()
@@ -150,6 +177,7 @@ def blue_tech_user(db):
hashed_password=hash_password("bluetech123"),
role="blue_tech",
is_active=True,
must_change_password=False,
)
db.add(user)
db.commit()
@@ -166,6 +194,7 @@ def red_lead_user(db):
hashed_password=hash_password("redlead123"),
role="red_lead",
is_active=True,
must_change_password=False,
)
db.add(user)
db.commit()
@@ -182,6 +211,7 @@ def blue_lead_user(db):
hashed_password=hash_password("bluelead123"),
role="blue_lead",
is_active=True,
must_change_password=False,
)
db.add(user)
db.commit()
+43
View File
@@ -79,3 +79,46 @@ def test_get_me_invalid_token(client):
headers={"Authorization": "Bearer invalidtoken"},
)
assert response.status_code == 401
def test_logout_revokes_token(client, admin_user):
"""After logout, the same JWT must be rejected (Redis blacklist).
Prefer Authorization over the HttpOnly cookie so logout blacklists the
same token the client sends on the next request; clear cookies to avoid
stale jar state across calls.
"""
login = client.post(
"/api/v1/auth/login",
data={"username": "admin", "password": "admin123"},
)
assert login.status_code == 200
token = login.json()["access_token"]
client.cookies.clear()
out = client.post(
"/api/v1/auth/logout",
headers={"Authorization": f"Bearer {token}"},
)
assert out.status_code == 200
from jose import jwt
from app.config import settings
from app.infrastructure.redis_client import get_redis_blacklist
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM],
)
jti = payload.get("jti")
assert jti
assert get_redis_blacklist().exists(f"blacklist:{jti}")
client.cookies.clear()
assert not len(client.cookies), "cookie jar must be empty to force Authorization Bearer"
me = client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {token}"},
)
assert me.status_code == 401
assert me.json()["detail"] == "Token has been revoked"
+6 -1
View File
@@ -115,7 +115,12 @@ for _mod in [
"passlib", "passlib.context",
]:
if _mod not in sys.modules:
sys.modules[_mod] = ModuleType(_mod)
m = ModuleType(_mod)
if _mod == "boto3":
m.client = MagicMock()
elif _mod == "botocore.exceptions":
m.ClientError = Exception
sys.modules[_mod] = m
# Now safe to import
from app.models.enums import TestState, TestResult, TechniqueStatus
+93
View File
@@ -0,0 +1,93 @@
"""Tests for Jira link model and jira_service link helpers (FASE-1.1).
Verifies persistence with SQLite test DB and link creation when Jira is disabled.
API routes are covered indirectly via jira_service (same code path as the router).
"""
import uuid
import pytest
from pydantic import ValidationError
from app.models.jira_link import JiraLink, JiraLinkEntityType, JiraSyncDirection
from app.schemas.jira_schema import JiraLinkCreate
from app.services import jira_service
def test_jira_link_persist_and_query(db, admin_user):
entity_id = uuid.uuid4()
link = JiraLink(
entity_type=JiraLinkEntityType.test,
entity_id=entity_id,
jira_issue_key="PROJ-42",
jira_issue_id="10001",
sync_direction=JiraSyncDirection.bidirectional,
created_by=admin_user.id,
sync_metadata={"foo": "bar"},
)
db.add(link)
db.commit()
db.refresh(link)
loaded = db.query(JiraLink).filter(JiraLink.id == link.id).one()
assert loaded.entity_type == JiraLinkEntityType.test
assert loaded.entity_id == entity_id
assert loaded.jira_issue_key == "PROJ-42"
assert loaded.sync_metadata == {"foo": "bar"}
assert loaded.created_by == admin_user.id
def test_jira_link_create_schema_accepts_valid_issue_key():
body = JiraLinkCreate(
entity_type=JiraLinkEntityType.campaign,
entity_id=uuid.uuid4(),
jira_issue_key="ABC-1",
)
assert body.jira_issue_key == "ABC-1"
assert body.sync_direction == JiraSyncDirection.bidirectional
def test_jira_link_create_schema_rejects_invalid_issue_key():
with pytest.raises(ValidationError):
JiraLinkCreate(
entity_type=JiraLinkEntityType.test,
entity_id=uuid.uuid4(),
jira_issue_key="proj-123",
)
def test_create_link_via_service_persists(db, admin_user):
eid = uuid.uuid4()
link = jira_service.create_link(
db,
entity_type=JiraLinkEntityType.test,
entity_id=eid,
jira_issue_key="TST-99",
sync_direction=JiraSyncDirection.bidirectional,
created_by=admin_user.id,
)
db.commit()
db.refresh(link)
assert link.jira_issue_key == "TST-99"
assert link.entity_id == eid
def test_list_links_filters_by_entity(db, admin_user):
eid = uuid.uuid4()
jira_service.create_link(
db,
entity_type=JiraLinkEntityType.technique,
entity_id=eid,
jira_issue_key="TECH-7",
sync_direction=JiraSyncDirection.jira_to_aegis,
created_by=admin_user.id,
)
db.commit()
rows = jira_service.list_links(
db,
entity_type=JiraLinkEntityType.technique,
entity_id=eid,
)
assert len(rows) == 1
assert rows[0].jira_issue_key == "TECH-7"
assert rows[0].sync_direction == JiraSyncDirection.jira_to_aegis
+61
View File
@@ -0,0 +1,61 @@
"""Redis URL helpers and blacklist DB wiring (FASE 0.1)."""
import time
import uuid
from unittest.mock import MagicMock
import pytest
from app.config import settings
from app.infrastructure import redis_client as rc
@pytest.fixture
def fakeredis():
import fakeredis as fr
return fr.FakeRedis(decode_responses=True)
def test_redis_url_with_db_replaces_index():
assert (
rc._redis_url_with_db("redis://localhost:6379/0", 2)
== "redis://localhost:6379/2"
)
def test_get_redis_blacklist_uses_configured_db(monkeypatch):
monkeypatch.setattr(rc, "_clients", {})
captured: list[str] = []
def fake_from_url(url, **kwargs):
captured.append(url)
m = MagicMock()
m.setex = MagicMock()
return m
monkeypatch.setattr("redis.from_url", fake_from_url)
monkeypatch.setattr(settings, "REDIS_URL", "redis://redis:6379/0")
monkeypatch.setattr(settings, "REDIS_TOKEN_BLACKLIST_DB", 9)
c = rc.get_redis_blacklist()
c.setex("k", 10, "v")
assert captured == ["redis://redis:6379/9"]
c.setex.assert_called_once_with("k", 10, "v")
def test_redis_set_get_ttl(fakeredis, monkeypatch):
"""Integration-style check against FakeRedis (no real server)."""
monkeypatch.setattr(rc, "_clients", {})
monkeypatch.setattr(
"app.infrastructure.redis_client.get_redis_blacklist",
lambda: fakeredis,
)
from app.auth import blacklist_token, is_token_blacklisted
jti = str(uuid.uuid4())
exp = time.time() + 3600
blacklist_token(jti, exp)
assert is_token_blacklisted(jti) is True
ttl = fakeredis.ttl(f"blacklist:{jti}")
assert 0 < ttl <= 3600
+12 -123
View File
@@ -1,139 +1,28 @@
"""Validation tests for T-106: Test Workflow Service.
Uses mock objects to avoid needing a running database.
The database module is stubbed before any app imports.
Mocked DB sessions exercise the workflow service without PostgreSQL.
Uses the same import stack as the rest of the suite (``tests.conftest``) so
this module must **not** replace entries in ``sys.modules`` — doing so broke
JWT verification in API tests by splitting :mod:`app.auth` (loaded early via
conftest) from :mod:`app.routers.auth` / :mod:`app.dependencies.auth`.
"""
import sys
import os
import uuid
from unittest.mock import MagicMock, patch
from types import ModuleType
from datetime import datetime
# ---------------------------------------------------------------------------
# 0. Stub heavy dependencies BEFORE importing any app modules
# ---------------------------------------------------------------------------
# Ensure backend/ is on sys.path
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
# Stub pydantic_settings so config doesn't fail
if "pydantic_settings" not in sys.modules:
pydantic_settings_mock = ModuleType("pydantic_settings")
class _BaseSettings:
def __init__(self, **kwargs):
pass
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
pydantic_settings_mock.BaseSettings = _BaseSettings
sys.modules["pydantic_settings"] = pydantic_settings_mock
# Stub app.config
config_mod = ModuleType("app.config")
class _FakeSettings:
DATABASE_URL = "sqlite:///:memory:"
SECRET_KEY = "test"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
REDIS_URL = "redis://localhost:6379/0"
MINIO_ENDPOINT = "localhost:9000"
MINIO_ACCESS_KEY = "test"
MINIO_SECRET_KEY = "test"
MINIO_BUCKET = "test"
MINIO_SECURE = False
MAX_RETEST_COUNT = 3
CORS_ORIGINS = "http://localhost:3000"
SCORING_WEIGHT_TESTS = 40
SCORING_WEIGHT_DETECTION_RULES = 20
SCORING_WEIGHT_D3FEND = 15
SCORING_WEIGHT_FRESHNESS = 15
SCORING_WEIGHT_PLATFORM_DIVERSITY = 10
REPORT_TEMPLATES_DIR = "app/templates/reports"
REPORT_OUTPUT_DIR = "/tmp/aegis_reports"
COMPANY_NAME = "Test Org"
COMPANY_LOGO_PATH = "app/templates/reports/assets/logo.png"
JIRA_ENABLED = False
JIRA_URL = ""
JIRA_USERNAME = ""
JIRA_API_TOKEN = ""
JIRA_IS_CLOUD = True
JIRA_DEFAULT_PROJECT = ""
JIRA_ISSUE_TYPE_TEST = "Task"
JIRA_ISSUE_TYPE_CAMPAIGN = "Epic"
TEMPO_ENABLED = False
TEMPO_API_TOKEN = ""
TEMPO_DEFAULT_WORK_TYPE = "Red Team"
NVD_API_KEY = ""
STALE_THRESHOLD_DAYS = 365
config_mod.settings = _FakeSettings()
sys.modules["app.config"] = config_mod
# Stub app.database so no real engine is created
db_mod = ModuleType("app.database")
db_mod.Base = type("Base", (), {"metadata": MagicMock()})
db_mod.get_db = MagicMock()
sys.modules["app.database"] = db_mod
# Stub taxii2client
taxii_v20 = ModuleType("taxii2client.v20")
taxii_v20.Server = MagicMock
sys.modules["taxii2client"] = ModuleType("taxii2client")
sys.modules["taxii2client.v20"] = taxii_v20
# Stub jose
jose_mod = ModuleType("jose")
jose_mod.JWTError = Exception
jose_mod.jwt = MagicMock()
sys.modules["jose"] = jose_mod
# Stub boto3
boto3_mod = ModuleType("boto3")
boto3_mod.client = MagicMock()
sys.modules["boto3"] = boto3_mod
sys.modules["botocore"] = ModuleType("botocore")
sys.modules["botocore.exceptions"] = ModuleType("botocore.exceptions")
sys.modules["botocore.exceptions"].ClientError = Exception
# Stub apscheduler
sys.modules["apscheduler"] = ModuleType("apscheduler")
sys.modules["apscheduler.schedulers"] = ModuleType("apscheduler.schedulers")
sys.modules["apscheduler.schedulers.background"] = ModuleType("apscheduler.schedulers.background")
sys.modules["apscheduler.schedulers.background"].BackgroundScheduler = MagicMock
sys.modules["apscheduler.triggers"] = ModuleType("apscheduler.triggers")
sys.modules["apscheduler.triggers.cron"] = ModuleType("apscheduler.triggers.cron")
sys.modules["apscheduler.triggers.cron"].CronTrigger = MagicMock
# ---------------------------------------------------------------------------
# Now we can safely import
# ---------------------------------------------------------------------------
from app.domain.exceptions import InvalidTransitionError
from app.models.enums import TestState
from app.services.test_workflow_service import (
VALID_TRANSITIONS,
can_transition,
transition_state,
start_execution,
submit_red_evidence,
submit_blue_evidence,
validate_as_red_lead,
validate_as_blue_lead,
check_dual_validation,
reopen_test,
start_execution,
submit_blue_evidence,
submit_red_evidence,
transition_state,
validate_as_blue_lead,
validate_as_red_lead,
)
# We need the domain exceptions for assertions
from app.domain.exceptions import InvalidTransitionError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
+42
View File
@@ -75,3 +75,45 @@ def test_get_test_by_id(client, auth_headers, technique):
response = client.get(f"/api/v1/tests/{test_id}", headers=auth_headers)
assert response.status_code == 200
assert response.json()["id"] == test_id
def test_start_execution_twice_returns_invalid_transition(
client, auth_headers, technique, red_tech_user
):
"""Invalid workflow transition surfaces domain error JSON (FASE 0.4).
HttpOnly login cookies take precedence over the Authorization header.
Clear cookies before each phase so Bearer tokens match the intended user.
"""
client.cookies.clear()
create_response = client.post(
"/api/v1/tests",
json={"technique_id": technique["id"], "name": "Workflow dup start"},
headers=auth_headers,
)
assert create_response.status_code == 201
test_id = create_response.json()["id"]
rl = client.post(
"/api/v1/auth/login",
data={"username": "redtech", "password": "redtech123"},
)
assert rl.status_code == 200
red_headers = {"Authorization": f"Bearer {rl.json()['access_token']}"}
client.cookies.clear()
first = client.post(
f"/api/v1/tests/{test_id}/start-execution",
headers=red_headers,
)
assert first.status_code == 200
client.cookies.clear()
second = client.post(
f"/api/v1/tests/{test_id}/start-execution",
headers=red_headers,
)
assert second.status_code == 400
body = second.json()
assert body.get("code") == "INVALID_TRANSITION"
assert "detail" in body
+2
View File
@@ -80,6 +80,8 @@ services:
MINIO_BUCKET: ${MINIO_BUCKET:-evidence}
MINIO_SECURE: ${MINIO_SECURE:-false}
REDIS_URL: redis://redis:6379/0
REDIS_TOKEN_BLACKLIST_DB: ${REDIS_TOKEN_BLACKLIST_DB:-1}
REDIS_CACHE_DB: ${REDIS_CACHE_DB:-2}
CORS_ORIGINS: ${CORS_ORIGINS:-}
AEGIS_ENV: ${AEGIS_ENV:-production}
ADMIN_USERNAME: ${ADMIN_USERNAME:-admin}
+3 -1
View File
@@ -69,7 +69,7 @@ services:
interval: 5s
timeout: 3s
retries: 5
restart: unless-stopped
restart: always
# ── FastAPI Backend ────────────────────────────────────────────────────────
backend:
@@ -89,6 +89,8 @@ services:
ACCESS_TOKEN_EXPIRE_MINUTES: 60
# Redis
REDIS_URL: redis://redis:6379/0
REDIS_TOKEN_BLACKLIST_DB: "1"
REDIS_CACHE_DB: "2"
# MinIO
MINIO_ENDPOINT: minio:9000
MINIO_ACCESS_KEY: minioadmin