851100d8ec
Two bugs fixed:
1. Blue team evaluation was also sent to Tempo. Only operator (red team)
execution time should be logged — blue team time is tracked internally
in Aegis but does NOT represent billable operator work. Added a
whitelist (_TEMPO_ACTIVITY_TYPES = {"red_team_execution"}).
2. _calculate_duration() re-computed duration from red_started_at to
datetime.utcnow() at call time, without subtracting paused seconds.
This caused inflated times (e.g. 45 min instead of 5 min) when there
was any delay between the workflow transition and the Tempo call.
Now the duration_seconds already computed by _create_phase_worklog
(gross elapsed - paused) is passed directly to auto_log_test_worklog
and used as-is, so Aegis and Tempo always agree on the duration.
Also: use red_started_at as the worklog date (not submission timestamp)
so the Tempo entry reflects when the work actually happened.
626 lines
20 KiB
Python
626 lines
20 KiB
Python
"""Test workflow service — state-machine transitions for the Red/Blue validation flow.
|
|
|
|
Controls which state transitions are valid and exposes high-level helpers
|
|
for each step in the test lifecycle:
|
|
|
|
draft → red_executing → blue_evaluating → in_review → validated / rejected
|
|
↓
|
|
rejected → draft
|
|
|
|
Every public function validates the transition, mutates the test, and writes
|
|
an audit-log entry. The caller (router) is responsible for committing the
|
|
session via the Unit of Work pattern.
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.config import settings
|
|
from app.domain.exceptions import InvalidOperationError, InvalidTransitionError
|
|
from app.domain.test_entity import TestEntity
|
|
from app.models.enums import TestState
|
|
from app.models.test import Test
|
|
from app.models.user import User
|
|
from app.services.audit_service import log_action
|
|
from app.services.notification_service import notify_test_state_change, create_notification
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Valid transition map
|
|
# ---------------------------------------------------------------------------
|
|
|
|
VALID_TRANSITIONS: dict[TestState, list[TestState]] = {
|
|
TestState.draft: [TestState.red_executing],
|
|
TestState.red_executing: [TestState.blue_evaluating],
|
|
TestState.blue_evaluating: [TestState.in_review],
|
|
TestState.in_review: [TestState.validated, TestState.rejected],
|
|
TestState.rejected: [TestState.draft],
|
|
TestState.validated: [], # terminal state
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def can_transition(test: Test, target_state: TestState) -> bool:
|
|
"""Return *True* if moving *test* to *target_state* is allowed."""
|
|
current = test.state if isinstance(test.state, TestState) else TestState(test.state)
|
|
return target_state in VALID_TRANSITIONS.get(current, [])
|
|
|
|
|
|
def transition_state(
|
|
db: Session,
|
|
test: Test,
|
|
target_state: TestState,
|
|
user: User,
|
|
*,
|
|
action_name: str = "transition_state",
|
|
extra_details: dict | None = None,
|
|
) -> Test:
|
|
"""Validate and perform a state transition, log it, and flush.
|
|
|
|
Delegates validation to :class:`TestEntity` which raises
|
|
:class:`InvalidStateTransition` (aliased as ``InvalidTransitionError``)
|
|
when the transition is illegal. The entity is authoritative for which
|
|
transitions are valid; the module-level ``VALID_TRANSITIONS`` dict is
|
|
kept temporarily for backward compatibility of ``can_transition()``.
|
|
"""
|
|
entity = TestEntity.from_orm(test)
|
|
previous_state = entity.transition_to(target_state)
|
|
|
|
test.state = entity.state
|
|
db.flush()
|
|
|
|
details: dict = {
|
|
"previous_state": previous_state,
|
|
"new_state": target_state.value,
|
|
"test_name": test.name,
|
|
"technique_id": str(test.technique_id),
|
|
}
|
|
if extra_details:
|
|
details.update(extra_details)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=user.id,
|
|
action=action_name,
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details=details,
|
|
)
|
|
|
|
try:
|
|
notify_test_state_change(db, test, target_state.value)
|
|
except Exception as e:
|
|
logger.warning("Notification failed for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
return test
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lifecycle convenience functions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def start_execution(db: Session, test: Test, user: User) -> Test:
|
|
"""Move from ``draft`` → ``red_executing``."""
|
|
entity = TestEntity.from_orm(test)
|
|
entity.start_execution()
|
|
entity.apply_to(test)
|
|
db.flush()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=user.id,
|
|
action="start_execution",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={
|
|
"previous_state": "draft",
|
|
"new_state": test.state.value,
|
|
"test_name": test.name,
|
|
"technique_id": str(test.technique_id),
|
|
},
|
|
)
|
|
|
|
try:
|
|
notify_test_state_change(db, test, test.state.value)
|
|
except Exception as e:
|
|
logger.warning("Notification failed for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
try:
|
|
from app.services.jira_service import push_test_event
|
|
push_test_event(db, test, user, "red_executing")
|
|
except Exception as e:
|
|
logger.warning("Jira push failed for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
return test
|
|
|
|
|
|
def submit_red_evidence(db: Session, test: Test, user: User) -> Test:
|
|
"""Move from ``red_executing`` → ``blue_evaluating``.
|
|
|
|
Called by **red_tech** once they have finished documenting the attack.
|
|
Stops the Red Team timer and creates an automatic worklog.
|
|
Starts the Blue Team timer by recording ``blue_started_at``.
|
|
"""
|
|
now = datetime.utcnow()
|
|
|
|
# Auto-resume if paused
|
|
paused_extra = 0
|
|
if test.paused_at is not None:
|
|
paused_extra = max(int((now - test.paused_at).total_seconds()), 0)
|
|
test.paused_at = None
|
|
|
|
test = transition_state(
|
|
db, test, TestState.blue_evaluating, user,
|
|
action_name="submit_red_evidence",
|
|
)
|
|
|
|
# Create automatic worklog for Red Team phase (subtract paused time)
|
|
_create_phase_worklog(
|
|
db,
|
|
test=test,
|
|
user=user,
|
|
phase_started_at=test.red_started_at,
|
|
phase_ended_at=now,
|
|
paused_seconds=(test.red_paused_seconds or 0) + paused_extra,
|
|
activity_type="red_team_execution",
|
|
description=f"Red Team execution: {test.name}",
|
|
)
|
|
|
|
# Start Blue Team timer
|
|
test.blue_started_at = now
|
|
test.blue_paused_seconds = 0
|
|
|
|
try:
|
|
from app.services.jira_service import push_test_event
|
|
push_test_event(db, test, user, "blue_evaluating")
|
|
except Exception as e:
|
|
logger.warning("Jira push failed for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
return test
|
|
|
|
|
|
def submit_blue_evidence(db: Session, test: Test, user: User) -> Test:
|
|
"""Move from ``blue_evaluating`` → ``in_review``.
|
|
|
|
Called by **blue_tech** once they have finished documenting detection.
|
|
Stops the Blue Team timer and creates an automatic worklog.
|
|
"""
|
|
now = datetime.utcnow()
|
|
|
|
# Auto-resume if paused
|
|
paused_extra = 0
|
|
if test.paused_at is not None:
|
|
paused_extra = max(int((now - test.paused_at).total_seconds()), 0)
|
|
test.paused_at = None
|
|
|
|
test = transition_state(
|
|
db, test, TestState.in_review, user,
|
|
action_name="submit_blue_evidence",
|
|
)
|
|
|
|
# Create automatic worklog for Blue Team phase (subtract paused time)
|
|
_create_phase_worklog(
|
|
db,
|
|
test=test,
|
|
user=user,
|
|
phase_started_at=test.blue_started_at,
|
|
phase_ended_at=now,
|
|
paused_seconds=(test.blue_paused_seconds or 0) + paused_extra,
|
|
activity_type="blue_team_evaluation",
|
|
description=f"Blue Team evaluation: {test.name}",
|
|
)
|
|
|
|
try:
|
|
from app.services.jira_service import push_test_event
|
|
push_test_event(db, test, user, "in_review")
|
|
except Exception as e:
|
|
logger.warning("Jira push failed for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
return test
|
|
|
|
|
|
def pause_timer(db: Session, test: Test, user: User) -> Test:
|
|
"""Pause the active phase timer.
|
|
|
|
Can only be called when the test is in ``red_executing`` or
|
|
``blue_evaluating`` and is not already paused.
|
|
"""
|
|
if test.state not in (TestState.red_executing, TestState.blue_evaluating):
|
|
raise InvalidOperationError(
|
|
f"Cannot pause timer in '{test.state.value}' state"
|
|
)
|
|
if test.paused_at is not None:
|
|
raise InvalidOperationError("Timer is already paused")
|
|
|
|
test.paused_at = datetime.utcnow()
|
|
log_action(
|
|
db,
|
|
user_id=user.id,
|
|
action="pause_timer",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={"state": test.state.value},
|
|
)
|
|
return test
|
|
|
|
|
|
def resume_timer(db: Session, test: Test, user: User) -> Test:
|
|
"""Resume a paused phase timer.
|
|
|
|
Accumulates the paused duration into the appropriate counter so
|
|
it is subtracted from the final worklog.
|
|
"""
|
|
if test.paused_at is None:
|
|
raise InvalidOperationError("Timer is not paused")
|
|
|
|
now = datetime.utcnow()
|
|
paused_seconds = max(int((now - test.paused_at).total_seconds()), 0)
|
|
|
|
if test.state == TestState.red_executing:
|
|
test.red_paused_seconds = (test.red_paused_seconds or 0) + paused_seconds
|
|
elif test.state == TestState.blue_evaluating:
|
|
test.blue_paused_seconds = (test.blue_paused_seconds or 0) + paused_seconds
|
|
|
|
test.paused_at = None
|
|
log_action(
|
|
db,
|
|
user_id=user.id,
|
|
action="resume_timer",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={"paused_seconds": paused_seconds, "state": test.state.value},
|
|
)
|
|
return test
|
|
|
|
|
|
def _create_phase_worklog(
|
|
db: Session,
|
|
*,
|
|
test: Test,
|
|
user: User,
|
|
phase_started_at: datetime | None,
|
|
phase_ended_at: datetime,
|
|
paused_seconds: int = 0,
|
|
activity_type: str,
|
|
description: str,
|
|
) -> None:
|
|
"""Create an automatic, integrity-hashed worklog for a completed phase.
|
|
|
|
Subtracts accumulated *paused_seconds* from the gross elapsed time
|
|
so the worklog reflects only active working time.
|
|
Also triggers Tempo sync if the test has a Jira link.
|
|
"""
|
|
if not phase_started_at:
|
|
logger.warning(
|
|
"No phase start timestamp for test %s (%s), skipping worklog",
|
|
test.id, activity_type,
|
|
)
|
|
return
|
|
|
|
gross_seconds = int((phase_ended_at - phase_started_at).total_seconds())
|
|
duration_seconds = max(gross_seconds - paused_seconds, 1)
|
|
|
|
try:
|
|
from app.services.worklog_service import create_worklog
|
|
|
|
wl = create_worklog(
|
|
db,
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
user_id=user.id,
|
|
activity_type=activity_type,
|
|
started_at=phase_started_at,
|
|
ended_at=phase_ended_at,
|
|
duration_seconds=duration_seconds,
|
|
description=description,
|
|
)
|
|
logger.info(
|
|
"Auto-worklog created for test %s: %s, %ds (worklog %s)",
|
|
test.id, activity_type, duration_seconds, wl.id,
|
|
)
|
|
|
|
# Sync to Tempo: only red_team_execution, using the already-computed
|
|
# duration so the Tempo entry is identical to the Aegis worklog.
|
|
try:
|
|
from app.services.tempo_service import auto_log_test_worklog
|
|
auto_log_test_worklog(db, test, user, activity_type, duration_seconds)
|
|
except Exception as e:
|
|
logger.warning("Tempo sync failed for worklog: %s", e, exc_info=True)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to create auto-worklog for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
|
|
def validate_as_red_lead(
|
|
db: Session,
|
|
test: Test,
|
|
user: User,
|
|
validation_status: str,
|
|
notes: str | None = None,
|
|
) -> Test:
|
|
"""Record Red Lead's validation decision.
|
|
|
|
Delegates validation rules and state mutation entirely to
|
|
:meth:`TestEntity.validate_red`. If both leads have voted the
|
|
entity will also advance the test to ``validated`` or ``rejected``.
|
|
"""
|
|
entity = TestEntity.from_orm(test)
|
|
entity.validate_red(validation_status, by=user.id, notes=notes)
|
|
entity.apply_to(test)
|
|
db.flush()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=user.id,
|
|
action="validate_as_red_lead",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={
|
|
"validation_status": validation_status,
|
|
"notes": notes,
|
|
"technique_id": str(test.technique_id),
|
|
},
|
|
)
|
|
|
|
_dispatch_dual_validation_effects(db, test, entity, actor=user)
|
|
return test
|
|
|
|
|
|
def validate_as_blue_lead(
|
|
db: Session,
|
|
test: Test,
|
|
user: User,
|
|
validation_status: str,
|
|
notes: str | None = None,
|
|
) -> Test:
|
|
"""Record Blue Lead's validation decision.
|
|
|
|
Delegates validation rules and state mutation entirely to
|
|
:meth:`TestEntity.validate_blue`. If both leads have voted the
|
|
entity will also advance the test to ``validated`` or ``rejected``.
|
|
"""
|
|
entity = TestEntity.from_orm(test)
|
|
entity.validate_blue(validation_status, by=user.id, notes=notes)
|
|
entity.apply_to(test)
|
|
db.flush()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=user.id,
|
|
action="validate_as_blue_lead",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={
|
|
"validation_status": validation_status,
|
|
"notes": notes,
|
|
"technique_id": str(test.technique_id),
|
|
},
|
|
)
|
|
|
|
_dispatch_dual_validation_effects(db, test, entity, actor=user)
|
|
return test
|
|
|
|
|
|
def check_dual_validation(db: Session, test: Test) -> Test:
|
|
"""Evaluate both leads' decisions and advance the test if both have voted.
|
|
|
|
All state mutation is delegated to :meth:`TestEntity.check_dual_validation`.
|
|
This function never assigns ``test.state`` directly.
|
|
"""
|
|
entity = TestEntity.from_orm(test)
|
|
entity.check_dual_validation()
|
|
entity.apply_to(test)
|
|
|
|
_dispatch_dual_validation_effects(db, test, entity)
|
|
return test
|
|
|
|
|
|
def _dispatch_dual_validation_effects(
|
|
db: Session, test: Test, entity: TestEntity, actor: User | None = None
|
|
) -> None:
|
|
"""Dispatch side effects (notifications, cache, Jira) based on domain events."""
|
|
for event in entity.events:
|
|
if event.name == "dual_validation_approved":
|
|
try:
|
|
from app.services.score_cache import invalidate
|
|
invalidate()
|
|
except Exception as e:
|
|
logger.warning("Score cache invalidation failed: %s", e, exc_info=True)
|
|
try:
|
|
notify_test_state_change(db, test, "validated")
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Notification failed for test %s (validated): %s",
|
|
test.id, e, exc_info=True,
|
|
)
|
|
if actor:
|
|
try:
|
|
from app.services.jira_service import push_test_event
|
|
push_test_event(db, test, actor, "validated")
|
|
except Exception as e:
|
|
logger.warning("Jira push failed for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
elif event.name == "dual_validation_rejected":
|
|
try:
|
|
notify_test_state_change(db, test, "rejected")
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Notification failed for test %s (rejected): %s",
|
|
test.id, e, exc_info=True,
|
|
)
|
|
if actor:
|
|
try:
|
|
from app.services.jira_service import push_test_event
|
|
push_test_event(db, test, actor, "rejected")
|
|
except Exception as e:
|
|
logger.warning("Jira push failed for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
|
|
def handle_remediation_completed(db: Session, test: Test, user: User) -> Test | None:
|
|
"""Create a re-test when remediation is completed.
|
|
|
|
When a test's remediation_status changes to 'completed', this function
|
|
creates a new test (retest) with the same base data to verify that the
|
|
fix was effective.
|
|
|
|
Prevents infinite loops by enforcing ``MAX_RETEST_COUNT``.
|
|
|
|
Returns the new retest or *None* if the limit was reached.
|
|
"""
|
|
# Always reference the original test, not an intermediate retest
|
|
original_test_id = test.retest_of or test.id
|
|
|
|
if test.retest_count >= settings.MAX_RETEST_COUNT:
|
|
# Max retests reached — notify and bail out
|
|
if test.created_by:
|
|
create_notification(
|
|
db,
|
|
user_id=test.created_by,
|
|
type="max_retests_reached",
|
|
title="Maximum retests reached",
|
|
message=(
|
|
f'Test "{test.name}" has reached the maximum of '
|
|
f'{settings.MAX_RETEST_COUNT} retests. Manual review required.'
|
|
),
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=user.id,
|
|
action="max_retests_reached",
|
|
entity_type="test",
|
|
entity_id=test.id,
|
|
details={
|
|
"retest_count": test.retest_count,
|
|
"max_allowed": settings.MAX_RETEST_COUNT,
|
|
"original_test_id": str(original_test_id),
|
|
},
|
|
)
|
|
return None
|
|
|
|
retest = Test(
|
|
technique_id=test.technique_id,
|
|
name=f"[Retest #{test.retest_count + 1}] {test.name.replace(f'[Retest #{test.retest_count}] ', '')}",
|
|
description=test.description,
|
|
platform=test.platform,
|
|
procedure_text=test.procedure_text,
|
|
tool_used=test.tool_used,
|
|
state=TestState.draft,
|
|
created_by=test.created_by,
|
|
retest_of=original_test_id,
|
|
retest_count=test.retest_count + 1,
|
|
)
|
|
db.add(retest)
|
|
db.flush()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=user.id,
|
|
action="create_retest",
|
|
entity_type="test",
|
|
entity_id=retest.id,
|
|
details={
|
|
"original_test_id": str(original_test_id),
|
|
"retest_number": retest.retest_count,
|
|
"source_test_id": str(test.id),
|
|
},
|
|
)
|
|
|
|
# Notify the test creator and any red_tech users
|
|
if test.created_by:
|
|
create_notification(
|
|
db,
|
|
user_id=test.created_by,
|
|
type="retest_created",
|
|
title="Re-test created",
|
|
message=(
|
|
f'A re-test has been automatically created for "{test.name}" '
|
|
f'after remediation was completed.'
|
|
),
|
|
entity_type="test",
|
|
entity_id=retest.id,
|
|
)
|
|
|
|
db.flush()
|
|
return retest
|
|
|
|
|
|
def get_retest_chain(db: Session, test_id) -> list[Test]:
|
|
"""Return the full chain of retests for a given test.
|
|
|
|
Includes the original test and all subsequent retests, ordered
|
|
by retest_count.
|
|
"""
|
|
import uuid as _uuid
|
|
|
|
tid = _uuid.UUID(str(test_id)) if not isinstance(test_id, _uuid.UUID) else test_id
|
|
|
|
# Find the original test first
|
|
test = db.query(Test).filter(Test.id == tid).first()
|
|
if not test:
|
|
return []
|
|
|
|
original_id = test.retest_of or test.id
|
|
|
|
# Get original
|
|
original = db.query(Test).filter(Test.id == original_id).first()
|
|
if not original:
|
|
return [test]
|
|
|
|
# Get all retests of the original
|
|
retests = (
|
|
db.query(Test)
|
|
.filter(Test.retest_of == original_id)
|
|
.order_by(Test.retest_count)
|
|
.all()
|
|
)
|
|
|
|
return [original] + retests
|
|
|
|
|
|
def reopen_test(db: Session, test: Test, user: User) -> Test:
|
|
"""Move a ``rejected`` test back to ``draft``, clearing validation fields.
|
|
|
|
This allows the teams to redo the test cycle.
|
|
"""
|
|
test = transition_state(
|
|
db, test, TestState.draft, user,
|
|
action_name="reopen_test",
|
|
)
|
|
|
|
# Clear dual-validation fields
|
|
test.red_validation_status = None
|
|
test.red_validated_by = None
|
|
test.red_validated_at = None
|
|
test.red_validation_notes = None
|
|
|
|
test.blue_validation_status = None
|
|
test.blue_validated_by = None
|
|
test.blue_validated_at = None
|
|
test.blue_validation_notes = None
|
|
|
|
# Clear phase timing fields
|
|
test.red_started_at = None
|
|
test.blue_started_at = None
|
|
test.paused_at = None
|
|
test.red_paused_seconds = 0
|
|
test.blue_paused_seconds = 0
|
|
|
|
try:
|
|
from app.services.jira_service import push_test_event
|
|
push_test_event(db, test, user, "draft")
|
|
except Exception as e:
|
|
logger.warning("Jira push failed for test %s: %s", test.id, e, exc_info=True)
|
|
|
|
return test
|