"""Jira integration service. Authentication model -------------------- Each Aegis user authenticates to Jira with their own Atlassian email and personal API token. The email used is ``user.jira_email`` when set, falling back to ``user.email`` (the Aegis account email). This lets users specify a separate corporate Atlassian email without changing their Aegis login. The token is stored in ``user.jira_api_token``. Admin configuration ------------------- The Jira URL and default project key are stored in the ``system_configs`` table (keys ``jira.url`` and ``jira.project_key``) so the admin can update them at runtime without redeploying. These values override the legacy ``settings.JIRA_URL`` / ``settings.JIRA_DEFAULT_PROJECT`` env-vars which are kept for backwards-compatibility only. Lifecycle hooks --------------- ``push_test_event()`` is the single entry-point called from the test-workflow service on every state transition. It posts a rich comment to the linked Jira issue (if one exists) using the acting user's credentials. ``auto_create_test_issue()`` is called once after a test is created; it creates the Jira ticket and stores the link. """ from __future__ import annotations # Import logging import logging # Import datetime from datetime from datetime import datetime # Import Any, Optional from typing from typing import Optional # Import UUID from uuid from uuid import UUID # Import Session from sqlalchemy.orm from sqlalchemy.orm import Session # Import settings from app.config from app.config import settings # Import EntityNotFoundError from app.domain.errors from app.domain.errors import EntityNotFoundError # Import InvalidOperationError from app.domain.exceptions from app.domain.exceptions import InvalidOperationError # Import Campaign from app.models.campaign from app.models.campaign import Campaign # Import JiraLink, JiraLinkEntityType, JiraSyncDirection from app.models.jira_link from app.models.jira_link import JiraLink, JiraLinkEntityType, JiraSyncDirection # Import Technique from app.models.technique from app.models.technique import Technique # Import Test from app.models.test from app.models.test import Test from app.models.user import User # Assign logger = logging.getLogger(__name__) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # System-config helpers (admin-configurable Jira settings) # --------------------------------------------------------------------------- _JIRA_KEYS = { "url": "jira.url", "project_key": "jira.project_key", "enabled": "jira.enabled", } def _read_system_config(db: Session, key: str) -> Optional[str]: """Return a value from system_configs, or None if not set.""" from app.models.system_config import SystemConfig # avoid circular at import time row = db.query(SystemConfig).filter(SystemConfig.key == key).first() return row.value if row else None def get_jira_url(db: Session) -> Optional[str]: """Return the admin-configured Jira URL, falling back to the env-var.""" return _read_system_config(db, _JIRA_KEYS["url"]) or settings.JIRA_URL or None def get_jira_project_key(db: Session) -> Optional[str]: """Return the admin-configured default project key, falling back to env-var.""" return ( _read_system_config(db, _JIRA_KEYS["project_key"]) or settings.JIRA_DEFAULT_PROJECT or None ) def is_jira_enabled(db: Session) -> bool: """Return True if Jira integration is enabled (DB setting or env-var).""" db_val = _read_system_config(db, _JIRA_KEYS["enabled"]) if db_val is not None: return db_val.lower() in ("true", "1", "yes") return settings.JIRA_ENABLED def get_jira_parent_ticket(db: Session) -> Optional[str]: """Return the configured parent ticket key for campaigns, or None if not set.""" return _read_system_config(db, "jira.parent_ticket") or None def get_jira_parent_ticket_standalone(db: Session) -> Optional[str]: """Return the parent ticket for standalone tests (not in a campaign). Falls back to get_jira_parent_ticket() if not explicitly configured. """ return ( _read_system_config(db, "jira.parent_ticket_standalone") or get_jira_parent_ticket(db) ) def upsert_jira_config(db: Session, key: str, value: str) -> None: """Persist a Jira config key-value pair.""" from app.models.system_config import SystemConfig row = db.query(SystemConfig).filter(SystemConfig.key == key).first() if row: row.value = value else: db.add(SystemConfig(key=key, value=value)) # --------------------------------------------------------------------------- # Per-user Jira client # --------------------------------------------------------------------------- def _effective_jira_email(user: User) -> Optional[str]: """Return the email to use for Jira auth: jira_email if set, otherwise email.""" return getattr(user, "jira_email", None) or user.email def get_user_jira_client(user: User, db: Session): """Build an Atlassian Jira client authenticated as *user*. Uses ``user.jira_email`` when set, otherwise falls back to ``user.email``. Raises ``InvalidOperationError`` when configuration is incomplete so callers can surface meaningful error messages. """ jira_url = get_jira_url(db) if not jira_url: raise InvalidOperationError( "Jira URL is not configured. Ask your administrator to set it in " "System Settings → Jira Configuration." ) auth_email = _effective_jira_email(user) if not auth_email: raise InvalidOperationError( "No email configured for Jira authentication. " "Set a Jira email in Settings → Profile → Jira Integration." ) if not user.jira_api_token: raise InvalidOperationError( "You have not configured a Jira API token. " "Go to Settings → Profile → Jira Integration and add your personal Atlassian token." ) from atlassian import Jira # Strip trailing slash — the Atlassian library appends paths like # /rest/api/2/myself and a trailing slash causes double-slash URLs. clean_url = jira_url.rstrip("/") return Jira( url=clean_url, username=auth_email, password=user.jira_api_token, cloud=True, ) def has_jira_configured(user: User, db: Session) -> bool: """Return True if *user* has everything needed to call Jira.""" return bool(get_jira_url(db) and _effective_jira_email(user) and user.jira_api_token) # --------------------------------------------------------------------------- # Ticket content builders (inspired by the pentest-to-Jira script) # --------------------------------------------------------------------------- _SEVERITY_TO_PRIORITY: dict[str, str] = { "critical": "Highest", "high": "High", "medium": "Medium", "low": "Low", "informational": "Lowest", } _STATE_EMOJI: dict[str, str] = { "draft": "📝 Draft", "red_executing": "🔴 Red Team Executing", "blue_evaluating": "🔵 Blue Team Evaluating", "in_review": "📋 In Review", "validated": "✅ Validated", "rejected": "❌ Rejected", } def _technique_severity(technique: Optional[Technique]) -> str: """Return a lowercase severity string from the technique, defaulting to medium.""" if technique and hasattr(technique, "severity") and technique.severity: return technique.severity.lower() return "medium" def _build_test_description(test: Test, technique: Optional[Technique]) -> str: """Build the initial Jira ticket description for a newly created test.""" mitre_id = technique.mitre_id if technique else "N/A" tech_name = technique.name if technique else "N/A" tactic = technique.tactic if technique else "N/A" severity = _technique_severity(technique).capitalize() lines = [ "h2. Aegis Security Test", "", f"*Test Name:* {test.name}", f"*MITRE Technique:* [{mitre_id}|https://attack.mitre.org/techniques/{mitre_id.replace('.', '/')}] — {tech_name}", f"*Tactic:* {tactic}", f"*Platform:* {test.platform or 'N/A'}", f"*Severity:* {severity}", f"*Data Classification:* {test.data_classification or 'N/A'}", "", "h3. Description", test.description or "_No description provided._", "", f"*Tool:* {test.tool_used or 'N/A'}", "", "----", f"_Created via Aegis at {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC_", ] return "\n".join(lines) def _build_state_comment( test: Test, new_state: str, actor: User, extra: dict | None = None, ) -> str: """Build a Jira comment body for a test state transition.""" label = _STATE_EMOJI.get(new_state, new_state) lines = [ f"h3. {label}", "", f"*Changed by:* {actor.username} ({actor.email or 'no email'})", f"*At:* {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC", "", ] if new_state == "red_executing": lines += [ "Red Team has started the attack execution.", ] elif new_state == "blue_evaluating": lines += [ "Red Team has finished execution and submitted evidence for Blue Team evaluation.", "", f"*Attack Success:* {test.attack_success if test.attack_success is not None else 'N/A'}", ] if test.red_summary: lines += ["", "h4. Red Team Summary", test.red_summary] elif new_state == "in_review": lines += [ "Blue Team has completed evaluation. Test is awaiting lead validation.", "", f"*Detection Result:* {test.detection_result or 'N/A'}", ] if test.blue_summary: lines += ["", "h4. Blue Team Summary", test.blue_summary] if test.remediation_steps: lines += ["", "h4. Remediation Steps", test.remediation_steps] elif new_state == "validated": lines += [ "Test has been *validated* by both leads.", "", f"*Red Lead Status:* {test.red_validation_status or 'N/A'}", f"*Blue Lead Status:* {test.blue_validation_status or 'N/A'}", ] if test.red_validation_notes: lines += ["", f"*Red Lead Notes:* {test.red_validation_notes}"] if test.blue_validation_notes: lines += ["", f"*Blue Lead Notes:* {test.blue_validation_notes}"] elif new_state == "rejected": lines += [ "Test has been *rejected* and must be reworked.", "", f"*Red Lead Status:* {test.red_validation_status or 'N/A'}", f"*Blue Lead Status:* {test.blue_validation_status or 'N/A'}", ] if test.red_validation_notes: lines += ["", f"*Red Lead Notes:* {test.red_validation_notes}"] if test.blue_validation_notes: lines += ["", f"*Blue Lead Notes:* {test.blue_validation_notes}"] elif new_state == "draft": lines += ["Test has been reopened for re-execution."] # Any caller-supplied extra data if extra: lines.append("") for k, v in extra.items(): lines.append(f"*{k}:* {v}") lines.append("") lines.append("_Synced from [Aegis|https://aegis.undiamagico.es]_") return "\n".join(lines) # --------------------------------------------------------------------------- # Public lifecycle hooks # --------------------------------------------------------------------------- def _build_campaign_description(campaign) -> str: """Build the Jira ticket description for a campaign.""" lines = [ "h2. Aegis Security Campaign", "", f"*Campaign Name:* {campaign.name}", f"*Type:* {campaign.type}", f"*Status:* {campaign.status}", ] if campaign.description: lines += ["", "h3. Description", campaign.description] if campaign.tags: lines += ["", f"*Tags:* {', '.join(campaign.tags)}"] lines += [ "", "----", f"_Created via Aegis at {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC_", ] return "\n".join(lines) def get_campaign_jira_key(db: Session, campaign_id) -> Optional[str]: """Return the Jira issue key for a campaign, or None if not linked.""" import uuid as _uuid try: cid = _uuid.UUID(str(campaign_id)) except (ValueError, AttributeError): return None link = ( db.query(JiraLink) .filter( JiraLink.entity_type == JiraLinkEntityType.campaign, JiraLink.entity_id == cid, ) .first() ) return link.jira_issue_key if link else None def get_test_jira_key(db: Session, test_id) -> Optional[str]: """Return the Jira issue key for a test, or None if not linked.""" import uuid as _uuid try: tid = _uuid.UUID(str(test_id)) except (ValueError, AttributeError): return None link = ( db.query(JiraLink) .filter( JiraLink.entity_type == JiraLinkEntityType.test, JiraLink.entity_id == tid, ) .first() ) return link.jira_issue_key if link else None def auto_create_campaign_issue( db: Session, campaign, actor: User, ) -> Optional[str]: """Create a Jira issue for *campaign* under the configured parent ticket. Returns the Jira issue key on success, or ``None`` if Jira is not configured for *actor* or if the operation fails (non-fatal). Called once right after a campaign is committed to the database. The created ticket is stored as a JiraLink with entity_type=campaign. """ if not has_jira_configured(actor, db): return None project_key = get_jira_project_key(db) if not project_key: logger.warning( "Jira project key not configured; skipping auto-create for campaign %s", campaign.id, ) return None parent_ticket = get_jira_parent_ticket(db) try: jira = get_user_jira_client(actor, db) fields: dict = { "project": {"key": project_key}, "summary": f"[Aegis Campaign] {campaign.name}", "description": _build_campaign_description(campaign), "issuetype": {"name": settings.JIRA_ISSUE_TYPE_CAMPAIGN}, "labels": ["aegis", "campaign"], # customfield_10011 = Epic Name (required for Epic type in classic Jira) "customfield_10011": campaign.name, } # Set start date: use campaign.start_date if set, otherwise today effective_start = campaign.start_date or campaign.created_at if effective_start: fields[settings.JIRA_START_DATE_FIELD] = effective_start.strftime("%Y-%m-%d") # Nest under the configured parent ticket (Initiative, e.g. OFS-20795) if parent_ticket: fields["parent"] = {"key": parent_ticket} result = jira.issue_create(fields=fields) issue_key = result["key"] issue_id = result.get("id", "") link = JiraLink( entity_type=JiraLinkEntityType.campaign, entity_id=campaign.id, jira_issue_key=issue_key, jira_issue_id=issue_id, jira_project_key=project_key, sync_direction=JiraSyncDirection.aegis_to_jira, created_by=actor.id, ) db.add(link) db.flush() logger.info("Auto-created Jira issue %s for campaign %s", issue_key, campaign.id) return issue_key except Exception as exc: # Non-fatal: Jira failures must never break the campaign creation flow logger.warning( "Failed to auto-create Jira issue for campaign %s: %s", campaign.id, exc, exc_info=True, ) return None def auto_create_test_issue( db: Session, test: Test, actor: User, *, technique: Optional[Technique] = None, parent_ticket_override: Optional[str] = None, campaign_start_date=None, # datetime | None — inherited from campaign when available ) -> Optional[str]: """Create a Jira issue for *test* and store the link. Returns the Jira issue key on success, or ``None`` if Jira is not configured for *actor* or if the operation fails (non-fatal). Called once right after a test is committed to the database. Args: parent_ticket_override: When set, use this as the Jira parent ticket instead of the system-configured parent (e.g. OFS-9107). Use this to nest test tickets under a campaign ticket. """ if not has_jira_configured(actor, db): return None project_key = get_jira_project_key(db) if not project_key: logger.warning("Jira project key not configured; skipping auto-create for test %s", test.id) return None # Resolve technique if not supplied if technique is None: technique = db.query(Technique).filter(Technique.id == test.technique_id).first() mitre_id = technique.mitre_id if technique else "N/A" try: jira = get_user_jira_client(actor, db) # All tests — whether inside a campaign or standalone — are created # as Task. Campaign tests use the campaign Jira key as parent # (passed via parent_ticket_override); standalone tests use the # configured standalone parent ticket (e.g. OFS-20798, which is an # Epic so it can parent Tasks). parent = parent_ticket_override or get_jira_parent_ticket_standalone(db) issue_type = settings.JIRA_ISSUE_TYPE_TEST # always Task poc = test.procedure_text or "N/A" fields: dict = { "project": {"key": project_key}, "summary": f"[Aegis] {mitre_id} — {test.name}", "description": _build_test_description(test, technique), "issuetype": {"name": issue_type}, "labels": ["aegis", "security-test", mitre_id.replace(".", "-")], # customfield_10309 = Proof of Concept field (required by team's Jira config) "customfield_10309": f"{{code}}{poc}{{code}}", } # Inherit campaign start date if available, otherwise use today from datetime import date as _date effective_start = campaign_start_date or _date.today() if hasattr(effective_start, "strftime"): fields[settings.JIRA_START_DATE_FIELD] = effective_start.strftime("%Y-%m-%d") if parent: fields["parent"] = {"key": parent} result = jira.issue_create(fields=fields) issue_key = result["key"] issue_id = result.get("id", "") link = JiraLink( entity_type=JiraLinkEntityType.test, entity_id=test.id, jira_issue_key=issue_key, jira_issue_id=issue_id, jira_project_key=project_key, sync_direction=JiraSyncDirection.aegis_to_jira, created_by=actor.id, ) db.add(link) db.flush() logger.info("Auto-created Jira issue %s for test %s", issue_key, test.id) return issue_key except Exception as exc: # Non-fatal: Jira failures must never break the test creation flow logger.warning( "Failed to auto-create Jira issue for test %s: %s", test.id, exc, exc_info=True, ) return None def push_test_event( db: Session, test: Test, actor: User, new_state: str, *, extra: dict | None = None, ) -> None: """Post a lifecycle comment to the Jira issue linked to *test*. Called from ``test_workflow_service`` after every state transition. Completely non-fatal — any Jira error is logged and swallowed so it never blocks the test workflow. """ if not has_jira_configured(actor, db): return link = ( db.query(JiraLink) .filter( JiraLink.entity_type == JiraLinkEntityType.test, JiraLink.entity_id == test.id, ) .first() ) if not link: return try: jira = get_user_jira_client(actor, db) comment = _build_state_comment(test, new_state, actor, extra) jira.issue_add_comment(link.jira_issue_key, comment) # When the operator starts execution: transition to "In Progress" # and assign the ticket to that operator. if new_state == "red_executing": try: jira.set_issue_status(link.jira_issue_key, "In Progress") logger.info( "Transitioned Jira ticket %s to In Progress", link.jira_issue_key ) except Exception as exc_t: logger.warning( "Could not transition %s to In Progress: %s", link.jira_issue_key, exc_t, ) jira_account_id = getattr(actor, "jira_account_id", None) if jira_account_id: try: jira.assign_issue(link.jira_issue_key, account_id=jira_account_id) logger.info( "Assigned Jira ticket %s to account %s", link.jira_issue_key, jira_account_id, ) except Exception as exc_a: logger.warning( "Could not assign %s to %s: %s", link.jira_issue_key, jira_account_id, exc_a, ) link.last_synced_at = datetime.utcnow() db.flush() logger.info( "Posted Jira comment to %s for test %s state=%s", link.jira_issue_key, test.id, new_state, ) except Exception as exc: logger.warning( "Failed to push Jira event for test %s (state=%s): %s", test.id, new_state, exc, exc_info=True, ) # --------------------------------------------------------------------------- # Legacy / generic helpers (kept for existing routes) # --------------------------------------------------------------------------- def get_jira_client(): """Return a shared Jira client using global credentials (legacy path). Raises ``InvalidOperationError`` when Jira is disabled or unconfigured. Prefer ``get_user_jira_client()`` for new code. """ if not settings.JIRA_ENABLED: # Raise InvalidOperationError raise InvalidOperationError("Jira integration is not enabled") if not settings.JIRA_URL or not settings.JIRA_USERNAME or not settings.JIRA_API_TOKEN: raise InvalidOperationError( "Jira is enabled but JIRA_URL / JIRA_USERNAME / JIRA_API_TOKEN are not set" ) from atlassian import Jira return Jira( url=settings.JIRA_URL, username=settings.JIRA_USERNAME, password=settings.JIRA_API_TOKEN, cloud=settings.JIRA_IS_CLOUD, ) # Define function search_jira_issues def search_jira_issues(query: str, max_results: int = 10) -> list[dict]: """Search Jira issues by JQL or free text (uses global credentials).""" jira = get_jira_client() # Assign jql = query if "=" in query or "~" in query else f'summary ~ "{query}"' jql = query if "=" in query or "~" in query else f'summary ~ "{query}"' # Assign results = jira.jql(jql, limit=max_results) results = jira.jql(jql, limit=max_results) # Return [ return [ { # Literal argument value "issue_key": issue["key"], # Literal argument value "summary": issue["fields"]["summary"], # Literal argument value "status": issue["fields"]["status"]["name"], # Literal argument value "assignee": (issue["fields"].get("assignee") or {}).get("displayName"), # Literal argument value "priority": (issue["fields"].get("priority") or {}).get("name"), } for issue in results.get("issues", []) ] # Define function create_jira_issue def create_jira_issue( # Entry: project_key project_key: str, # Entry: summary summary: str, # Entry: description description: str, # Entry: issue_type issue_type: str = "Task", # Entry: labels labels: Optional[list[str]] = None, # Entry: custom_fields custom_fields: Optional[dict] = None, ) -> dict: """Create a Jira issue and return its key + id (uses global credentials).""" jira = get_jira_client() # Assign fields = { fields: dict = { # Literal argument value "project": {"key": project_key}, # Literal argument value "summary": summary, # Literal argument value "description": description, # Literal argument value "issuetype": {"name": issue_type}, } # Check: labels if labels: # Assign fields["labels"] = labels fields["labels"] = labels # Check: custom_fields if custom_fields: # Call fields.update() fields.update(custom_fields) # Assign result = jira.issue_create(fields=fields) result = jira.issue_create(fields=fields) # Return {"issue_key": result["key"], "issue_id": result["id"]} return {"issue_key": result["key"], "issue_id": result["id"]} # Define function sync_jira_to_aegis def sync_jira_to_aegis(db: Session, link: JiraLink) -> None: """Pull current status from Jira into the local link record (global creds).""" jira = get_jira_client() # Assign issue = jira.issue(link.jira_issue_key) issue = jira.issue(link.jira_issue_key) # Assign fields = issue.get("fields", {}) fields = issue.get("fields", {}) # Assign link.jira_status = fields.get("status", {}).get("name") link.jira_status = fields.get("status", {}).get("name") # Assign link.jira_priority = (fields.get("priority") or {}).get("name") link.jira_priority = (fields.get("priority") or {}).get("name") # Assign link.jira_assignee = (fields.get("assignee") or {}).get("displayName") link.jira_assignee = (fields.get("assignee") or {}).get("displayName") # Assign link.jira_story_points = str(fields.get("customfield_10016", "")) link.jira_story_points = str(fields.get("customfield_10016", "")) # Assign link.last_synced_at = datetime.utcnow() link.last_synced_at = datetime.utcnow() # Flush changes to DB without committing the transaction db.flush() # Define function sync_aegis_to_jira def sync_aegis_to_jira(db: Session, link: JiraLink, entity_data: dict) -> None: """Push an Aegis status update as a Jira comment (global creds).""" jira = get_jira_client() # Assign comment_body = _build_sync_comment(entity_data) comment_body = _build_sync_comment(entity_data) # Call jira.issue_add_comment() jira.issue_add_comment(link.jira_issue_key, comment_body) # Assign link.last_synced_at = datetime.utcnow() link.last_synced_at = datetime.utcnow() # Flush changes to DB without committing the transaction db.flush() # Define function _build_sync_comment def _build_sync_comment(data: dict) -> str: lines = ["h3. Aegis Sync Update", ""] # Iterate over data.items() for key, value in data.items(): # Call lines.append() lines.append(f"*{key}:* {value}") # Call lines.append() lines.append(f"\n_Synced at {datetime.utcnow().isoformat()}_") # Return "\n".join(lines) return "\n".join(lines) # ── Link CRUD ──────────────────────────────────────────────────────────── def create_link( # Entry: db db: Session, *, # Entry: entity_type entity_type: JiraLinkEntityType, # Entry: entity_id entity_id: UUID, # Entry: jira_issue_key jira_issue_key: str, # Entry: sync_direction sync_direction: JiraSyncDirection, # Entry: created_by created_by: UUID, ) -> JiraLink: link = JiraLink( # Keyword argument: entity_type entity_type=entity_type, # Keyword argument: entity_id entity_id=entity_id, # Keyword argument: jira_issue_key jira_issue_key=jira_issue_key, # Keyword argument: sync_direction sync_direction=sync_direction, # Keyword argument: created_by created_by=created_by, ) # Stage new record(s) for database insertion db.add(link) # Flush changes to DB without committing the transaction db.flush() # Check: settings.JIRA_ENABLED if settings.JIRA_ENABLED: # Attempt the following; catch errors below try: # Call sync_jira_to_aegis() sync_jira_to_aegis(db, link) # Handle Exception except Exception as e: # Log warning: "Initial Jira sync failed for %s: %s", jira_issue_ logger.warning("Initial Jira sync failed for %s: %s", jira_issue_key, e) # Return link return link # Define function list_links def list_links( # Entry: db db: Session, *, # Entry: entity_type entity_type: Optional[JiraLinkEntityType] = None, # Entry: entity_id entity_id: Optional[UUID] = None, entity_ids: Optional[list[UUID]] = None, ) -> list[JiraLink]: query = db.query(JiraLink) # Check: entity_type if entity_type: # Assign query = query.filter(JiraLink.entity_type == entity_type) query = query.filter(JiraLink.entity_type == entity_type) # Check: entity_id if entity_id: # Assign query = query.filter(JiraLink.entity_id == entity_id) query = query.filter(JiraLink.entity_id == entity_id) elif entity_ids: query = query.filter(JiraLink.entity_id.in_(entity_ids)) return query.order_by(JiraLink.created_at.desc()).all() # Define function get_link_or_raise def get_link_or_raise(db: Session, link_id: UUID) -> JiraLink: link = db.query(JiraLink).filter(JiraLink.id == link_id).first() # Check: not link if not link: # Raise EntityNotFoundError raise EntityNotFoundError("JiraLink", str(link_id)) # Return link return link # Define function delete_link def delete_link(db: Session, link_id: UUID) -> JiraLink: link = get_link_or_raise(db, link_id) # Mark record for deletion on next commit db.delete(link) # Return link return link def build_issue_data( db: Session, entity_type: JiraLinkEntityType, entity_id: UUID ) -> tuple[str, str]: """Build Jira issue summary and description from an Aegis entity.""" # Check: entity_type == JiraLinkEntityType.test if entity_type == JiraLinkEntityType.test: # Assign entity = db.query(Test).filter(Test.id == entity_id).first() entity = db.query(Test).filter(Test.id == entity_id).first() # Check: not entity if not entity: # Raise EntityNotFoundError raise EntityNotFoundError("Test", str(entity_id)) technique = db.query(Technique).filter(Technique.id == entity.technique_id).first() return ( f"[Aegis] {technique.mitre_id if technique else 'N/A'} — {entity.name}", _build_test_description(entity, technique), ) # Alternative: entity_type == JiraLinkEntityType.campaign elif entity_type == JiraLinkEntityType.campaign: # Assign entity = db.query(Campaign).filter(Campaign.id == entity_id).first() entity = db.query(Campaign).filter(Campaign.id == entity_id).first() # Check: not entity if not entity: # Raise EntityNotFoundError raise EntityNotFoundError("Campaign", str(entity_id)) # Return ( return ( f"[Aegis Campaign] {entity.name}", f"Campaign: {entity.name}\nType: {entity.type}\nStatus: {entity.status}\n" f"Description: {entity.description or 'N/A'}", ) # Alternative: entity_type == JiraLinkEntityType.technique elif entity_type == JiraLinkEntityType.technique: # Assign entity = db.query(Technique).filter(Technique.id == entity_id).first() entity = db.query(Technique).filter(Technique.id == entity_id).first() # Check: not entity if not entity: # Raise EntityNotFoundError raise EntityNotFoundError("Technique", str(entity_id)) # Return ( return ( f"[Aegis Technique] {entity.mitre_id} - {entity.name}", f"MITRE ID: {entity.mitre_id}\nName: {entity.name}\n" f"Tactic: {entity.tactic or 'N/A'}\n" f"Description: {entity.description or 'N/A'}", ) # Fallback: handle remaining cases else: # Return f"[Aegis] Entity {entity_id}", f"Entity type: {entity_type.value}" return f"[Aegis] Entity {entity_id}", f"Entity type: {entity_type.value}" # Define function create_issue_and_link def create_issue_and_link( # Entry: db db: Session, *, # Entry: entity_type entity_type: JiraLinkEntityType, # Entry: entity_id entity_id: UUID, # Entry: created_by created_by: UUID, ) -> dict: """Create a Jira issue from an Aegis entity and link them (global creds).""" summary, description = build_issue_data(db, entity_type, entity_id) project_key = settings.JIRA_DEFAULT_PROJECT result = create_jira_issue( project_key=project_key, summary=summary, # Keyword argument: description description=description, # Keyword argument: labels labels=["aegis", entity_type.value], ) # Assign link = JiraLink( link = JiraLink( # Keyword argument: entity_type entity_type=entity_type, # Keyword argument: entity_id entity_id=entity_id, # Keyword argument: jira_issue_key jira_issue_key=result["issue_key"], # Keyword argument: jira_issue_id jira_issue_id=result["issue_id"], jira_project_key=project_key, created_by=created_by, ) # Stage new record(s) for database insertion db.add(link) # Return {"issue_key": result["issue_key"], "link_id": str(link.id)} return {"issue_key": result["issue_key"], "link_id": str(link.id)}