feat(phase-35): Jira + Tempo integration with internal worklogs
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled

Full Jira/Tempo pipeline: link Aegis entities to Jira issues, auto-sync
status hourly, log time internally with integrity hashing, and optionally
push worklogs to Tempo.

- 1.1 JiraLink model + Worklog model: Alembic migration b020 with indexes,
  enums (jiralinkentitytype, jirasyncdirection), and integrity_hash column
- 1.2 Jira service: atlassian-python-api wrapper with lazy singleton client,
  search/create/sync operations, feature-flagged via JIRA_ENABLED
- 1.3 Jira router: CRUD endpoints for /jira/links, /jira/search,
  /jira/create-issue with audit logging and entity-to-issue auto-creation
- 1.4 Tempo service: worklog push via tempo-api-python-client, auto-log from
  test completions when TEMPO_ENABLED, graceful fallback on failure
- 1.5 Worklog service + router: immutable internal time records with SHA-256
  integrity hash, CRUD at /worklogs, /worklogs/{id}/verify endpoint
- 1.6 Frontend: JiraLinkPanel component (search, link, sync, unlink) and
  WorklogTimeline component (timeline view, manual log form) integrated into
  TestDetailPage sidebar, CampaignDetailPage grid, TechniqueDetailPage
- 1.7 Jira sync job: APScheduler hourly job syncs all links from Jira,
  registered in background scheduler alongside existing jobs
This commit is contained in:
2026-02-17 15:57:39 +01:00
parent 6d18a5417d
commit 9b98f60a9a
23 changed files with 1605 additions and 1 deletions

View File

@@ -0,0 +1,105 @@
"""Jira integration service — wraps atlassian-python-api for Jira REST calls."""
import logging
from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Session
from app.config import settings
from app.domain.exceptions import InvalidOperationError
from app.models.jira_link import JiraLink
logger = logging.getLogger(__name__)
_jira_client = None
def get_jira_client():
"""Return a lazily-initialised Jira client, or raise if disabled."""
global _jira_client
if not settings.JIRA_ENABLED:
raise InvalidOperationError("Jira integration is not enabled")
if _jira_client is None:
from atlassian import Jira
_jira_client = Jira(
url=settings.JIRA_URL,
username=settings.JIRA_USERNAME,
password=settings.JIRA_API_TOKEN,
cloud=settings.JIRA_IS_CLOUD,
)
return _jira_client
def search_jira_issues(query: str, max_results: int = 10) -> list[dict]:
"""Search Jira issues by JQL or free text."""
jira = get_jira_client()
jql = query if "=" in query or "~" in query else f'summary ~ "{query}"'
results = jira.jql(jql, limit=max_results)
return [
{
"issue_key": issue["key"],
"summary": issue["fields"]["summary"],
"status": issue["fields"]["status"]["name"],
"assignee": (issue["fields"].get("assignee") or {}).get("displayName"),
"priority": (issue["fields"].get("priority") or {}).get("name"),
}
for issue in results.get("issues", [])
]
def create_jira_issue(
project_key: str,
summary: str,
description: str,
issue_type: str = "Task",
labels: Optional[list[str]] = None,
custom_fields: Optional[dict] = None,
) -> dict:
"""Create a Jira issue and return its key + id."""
jira = get_jira_client()
fields: dict = {
"project": {"key": project_key},
"summary": summary,
"description": description,
"issuetype": {"name": issue_type},
}
if labels:
fields["labels"] = labels
if custom_fields:
fields.update(custom_fields)
result = jira.issue_create(fields=fields)
return {"issue_key": result["key"], "issue_id": result["id"]}
def sync_jira_to_aegis(db: Session, link: JiraLink) -> None:
"""Pull current status from Jira into the local link record."""
jira = get_jira_client()
issue = jira.issue(link.jira_issue_key)
fields = issue.get("fields", {})
link.jira_status = fields.get("status", {}).get("name")
link.jira_priority = (fields.get("priority") or {}).get("name")
link.jira_assignee = (fields.get("assignee") or {}).get("displayName")
link.jira_story_points = str(fields.get("customfield_10016", ""))
link.last_synced_at = datetime.utcnow()
db.flush()
def sync_aegis_to_jira(db: Session, link: JiraLink, entity_data: dict) -> None:
"""Push an Aegis status update as a Jira comment."""
jira = get_jira_client()
comment_body = _build_sync_comment(entity_data)
jira.issue_add_comment(link.jira_issue_key, comment_body)
link.last_synced_at = datetime.utcnow()
db.flush()
def _build_sync_comment(data: dict) -> str:
"""Build a formatted Jira comment from entity data."""
lines = ["h3. Aegis Sync Update", ""]
for key, value in data.items():
lines.append(f"*{key}:* {value}")
lines.append(f"\n_Synced at {datetime.utcnow().isoformat()}_")
return "\n".join(lines)

View File

@@ -0,0 +1,96 @@
"""Tempo time-tracking integration service."""
import logging
from typing import Optional
from sqlalchemy.orm import Session
from app.config import settings
from app.domain.exceptions import InvalidOperationError
from app.models.jira_link import JiraLink
logger = logging.getLogger(__name__)
def get_tempo_client():
"""Return a Tempo API client, or raise if disabled."""
if not settings.TEMPO_ENABLED:
raise InvalidOperationError("Tempo integration is not enabled")
try:
from tempoapiclient import client_v4 as tempo_client
return tempo_client.Tempo(auth_token=settings.TEMPO_API_TOKEN)
except ImportError:
raise InvalidOperationError(
"tempo-api-python-client is not installed. "
"Install it with: pip install tempo-api-python-client"
)
def log_worklog(
jira_issue_id: int,
author_account_id: str,
date: str,
time_spent_seconds: int,
description: str,
) -> dict:
"""Create a worklog entry in Tempo."""
tempo = get_tempo_client()
worklog = tempo.create_worklog(
accountId=author_account_id,
issueId=jira_issue_id,
dateFrom=date,
timeSpentSeconds=time_spent_seconds,
description=description,
)
return worklog
def auto_log_test_worklog(
db: Session,
test,
user,
activity_type: str,
) -> Optional[dict]:
"""If the test has a Jira link, log time to Tempo automatically.
Returns the Tempo worklog response, or None if skipped.
"""
if not settings.TEMPO_ENABLED:
return None
link = (
db.query(JiraLink)
.filter(JiraLink.entity_id == test.id, JiraLink.entity_type == "test")
.first()
)
if not link or not link.jira_issue_id:
logger.debug("No Jira link for test %s, skipping Tempo worklog", test.id)
return None
duration = _calculate_duration(test, activity_type)
if duration <= 0:
return None
try:
result = log_worklog(
jira_issue_id=int(link.jira_issue_id),
author_account_id=getattr(user, "jira_account_id", "") or "",
date=test.created_at.strftime("%Y-%m-%d"),
time_spent_seconds=duration,
description=f"[Aegis] {activity_type}: {test.name}",
)
logger.info("Tempo worklog created for test %s, %ds", test.id, duration)
return result
except Exception as e:
logger.warning("Tempo worklog failed for test %s: %s", test.id, e, exc_info=True)
return None
def _calculate_duration(test, activity_type: str) -> int:
"""Estimate duration in seconds based on test timestamps and activity type."""
if activity_type == "execution" and test.execution_date and test.created_at:
delta = test.execution_date - test.created_at
return max(int(delta.total_seconds()), 0)
return 3600 # default 1 hour if no timestamps available

View File

@@ -0,0 +1,75 @@
"""Internal worklog service — CRUD with integrity hashing."""
import hashlib
import logging
from datetime import datetime
from typing import Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.models.worklog import Worklog
logger = logging.getLogger(__name__)
def create_worklog(
db: Session,
*,
entity_type: str,
entity_id: UUID,
user_id: UUID,
activity_type: str,
started_at: datetime,
duration_seconds: int,
ended_at: Optional[datetime] = None,
description: Optional[str] = None,
) -> Worklog:
"""Create a worklog with an auto-computed integrity hash."""
wl = Worklog(
entity_type=entity_type,
entity_id=entity_id,
user_id=user_id,
activity_type=activity_type,
started_at=started_at,
ended_at=ended_at,
duration_seconds=duration_seconds,
description=description,
)
wl.integrity_hash = _compute_hash(wl)
db.add(wl)
db.commit()
db.refresh(wl)
return wl
def list_worklogs(
db: Session,
*,
entity_type: Optional[str] = None,
entity_id: Optional[UUID] = None,
user_id: Optional[UUID] = None,
) -> list[Worklog]:
"""List worklogs with optional filters."""
query = db.query(Worklog)
if entity_type:
query = query.filter(Worklog.entity_type == entity_type)
if entity_id:
query = query.filter(Worklog.entity_id == entity_id)
if user_id:
query = query.filter(Worklog.user_id == user_id)
return query.order_by(Worklog.started_at.desc()).all()
def verify_worklog_integrity(wl: Worklog) -> bool:
"""Return True if the worklog has not been tampered with."""
return wl.integrity_hash == _compute_hash(wl)
def _compute_hash(wl: Worklog) -> str:
"""SHA-256 of the immutable fields for audit integrity."""
data = (
f"{wl.entity_type}:{wl.entity_id}:{wl.user_id}:"
f"{wl.activity_type}:{wl.started_at}:{wl.duration_seconds}"
)
return hashlib.sha256(data.encode()).hexdigest()