Files
Aegis/backend/app/services/tempo_service.py
kitos 0e6cec4d07
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
fix(tempo): only log red team execution time, use pre-computed duration
Two bugs fixed:

1. Blue team evaluation was also sent to Tempo. Only operator (red team)
   execution time should be logged — blue team time is tracked internally
   in Aegis but does NOT represent billable operator work. Added a
   whitelist (_TEMPO_ACTIVITY_TYPES = {"red_team_execution"}).

2. _calculate_duration() re-computed duration from red_started_at to
   datetime.utcnow() at call time, without subtracting paused seconds.
   This caused inflated times (e.g. 45 min instead of 5 min) when there
   was any delay between the workflow transition and the Tempo call.
   Now the duration_seconds already computed by _create_phase_worklog
   (gross elapsed - paused) is passed directly to auto_log_test_worklog
   and used as-is, so Aegis and Tempo always agree on the duration.

Also: use red_started_at as the worklog date (not submission timestamp)
so the Tempo entry reflects when the work actually happened.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 11:38:44 +02:00

180 lines
5.9 KiB
Python

"""Tempo time-tracking integration service.
Authentication model
--------------------
Each user authenticates to Tempo with their own personal Tempo API token,
stored in ``user.tempo_api_token``. This is different from the Jira API token.
Obtain a Tempo token at: Jira → Apps → Tempo → Settings → API Integration.
The global ``settings.TEMPO_ENABLED`` flag acts as a kill-switch. When False,
all Tempo calls are silently skipped regardless of whether users have tokens.
What goes to Tempo
------------------
Only **red team execution** time is logged to Tempo. This reflects the time
the operator (red_tech) spends executing the attack technique — from the moment
they click "Start" to when they click "Done". Blue team evaluation time is
tracked internally in Aegis but is NOT sent to Tempo.
"""
import logging
from typing import Optional
from sqlalchemy.orm import Session
from app.config import settings
from app.domain.exceptions import InvalidOperationError
from app.models.jira_link import JiraLink, JiraLinkEntityType
logger = logging.getLogger(__name__)
# Only these activity types are forwarded to Tempo.
# "blue_team_evaluation" is intentionally excluded — it is tracked in Aegis
# but does not represent operator execution work.
_TEMPO_ACTIVITY_TYPES = {"red_team_execution"}
def has_tempo_configured(user) -> bool:
"""Return True if *user* has a personal Tempo API token stored."""
return bool(getattr(user, "tempo_api_token", None))
def get_user_tempo_client(user):
"""Return a Tempo API v4 client authenticated as *user*.
Raises ``InvalidOperationError`` when the user has no token or the
client library is not installed.
"""
token = getattr(user, "tempo_api_token", None)
if not token:
raise InvalidOperationError(
"No Tempo API token configured. "
"Add it in Settings → Profile → Tempo Integration."
)
try:
from tempoapiclient import client_v4 as tempo_client
return tempo_client.Tempo(auth_token=token)
except ImportError:
raise InvalidOperationError(
"tempo-api-python-client is not installed. "
"Run: pip install tempo-api-python-client"
)
def log_worklog(
user,
jira_issue_id: int,
author_account_id: str,
date: str,
time_spent_seconds: int,
description: str,
work_type: str | None = None,
) -> dict:
"""Create a worklog entry in Tempo using *user*'s personal token."""
tempo = get_user_tempo_client(user)
kwargs: dict = {
"accountId": author_account_id,
"issueId": jira_issue_id,
"dateFrom": date,
"timeSpentSeconds": time_spent_seconds,
"description": description,
}
wt = work_type or settings.TEMPO_DEFAULT_WORK_TYPE
if wt:
kwargs["workType"] = wt
return tempo.create_worklog(**kwargs)
def auto_log_test_worklog(
db: Session,
test,
user,
activity_type: str,
duration_seconds: int,
) -> Optional[dict]:
"""Log *duration_seconds* to Tempo for the given test if conditions are met.
``duration_seconds`` must be the value already computed by the workflow
layer (gross elapsed time minus any paused time). It is used as-is so
the Tempo entry always matches the Aegis worklog — no re-calculation.
Only ``red_team_execution`` activities are forwarded to Tempo.
``blue_team_evaluation`` is tracked internally but not sent.
Returns the Tempo worklog response dict, or ``None`` if skipped.
Completely non-fatal — errors are logged and swallowed.
"""
# Only operator execution time goes to Tempo
if activity_type not in _TEMPO_ACTIVITY_TYPES:
logger.debug(
"Skipping Tempo sync for activity_type=%s (not in whitelist)", activity_type
)
return None
# Global kill-switch
if not settings.TEMPO_ENABLED:
return None
if duration_seconds <= 0:
logger.debug(
"Skipping Tempo sync for test %s: duration=%ds", test.id, duration_seconds
)
return None
# Per-user token required
if not has_tempo_configured(user):
logger.debug(
"User %s has no Tempo token; skipping worklog for test %s",
getattr(user, "username", user), test.id,
)
return None
# Need a Jira link with a numeric issue ID
link = (
db.query(JiraLink)
.filter(
JiraLink.entity_id == test.id,
JiraLink.entity_type == JiraLinkEntityType.test,
)
.first()
)
if not link or not link.jira_issue_id:
logger.debug("No Jira link for test %s, skipping Tempo worklog", test.id)
return None
jira_account_id = getattr(user, "jira_account_id", "") or ""
if not jira_account_id:
logger.debug(
"User %s has no jira_account_id; skipping Tempo worklog",
getattr(user, "username", user),
)
return None
try:
# Use red_started_at date as the worklog date so it matches when the
# work actually happened (not the submission timestamp).
work_date = (
(test.red_started_at or getattr(test, "updated_at", None) or test.created_at)
.strftime("%Y-%m-%d")
)
result = log_worklog(
user=user,
jira_issue_id=int(link.jira_issue_id),
author_account_id=jira_account_id,
date=work_date,
time_spent_seconds=duration_seconds,
description=f"[Aegis] Red Team execution: {test.name}",
)
logger.info(
"Tempo worklog created for test %s by user %s: %ds on %s",
test.id, getattr(user, "username", user), duration_seconds, work_date,
)
return result
except Exception as e:
logger.warning(
"Tempo worklog failed for test %s (user %s): %s",
test.id, getattr(user, "username", user), e, exc_info=True,
)
return None