236 lines
7.9 KiB
Python
236 lines
7.9 KiB
Python
"""Jira integration service — wraps atlassian-python-api for Jira REST calls."""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.config import settings
|
|
from app.domain.errors import EntityNotFoundError
|
|
from app.domain.exceptions import InvalidOperationError
|
|
from app.models.campaign import Campaign
|
|
from app.models.jira_link import JiraLink, JiraLinkEntityType, JiraSyncDirection
|
|
from app.models.technique import Technique
|
|
from app.models.test import Test
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_jira_client = None
|
|
|
|
|
|
def get_jira_client():
|
|
"""Return a lazily-initialised Jira client, or raise if disabled."""
|
|
global _jira_client
|
|
if not settings.JIRA_ENABLED:
|
|
raise InvalidOperationError("Jira integration is not enabled")
|
|
if _jira_client is None:
|
|
from atlassian import Jira
|
|
|
|
_jira_client = Jira(
|
|
url=settings.JIRA_URL,
|
|
username=settings.JIRA_USERNAME,
|
|
password=settings.JIRA_API_TOKEN,
|
|
cloud=settings.JIRA_IS_CLOUD,
|
|
)
|
|
return _jira_client
|
|
|
|
|
|
def search_jira_issues(query: str, max_results: int = 10) -> list[dict]:
|
|
"""Search Jira issues by JQL or free text."""
|
|
jira = get_jira_client()
|
|
jql = query if "=" in query or "~" in query else f'summary ~ "{query}"'
|
|
results = jira.jql(jql, limit=max_results)
|
|
return [
|
|
{
|
|
"issue_key": issue["key"],
|
|
"summary": issue["fields"]["summary"],
|
|
"status": issue["fields"]["status"]["name"],
|
|
"assignee": (issue["fields"].get("assignee") or {}).get("displayName"),
|
|
"priority": (issue["fields"].get("priority") or {}).get("name"),
|
|
}
|
|
for issue in results.get("issues", [])
|
|
]
|
|
|
|
|
|
def create_jira_issue(
|
|
project_key: str,
|
|
summary: str,
|
|
description: str,
|
|
issue_type: str = "Task",
|
|
labels: Optional[list[str]] = None,
|
|
custom_fields: Optional[dict] = None,
|
|
) -> dict:
|
|
"""Create a Jira issue and return its key + id."""
|
|
jira = get_jira_client()
|
|
fields: dict = {
|
|
"project": {"key": project_key},
|
|
"summary": summary,
|
|
"description": description,
|
|
"issuetype": {"name": issue_type},
|
|
}
|
|
if labels:
|
|
fields["labels"] = labels
|
|
if custom_fields:
|
|
fields.update(custom_fields)
|
|
|
|
result = jira.issue_create(fields=fields)
|
|
return {"issue_key": result["key"], "issue_id": result["id"]}
|
|
|
|
|
|
def sync_jira_to_aegis(db: Session, link: JiraLink) -> None:
|
|
"""Pull current status from Jira into the local link record."""
|
|
jira = get_jira_client()
|
|
issue = jira.issue(link.jira_issue_key)
|
|
fields = issue.get("fields", {})
|
|
link.jira_status = fields.get("status", {}).get("name")
|
|
link.jira_priority = (fields.get("priority") or {}).get("name")
|
|
link.jira_assignee = (fields.get("assignee") or {}).get("displayName")
|
|
link.jira_story_points = str(fields.get("customfield_10016", ""))
|
|
link.last_synced_at = datetime.utcnow()
|
|
db.flush()
|
|
|
|
|
|
def sync_aegis_to_jira(db: Session, link: JiraLink, entity_data: dict) -> None:
|
|
"""Push an Aegis status update as a Jira comment."""
|
|
jira = get_jira_client()
|
|
comment_body = _build_sync_comment(entity_data)
|
|
jira.issue_add_comment(link.jira_issue_key, comment_body)
|
|
link.last_synced_at = datetime.utcnow()
|
|
db.flush()
|
|
|
|
|
|
def _build_sync_comment(data: dict) -> str:
|
|
"""Build a formatted Jira comment from entity data."""
|
|
lines = ["h3. Aegis Sync Update", ""]
|
|
for key, value in data.items():
|
|
lines.append(f"*{key}:* {value}")
|
|
lines.append(f"\n_Synced at {datetime.utcnow().isoformat()}_")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ── Link CRUD ────────────────────────────────────────────────────────
|
|
|
|
|
|
def create_link(
|
|
db: Session,
|
|
*,
|
|
entity_type: JiraLinkEntityType,
|
|
entity_id: UUID,
|
|
jira_issue_key: str,
|
|
sync_direction: JiraSyncDirection,
|
|
created_by: UUID,
|
|
) -> JiraLink:
|
|
"""Create a Jira link and optionally pull initial data from Jira."""
|
|
link = JiraLink(
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
jira_issue_key=jira_issue_key,
|
|
sync_direction=sync_direction,
|
|
created_by=created_by,
|
|
)
|
|
db.add(link)
|
|
db.flush()
|
|
|
|
if settings.JIRA_ENABLED:
|
|
try:
|
|
sync_jira_to_aegis(db, link)
|
|
except Exception as e:
|
|
logger.warning("Initial Jira sync failed for %s: %s", jira_issue_key, e)
|
|
|
|
return link
|
|
|
|
|
|
def list_links(
|
|
db: Session,
|
|
*,
|
|
entity_type: Optional[JiraLinkEntityType] = None,
|
|
entity_id: Optional[UUID] = None,
|
|
) -> list[JiraLink]:
|
|
"""List Jira links with optional filters."""
|
|
query = db.query(JiraLink)
|
|
if entity_type:
|
|
query = query.filter(JiraLink.entity_type == entity_type)
|
|
if entity_id:
|
|
query = query.filter(JiraLink.entity_id == entity_id)
|
|
return query.order_by(JiraLink.created_at.desc()).all()
|
|
|
|
|
|
def get_link_or_raise(db: Session, link_id: UUID) -> JiraLink:
|
|
"""Get a Jira link by ID or raise EntityNotFoundError."""
|
|
link = db.query(JiraLink).filter(JiraLink.id == link_id).first()
|
|
if not link:
|
|
raise EntityNotFoundError("JiraLink", str(link_id))
|
|
return link
|
|
|
|
|
|
def delete_link(db: Session, link_id: UUID) -> JiraLink:
|
|
"""Delete a Jira link. Returns the deleted link (for audit)."""
|
|
link = get_link_or_raise(db, link_id)
|
|
db.delete(link)
|
|
return link
|
|
|
|
|
|
def build_issue_data(db: Session, entity_type: JiraLinkEntityType, entity_id: UUID) -> tuple[str, str]:
|
|
"""Build Jira issue summary and description from an Aegis entity."""
|
|
if entity_type == JiraLinkEntityType.test:
|
|
entity = db.query(Test).filter(Test.id == entity_id).first()
|
|
if not entity:
|
|
raise EntityNotFoundError("Test", str(entity_id))
|
|
return (
|
|
f"[Aegis Test] {entity.name}",
|
|
f"Test: {entity.name}\n"
|
|
f"State: {entity.state.value if entity.state else 'draft'}\n"
|
|
f"Description: {entity.description or 'N/A'}",
|
|
)
|
|
elif entity_type == JiraLinkEntityType.campaign:
|
|
entity = db.query(Campaign).filter(Campaign.id == entity_id).first()
|
|
if not entity:
|
|
raise EntityNotFoundError("Campaign", str(entity_id))
|
|
return (
|
|
f"[Aegis Campaign] {entity.name}",
|
|
f"Campaign: {entity.name}\n"
|
|
f"Type: {entity.type}\nStatus: {entity.status}\n"
|
|
f"Description: {entity.description or 'N/A'}",
|
|
)
|
|
elif entity_type == JiraLinkEntityType.technique:
|
|
entity = db.query(Technique).filter(Technique.id == entity_id).first()
|
|
if not entity:
|
|
raise EntityNotFoundError("Technique", str(entity_id))
|
|
return (
|
|
f"[Aegis Technique] {entity.mitre_id} - {entity.name}",
|
|
f"MITRE ID: {entity.mitre_id}\nName: {entity.name}\n"
|
|
f"Tactic: {entity.tactic or 'N/A'}\n"
|
|
f"Description: {entity.description or 'N/A'}",
|
|
)
|
|
else:
|
|
return f"[Aegis] Entity {entity_id}", f"Entity type: {entity_type.value}"
|
|
|
|
|
|
def create_issue_and_link(
|
|
db: Session,
|
|
*,
|
|
entity_type: JiraLinkEntityType,
|
|
entity_id: UUID,
|
|
created_by: UUID,
|
|
) -> dict:
|
|
"""Create a Jira issue from an Aegis entity and link them."""
|
|
summary, description = build_issue_data(db, entity_type, entity_id)
|
|
result = create_jira_issue(
|
|
project_key=settings.JIRA_DEFAULT_PROJECT,
|
|
summary=summary,
|
|
description=description,
|
|
labels=["aegis", entity_type.value],
|
|
)
|
|
link = JiraLink(
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
jira_issue_key=result["issue_key"],
|
|
jira_issue_id=result["issue_id"],
|
|
jira_project_key=settings.JIRA_DEFAULT_PROJECT,
|
|
created_by=created_by,
|
|
)
|
|
db.add(link)
|
|
return {"issue_key": result["issue_key"], "link_id": str(link.id)}
|