Files
Aegis/backend/app/services/jira_service.py
kitos eeee17d260
Some checks failed
Aegis CI / lint-and-test (push) Has been cancelled
feat(jira+tests): 5 improvements from review
1. Jira status → In Progress on Start Execution
   - push_test_event calls set_issue_status("In Progress") when
     new_state == "red_executing" (non-fatal, separate try/except)

2. Jira assignee set on Start Execution
   - assign_issue() called with actor.jira_account_id when operator
     clicks Start (non-fatal)

3. Standalone tests parent ticket (OFS-20798)
   - New jira.parent_ticket_standalone config key
   - get_jira_parent_ticket_standalone() falls back to parent_ticket
   - auto_create_test_issue uses standalone parent for non-campaign tests
   - Exposed in /system/jira-config GET+PATCH and SettingsPage UI

4. Tests table: Created + Updated columns
   - Add Created column (created_at), fix Updated to show updated_at
   - Both use UTC-aware date parsing (append Z if no tz suffix)
   - updated_at added to Test TypeScript interface

5. Sortable columns in tests table
   - All 7 columns sortable: Name, Technique, State, Current Team,
     Platform, Created, Updated
   - Click to sort asc, click again to reverse; ChevronUp/Down indicator
   - Default sort: Created desc (newest first)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 13:07:46 +02:00

823 lines
28 KiB
Python

"""Jira integration service.
Authentication model
--------------------
Each Aegis user authenticates to Jira with their own Atlassian email and
personal API token. The email used is ``user.jira_email`` when set, falling
back to ``user.email`` (the Aegis account email). This lets users specify a
separate corporate Atlassian email without changing their Aegis login.
The token is stored in ``user.jira_api_token``.
Admin configuration
-------------------
The Jira URL and default project key are stored in the ``system_configs``
table (keys ``jira.url`` and ``jira.project_key``) so the admin can update
them at runtime without redeploying. These values override the legacy
``settings.JIRA_URL`` / ``settings.JIRA_DEFAULT_PROJECT`` env-vars which are
kept for backwards-compatibility only.
Lifecycle hooks
---------------
``push_test_event()`` is the single entry-point called from the test-workflow
service on every state transition. It posts a rich comment to the linked
Jira issue (if one exists) using the acting user's credentials.
``auto_create_test_issue()`` is called once after a test is created; it
creates the Jira ticket and stores the link.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.config import settings
from app.domain.errors import EntityNotFoundError
from app.domain.exceptions import InvalidOperationError
from app.models.campaign import Campaign
from app.models.jira_link import JiraLink, JiraLinkEntityType, JiraSyncDirection
from app.models.technique import Technique
from app.models.test import Test
from app.models.user import User
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# System-config helpers (admin-configurable Jira settings)
# ---------------------------------------------------------------------------
_JIRA_KEYS = {
"url": "jira.url",
"project_key": "jira.project_key",
"enabled": "jira.enabled",
}
def _read_system_config(db: Session, key: str) -> Optional[str]:
"""Return a value from system_configs, or None if not set."""
from app.models.system_config import SystemConfig # avoid circular at import time
row = db.query(SystemConfig).filter(SystemConfig.key == key).first()
return row.value if row else None
def get_jira_url(db: Session) -> Optional[str]:
"""Return the admin-configured Jira URL, falling back to the env-var."""
return _read_system_config(db, _JIRA_KEYS["url"]) or settings.JIRA_URL or None
def get_jira_project_key(db: Session) -> Optional[str]:
"""Return the admin-configured default project key, falling back to env-var."""
return (
_read_system_config(db, _JIRA_KEYS["project_key"])
or settings.JIRA_DEFAULT_PROJECT
or None
)
def is_jira_enabled(db: Session) -> bool:
"""Return True if Jira integration is enabled (DB setting or env-var)."""
db_val = _read_system_config(db, _JIRA_KEYS["enabled"])
if db_val is not None:
return db_val.lower() in ("true", "1", "yes")
return settings.JIRA_ENABLED
def get_jira_parent_ticket(db: Session) -> Optional[str]:
"""Return the configured parent ticket key for campaigns, or None if not set."""
return _read_system_config(db, "jira.parent_ticket") or None
def get_jira_parent_ticket_standalone(db: Session) -> Optional[str]:
"""Return the parent ticket for standalone tests (not in a campaign).
Falls back to get_jira_parent_ticket() if not explicitly configured.
"""
return (
_read_system_config(db, "jira.parent_ticket_standalone")
or get_jira_parent_ticket(db)
)
def upsert_jira_config(db: Session, key: str, value: str) -> None:
"""Persist a Jira config key-value pair."""
from app.models.system_config import SystemConfig
row = db.query(SystemConfig).filter(SystemConfig.key == key).first()
if row:
row.value = value
else:
db.add(SystemConfig(key=key, value=value))
# ---------------------------------------------------------------------------
# Per-user Jira client
# ---------------------------------------------------------------------------
def _effective_jira_email(user: User) -> Optional[str]:
"""Return the email to use for Jira auth: jira_email if set, otherwise email."""
return getattr(user, "jira_email", None) or user.email
def get_user_jira_client(user: User, db: Session):
"""Build an Atlassian Jira client authenticated as *user*.
Uses ``user.jira_email`` when set, otherwise falls back to ``user.email``.
Raises ``InvalidOperationError`` when configuration is incomplete so
callers can surface meaningful error messages.
"""
jira_url = get_jira_url(db)
if not jira_url:
raise InvalidOperationError(
"Jira URL is not configured. Ask your administrator to set it in "
"System Settings → Jira Configuration."
)
auth_email = _effective_jira_email(user)
if not auth_email:
raise InvalidOperationError(
"No email configured for Jira authentication. "
"Set a Jira email in Settings → Profile → Jira Integration."
)
if not user.jira_api_token:
raise InvalidOperationError(
"You have not configured a Jira API token. "
"Go to Settings → Profile → Jira Integration and add your personal Atlassian token."
)
from atlassian import Jira
# Strip trailing slash — the Atlassian library appends paths like
# /rest/api/2/myself and a trailing slash causes double-slash URLs.
clean_url = jira_url.rstrip("/")
return Jira(
url=clean_url,
username=auth_email,
password=user.jira_api_token,
cloud=True,
)
def has_jira_configured(user: User, db: Session) -> bool:
"""Return True if *user* has everything needed to call Jira."""
return bool(get_jira_url(db) and _effective_jira_email(user) and user.jira_api_token)
# ---------------------------------------------------------------------------
# Ticket content builders (inspired by the pentest-to-Jira script)
# ---------------------------------------------------------------------------
_SEVERITY_TO_PRIORITY: dict[str, str] = {
"critical": "Highest",
"high": "High",
"medium": "Medium",
"low": "Low",
"informational": "Lowest",
}
_STATE_EMOJI: dict[str, str] = {
"draft": "📝 Draft",
"red_executing": "🔴 Red Team Executing",
"blue_evaluating": "🔵 Blue Team Evaluating",
"in_review": "📋 In Review",
"validated": "✅ Validated",
"rejected": "❌ Rejected",
}
def _technique_severity(technique: Optional[Technique]) -> str:
"""Return a lowercase severity string from the technique, defaulting to medium."""
if technique and hasattr(technique, "severity") and technique.severity:
return technique.severity.lower()
return "medium"
def _build_test_description(test: Test, technique: Optional[Technique]) -> str:
"""Build the initial Jira ticket description for a newly created test."""
mitre_id = technique.mitre_id if technique else "N/A"
tech_name = technique.name if technique else "N/A"
tactic = technique.tactic if technique else "N/A"
severity = _technique_severity(technique).capitalize()
lines = [
"h2. Aegis Security Test",
"",
f"*Test Name:* {test.name}",
f"*MITRE Technique:* [{mitre_id}|https://attack.mitre.org/techniques/{mitre_id.replace('.', '/')}] — {tech_name}",
f"*Tactic:* {tactic}",
f"*Platform:* {test.platform or 'N/A'}",
f"*Severity:* {severity}",
f"*Data Classification:* {test.data_classification or 'N/A'}",
"",
"h3. Description",
test.description or "_No description provided._",
"",
"h3. Proof of Concept",
f"{{code}}{test.procedure_text or 'N/A'}{{code}}",
"",
f"*Tool:* {test.tool_used or 'N/A'}",
"",
"----",
f"_Created via Aegis at {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC_",
]
return "\n".join(lines)
def _build_state_comment(
test: Test,
new_state: str,
actor: User,
extra: dict | None = None,
) -> str:
"""Build a Jira comment body for a test state transition."""
label = _STATE_EMOJI.get(new_state, new_state)
lines = [
f"h3. {label}",
"",
f"*Changed by:* {actor.username} ({actor.email or 'no email'})",
f"*At:* {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC",
"",
]
if new_state == "red_executing":
lines += [
"Red Team has started the attack execution.",
]
elif new_state == "blue_evaluating":
lines += [
"Red Team has finished execution and submitted evidence for Blue Team evaluation.",
"",
f"*Attack Success:* {test.attack_success if test.attack_success is not None else 'N/A'}",
]
if test.red_summary:
lines += ["", "h4. Red Team Summary", test.red_summary]
elif new_state == "in_review":
lines += [
"Blue Team has completed evaluation. Test is awaiting lead validation.",
"",
f"*Detection Result:* {test.detection_result or 'N/A'}",
]
if test.blue_summary:
lines += ["", "h4. Blue Team Summary", test.blue_summary]
if test.remediation_steps:
lines += ["", "h4. Remediation Steps", test.remediation_steps]
elif new_state == "validated":
lines += [
"Test has been *validated* by both leads.",
"",
f"*Red Lead Status:* {test.red_validation_status or 'N/A'}",
f"*Blue Lead Status:* {test.blue_validation_status or 'N/A'}",
]
if test.red_validation_notes:
lines += ["", f"*Red Lead Notes:* {test.red_validation_notes}"]
if test.blue_validation_notes:
lines += ["", f"*Blue Lead Notes:* {test.blue_validation_notes}"]
elif new_state == "rejected":
lines += [
"Test has been *rejected* and must be reworked.",
"",
f"*Red Lead Status:* {test.red_validation_status or 'N/A'}",
f"*Blue Lead Status:* {test.blue_validation_status or 'N/A'}",
]
if test.red_validation_notes:
lines += ["", f"*Red Lead Notes:* {test.red_validation_notes}"]
if test.blue_validation_notes:
lines += ["", f"*Blue Lead Notes:* {test.blue_validation_notes}"]
elif new_state == "draft":
lines += ["Test has been reopened for re-execution."]
# Any caller-supplied extra data
if extra:
lines.append("")
for k, v in extra.items():
lines.append(f"*{k}:* {v}")
lines.append("")
lines.append("_Synced from [Aegis|https://aegis.undiamagico.es]_")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Public lifecycle hooks
# ---------------------------------------------------------------------------
def _build_campaign_description(campaign) -> str:
"""Build the Jira ticket description for a campaign."""
lines = [
"h2. Aegis Security Campaign",
"",
f"*Campaign Name:* {campaign.name}",
f"*Type:* {campaign.type}",
f"*Status:* {campaign.status}",
]
if campaign.description:
lines += ["", "h3. Description", campaign.description]
if campaign.tags:
lines += ["", f"*Tags:* {', '.join(campaign.tags)}"]
lines += [
"",
"----",
f"_Created via Aegis at {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC_",
]
return "\n".join(lines)
def get_campaign_jira_key(db: Session, campaign_id) -> Optional[str]:
"""Return the Jira issue key for a campaign, or None if not linked."""
import uuid as _uuid
try:
cid = _uuid.UUID(str(campaign_id))
except (ValueError, AttributeError):
return None
link = (
db.query(JiraLink)
.filter(
JiraLink.entity_type == JiraLinkEntityType.campaign,
JiraLink.entity_id == cid,
)
.first()
)
return link.jira_issue_key if link else None
def get_test_jira_key(db: Session, test_id) -> Optional[str]:
"""Return the Jira issue key for a test, or None if not linked."""
import uuid as _uuid
try:
tid = _uuid.UUID(str(test_id))
except (ValueError, AttributeError):
return None
link = (
db.query(JiraLink)
.filter(
JiraLink.entity_type == JiraLinkEntityType.test,
JiraLink.entity_id == tid,
)
.first()
)
return link.jira_issue_key if link else None
def auto_create_campaign_issue(
db: Session,
campaign,
actor: User,
) -> Optional[str]:
"""Create a Jira issue for *campaign* under the configured parent ticket.
Returns the Jira issue key on success, or ``None`` if Jira is not
configured for *actor* or if the operation fails (non-fatal).
Called once right after a campaign is committed to the database.
The created ticket is stored as a JiraLink with entity_type=campaign.
"""
if not has_jira_configured(actor, db):
return None
project_key = get_jira_project_key(db)
if not project_key:
logger.warning(
"Jira project key not configured; skipping auto-create for campaign %s",
campaign.id,
)
return None
parent_ticket = get_jira_parent_ticket(db)
try:
jira = get_user_jira_client(actor, db)
fields: dict = {
"project": {"key": project_key},
"summary": f"[Aegis Campaign] {campaign.name}",
"description": _build_campaign_description(campaign),
"issuetype": {"name": settings.JIRA_ISSUE_TYPE_CAMPAIGN},
"labels": ["aegis", "campaign"],
}
# Always nest under the configured parent ticket (e.g. OFS-9107)
if parent_ticket:
fields["parent"] = {"key": parent_ticket}
result = jira.issue_create(fields=fields)
issue_key = result["key"]
issue_id = result.get("id", "")
link = JiraLink(
entity_type=JiraLinkEntityType.campaign,
entity_id=campaign.id,
jira_issue_key=issue_key,
jira_issue_id=issue_id,
jira_project_key=project_key,
sync_direction=JiraSyncDirection.aegis_to_jira,
created_by=actor.id,
)
db.add(link)
db.flush()
logger.info("Auto-created Jira issue %s for campaign %s", issue_key, campaign.id)
return issue_key
except Exception as exc:
# Non-fatal: Jira failures must never break the campaign creation flow
logger.warning(
"Failed to auto-create Jira issue for campaign %s: %s",
campaign.id, exc, exc_info=True,
)
return None
def auto_create_test_issue(
db: Session,
test: Test,
actor: User,
*,
technique: Optional[Technique] = None,
parent_ticket_override: Optional[str] = None,
) -> Optional[str]:
"""Create a Jira issue for *test* and store the link.
Returns the Jira issue key on success, or ``None`` if Jira is not
configured for *actor* or if the operation fails (non-fatal).
Called once right after a test is committed to the database.
Args:
parent_ticket_override: When set, use this as the Jira parent ticket
instead of the system-configured parent (e.g. OFS-9107).
Use this to nest test tickets under a campaign ticket.
"""
if not has_jira_configured(actor, db):
return None
project_key = get_jira_project_key(db)
if not project_key:
logger.warning("Jira project key not configured; skipping auto-create for test %s", test.id)
return None
# Resolve technique if not supplied
if technique is None:
technique = db.query(Technique).filter(Technique.id == test.technique_id).first()
severity = _technique_severity(technique)
mitre_id = technique.mitre_id if technique else "N/A"
try:
jira = get_user_jira_client(actor, db)
# Tests nested under a campaign are Sub-tasks; standalone tests are Tasks
issue_type = (
settings.JIRA_ISSUE_TYPE_SUBTASK
if parent_ticket_override
else settings.JIRA_ISSUE_TYPE_TEST
)
fields: dict = {
"project": {"key": project_key},
"summary": f"[Aegis] {mitre_id}{test.name}",
"description": _build_test_description(test, technique),
"issuetype": {"name": issue_type},
"labels": ["aegis", "security-test", mitre_id.replace(".", "-")],
}
# Use campaign ticket as parent when provided; otherwise use the
# standalone-tests parent (e.g. OFS-20798), falling back to the
# general parent ticket if the standalone one is not configured.
parent = parent_ticket_override or get_jira_parent_ticket_standalone(db)
if parent:
fields["parent"] = {"key": parent}
result = jira.issue_create(fields=fields)
issue_key = result["key"]
issue_id = result.get("id", "")
link = JiraLink(
entity_type=JiraLinkEntityType.test,
entity_id=test.id,
jira_issue_key=issue_key,
jira_issue_id=issue_id,
jira_project_key=project_key,
sync_direction=JiraSyncDirection.aegis_to_jira,
created_by=actor.id,
)
db.add(link)
db.flush()
logger.info("Auto-created Jira issue %s for test %s", issue_key, test.id)
return issue_key
except Exception as exc:
# Non-fatal: Jira failures must never break the test creation flow
logger.warning(
"Failed to auto-create Jira issue for test %s: %s",
test.id, exc, exc_info=True,
)
return None
def push_test_event(
db: Session,
test: Test,
actor: User,
new_state: str,
*,
extra: dict | None = None,
) -> None:
"""Post a lifecycle comment to the Jira issue linked to *test*.
Called from ``test_workflow_service`` after every state transition.
Completely non-fatal — any Jira error is logged and swallowed so it
never blocks the test workflow.
"""
if not has_jira_configured(actor, db):
return
link = (
db.query(JiraLink)
.filter(
JiraLink.entity_type == JiraLinkEntityType.test,
JiraLink.entity_id == test.id,
)
.first()
)
if not link:
return
try:
jira = get_user_jira_client(actor, db)
comment = _build_state_comment(test, new_state, actor, extra)
jira.issue_add_comment(link.jira_issue_key, comment)
# When the operator starts execution: transition to "In Progress"
# and assign the ticket to that operator.
if new_state == "red_executing":
try:
jira.set_issue_status(link.jira_issue_key, "In Progress")
logger.info(
"Transitioned Jira ticket %s to In Progress", link.jira_issue_key
)
except Exception as exc_t:
logger.warning(
"Could not transition %s to In Progress: %s",
link.jira_issue_key, exc_t,
)
jira_account_id = getattr(actor, "jira_account_id", None)
if jira_account_id:
try:
jira.assign_issue(link.jira_issue_key, account_id=jira_account_id)
logger.info(
"Assigned Jira ticket %s to account %s",
link.jira_issue_key, jira_account_id,
)
except Exception as exc_a:
logger.warning(
"Could not assign %s to %s: %s",
link.jira_issue_key, jira_account_id, exc_a,
)
link.last_synced_at = datetime.utcnow()
db.flush()
logger.info(
"Posted Jira comment to %s for test %s state=%s",
link.jira_issue_key, test.id, new_state,
)
except Exception as exc:
logger.warning(
"Failed to push Jira event for test %s (state=%s): %s",
test.id, new_state, exc, exc_info=True,
)
# ---------------------------------------------------------------------------
# Legacy / generic helpers (kept for existing routes)
# ---------------------------------------------------------------------------
def get_jira_client():
"""Return a shared Jira client using global credentials (legacy path).
Raises ``InvalidOperationError`` when Jira is disabled or unconfigured.
Prefer ``get_user_jira_client()`` for new code.
"""
if not settings.JIRA_ENABLED:
raise InvalidOperationError("Jira integration is not enabled")
if not settings.JIRA_URL or not settings.JIRA_USERNAME or not settings.JIRA_API_TOKEN:
raise InvalidOperationError(
"Jira is enabled but JIRA_URL / JIRA_USERNAME / JIRA_API_TOKEN are not set"
)
from atlassian import Jira
return Jira(
url=settings.JIRA_URL,
username=settings.JIRA_USERNAME,
password=settings.JIRA_API_TOKEN,
cloud=settings.JIRA_IS_CLOUD,
)
def search_jira_issues(query: str, max_results: int = 10) -> list[dict]:
"""Search Jira issues by JQL or free text (uses global credentials)."""
jira = get_jira_client()
jql = query if "=" in query or "~" in query else f'summary ~ "{query}"'
results = jira.jql(jql, limit=max_results)
return [
{
"issue_key": issue["key"],
"summary": issue["fields"]["summary"],
"status": issue["fields"]["status"]["name"],
"assignee": (issue["fields"].get("assignee") or {}).get("displayName"),
"priority": (issue["fields"].get("priority") or {}).get("name"),
}
for issue in results.get("issues", [])
]
def create_jira_issue(
project_key: str,
summary: str,
description: str,
issue_type: str = "Task",
labels: Optional[list[str]] = None,
custom_fields: Optional[dict] = None,
) -> dict:
"""Create a Jira issue and return its key + id (uses global credentials)."""
jira = get_jira_client()
fields: dict = {
"project": {"key": project_key},
"summary": summary,
"description": description,
"issuetype": {"name": issue_type},
}
if labels:
fields["labels"] = labels
if custom_fields:
fields.update(custom_fields)
result = jira.issue_create(fields=fields)
return {"issue_key": result["key"], "issue_id": result["id"]}
def sync_jira_to_aegis(db: Session, link: JiraLink) -> None:
"""Pull current status from Jira into the local link record (global creds)."""
jira = get_jira_client()
issue = jira.issue(link.jira_issue_key)
fields = issue.get("fields", {})
link.jira_status = fields.get("status", {}).get("name")
link.jira_priority = (fields.get("priority") or {}).get("name")
link.jira_assignee = (fields.get("assignee") or {}).get("displayName")
link.jira_story_points = str(fields.get("customfield_10016", ""))
link.last_synced_at = datetime.utcnow()
db.flush()
def sync_aegis_to_jira(db: Session, link: JiraLink, entity_data: dict) -> None:
"""Push an Aegis status update as a Jira comment (global creds)."""
jira = get_jira_client()
comment_body = _build_sync_comment(entity_data)
jira.issue_add_comment(link.jira_issue_key, comment_body)
link.last_synced_at = datetime.utcnow()
db.flush()
def _build_sync_comment(data: dict) -> str:
lines = ["h3. Aegis Sync Update", ""]
for key, value in data.items():
lines.append(f"*{key}:* {value}")
lines.append(f"\n_Synced at {datetime.utcnow().isoformat()}_")
return "\n".join(lines)
# ── Link CRUD ────────────────────────────────────────────────────────────
def create_link(
db: Session,
*,
entity_type: JiraLinkEntityType,
entity_id: UUID,
jira_issue_key: str,
sync_direction: JiraSyncDirection,
created_by: UUID,
) -> JiraLink:
link = JiraLink(
entity_type=entity_type,
entity_id=entity_id,
jira_issue_key=jira_issue_key,
sync_direction=sync_direction,
created_by=created_by,
)
db.add(link)
db.flush()
if settings.JIRA_ENABLED:
try:
sync_jira_to_aegis(db, link)
except Exception as e:
logger.warning("Initial Jira sync failed for %s: %s", jira_issue_key, e)
return link
def list_links(
db: Session,
*,
entity_type: Optional[JiraLinkEntityType] = None,
entity_id: Optional[UUID] = None,
) -> list[JiraLink]:
query = db.query(JiraLink)
if entity_type:
query = query.filter(JiraLink.entity_type == entity_type)
if entity_id:
query = query.filter(JiraLink.entity_id == entity_id)
return query.order_by(JiraLink.created_at.desc()).all()
def get_link_or_raise(db: Session, link_id: UUID) -> JiraLink:
link = db.query(JiraLink).filter(JiraLink.id == link_id).first()
if not link:
raise EntityNotFoundError("JiraLink", str(link_id))
return link
def delete_link(db: Session, link_id: UUID) -> JiraLink:
link = get_link_or_raise(db, link_id)
db.delete(link)
return link
def build_issue_data(
db: Session, entity_type: JiraLinkEntityType, entity_id: UUID
) -> tuple[str, str]:
"""Build Jira issue summary and description from an Aegis entity."""
if entity_type == JiraLinkEntityType.test:
entity = db.query(Test).filter(Test.id == entity_id).first()
if not entity:
raise EntityNotFoundError("Test", str(entity_id))
technique = db.query(Technique).filter(Technique.id == entity.technique_id).first()
return (
f"[Aegis] {technique.mitre_id if technique else 'N/A'}{entity.name}",
_build_test_description(entity, technique),
)
elif entity_type == JiraLinkEntityType.campaign:
entity = db.query(Campaign).filter(Campaign.id == entity_id).first()
if not entity:
raise EntityNotFoundError("Campaign", str(entity_id))
return (
f"[Aegis Campaign] {entity.name}",
f"Campaign: {entity.name}\nType: {entity.type}\nStatus: {entity.status}\n"
f"Description: {entity.description or 'N/A'}",
)
elif entity_type == JiraLinkEntityType.technique:
entity = db.query(Technique).filter(Technique.id == entity_id).first()
if not entity:
raise EntityNotFoundError("Technique", str(entity_id))
return (
f"[Aegis Technique] {entity.mitre_id} - {entity.name}",
f"MITRE ID: {entity.mitre_id}\nName: {entity.name}\n"
f"Tactic: {entity.tactic or 'N/A'}\n"
f"Description: {entity.description or 'N/A'}",
)
else:
return f"[Aegis] Entity {entity_id}", f"Entity type: {entity_type.value}"
def create_issue_and_link(
db: Session,
*,
entity_type: JiraLinkEntityType,
entity_id: UUID,
created_by: UUID,
) -> dict:
"""Create a Jira issue from an Aegis entity and link them (global creds)."""
summary, description = build_issue_data(db, entity_type, entity_id)
project_key = settings.JIRA_DEFAULT_PROJECT
result = create_jira_issue(
project_key=project_key,
summary=summary,
description=description,
labels=["aegis", entity_type.value],
)
link = JiraLink(
entity_type=entity_type,
entity_id=entity_id,
jira_issue_key=result["issue_key"],
jira_issue_id=result["issue_id"],
jira_project_key=project_key,
created_by=created_by,
)
db.add(link)
return {"issue_key": result["issue_key"], "link_id": str(link.id)}