Files
Aegis/backend/app/routers/worklogs.py
Kitos 9b98f60a9a
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(phase-35): Jira + Tempo integration with internal worklogs
Full Jira/Tempo pipeline: link Aegis entities to Jira issues, auto-sync
status hourly, log time internally with integrity hashing, and optionally
push worklogs to Tempo.

- 1.1 JiraLink model + Worklog model: Alembic migration b020 with indexes,
  enums (jiralinkentitytype, jirasyncdirection), and integrity_hash column
- 1.2 Jira service: atlassian-python-api wrapper with lazy singleton client,
  search/create/sync operations, feature-flagged via JIRA_ENABLED
- 1.3 Jira router: CRUD endpoints for /jira/links, /jira/search,
  /jira/create-issue with audit logging and entity-to-issue auto-creation
- 1.4 Tempo service: worklog push via tempo-api-python-client, auto-log from
  test completions when TEMPO_ENABLED, graceful fallback on failure
- 1.5 Worklog service + router: immutable internal time records with SHA-256
  integrity hash, CRUD at /worklogs, /worklogs/{id}/verify endpoint
- 1.6 Frontend: JiraLinkPanel component (search, link, sync, unlink) and
  WorklogTimeline component (timeline view, manual log form) integrated into
  TestDetailPage sidebar, CampaignDetailPage grid, TechniqueDetailPage
- 1.7 Jira sync job: APScheduler hourly job syncs all links from Jira,
  registered in background scheduler alongside existing jobs
2026-02-17 15:57:39 +01:00

120 lines
3.6 KiB
Python

"""Worklog router — internal time-tracking records with integrity verification."""
from datetime import datetime
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies.auth import get_current_user
from app.domain.exceptions import EntityNotFoundError
from app.models.user import User
from app.models.worklog import Worklog
from app.services import worklog_service
router = APIRouter(prefix="/worklogs", tags=["worklogs"])
# ── Schemas ──────────────────────────────────────────────────────────────
class WorklogCreate(BaseModel):
entity_type: str = Field(..., max_length=50)
entity_id: UUID
activity_type: str = Field(..., max_length=100)
started_at: datetime
ended_at: Optional[datetime] = None
duration_seconds: int = Field(..., gt=0)
description: Optional[str] = None
class WorklogOut(BaseModel):
id: UUID
entity_type: str
entity_id: UUID
user_id: UUID
activity_type: str
started_at: datetime
ended_at: Optional[datetime] = None
duration_seconds: int
description: Optional[str] = None
tempo_synced: Optional[datetime] = None
integrity_hash: Optional[str] = None
created_at: datetime
class Config:
from_attributes = True
# ── Endpoints ────────────────────────────────────────────────────────────
@router.post("", response_model=WorklogOut, status_code=201)
def create(
body: WorklogCreate,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Create a manually-logged worklog entry."""
wl = worklog_service.create_worklog(
db,
entity_type=body.entity_type,
entity_id=body.entity_id,
user_id=user.id,
activity_type=body.activity_type,
started_at=body.started_at,
ended_at=body.ended_at,
duration_seconds=body.duration_seconds,
description=body.description,
)
return wl
@router.get("", response_model=list[WorklogOut])
def list_all(
entity_type: Optional[str] = None,
entity_id: Optional[UUID] = None,
user_id: Optional[UUID] = None,
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
"""List worklogs with optional filters."""
return worklog_service.list_worklogs(
db,
entity_type=entity_type,
entity_id=entity_id,
user_id=user_id,
)
@router.get("/{worklog_id}", response_model=WorklogOut)
def get_one(
worklog_id: UUID,
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
"""Get a single worklog by ID."""
wl = db.query(Worklog).filter(Worklog.id == worklog_id).first()
if not wl:
raise EntityNotFoundError("Worklog", str(worklog_id))
return wl
@router.get("/{worklog_id}/verify")
def verify_integrity(
worklog_id: UUID,
db: Session = Depends(get_db),
_user: User = Depends(get_current_user),
):
"""Check whether a worklog's integrity hash is still valid."""
wl = db.query(Worklog).filter(Worklog.id == worklog_id).first()
if not wl:
raise EntityNotFoundError("Worklog", str(worklog_id))
return {
"worklog_id": str(wl.id),
"integrity_valid": worklog_service.verify_worklog_integrity(wl),
}