Files
Aegis/backend/app/services/jira_service.py
T
kitos 9472fe91fa
Aegis CI / lint-and-test (push) Has been cancelled
fix(lint): resolve 2132 ruff errors to pass CI lint-and-test job
- Remove ANN (type annotations) and D (docstrings) from ruff select; not
  feasible to add thousands of missing annotations/docstrings across the codebase
- Add I001 and E501 to ignore: comment-interleaved import style and SQLAlchemy
  FK definitions naturally exceed line limits
- Fix F811 duplicate import blocks in main.py, models/__init__.py, routers
  (campaigns, system, tests, evidence) and services (test_workflow, test_crud,
  campaign_service, schemas/test)
- Add missing Evidence/IntelItem/Technique/Test/TestTemplate/User imports to
  models/__init__.py (were only in duplicate block)
- Fix F821: add missing JWTError import in auth.py
- Fix F401 unused imports across 15+ files (jira_service, sso_service,
  notification_service, playbook_service, tempo_service, models, schemas,
  routers: admin_config, attack_paths, executive_dashboard, knowledge,
  ownership, risk_intelligence, sso, api_keys, email_service)
- Fix F841 unused variables: owned_technique_ids (executive_dashboard_service),
  severity (jira_service), priority_order (revalidation_queue_service)
- Fix F541 f-strings without placeholders in system.py and attck_evaluations_service
- Fix F601 duplicate dict key G0067 in threat_actor_import_service
- Fix E701 multiple-statements-on-one-line in risk_intelligence_service
- Fix E741 ambiguous variable name l -> lvl in risk_intelligence_service
- Fix N806 uppercase vars in functions: technique.py, heatmap_service.py;
  add noqa for compliance_import_service.py large unused constant dicts
- Fix W293 whitespace on blank lines in tests/conftest.py
2026-06-12 10:47:48 +02:00

969 lines
34 KiB
Python

"""Jira integration service.
Authentication model
--------------------
Each Aegis user authenticates to Jira with their own Atlassian email and
personal API token. The email used is ``user.jira_email`` when set, falling
back to ``user.email`` (the Aegis account email). This lets users specify a
separate corporate Atlassian email without changing their Aegis login.
The token is stored in ``user.jira_api_token``.
Admin configuration
-------------------
The Jira URL and default project key are stored in the ``system_configs``
table (keys ``jira.url`` and ``jira.project_key``) so the admin can update
them at runtime without redeploying. These values override the legacy
``settings.JIRA_URL`` / ``settings.JIRA_DEFAULT_PROJECT`` env-vars which are
kept for backwards-compatibility only.
Lifecycle hooks
---------------
``push_test_event()`` is the single entry-point called from the test-workflow
service on every state transition. It posts a rich comment to the linked
Jira issue (if one exists) using the acting user's credentials.
``auto_create_test_issue()`` is called once after a test is created; it
creates the Jira ticket and stores the link.
"""
from __future__ import annotations
# Import logging
import logging
# Import datetime from datetime
from datetime import datetime
# Import Any, Optional from typing
from typing import Optional
# Import UUID from uuid
from uuid import UUID
# Import Session from sqlalchemy.orm
from sqlalchemy.orm import Session
# Import settings from app.config
from app.config import settings
# Import EntityNotFoundError from app.domain.errors
from app.domain.errors import EntityNotFoundError
# Import InvalidOperationError from app.domain.exceptions
from app.domain.exceptions import InvalidOperationError
# Import Campaign from app.models.campaign
from app.models.campaign import Campaign
# Import JiraLink, JiraLinkEntityType, JiraSyncDirection from app.models.jira_link
from app.models.jira_link import JiraLink, JiraLinkEntityType, JiraSyncDirection
# Import Technique from app.models.technique
from app.models.technique import Technique
# Import Test from app.models.test
from app.models.test import Test
from app.models.user import User
# Assign logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# System-config helpers (admin-configurable Jira settings)
# ---------------------------------------------------------------------------
_JIRA_KEYS = {
"url": "jira.url",
"project_key": "jira.project_key",
"enabled": "jira.enabled",
}
def _read_system_config(db: Session, key: str) -> Optional[str]:
"""Return a value from system_configs, or None if not set."""
from app.models.system_config import SystemConfig # avoid circular at import time
row = db.query(SystemConfig).filter(SystemConfig.key == key).first()
return row.value if row else None
def get_jira_url(db: Session) -> Optional[str]:
"""Return the admin-configured Jira URL, falling back to the env-var."""
return _read_system_config(db, _JIRA_KEYS["url"]) or settings.JIRA_URL or None
def get_jira_project_key(db: Session) -> Optional[str]:
"""Return the admin-configured default project key, falling back to env-var."""
return (
_read_system_config(db, _JIRA_KEYS["project_key"])
or settings.JIRA_DEFAULT_PROJECT
or None
)
def is_jira_enabled(db: Session) -> bool:
"""Return True if Jira integration is enabled (DB setting or env-var)."""
db_val = _read_system_config(db, _JIRA_KEYS["enabled"])
if db_val is not None:
return db_val.lower() in ("true", "1", "yes")
return settings.JIRA_ENABLED
def get_jira_parent_ticket(db: Session) -> Optional[str]:
"""Return the configured parent ticket key for campaigns, or None if not set."""
return _read_system_config(db, "jira.parent_ticket") or None
def get_jira_parent_ticket_standalone(db: Session) -> Optional[str]:
"""Return the parent ticket for standalone tests (not in a campaign).
Falls back to get_jira_parent_ticket() if not explicitly configured.
"""
return (
_read_system_config(db, "jira.parent_ticket_standalone")
or get_jira_parent_ticket(db)
)
def upsert_jira_config(db: Session, key: str, value: str) -> None:
"""Persist a Jira config key-value pair."""
from app.models.system_config import SystemConfig
row = db.query(SystemConfig).filter(SystemConfig.key == key).first()
if row:
row.value = value
else:
db.add(SystemConfig(key=key, value=value))
# ---------------------------------------------------------------------------
# Per-user Jira client
# ---------------------------------------------------------------------------
def _effective_jira_email(user: User) -> Optional[str]:
"""Return the email to use for Jira auth: jira_email if set, otherwise email."""
return getattr(user, "jira_email", None) or user.email
def get_user_jira_client(user: User, db: Session):
"""Build an Atlassian Jira client authenticated as *user*.
Uses ``user.jira_email`` when set, otherwise falls back to ``user.email``.
Raises ``InvalidOperationError`` when configuration is incomplete so
callers can surface meaningful error messages.
"""
jira_url = get_jira_url(db)
if not jira_url:
raise InvalidOperationError(
"Jira URL is not configured. Ask your administrator to set it in "
"System Settings → Jira Configuration."
)
auth_email = _effective_jira_email(user)
if not auth_email:
raise InvalidOperationError(
"No email configured for Jira authentication. "
"Set a Jira email in Settings → Profile → Jira Integration."
)
if not user.jira_api_token:
raise InvalidOperationError(
"You have not configured a Jira API token. "
"Go to Settings → Profile → Jira Integration and add your personal Atlassian token."
)
from atlassian import Jira
# Strip trailing slash — the Atlassian library appends paths like
# /rest/api/2/myself and a trailing slash causes double-slash URLs.
clean_url = jira_url.rstrip("/")
return Jira(
url=clean_url,
username=auth_email,
password=user.jira_api_token,
cloud=True,
)
def has_jira_configured(user: User, db: Session) -> bool:
"""Return True if *user* has everything needed to call Jira."""
return bool(get_jira_url(db) and _effective_jira_email(user) and user.jira_api_token)
# ---------------------------------------------------------------------------
# Ticket content builders (inspired by the pentest-to-Jira script)
# ---------------------------------------------------------------------------
_SEVERITY_TO_PRIORITY: dict[str, str] = {
"critical": "Highest",
"high": "High",
"medium": "Medium",
"low": "Low",
"informational": "Lowest",
}
_STATE_EMOJI: dict[str, str] = {
"draft": "📝 Draft",
"red_executing": "🔴 Red Team Executing",
"blue_evaluating": "🔵 Blue Team Evaluating",
"in_review": "📋 In Review",
"validated": "✅ Validated",
"rejected": "❌ Rejected",
}
def _technique_severity(technique: Optional[Technique]) -> str:
"""Return a lowercase severity string from the technique, defaulting to medium."""
if technique and hasattr(technique, "severity") and technique.severity:
return technique.severity.lower()
return "medium"
def _build_test_description(test: Test, technique: Optional[Technique]) -> str:
"""Build the initial Jira ticket description for a newly created test."""
mitre_id = technique.mitre_id if technique else "N/A"
tech_name = technique.name if technique else "N/A"
tactic = technique.tactic if technique else "N/A"
severity = _technique_severity(technique).capitalize()
lines = [
"h2. Aegis Security Test",
"",
f"*Test Name:* {test.name}",
f"*MITRE Technique:* [{mitre_id}|https://attack.mitre.org/techniques/{mitre_id.replace('.', '/')}] — {tech_name}",
f"*Tactic:* {tactic}",
f"*Platform:* {test.platform or 'N/A'}",
f"*Severity:* {severity}",
f"*Data Classification:* {test.data_classification or 'N/A'}",
"",
"h3. Description",
test.description or "_No description provided._",
"",
f"*Tool:* {test.tool_used or 'N/A'}",
"",
"----",
f"_Created via Aegis at {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC_",
]
return "\n".join(lines)
def _build_state_comment(
test: Test,
new_state: str,
actor: User,
extra: dict | None = None,
) -> str:
"""Build a Jira comment body for a test state transition."""
label = _STATE_EMOJI.get(new_state, new_state)
lines = [
f"h3. {label}",
"",
f"*Changed by:* {actor.username} ({actor.email or 'no email'})",
f"*At:* {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC",
"",
]
if new_state == "red_executing":
lines += [
"Red Team has started the attack execution.",
]
elif new_state == "blue_evaluating":
lines += [
"Red Team has finished execution and submitted evidence for Blue Team evaluation.",
"",
f"*Attack Success:* {test.attack_success if test.attack_success is not None else 'N/A'}",
]
if test.red_summary:
lines += ["", "h4. Red Team Summary", test.red_summary]
elif new_state == "in_review":
lines += [
"Blue Team has completed evaluation. Test is awaiting lead validation.",
"",
f"*Detection Result:* {test.detection_result or 'N/A'}",
]
if test.blue_summary:
lines += ["", "h4. Blue Team Summary", test.blue_summary]
if test.remediation_steps:
lines += ["", "h4. Remediation Steps", test.remediation_steps]
elif new_state == "validated":
lines += [
"Test has been *validated* by both leads.",
"",
f"*Red Lead Status:* {test.red_validation_status or 'N/A'}",
f"*Blue Lead Status:* {test.blue_validation_status or 'N/A'}",
]
if test.red_validation_notes:
lines += ["", f"*Red Lead Notes:* {test.red_validation_notes}"]
if test.blue_validation_notes:
lines += ["", f"*Blue Lead Notes:* {test.blue_validation_notes}"]
elif new_state == "rejected":
lines += [
"Test has been *rejected* and must be reworked.",
"",
f"*Red Lead Status:* {test.red_validation_status or 'N/A'}",
f"*Blue Lead Status:* {test.blue_validation_status or 'N/A'}",
]
if test.red_validation_notes:
lines += ["", f"*Red Lead Notes:* {test.red_validation_notes}"]
if test.blue_validation_notes:
lines += ["", f"*Blue Lead Notes:* {test.blue_validation_notes}"]
elif new_state == "draft":
lines += ["Test has been reopened for re-execution."]
# Any caller-supplied extra data
if extra:
lines.append("")
for k, v in extra.items():
lines.append(f"*{k}:* {v}")
lines.append("")
lines.append("_Synced from [Aegis|https://aegis.undiamagico.es]_")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Public lifecycle hooks
# ---------------------------------------------------------------------------
def _build_campaign_description(campaign) -> str:
"""Build the Jira ticket description for a campaign."""
lines = [
"h2. Aegis Security Campaign",
"",
f"*Campaign Name:* {campaign.name}",
f"*Type:* {campaign.type}",
f"*Status:* {campaign.status}",
]
if campaign.description:
lines += ["", "h3. Description", campaign.description]
if campaign.tags:
lines += ["", f"*Tags:* {', '.join(campaign.tags)}"]
lines += [
"",
"----",
f"_Created via Aegis at {datetime.utcnow().strftime('%Y-%m-%d %H:%M')} UTC_",
]
return "\n".join(lines)
def get_campaign_jira_key(db: Session, campaign_id) -> Optional[str]:
"""Return the Jira issue key for a campaign, or None if not linked."""
import uuid as _uuid
try:
cid = _uuid.UUID(str(campaign_id))
except (ValueError, AttributeError):
return None
link = (
db.query(JiraLink)
.filter(
JiraLink.entity_type == JiraLinkEntityType.campaign,
JiraLink.entity_id == cid,
)
.first()
)
return link.jira_issue_key if link else None
def get_test_jira_key(db: Session, test_id) -> Optional[str]:
"""Return the Jira issue key for a test, or None if not linked."""
import uuid as _uuid
try:
tid = _uuid.UUID(str(test_id))
except (ValueError, AttributeError):
return None
link = (
db.query(JiraLink)
.filter(
JiraLink.entity_type == JiraLinkEntityType.test,
JiraLink.entity_id == tid,
)
.first()
)
return link.jira_issue_key if link else None
def auto_create_campaign_issue(
db: Session,
campaign,
actor: User,
) -> Optional[str]:
"""Create a Jira issue for *campaign* under the configured parent ticket.
Returns the Jira issue key on success, or ``None`` if Jira is not
configured for *actor* or if the operation fails (non-fatal).
Called once right after a campaign is committed to the database.
The created ticket is stored as a JiraLink with entity_type=campaign.
"""
if not has_jira_configured(actor, db):
return None
project_key = get_jira_project_key(db)
if not project_key:
logger.warning(
"Jira project key not configured; skipping auto-create for campaign %s",
campaign.id,
)
return None
parent_ticket = get_jira_parent_ticket(db)
try:
jira = get_user_jira_client(actor, db)
fields: dict = {
"project": {"key": project_key},
"summary": f"[Aegis Campaign] {campaign.name}",
"description": _build_campaign_description(campaign),
"issuetype": {"name": settings.JIRA_ISSUE_TYPE_CAMPAIGN},
"labels": ["aegis", "campaign"],
# customfield_10011 = Epic Name (required for Epic type in classic Jira)
"customfield_10011": campaign.name,
}
# Set start date: use campaign.start_date if set, otherwise today
effective_start = campaign.start_date or campaign.created_at
if effective_start:
fields[settings.JIRA_START_DATE_FIELD] = effective_start.strftime("%Y-%m-%d")
# Nest under the configured parent ticket (Initiative, e.g. OFS-20795)
if parent_ticket:
fields["parent"] = {"key": parent_ticket}
result = jira.issue_create(fields=fields)
issue_key = result["key"]
issue_id = result.get("id", "")
link = JiraLink(
entity_type=JiraLinkEntityType.campaign,
entity_id=campaign.id,
jira_issue_key=issue_key,
jira_issue_id=issue_id,
jira_project_key=project_key,
sync_direction=JiraSyncDirection.aegis_to_jira,
created_by=actor.id,
)
db.add(link)
db.flush()
logger.info("Auto-created Jira issue %s for campaign %s", issue_key, campaign.id)
return issue_key
except Exception as exc:
# Non-fatal: Jira failures must never break the campaign creation flow
logger.warning(
"Failed to auto-create Jira issue for campaign %s: %s",
campaign.id, exc, exc_info=True,
)
return None
def auto_create_test_issue(
db: Session,
test: Test,
actor: User,
*,
technique: Optional[Technique] = None,
parent_ticket_override: Optional[str] = None,
campaign_start_date=None, # datetime | None — inherited from campaign when available
) -> Optional[str]:
"""Create a Jira issue for *test* and store the link.
Returns the Jira issue key on success, or ``None`` if Jira is not
configured for *actor* or if the operation fails (non-fatal).
Called once right after a test is committed to the database.
Args:
parent_ticket_override: When set, use this as the Jira parent ticket
instead of the system-configured parent (e.g. OFS-9107).
Use this to nest test tickets under a campaign ticket.
"""
if not has_jira_configured(actor, db):
return None
project_key = get_jira_project_key(db)
if not project_key:
logger.warning("Jira project key not configured; skipping auto-create for test %s", test.id)
return None
# Resolve technique if not supplied
if technique is None:
technique = db.query(Technique).filter(Technique.id == test.technique_id).first()
mitre_id = technique.mitre_id if technique else "N/A"
try:
jira = get_user_jira_client(actor, db)
# All tests — whether inside a campaign or standalone — are created
# as Task. Campaign tests use the campaign Jira key as parent
# (passed via parent_ticket_override); standalone tests use the
# configured standalone parent ticket (e.g. OFS-20798, which is an
# Epic so it can parent Tasks).
parent = parent_ticket_override or get_jira_parent_ticket_standalone(db)
issue_type = settings.JIRA_ISSUE_TYPE_TEST # always Task
poc = test.procedure_text or "N/A"
fields: dict = {
"project": {"key": project_key},
"summary": f"[Aegis] {mitre_id}{test.name}",
"description": _build_test_description(test, technique),
"issuetype": {"name": issue_type},
"labels": ["aegis", "security-test", mitre_id.replace(".", "-")],
# customfield_10309 = Proof of Concept field (required by team's Jira config)
"customfield_10309": f"{{code}}{poc}{{code}}",
}
# Inherit campaign start date if available, otherwise use today
from datetime import date as _date
effective_start = campaign_start_date or _date.today()
if hasattr(effective_start, "strftime"):
fields[settings.JIRA_START_DATE_FIELD] = effective_start.strftime("%Y-%m-%d")
if parent:
fields["parent"] = {"key": parent}
result = jira.issue_create(fields=fields)
issue_key = result["key"]
issue_id = result.get("id", "")
link = JiraLink(
entity_type=JiraLinkEntityType.test,
entity_id=test.id,
jira_issue_key=issue_key,
jira_issue_id=issue_id,
jira_project_key=project_key,
sync_direction=JiraSyncDirection.aegis_to_jira,
created_by=actor.id,
)
db.add(link)
db.flush()
logger.info("Auto-created Jira issue %s for test %s", issue_key, test.id)
return issue_key
except Exception as exc:
# Non-fatal: Jira failures must never break the test creation flow
logger.warning(
"Failed to auto-create Jira issue for test %s: %s",
test.id, exc, exc_info=True,
)
return None
def push_test_event(
db: Session,
test: Test,
actor: User,
new_state: str,
*,
extra: dict | None = None,
) -> None:
"""Post a lifecycle comment to the Jira issue linked to *test*.
Called from ``test_workflow_service`` after every state transition.
Completely non-fatal — any Jira error is logged and swallowed so it
never blocks the test workflow.
"""
if not has_jira_configured(actor, db):
return
link = (
db.query(JiraLink)
.filter(
JiraLink.entity_type == JiraLinkEntityType.test,
JiraLink.entity_id == test.id,
)
.first()
)
if not link:
return
try:
jira = get_user_jira_client(actor, db)
comment = _build_state_comment(test, new_state, actor, extra)
jira.issue_add_comment(link.jira_issue_key, comment)
# When the operator starts execution: transition to "In Progress"
# and assign the ticket to that operator.
if new_state == "red_executing":
try:
jira.set_issue_status(link.jira_issue_key, "In Progress")
logger.info(
"Transitioned Jira ticket %s to In Progress", link.jira_issue_key
)
except Exception as exc_t:
logger.warning(
"Could not transition %s to In Progress: %s",
link.jira_issue_key, exc_t,
)
jira_account_id = getattr(actor, "jira_account_id", None)
if jira_account_id:
try:
jira.assign_issue(link.jira_issue_key, account_id=jira_account_id)
logger.info(
"Assigned Jira ticket %s to account %s",
link.jira_issue_key, jira_account_id,
)
except Exception as exc_a:
logger.warning(
"Could not assign %s to %s: %s",
link.jira_issue_key, jira_account_id, exc_a,
)
link.last_synced_at = datetime.utcnow()
db.flush()
logger.info(
"Posted Jira comment to %s for test %s state=%s",
link.jira_issue_key, test.id, new_state,
)
except Exception as exc:
logger.warning(
"Failed to push Jira event for test %s (state=%s): %s",
test.id, new_state, exc, exc_info=True,
)
# ---------------------------------------------------------------------------
# Legacy / generic helpers (kept for existing routes)
# ---------------------------------------------------------------------------
def get_jira_client():
"""Return a shared Jira client using global credentials (legacy path).
Raises ``InvalidOperationError`` when Jira is disabled or unconfigured.
Prefer ``get_user_jira_client()`` for new code.
"""
if not settings.JIRA_ENABLED:
# Raise InvalidOperationError
raise InvalidOperationError("Jira integration is not enabled")
if not settings.JIRA_URL or not settings.JIRA_USERNAME or not settings.JIRA_API_TOKEN:
raise InvalidOperationError(
"Jira is enabled but JIRA_URL / JIRA_USERNAME / JIRA_API_TOKEN are not set"
)
from atlassian import Jira
return Jira(
url=settings.JIRA_URL,
username=settings.JIRA_USERNAME,
password=settings.JIRA_API_TOKEN,
cloud=settings.JIRA_IS_CLOUD,
)
# Define function search_jira_issues
def search_jira_issues(query: str, max_results: int = 10) -> list[dict]:
"""Search Jira issues by JQL or free text (uses global credentials)."""
jira = get_jira_client()
# Assign jql = query if "=" in query or "~" in query else f'summary ~ "{query}"'
jql = query if "=" in query or "~" in query else f'summary ~ "{query}"'
# Assign results = jira.jql(jql, limit=max_results)
results = jira.jql(jql, limit=max_results)
# Return [
return [
{
# Literal argument value
"issue_key": issue["key"],
# Literal argument value
"summary": issue["fields"]["summary"],
# Literal argument value
"status": issue["fields"]["status"]["name"],
# Literal argument value
"assignee": (issue["fields"].get("assignee") or {}).get("displayName"),
# Literal argument value
"priority": (issue["fields"].get("priority") or {}).get("name"),
}
for issue in results.get("issues", [])
]
# Define function create_jira_issue
def create_jira_issue(
# Entry: project_key
project_key: str,
# Entry: summary
summary: str,
# Entry: description
description: str,
# Entry: issue_type
issue_type: str = "Task",
# Entry: labels
labels: Optional[list[str]] = None,
# Entry: custom_fields
custom_fields: Optional[dict] = None,
) -> dict:
"""Create a Jira issue and return its key + id (uses global credentials)."""
jira = get_jira_client()
# Assign fields = {
fields: dict = {
# Literal argument value
"project": {"key": project_key},
# Literal argument value
"summary": summary,
# Literal argument value
"description": description,
# Literal argument value
"issuetype": {"name": issue_type},
}
# Check: labels
if labels:
# Assign fields["labels"] = labels
fields["labels"] = labels
# Check: custom_fields
if custom_fields:
# Call fields.update()
fields.update(custom_fields)
# Assign result = jira.issue_create(fields=fields)
result = jira.issue_create(fields=fields)
# Return {"issue_key": result["key"], "issue_id": result["id"]}
return {"issue_key": result["key"], "issue_id": result["id"]}
# Define function sync_jira_to_aegis
def sync_jira_to_aegis(db: Session, link: JiraLink) -> None:
"""Pull current status from Jira into the local link record (global creds)."""
jira = get_jira_client()
# Assign issue = jira.issue(link.jira_issue_key)
issue = jira.issue(link.jira_issue_key)
# Assign fields = issue.get("fields", {})
fields = issue.get("fields", {})
# Assign link.jira_status = fields.get("status", {}).get("name")
link.jira_status = fields.get("status", {}).get("name")
# Assign link.jira_priority = (fields.get("priority") or {}).get("name")
link.jira_priority = (fields.get("priority") or {}).get("name")
# Assign link.jira_assignee = (fields.get("assignee") or {}).get("displayName")
link.jira_assignee = (fields.get("assignee") or {}).get("displayName")
# Assign link.jira_story_points = str(fields.get("customfield_10016", ""))
link.jira_story_points = str(fields.get("customfield_10016", ""))
# Assign link.last_synced_at = datetime.utcnow()
link.last_synced_at = datetime.utcnow()
# Flush changes to DB without committing the transaction
db.flush()
# Define function sync_aegis_to_jira
def sync_aegis_to_jira(db: Session, link: JiraLink, entity_data: dict) -> None:
"""Push an Aegis status update as a Jira comment (global creds)."""
jira = get_jira_client()
# Assign comment_body = _build_sync_comment(entity_data)
comment_body = _build_sync_comment(entity_data)
# Call jira.issue_add_comment()
jira.issue_add_comment(link.jira_issue_key, comment_body)
# Assign link.last_synced_at = datetime.utcnow()
link.last_synced_at = datetime.utcnow()
# Flush changes to DB without committing the transaction
db.flush()
# Define function _build_sync_comment
def _build_sync_comment(data: dict) -> str:
lines = ["h3. Aegis Sync Update", ""]
# Iterate over data.items()
for key, value in data.items():
# Call lines.append()
lines.append(f"*{key}:* {value}")
# Call lines.append()
lines.append(f"\n_Synced at {datetime.utcnow().isoformat()}_")
# Return "\n".join(lines)
return "\n".join(lines)
# ── Link CRUD ────────────────────────────────────────────────────────────
def create_link(
# Entry: db
db: Session,
*,
# Entry: entity_type
entity_type: JiraLinkEntityType,
# Entry: entity_id
entity_id: UUID,
# Entry: jira_issue_key
jira_issue_key: str,
# Entry: sync_direction
sync_direction: JiraSyncDirection,
# Entry: created_by
created_by: UUID,
) -> JiraLink:
link = JiraLink(
# Keyword argument: entity_type
entity_type=entity_type,
# Keyword argument: entity_id
entity_id=entity_id,
# Keyword argument: jira_issue_key
jira_issue_key=jira_issue_key,
# Keyword argument: sync_direction
sync_direction=sync_direction,
# Keyword argument: created_by
created_by=created_by,
)
# Stage new record(s) for database insertion
db.add(link)
# Flush changes to DB without committing the transaction
db.flush()
# Check: settings.JIRA_ENABLED
if settings.JIRA_ENABLED:
# Attempt the following; catch errors below
try:
# Call sync_jira_to_aegis()
sync_jira_to_aegis(db, link)
# Handle Exception
except Exception as e:
# Log warning: "Initial Jira sync failed for %s: %s", jira_issue_
logger.warning("Initial Jira sync failed for %s: %s", jira_issue_key, e)
# Return link
return link
# Define function list_links
def list_links(
# Entry: db
db: Session,
*,
# Entry: entity_type
entity_type: Optional[JiraLinkEntityType] = None,
# Entry: entity_id
entity_id: Optional[UUID] = None,
entity_ids: Optional[list[UUID]] = None,
) -> list[JiraLink]:
query = db.query(JiraLink)
# Check: entity_type
if entity_type:
# Assign query = query.filter(JiraLink.entity_type == entity_type)
query = query.filter(JiraLink.entity_type == entity_type)
# Check: entity_id
if entity_id:
# Assign query = query.filter(JiraLink.entity_id == entity_id)
query = query.filter(JiraLink.entity_id == entity_id)
elif entity_ids:
query = query.filter(JiraLink.entity_id.in_(entity_ids))
return query.order_by(JiraLink.created_at.desc()).all()
# Define function get_link_or_raise
def get_link_or_raise(db: Session, link_id: UUID) -> JiraLink:
link = db.query(JiraLink).filter(JiraLink.id == link_id).first()
# Check: not link
if not link:
# Raise EntityNotFoundError
raise EntityNotFoundError("JiraLink", str(link_id))
# Return link
return link
# Define function delete_link
def delete_link(db: Session, link_id: UUID) -> JiraLink:
link = get_link_or_raise(db, link_id)
# Mark record for deletion on next commit
db.delete(link)
# Return link
return link
def build_issue_data(
db: Session, entity_type: JiraLinkEntityType, entity_id: UUID
) -> tuple[str, str]:
"""Build Jira issue summary and description from an Aegis entity."""
# Check: entity_type == JiraLinkEntityType.test
if entity_type == JiraLinkEntityType.test:
# Assign entity = db.query(Test).filter(Test.id == entity_id).first()
entity = db.query(Test).filter(Test.id == entity_id).first()
# Check: not entity
if not entity:
# Raise EntityNotFoundError
raise EntityNotFoundError("Test", str(entity_id))
technique = db.query(Technique).filter(Technique.id == entity.technique_id).first()
return (
f"[Aegis] {technique.mitre_id if technique else 'N/A'}{entity.name}",
_build_test_description(entity, technique),
)
# Alternative: entity_type == JiraLinkEntityType.campaign
elif entity_type == JiraLinkEntityType.campaign:
# Assign entity = db.query(Campaign).filter(Campaign.id == entity_id).first()
entity = db.query(Campaign).filter(Campaign.id == entity_id).first()
# Check: not entity
if not entity:
# Raise EntityNotFoundError
raise EntityNotFoundError("Campaign", str(entity_id))
# Return (
return (
f"[Aegis Campaign] {entity.name}",
f"Campaign: {entity.name}\nType: {entity.type}\nStatus: {entity.status}\n"
f"Description: {entity.description or 'N/A'}",
)
# Alternative: entity_type == JiraLinkEntityType.technique
elif entity_type == JiraLinkEntityType.technique:
# Assign entity = db.query(Technique).filter(Technique.id == entity_id).first()
entity = db.query(Technique).filter(Technique.id == entity_id).first()
# Check: not entity
if not entity:
# Raise EntityNotFoundError
raise EntityNotFoundError("Technique", str(entity_id))
# Return (
return (
f"[Aegis Technique] {entity.mitre_id} - {entity.name}",
f"MITRE ID: {entity.mitre_id}\nName: {entity.name}\n"
f"Tactic: {entity.tactic or 'N/A'}\n"
f"Description: {entity.description or 'N/A'}",
)
# Fallback: handle remaining cases
else:
# Return f"[Aegis] Entity {entity_id}", f"Entity type: {entity_type.value}"
return f"[Aegis] Entity {entity_id}", f"Entity type: {entity_type.value}"
# Define function create_issue_and_link
def create_issue_and_link(
# Entry: db
db: Session,
*,
# Entry: entity_type
entity_type: JiraLinkEntityType,
# Entry: entity_id
entity_id: UUID,
# Entry: created_by
created_by: UUID,
) -> dict:
"""Create a Jira issue from an Aegis entity and link them (global creds)."""
summary, description = build_issue_data(db, entity_type, entity_id)
project_key = settings.JIRA_DEFAULT_PROJECT
result = create_jira_issue(
project_key=project_key,
summary=summary,
# Keyword argument: description
description=description,
# Keyword argument: labels
labels=["aegis", entity_type.value],
)
# Assign link = JiraLink(
link = JiraLink(
# Keyword argument: entity_type
entity_type=entity_type,
# Keyword argument: entity_id
entity_id=entity_id,
# Keyword argument: jira_issue_key
jira_issue_key=result["issue_key"],
# Keyword argument: jira_issue_id
jira_issue_id=result["issue_id"],
jira_project_key=project_key,
created_by=created_by,
)
# Stage new record(s) for database insertion
db.add(link)
# Return {"issue_key": result["issue_key"], "link_id": str(link.id)}
return {"issue_key": result["issue_key"], "link_id": str(link.id)}