Files
Aegis/backend/app/services/tempo_service.py
T
kitos 9472fe91fa
Aegis CI / lint-and-test (push) Has been cancelled
fix(lint): resolve 2132 ruff errors to pass CI lint-and-test job
- Remove ANN (type annotations) and D (docstrings) from ruff select; not
  feasible to add thousands of missing annotations/docstrings across the codebase
- Add I001 and E501 to ignore: comment-interleaved import style and SQLAlchemy
  FK definitions naturally exceed line limits
- Fix F811 duplicate import blocks in main.py, models/__init__.py, routers
  (campaigns, system, tests, evidence) and services (test_workflow, test_crud,
  campaign_service, schemas/test)
- Add missing Evidence/IntelItem/Technique/Test/TestTemplate/User imports to
  models/__init__.py (were only in duplicate block)
- Fix F821: add missing JWTError import in auth.py
- Fix F401 unused imports across 15+ files (jira_service, sso_service,
  notification_service, playbook_service, tempo_service, models, schemas,
  routers: admin_config, attack_paths, executive_dashboard, knowledge,
  ownership, risk_intelligence, sso, api_keys, email_service)
- Fix F841 unused variables: owned_technique_ids (executive_dashboard_service),
  severity (jira_service), priority_order (revalidation_queue_service)
- Fix F541 f-strings without placeholders in system.py and attck_evaluations_service
- Fix F601 duplicate dict key G0067 in threat_actor_import_service
- Fix E701 multiple-statements-on-one-line in risk_intelligence_service
- Fix E741 ambiguous variable name l -> lvl in risk_intelligence_service
- Fix N806 uppercase vars in functions: technique.py, heatmap_service.py;
  add noqa for compliance_import_service.py large unused constant dicts
- Fix W293 whitespace on blank lines in tests/conftest.py
2026-06-12 10:47:48 +02:00

277 lines
9.5 KiB
Python

"""Tempo time-tracking integration service.
Authentication model
--------------------
Each user authenticates to Tempo with their own personal Tempo API token,
stored in ``user.tempo_api_token``. This is different from the Jira API token.
Obtain a Tempo token at: Jira → Apps → Tempo → Settings → API Integration.
The global ``settings.TEMPO_ENABLED`` flag acts as a kill-switch. When False,
all Tempo calls are silently skipped regardless of whether users have tokens.
What goes to Tempo
------------------
Both **red team execution** and **blue team evaluation** time are logged to
Tempo. Red team time runs from when the red_tech clicks "Start" to "Done".
Blue team time runs from when the blue_tech clicks "Start Evaluation" (sets
blue_work_started_at) to when they submit, so it reflects actual working time
rather than queue time.
"""
# Import logging
import logging
# Import Optional from typing
from typing import Optional
# Import Session from sqlalchemy.orm
from sqlalchemy.orm import Session
# Import settings from app.config
from app.config import settings
# Import InvalidOperationError from app.domain.exceptions
from app.domain.exceptions import InvalidOperationError
# Import JiraLink, JiraLinkEntityType from app.models.jira_link
from app.models.jira_link import JiraLink, JiraLinkEntityType
# Import Test from app.models.test
from app.models.test import Test
# Import User from app.models.user
from app.models.user import User
# Assign logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
# Only red team execution time goes to Tempo.
# Blue team evaluation time is tracked internally (worklogs table) for SLA
# purposes but is NOT forwarded to Tempo — blue team has no Jira access.
_TEMPO_ACTIVITY_TYPES = {"red_team_execution"}
def has_tempo_configured(user) -> bool:
"""Return True if *user* has a personal Tempo API token stored."""
return bool(getattr(user, "tempo_api_token", None))
_TEMPO_DEFAULT_BASE_URL = "https://api.tempo.io/4"
_TEMPO_EU_BASE_URL = "https://api.eu.tempo.io/4"
# system_configs key for admin-configurable Tempo base URL
_TEMPO_BASE_URL_CONFIG_KEY = "tempo.base_url"
def _get_tempo_base_url(db=None) -> str:
"""Return the Tempo API base URL.
Reads ``tempo.base_url`` from ``system_configs`` first (allows the admin
to set the EU endpoint ``https://api.eu.tempo.io/4`` without redeploying).
Falls back to ``settings.TEMPO_BASE_URL`` env-var, then the global default.
"""
if db is not None:
try:
from app.models.system_config import SystemConfig
row = db.query(SystemConfig).filter(
SystemConfig.key == _TEMPO_BASE_URL_CONFIG_KEY
).first()
if row and row.value:
return row.value.rstrip("/")
except Exception:
pass # DB unavailable — fall through to defaults
env_url = getattr(settings, "TEMPO_BASE_URL", None)
return (env_url or _TEMPO_DEFAULT_BASE_URL).rstrip("/")
def get_user_tempo_client(user, db=None):
"""Return a Tempo API v4 client authenticated as *user*.
Pass ``db`` to allow reading the admin-configured base URL from
``system_configs`` (needed for EU Tempo workspaces).
Raises ``InvalidOperationError`` when the user has no token or the
client library is not installed.
"""
token = getattr(user, "tempo_api_token", None)
if not token:
raise InvalidOperationError(
"No Tempo API token configured. "
"Add it in Settings → Profile → Tempo Integration."
)
try:
# Import client_v4 as tempo_client from tempoapiclient
from tempoapiclient import client_v4 as tempo_client
base_url = _get_tempo_base_url(db)
logger.debug("Using Tempo base URL: %s", base_url)
return tempo_client.Tempo(auth_token=token, base_url=base_url)
except ImportError:
# Raise InvalidOperationError
raise InvalidOperationError(
# Literal argument value
"tempo-api-python-client is not installed. "
"Run: pip install tempo-api-python-client"
)
# Define function log_worklog
def log_worklog(
user,
jira_issue_id: int,
# Entry: author_account_id
author_account_id: str,
# Entry: date
date: str,
# Entry: time_spent_seconds
time_spent_seconds: int,
# Entry: description
description: str,
db=None,
) -> dict:
"""Create a worklog entry in Tempo using *user*'s personal token.
Note: tempoapiclient raises SystemExit (not Exception) on API errors, so
we intercept BaseException and re-raise as RuntimeError to keep it non-fatal.
"""
tempo = get_user_tempo_client(user, db=db)
try:
return tempo.create_worklog(
accountId=author_account_id,
issueId=jira_issue_id,
dateFrom=date,
timeSpentSeconds=time_spent_seconds,
description=description,
)
except Exception:
raise
except BaseException as exc:
# tempoapiclient raises SystemExit on HTTP errors (e.g. 400 Bad Request).
# SystemExit is a BaseException, not Exception, so convert it so callers
# can catch it with the usual `except Exception` pattern.
raise RuntimeError(f"Tempo API error: {exc}") from exc
# Define function auto_log_test_worklog
def auto_log_test_worklog(
# Entry: db
db: Session,
# Entry: test
test: Test,
# Entry: user
user: User,
# Entry: activity_type
activity_type: str,
duration_seconds: int,
) -> Optional[dict]:
"""Log *duration_seconds* to Tempo for the given test if conditions are met.
``duration_seconds`` must be the value already computed by the workflow
layer (gross elapsed time minus any paused time). It is used as-is so
the Tempo entry always matches the Aegis worklog — no re-calculation.
Only ``red_team_execution`` activities are forwarded to Tempo.
``blue_team_evaluation`` is tracked internally but not sent.
Returns the Tempo worklog response dict, or ``None`` if skipped.
Completely non-fatal — errors are logged and swallowed.
"""
# Only whitelisted activity types go to Tempo
if activity_type not in _TEMPO_ACTIVITY_TYPES:
logger.debug(
"Skipping Tempo sync for activity_type=%s (not in whitelist)", activity_type
)
return None
# Global kill-switch
if not settings.TEMPO_ENABLED:
# Return None
return None
if duration_seconds <= 0:
logger.debug(
"Skipping Tempo sync for test %s: duration=%ds", test.id, duration_seconds
)
return None
# Tempo requires whole minutes. Always round UP to the nearest minute
# and enforce a minimum of 60 seconds (1 minute).
# 2 seconds → 60 s (1 min, minimum)
# 3 min 20 s (200s) → 240 s (4 min, ceiling)
# 5 min 0 s (300s) → 300 s (5 min, exact)
import math
duration_seconds = max(60, math.ceil(duration_seconds / 60) * 60)
# Per-user token required
if not has_tempo_configured(user):
logger.debug(
"User %s has no Tempo token; skipping worklog for test %s",
getattr(user, "username", user), test.id,
)
return None
# Need a Jira link with a numeric issue ID
link = (
db.query(JiraLink)
# Chain .filter() call
.filter(
JiraLink.entity_id == test.id,
JiraLink.entity_type == JiraLinkEntityType.test,
)
# Chain .first() call
.first()
)
# Check: not link or not link.jira_issue_id
if not link or not link.jira_issue_id:
# Log debug: "No Jira link for test %s, skipping Tempo worklog"
logger.debug("No Jira link for test %s, skipping Tempo worklog", test.id)
# Return None
return None
jira_account_id = (getattr(user, "jira_account_id", "") or "").strip()
if not jira_account_id:
logger.debug(
"User %s has no jira_account_id; skipping Tempo worklog",
getattr(user, "username", user),
)
return None
# Attempt the following; catch errors below
try:
# Use the phase start timestamp as the worklog date so it matches when
# work actually happened (not the submission timestamp).
if activity_type == "blue_team_evaluation":
work_date = (
(test.blue_work_started_at or test.blue_started_at or test.created_at)
.strftime("%Y-%m-%d")
)
description = f"[Aegis] Blue Team evaluation: {test.name}"
else:
work_date = (
(test.red_started_at or getattr(test, "updated_at", None) or test.created_at)
.strftime("%Y-%m-%d")
)
description = f"[Aegis] Red Team execution: {test.name}"
result = log_worklog(
user=user,
jira_issue_id=int(link.jira_issue_id),
author_account_id=jira_account_id,
date=work_date,
time_spent_seconds=duration_seconds,
description=description,
db=db,
)
logger.info(
"Tempo worklog created for test %s by user %s: %ds on %s",
test.id, getattr(user, "username", user), duration_seconds, work_date,
)
return result
# Handle Exception
except Exception as e:
logger.warning(
"Tempo worklog failed for test %s (user %s): %s",
test.id, getattr(user, "username", user), e, exc_info=True,
)
return None